1 module gelf.transport;
2 
3 private
4     import std.socket : Socket;
5     import gelf.protocol : Message;
6     
7 public:
8 
9     enum MAX_CHUNKS = 128; //"A message MUST NOT consist of more than 128 chunks."
10 
11     /**
12      This struct converts a Message into chunks as defined in http://docs.graylog.org/en/latest/pages/gelf.html#chunked-gelf
13      
14      It is an InputRange.
15      */
16     struct Chunks {
17 
18         import std.outbuffer : OutBuffer;
19         import std.range : Chunks;
20         import std.random : uniform;
21         
22         private :
23             Chunks!(const(ubyte)[]) byteChunks;
24             OutBuffer buffer;
25             ulong messageId;
26             byte sequenceNo;
27             ubyte total;
28         
29         public:
30         
31         /**
32          Convert a Message into chunks
33          
34          Params:
35              message   = The message to compress
36              chunkSize = The size of each chunk in bytes. Default : 8192
37              
38          Throws: MessageTooLargeException if # of chunks > 128.
39          */
40         this(Message message, uint chunkSize = 8192)
41         {
42             this(message.toBytes(), chunkSize);
43         }
44         
45         /**
46          In LDC 0.17.2, std.zlib.compress returns in const(void)[].
47          This ctor is the fix for now. Remove when LDC catches up.
48          */
49         this(const void[] message, uint chunkSize = 8192, ulong messageId = uniform(0, ulong.max))
50         {
51         	this(cast(ubyte[]) message, chunkSize, messageId);
52         }
53         
54         /**
55          Use this when you've converted a message to bytes. Use this after you have compressed your message
56          
57          Params:
58              message   = The compressed message.
59              chunkSize = The size of each chunk in bytes. Default : 8192 
60              messageId = The messageId. Do not use this, it is only used for unittests. Default : uniform(0, ulong.max) 
61              
62          Throws: MessageTooLargeException if # of chunks > 128.
63          */
64         this(const ubyte[] message, uint chunkSize = 8192, ulong messageId = uniform(0, ulong.max))
65         {
66             auto t = (message.length / (chunkSize - 12)) + 1;
67             if (t > MAX_CHUNKS) {
68                 import std.conv : to;
69                 throw new MessageTooLargeException("Message has "~ to!string(t) ~" chunks. Cannot be larger than " ~ MAX_CHUNKS ~ " chunks");
70             }
71             
72             this.messageId = messageId;
73             sequenceNo = 0;
74             
75             buffer = new OutBuffer();
76             buffer.reserve(chunkSize);
77 
78             import std.range : chunks;    
79             byteChunks = chunks(message, chunkSize - 12);
80             total = cast(ubyte)byteChunks.length();
81         }
82         
83         ubyte[] front() {
84             buffer.offset = 0;
85             buffer.write(cast(ubyte)0x1e);
86             buffer.write(cast(ubyte)0x0f);
87             buffer.write(messageId);
88             buffer.write(sequenceNo);
89             buffer.write(total);
90             buffer.write(byteChunks[sequenceNo]);
91             
92             return buffer.toBytes();
93         }
94     
95         void popFront() {
96             sequenceNo++;
97         }
98         
99         bool empty() {
100             return !(sequenceNo < byteChunks.length());
101         }
102         
103         auto length() {
104             return byteChunks.length();
105         }
106     }
107     
108     /**
109     This function provides a convenient way to send chunked GELF messages to Graylog.
110     It automatically chunks a message based on $(D_PARAM packetSizeBytes).
111 
112     Params:
113          packetSizeBytes = The size of each chunk in bytes. Default : 8192
114          compressed      = If true, compress the message using zlib. Default : false
115 
116     Throws: Exception if # of chunks > 128.
117 
118     Examples:
119     -------------------------
120     auto s = new UdpSocket();
121     s.connect(new InternetAddress("localhost", 12200));
122 
123     // Start netcat to watch this packet : `nc -lu 12200`
124     s.sendChunked(gelfMessage, 500);
125     -------------------------
126 
127     Returns: void
128     */
129     auto sendChunked(Socket socket, Message message, uint packetSizeBytes = 8192, bool compressed = false)
130     {
131         import std.zlib;
132         foreach(c; Chunks((compressed) ? cast(ubyte[])compress(message.toString()) : message.toBytes(), 500))
133             socket.send(c);
134     }
135 
136     class MessageTooLargeException : Exception {
137         this(string msg) { super(msg); }
138     }
139 
140 unittest {
141     
142     auto m = Message("localhost","An alert message");
143 //    import std.stdio;
144 //    writeln(m.toBytes, m.toBytes().length);
145 
146     import std.random : uniform;
147     auto mid = uniform(0, ulong.max);
148     auto chunks = Chunks(m.toBytes, 20, mid);
149     foreach(c; chunks) {
150 //        writeln(c);
151         assert(c[0 .. 2] == [0x1e, 0x0f]); //Magic bytes
152         
153         import std.bitmanip : littleEndianToNative;
154         
155         ubyte[8] bytes = c[2 .. 10];
156         assert(littleEndianToNative!ulong(bytes) == mid); // 8byte message id
157         
158         assert(c[11] == (m.toBytes().length / (20 - 12)) + 1); //1 byte - Total number of chunks this message has
159     }
160     
161     import std.zlib : compress;
162     
163     auto compressed = compress(m.toBytes);
164     chunks = Chunks(compressed, 20, mid);
165     foreach(c; chunks) {
166         assert(c[11] == (compressed.length / (20 - 12)) + 1); //1 byte - Total number of chunks this message has
167     }
168     
169     // Check if message is smaller than chunk length, then it makes one chunk only
170     chunks = Chunks(m.toBytes, cast(uint)(m.toBytes.length + 12), mid);
171     foreach(c; chunks) {
172         assert(c[11] == 1); //Only 1 chunk
173     }
174 }