build-message.js 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. var util = require('../core').util;
  2. var crypto = util.crypto;
  3. var Int64 = require('./int64').Int64;
  4. var toBuffer = util.buffer.toBuffer;
  5. var allocBuffer = util.buffer.alloc;
  6. var Buffer = util.Buffer;
  7. /**
  8. * @api private
  9. */
  10. function buildMessage(message) {
  11. var formattedHeaders = buildHeaders(message.headers);
  12. var headerLengthBytes = allocBuffer(4);
  13. headerLengthBytes.writeUInt32BE(formattedHeaders.length, 0);
  14. var totalLengthBytes = allocBuffer(4);
  15. totalLengthBytes.writeUInt32BE(
  16. totalLengthBytes.length + // size of this buffer
  17. headerLengthBytes.length + // size of header length buffer
  18. 4 + // prelude crc32
  19. formattedHeaders.length + // total size of headers
  20. message.body.length + // total size of payload
  21. 4, // size of crc32 of the total message
  22. 0
  23. );
  24. var prelude = Buffer.concat([
  25. totalLengthBytes,
  26. headerLengthBytes
  27. ], totalLengthBytes.length + headerLengthBytes.length);
  28. var preludeCrc32 = crc32(prelude);
  29. var totalSansCrc32 = Buffer.concat([
  30. prelude, preludeCrc32, formattedHeaders, message.body
  31. ], prelude.length + preludeCrc32.length + formattedHeaders.length + message.body.length);
  32. var totalCrc32 = crc32(totalSansCrc32);
  33. return Buffer.concat([totalSansCrc32, totalCrc32]);
  34. }
  35. function buildHeaders(headers) {
  36. /** @type {Buffer[]} */
  37. var chunks = [];
  38. var totalSize = 0;
  39. var headerNames = Object.keys(headers);
  40. for (var i = 0; i < headerNames.length; i++) {
  41. var headerName = headerNames[i];
  42. var bytes = toBuffer(headerName);
  43. var headerValue = buildHeaderValue(headers[headerName]);
  44. var nameLength = allocBuffer(1);
  45. nameLength[0] = headerName.length;
  46. chunks.push(
  47. nameLength,
  48. bytes,
  49. headerValue
  50. );
  51. totalSize += nameLength.length + bytes.length + headerValue.length;
  52. }
  53. var out = allocBuffer(totalSize);
  54. var position = 0;
  55. for (var j = 0; j < chunks.length; j++) {
  56. var chunk = chunks[j];
  57. for (var k = 0; k < chunk.length; k++) {
  58. out[position] = chunk[k];
  59. position++;
  60. }
  61. }
  62. return out;
  63. }
  64. /**
  65. * @param {object} header
  66. * @param {'boolean'|'byte'|'short'|'integer'|'long'|'binary'|'string'|'timestamp'|'uuid'} header.type
  67. * @param {*} header.value
  68. * @returns {Buffer}
  69. */
  70. function buildHeaderValue(header) {
  71. switch (header.type) {
  72. case 'binary':
  73. var binBytes = allocBuffer(3);
  74. binBytes.writeUInt8(HEADER_VALUE_TYPE.byteArray, 0);
  75. binBytes.writeUInt16BE(header.value.length, 1);
  76. return Buffer.concat([
  77. binBytes, header.value
  78. ], binBytes.length + header.value.length);
  79. case 'boolean':
  80. var boolByte = allocBuffer(1);
  81. boolByte[0] = header.value ? HEADER_VALUE_TYPE.boolTrue : HEADER_VALUE_TYPE.boolFalse;
  82. return boolByte;
  83. case 'byte':
  84. var singleByte = allocBuffer(2);
  85. singleByte[0] = HEADER_VALUE_TYPE.byte;
  86. singleByte[1] = header.value;
  87. return singleByte;
  88. case 'integer':
  89. var intBytes = allocBuffer(5);
  90. intBytes.writeUInt8(HEADER_VALUE_TYPE.integer, 0);
  91. intBytes.writeInt32BE(header.value, 1);
  92. return intBytes;
  93. case 'long':
  94. var longBytes = allocBuffer(1);
  95. longBytes[0] = HEADER_VALUE_TYPE.long;
  96. return Buffer.concat([
  97. longBytes, header.value.bytes
  98. ], 9);
  99. case 'short':
  100. var shortBytes = allocBuffer(3);
  101. shortBytes.writeUInt8(HEADER_VALUE_TYPE.short, 0);
  102. shortBytes.writeInt16BE(header.value, 1);
  103. return shortBytes;
  104. case 'string':
  105. var utf8Bytes = toBuffer(header.value);
  106. var strBytes = allocBuffer(3);
  107. strBytes.writeUInt8(HEADER_VALUE_TYPE.string, 0);
  108. strBytes.writeUInt16BE(utf8Bytes.length, 1);
  109. return Buffer.concat([
  110. strBytes, utf8Bytes
  111. ], strBytes.length + utf8Bytes.length);
  112. case 'timestamp':
  113. var tsBytes = allocBuffer(1);
  114. tsBytes[0] = HEADER_VALUE_TYPE.timestamp;
  115. return Buffer.concat([
  116. tsBytes, Int64.fromNumber(header.value.valueOf()).bytes
  117. ], 9);
  118. case 'uuid':
  119. if (!UUID_PATTERN.test(header.value)) {
  120. throw new Error('Invalid UUID received: ' + header.value);
  121. }
  122. var uuidBytes = allocBuffer(1);
  123. uuidBytes[0] = HEADER_VALUE_TYPE.uuid;
  124. return Buffer.concat([
  125. uuidBytes, toBuffer(header.value.replace(/\-/g, ''), 'hex')
  126. ], 17);
  127. }
  128. }
  129. function crc32(buffer) {
  130. var crc32 = crypto.crc32(buffer);
  131. var crc32Buffer = allocBuffer(4);
  132. crc32Buffer.writeUInt32BE(crc32, 0);
  133. return crc32Buffer;
  134. }
  135. /**
  136. * @api private
  137. */
  138. var HEADER_VALUE_TYPE = {
  139. boolTrue: 0,
  140. boolFalse: 1,
  141. byte: 2,
  142. short: 3,
  143. integer: 4,
  144. long: 5,
  145. byteArray: 6,
  146. string: 7,
  147. timestamp: 8,
  148. uuid: 9,
  149. };
  150. var UUID_PATTERN = /^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$/;
  151. /**
  152. * @api private
  153. */
  154. module.exports = {
  155. buildMessage: buildMessage
  156. };