diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 9f79a07a6ffbbe..7c6671fcd0760d 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -310,8 +310,7 @@ function chunkInvalid(state, chunk) { // 'readable' event will be triggered. function needMoreData(state) { return !state.ended && - (state.needReadable || - state.length < state.highWaterMark || + (state.length < state.highWaterMark || state.length === 0); } @@ -536,7 +535,17 @@ function emitReadable_(stream) { if (!state.destroyed && (state.length || state.ended)) { stream.emit('readable'); } - state.needReadable = !state.flowing && !state.ended; + + // The stream needs another readable event if + // 1. It is not flowing, as the flow mechanism will take + // care of it. + // 2. It is not ended. + // 3. It is below the highWaterMark, so we can schedule + // another readable later. + state.needReadable = + !state.flowing && + !state.ended && + state.length <= state.highWaterMark; flow(stream); } diff --git a/test/parallel/test-stream-backpressure.js b/test/parallel/test-stream-backpressure.js new file mode 100644 index 00000000000000..03bcc233c87cf0 --- /dev/null +++ b/test/parallel/test-stream-backpressure.js @@ -0,0 +1,39 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const stream = require('stream'); + +let pushes = 0; +const total = 65500 + 40 * 1024; +const rs = new stream.Readable({ + read: common.mustCall(function() { + if (pushes++ === 10) { + this.push(null); + return; + } + + const length = this._readableState.length; + + // We are at most doing two full runs of _reads + // before stopping, because Readable is greedy + // to keep its buffer full + assert(length <= total); + + this.push(Buffer.alloc(65500)); + for (let i = 0; i < 40; i++) { + this.push(Buffer.alloc(1024)); + } + + // We will be over highWaterMark at this point + // but a new call to _read is scheduled anyway. + }, 11) +}); + +const ws = stream.Writable({ + write: common.mustCall(function(data, enc, cb) { + setImmediate(cb); + }, 41 * 10) +}); + +rs.pipe(ws);