123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- var util = require('../core').util;
- var Transform = require('stream').Transform;
- var allocBuffer = util.buffer.alloc;
- /** @type {Transform} */
- function EventMessageChunkerStream(options) {
- Transform.call(this, options);
- this.currentMessageTotalLength = 0;
- this.currentMessagePendingLength = 0;
- /** @type {Buffer} */
- this.currentMessage = null;
- /** @type {Buffer} */
- this.messageLengthBuffer = null;
- }
- EventMessageChunkerStream.prototype = Object.create(Transform.prototype);
- /**
- *
- * @param {Buffer} chunk
- * @param {string} encoding
- * @param {*} callback
- */
- EventMessageChunkerStream.prototype._transform = function(chunk, encoding, callback) {
- var chunkLength = chunk.length;
- var currentOffset = 0;
- while (currentOffset < chunkLength) {
- // create new message if necessary
- if (!this.currentMessage) {
- // working on a new message, determine total length
- var bytesRemaining = chunkLength - currentOffset;
- // prevent edge case where total length spans 2 chunks
- if (!this.messageLengthBuffer) {
- this.messageLengthBuffer = allocBuffer(4);
- }
- var numBytesForTotal = Math.min(
- 4 - this.currentMessagePendingLength, // remaining bytes to fill the messageLengthBuffer
- bytesRemaining // bytes left in chunk
- );
- chunk.copy(
- this.messageLengthBuffer,
- this.currentMessagePendingLength,
- currentOffset,
- currentOffset + numBytesForTotal
- );
- this.currentMessagePendingLength += numBytesForTotal;
- currentOffset += numBytesForTotal;
- if (this.currentMessagePendingLength < 4) {
- // not enough information to create the current message
- break;
- }
- this.allocateMessage(this.messageLengthBuffer.readUInt32BE(0));
- this.messageLengthBuffer = null;
- }
- // write data into current message
- var numBytesToWrite = Math.min(
- this.currentMessageTotalLength - this.currentMessagePendingLength, // number of bytes left to complete message
- chunkLength - currentOffset // number of bytes left in the original chunk
- );
- chunk.copy(
- this.currentMessage, // target buffer
- this.currentMessagePendingLength, // target offset
- currentOffset, // chunk offset
- currentOffset + numBytesToWrite // chunk end to write
- );
- this.currentMessagePendingLength += numBytesToWrite;
- currentOffset += numBytesToWrite;
- // check if a message is ready to be pushed
- if (this.currentMessageTotalLength && this.currentMessageTotalLength === this.currentMessagePendingLength) {
- // push out the message
- this.push(this.currentMessage);
- // cleanup
- this.currentMessage = null;
- this.currentMessageTotalLength = 0;
- this.currentMessagePendingLength = 0;
- }
- }
- callback();
- };
- EventMessageChunkerStream.prototype._flush = function(callback) {
- if (this.currentMessageTotalLength) {
- if (this.currentMessageTotalLength === this.currentMessagePendingLength) {
- callback(null, this.currentMessage);
- } else {
- callback(new Error('Truncated event message received.'));
- }
- } else {
- callback();
- }
- };
- /**
- * @param {number} size Size of the message to be allocated.
- * @api private
- */
- EventMessageChunkerStream.prototype.allocateMessage = function(size) {
- if (typeof size !== 'number') {
- throw new Error('Attempted to allocate an event message where size was not a number: ' + size);
- }
- this.currentMessageTotalLength = size;
- this.currentMessagePendingLength = 4;
- this.currentMessage = allocBuffer(size);
- this.currentMessage.writeUInt32BE(size, 0);
- };
- /**
- * @api private
- */
- module.exports = {
- EventMessageChunkerStream: EventMessageChunkerStream
- };
|