event-message-chunker-stream.js 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. var util = require('../core').util;
  2. var Transform = require('stream').Transform;
  3. var allocBuffer = util.buffer.alloc;
  4. /** @type {Transform} */
  5. function EventMessageChunkerStream(options) {
  6. Transform.call(this, options);
  7. this.currentMessageTotalLength = 0;
  8. this.currentMessagePendingLength = 0;
  9. /** @type {Buffer} */
  10. this.currentMessage = null;
  11. /** @type {Buffer} */
  12. this.messageLengthBuffer = null;
  13. }
  14. EventMessageChunkerStream.prototype = Object.create(Transform.prototype);
  15. /**
  16. *
  17. * @param {Buffer} chunk
  18. * @param {string} encoding
  19. * @param {*} callback
  20. */
  21. EventMessageChunkerStream.prototype._transform = function(chunk, encoding, callback) {
  22. var chunkLength = chunk.length;
  23. var currentOffset = 0;
  24. while (currentOffset < chunkLength) {
  25. // create new message if necessary
  26. if (!this.currentMessage) {
  27. // working on a new message, determine total length
  28. var bytesRemaining = chunkLength - currentOffset;
  29. // prevent edge case where total length spans 2 chunks
  30. if (!this.messageLengthBuffer) {
  31. this.messageLengthBuffer = allocBuffer(4);
  32. }
  33. var numBytesForTotal = Math.min(
  34. 4 - this.currentMessagePendingLength, // remaining bytes to fill the messageLengthBuffer
  35. bytesRemaining // bytes left in chunk
  36. );
  37. chunk.copy(
  38. this.messageLengthBuffer,
  39. this.currentMessagePendingLength,
  40. currentOffset,
  41. currentOffset + numBytesForTotal
  42. );
  43. this.currentMessagePendingLength += numBytesForTotal;
  44. currentOffset += numBytesForTotal;
  45. if (this.currentMessagePendingLength < 4) {
  46. // not enough information to create the current message
  47. break;
  48. }
  49. this.allocateMessage(this.messageLengthBuffer.readUInt32BE(0));
  50. this.messageLengthBuffer = null;
  51. }
  52. // write data into current message
  53. var numBytesToWrite = Math.min(
  54. this.currentMessageTotalLength - this.currentMessagePendingLength, // number of bytes left to complete message
  55. chunkLength - currentOffset // number of bytes left in the original chunk
  56. );
  57. chunk.copy(
  58. this.currentMessage, // target buffer
  59. this.currentMessagePendingLength, // target offset
  60. currentOffset, // chunk offset
  61. currentOffset + numBytesToWrite // chunk end to write
  62. );
  63. this.currentMessagePendingLength += numBytesToWrite;
  64. currentOffset += numBytesToWrite;
  65. // check if a message is ready to be pushed
  66. if (this.currentMessageTotalLength && this.currentMessageTotalLength === this.currentMessagePendingLength) {
  67. // push out the message
  68. this.push(this.currentMessage);
  69. // cleanup
  70. this.currentMessage = null;
  71. this.currentMessageTotalLength = 0;
  72. this.currentMessagePendingLength = 0;
  73. }
  74. }
  75. callback();
  76. };
  77. EventMessageChunkerStream.prototype._flush = function(callback) {
  78. if (this.currentMessageTotalLength) {
  79. if (this.currentMessageTotalLength === this.currentMessagePendingLength) {
  80. callback(null, this.currentMessage);
  81. } else {
  82. callback(new Error('Truncated event message received.'));
  83. }
  84. } else {
  85. callback();
  86. }
  87. };
  88. /**
  89. * @param {number} size Size of the message to be allocated.
  90. * @api private
  91. */
  92. EventMessageChunkerStream.prototype.allocateMessage = function(size) {
  93. if (typeof size !== 'number') {
  94. throw new Error('Attempted to allocate an event message where size was not a number: ' + size);
  95. }
  96. this.currentMessageTotalLength = size;
  97. this.currentMessagePendingLength = 4;
  98. this.currentMessage = allocBuffer(size);
  99. this.currentMessage.writeUInt32BE(size, 0);
  100. };
  101. /**
  102. * @api private
  103. */
  104. module.exports = {
  105. EventMessageChunkerStream: EventMessageChunkerStream
  106. };