stringify-stream.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. const { Readable } = require('stream');
  2. const {
  3. normalizeReplacer,
  4. normalizeSpace,
  5. replaceValue,
  6. getTypeAsync,
  7. type: {
  8. PRIMITIVE,
  9. OBJECT,
  10. ARRAY,
  11. PROMISE,
  12. STRING_STREAM,
  13. OBJECT_STREAM
  14. }
  15. } = require('./utils');
  16. const noop = () => {};
  17. const hasOwnProperty = Object.prototype.hasOwnProperty;
  18. // TODO: Remove when drop support for Node.js 10
  19. // Node.js 10 has no well-formed JSON.stringify()
  20. // https://github.com/tc39/proposal-well-formed-stringify
  21. // Adopted code from https://bugs.chromium.org/p/v8/issues/detail?id=7782#c12
  22. const wellformedStringStringify = JSON.stringify('\ud800') === '"\\ud800"'
  23. ? JSON.stringify
  24. : s => JSON.stringify(s).replace(
  25. /\p{Surrogate}/gu,
  26. m => `\\u${m.charCodeAt(0).toString(16)}`
  27. );
  28. function push() {
  29. this.push(this._stack.value);
  30. this.popStack();
  31. }
  32. function pushPrimitive(value) {
  33. switch (typeof value) {
  34. case 'string':
  35. this.push(this.encodeString(value));
  36. break;
  37. case 'number':
  38. this.push(Number.isFinite(value) ? this.encodeNumber(value) : 'null');
  39. break;
  40. case 'boolean':
  41. this.push(value ? 'true' : 'false');
  42. break;
  43. case 'undefined':
  44. case 'object': // typeof null === 'object'
  45. this.push('null');
  46. break;
  47. default:
  48. this.destroy(new TypeError(`Do not know how to serialize a ${value.constructor && value.constructor.name || typeof value}`));
  49. }
  50. }
  51. function processObjectEntry(key) {
  52. const current = this._stack;
  53. if (!current.first) {
  54. current.first = true;
  55. } else {
  56. this.push(',');
  57. }
  58. if (this.space) {
  59. this.push(`\n${this.space.repeat(this._depth)}${this.encodeString(key)}: `);
  60. } else {
  61. this.push(this.encodeString(key) + ':');
  62. }
  63. }
  64. function processObject() {
  65. const current = this._stack;
  66. // when no keys left, remove obj from stack
  67. if (current.index === current.keys.length) {
  68. if (this.space && current.first) {
  69. this.push(`\n${this.space.repeat(this._depth - 1)}}`);
  70. } else {
  71. this.push('}');
  72. }
  73. this.popStack();
  74. return;
  75. }
  76. const key = current.keys[current.index];
  77. this.processValue(current.value, key, current.value[key], processObjectEntry);
  78. current.index++;
  79. }
  80. function processArrayItem(index) {
  81. if (index !== 0) {
  82. this.push(',');
  83. }
  84. if (this.space) {
  85. this.push(`\n${this.space.repeat(this._depth)}`);
  86. }
  87. }
  88. function processArray() {
  89. const current = this._stack;
  90. if (current.index === current.value.length) {
  91. if (this.space && current.index > 0) {
  92. this.push(`\n${this.space.repeat(this._depth - 1)}]`);
  93. } else {
  94. this.push(']');
  95. }
  96. this.popStack();
  97. return;
  98. }
  99. this.processValue(current.value, current.index, current.value[current.index], processArrayItem);
  100. current.index++;
  101. }
  102. function createStreamReader(fn) {
  103. return function() {
  104. const current = this._stack;
  105. const data = current.value.read(this._readSize);
  106. if (data !== null) {
  107. current.first = false;
  108. fn.call(this, data, current);
  109. } else {
  110. if ((current.first && !current.value._readableState.reading) || current.ended) {
  111. this.popStack();
  112. } else {
  113. current.first = true;
  114. current.awaiting = true;
  115. }
  116. }
  117. };
  118. }
  119. const processReadableObject = createStreamReader(function(data, current) {
  120. this.processValue(current.value, current.index, data, processArrayItem);
  121. current.index++;
  122. });
  123. const processReadableString = createStreamReader(function(data) {
  124. this.push(data);
  125. });
  126. class JsonStringifyStream extends Readable {
  127. constructor(value, replacer, space) {
  128. super({
  129. autoDestroy: true
  130. });
  131. this.getKeys = Object.keys;
  132. this.replacer = normalizeReplacer(replacer);
  133. if (Array.isArray(this.replacer)) {
  134. const allowlist = this.replacer;
  135. this.getKeys = (value) => allowlist.filter(key => hasOwnProperty.call(value, key));
  136. this.replacer = null;
  137. }
  138. this.space = normalizeSpace(space);
  139. this._depth = 0;
  140. this.error = null;
  141. this._processing = false;
  142. this._ended = false;
  143. this._readSize = 0;
  144. this._buffer = '';
  145. this._stack = null;
  146. this._visited = new WeakSet();
  147. this.pushStack({
  148. handler: () => {
  149. this.popStack();
  150. this.processValue({ '': value }, '', value, noop);
  151. }
  152. });
  153. }
  154. encodeString(value) {
  155. if (/[^\x20-\uD799]|[\x22\x5c]/.test(value)) {
  156. return wellformedStringStringify(value);
  157. }
  158. return '"' + value + '"';
  159. }
  160. encodeNumber(value) {
  161. return value;
  162. }
  163. processValue(holder, key, value, callback) {
  164. value = replaceValue(holder, key, value, this.replacer);
  165. let type = getTypeAsync(value);
  166. switch (type) {
  167. case PRIMITIVE:
  168. if (callback !== processObjectEntry || value !== undefined) {
  169. callback.call(this, key);
  170. pushPrimitive.call(this, value);
  171. }
  172. break;
  173. case OBJECT:
  174. callback.call(this, key);
  175. // check for circular structure
  176. if (this._visited.has(value)) {
  177. return this.destroy(new TypeError('Converting circular structure to JSON'));
  178. }
  179. this._visited.add(value);
  180. this._depth++;
  181. this.push('{');
  182. this.pushStack({
  183. handler: processObject,
  184. value,
  185. index: 0,
  186. first: false,
  187. keys: this.getKeys(value)
  188. });
  189. break;
  190. case ARRAY:
  191. callback.call(this, key);
  192. // check for circular structure
  193. if (this._visited.has(value)) {
  194. return this.destroy(new TypeError('Converting circular structure to JSON'));
  195. }
  196. this._visited.add(value);
  197. this.push('[');
  198. this.pushStack({
  199. handler: processArray,
  200. value,
  201. index: 0
  202. });
  203. this._depth++;
  204. break;
  205. case PROMISE:
  206. this.pushStack({
  207. handler: noop,
  208. awaiting: true
  209. });
  210. Promise.resolve(value)
  211. .then(resolved => {
  212. this.popStack();
  213. this.processValue(holder, key, resolved, callback);
  214. this.processStack();
  215. })
  216. .catch(error => {
  217. this.destroy(error);
  218. });
  219. break;
  220. case STRING_STREAM:
  221. case OBJECT_STREAM:
  222. callback.call(this, key);
  223. // TODO: Remove when drop support for Node.js 10
  224. // Used `_readableState.endEmitted` as fallback, since Node.js 10 has no `readableEnded` getter
  225. if (value.readableEnded || value._readableState.endEmitted) {
  226. return this.destroy(new Error('Readable Stream has ended before it was serialized. All stream data have been lost'));
  227. }
  228. if (value.readableFlowing) {
  229. return this.destroy(new Error('Readable Stream is in flowing mode, data may have been lost. Trying to pause stream.'));
  230. }
  231. if (type === OBJECT_STREAM) {
  232. this.push('[');
  233. this.pushStack({
  234. handler: push,
  235. value: this.space ? '\n' + this.space.repeat(this._depth) + ']' : ']'
  236. });
  237. this._depth++;
  238. }
  239. const self = this.pushStack({
  240. handler: type === OBJECT_STREAM ? processReadableObject : processReadableString,
  241. value,
  242. index: 0,
  243. first: false,
  244. ended: false,
  245. awaiting: !value.readable || value.readableLength === 0
  246. });
  247. const continueProcessing = () => {
  248. if (self.awaiting) {
  249. self.awaiting = false;
  250. this.processStack();
  251. }
  252. };
  253. value.once('error', error => this.destroy(error));
  254. value.once('end', () => {
  255. self.ended = true;
  256. continueProcessing();
  257. });
  258. value.on('readable', continueProcessing);
  259. break;
  260. }
  261. }
  262. pushStack(node) {
  263. node.prev = this._stack;
  264. return this._stack = node;
  265. }
  266. popStack() {
  267. const { handler, value } = this._stack;
  268. if (handler === processObject || handler === processArray || handler === processReadableObject) {
  269. this._visited.delete(value);
  270. this._depth--;
  271. }
  272. this._stack = this._stack.prev;
  273. }
  274. processStack() {
  275. if (this._processing || this._ended) {
  276. return;
  277. }
  278. try {
  279. this._processing = true;
  280. while (this._stack !== null && !this._stack.awaiting) {
  281. this._stack.handler.call(this);
  282. if (!this._processing) {
  283. return;
  284. }
  285. }
  286. this._processing = false;
  287. } catch (error) {
  288. this.destroy(error);
  289. return;
  290. }
  291. if (this._stack === null && !this._ended) {
  292. this._finish();
  293. this.push(null);
  294. }
  295. }
  296. push(data) {
  297. if (data !== null) {
  298. this._buffer += data;
  299. // check buffer overflow
  300. if (this._buffer.length < this._readSize) {
  301. return;
  302. }
  303. // flush buffer
  304. data = this._buffer;
  305. this._buffer = '';
  306. this._processing = false;
  307. }
  308. super.push(data);
  309. }
  310. _read(size) {
  311. // start processing
  312. this._readSize = size || this.readableHighWaterMark;
  313. this.processStack();
  314. }
  315. _finish() {
  316. this._ended = true;
  317. this._processing = false;
  318. this._stack = null;
  319. this._visited = null;
  320. if (this._buffer && this._buffer.length) {
  321. super.push(this._buffer); // flush buffer
  322. }
  323. this._buffer = '';
  324. }
  325. _destroy(error, cb) {
  326. this.error = this.error || error;
  327. this._finish();
  328. cb(error);
  329. }
  330. }
  331. module.exports = function createJsonStringifyStream(value, replacer, space) {
  332. return new JsonStringifyStream(value, replacer, space);
  333. };