From 21f72467c7e202690889fd73a7f8944536e3c6ef Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sun, 3 Apr 2016 00:37:49 +0200 Subject: [PATCH] stream: Fix readableState.awaitDrain mechanism In 68990948fe4 (https://github.com/nodejs/node/pull/2325), the conditions for increasing `readableState.awaitDrain` when writing to a piping destination returns false were changed so that they could not actually be met, effectively leaving `readableState.awaitDrain` with a constant value of 0. This patch changes the conditions to testing whether the stream for which `.write()` returned false is still a piping destination, which was likely the intention of the original patch. Fixes: https://github.com/nodejs/node/issues/5820 --- lib/_stream_readable.js | 6 +-- test/parallel/test-stream-pipe-await-drain.js | 40 +++++++++++++++++++ 2 files changed, 43 insertions(+), 3 deletions(-) create mode 100644 test/parallel/test-stream-pipe-await-drain.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 338bf2a7539bfc..275691cbb2e805 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -538,9 +538,9 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // If the user unpiped during `dest.write()`, it is possible // to get stuck in a permanently paused state if that write // also returned false. - if (state.pipesCount === 1 && - state.pipes[0] === dest && - src.listenerCount('data') === 1 && + // => Check whether `dest` is still a piping destination. + if (((state.pipesCount === 1 && state.pipes === dest) || + (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)) && !cleanedUp) { debug('false write response, pause', src._readableState.awaitDrain); src._readableState.awaitDrain++; diff --git a/test/parallel/test-stream-pipe-await-drain.js b/test/parallel/test-stream-pipe-await-drain.js new file mode 100644 index 00000000000000..fba99ed4563c14 --- /dev/null +++ b/test/parallel/test-stream-pipe-await-drain.js @@ -0,0 +1,40 @@ +'use strict'; +const common = require('../common'); +const stream = require('stream'); + +// This is very similar to test-stream-pipe-cleanup-pause.js. + +const reader = new stream.Readable(); +const writer1 = new stream.Writable(); +const writer2 = new stream.Writable(); + +// 560000 is chosen here because it is larger than the (default) highWaterMark +// and will cause `.write()` to return false +// See: https://github.com/nodejs/node/issues/5820 +const buffer = Buffer.allocUnsafe(560000); + +reader._read = function(n) {}; + +writer1._write = common.mustCall(function(chunk, encoding, cb) { + this.emit('chunk-received'); + cb(); +}, 1); +writer1.once('chunk-received', function() { + setImmediate(function() { + // This one should *not* get through to writer1 because writer2 is not + // "done" processing. + reader.push(buffer); + }); +}); + +// A "slow" consumer: +writer2._write = common.mustCall(function(chunk, encoding, cb) { + // Not calling cb here to "simulate" slow stream. + + // This should be called exactly once, since the first .write() call + // will return false. +}, 1); + +reader.pipe(writer1); +reader.pipe(writer2); +reader.push(buffer);