1 module gelf.transport; 2 3 private 4 import std.socket : Socket; 5 import gelf.protocol : Message; 6 7 public: 8 9 enum MAX_CHUNKS = 128; //"A message MUST NOT consist of more than 128 chunks." 10 11 /** 12 This struct converts a Message into chunks as defined in http://docs.graylog.org/en/latest/pages/gelf.html#chunked-gelf 13 14 It is an InputRange. 15 */ 16 struct Chunks { 17 18 import std.outbuffer : OutBuffer; 19 import std.range : Chunks; 20 import std.random : uniform; 21 22 private : 23 Chunks!(const(ubyte)[]) byteChunks; 24 OutBuffer buffer; 25 ulong messageId; 26 byte sequenceNo; 27 ubyte total; 28 29 public: 30 31 /** 32 Convert a Message into chunks 33 34 Params: 35 message = The message to compress 36 chunkSize = The size of each chunk in bytes. Default : 8192 37 38 Throws: MessageTooLargeException if # of chunks > 128. 39 */ 40 this(Message message, uint chunkSize = 8192) 41 { 42 this(message.toBytes(), chunkSize); 43 } 44 45 /** 46 In LDC 0.17.2, std.zlib.compress returns in const(void)[]. 47 This ctor is the fix for now. Remove when LDC catches up. 48 */ 49 this(const void[] message, uint chunkSize = 8192, ulong messageId = uniform(0, ulong.max)) 50 { 51 this(cast(ubyte[]) message, chunkSize, messageId); 52 } 53 54 /** 55 Use this when you've converted a message to bytes. Use this after you have compressed your message 56 57 Params: 58 message = The compressed message. 59 chunkSize = The size of each chunk in bytes. Default : 8192 60 messageId = The messageId. Do not use this, it is only used for unittests. Default : uniform(0, ulong.max) 61 62 Throws: MessageTooLargeException if # of chunks > 128. 63 */ 64 this(const ubyte[] message, uint chunkSize = 8192, ulong messageId = uniform(0, ulong.max)) 65 { 66 auto t = (message.length / (chunkSize - 12)) + 1; 67 if (t > MAX_CHUNKS) { 68 import std.conv : to; 69 throw new MessageTooLargeException("Message has "~ to!string(t) ~" chunks. Cannot be larger than " ~ MAX_CHUNKS ~ " chunks"); 70 } 71 72 this.messageId = messageId; 73 sequenceNo = 0; 74 75 buffer = new OutBuffer(); 76 buffer.reserve(chunkSize); 77 78 import std.range : chunks; 79 byteChunks = chunks(message, chunkSize - 12); 80 total = cast(ubyte)byteChunks.length(); 81 } 82 83 ubyte[] front() { 84 buffer.offset = 0; 85 buffer.write(cast(ubyte)0x1e); 86 buffer.write(cast(ubyte)0x0f); 87 buffer.write(messageId); 88 buffer.write(sequenceNo); 89 buffer.write(total); 90 buffer.write(byteChunks[sequenceNo]); 91 92 return buffer.toBytes(); 93 } 94 95 void popFront() { 96 sequenceNo++; 97 } 98 99 bool empty() { 100 return !(sequenceNo < byteChunks.length()); 101 } 102 103 auto length() { 104 return byteChunks.length(); 105 } 106 } 107 108 /** 109 This function provides a convenient way to send chunked GELF messages to Graylog. 110 It automatically chunks a message based on $(D_PARAM packetSizeBytes). 111 112 Params: 113 packetSizeBytes = The size of each chunk in bytes. Default : 8192 114 compressed = If true, compress the message using zlib. Default : false 115 116 Throws: Exception if # of chunks > 128. 117 118 Examples: 119 ------------------------- 120 auto s = new UdpSocket(); 121 s.connect(new InternetAddress("localhost", 12200)); 122 123 // Start netcat to watch this packet : `nc -lu 12200` 124 s.sendChunked(gelfMessage, 500); 125 ------------------------- 126 127 Returns: void 128 */ 129 auto sendChunked(Socket socket, Message message, uint packetSizeBytes = 8192, bool compressed = false) 130 { 131 import std.zlib; 132 foreach(c; Chunks((compressed) ? cast(ubyte[])compress(message.toString()) : message.toBytes(), 500)) 133 socket.send(c); 134 } 135 136 class MessageTooLargeException : Exception { 137 this(string msg) { super(msg); } 138 } 139 140 unittest { 141 142 auto m = Message("localhost","An alert message"); 143 // import std.stdio; 144 // writeln(m.toBytes, m.toBytes().length); 145 146 import std.random : uniform; 147 auto mid = uniform(0, ulong.max); 148 auto chunks = Chunks(m.toBytes, 20, mid); 149 foreach(c; chunks) { 150 // writeln(c); 151 assert(c[0 .. 2] == [0x1e, 0x0f]); //Magic bytes 152 153 import std.bitmanip : littleEndianToNative; 154 155 ubyte[8] bytes = c[2 .. 10]; 156 assert(littleEndianToNative!ulong(bytes) == mid); // 8byte message id 157 158 assert(c[11] == (m.toBytes().length / (20 - 12)) + 1); //1 byte - Total number of chunks this message has 159 } 160 161 import std.zlib : compress; 162 163 auto compressed = compress(m.toBytes); 164 chunks = Chunks(compressed, 20, mid); 165 foreach(c; chunks) { 166 assert(c[11] == (compressed.length / (20 - 12)) + 1); //1 byte - Total number of chunks this message has 167 } 168 169 // Check if message is smaller than chunk length, then it makes one chunk only 170 chunks = Chunks(m.toBytes, cast(uint)(m.toBytes.length + 12), mid); 171 foreach(c; chunks) { 172 assert(c[11] == 1); //Only 1 chunk 173 } 174 }