Skip to content

Commit

Permalink
stream: switch _writableState.buffer to queue
Browse files Browse the repository at this point in the history
In cases where many small writes are made to a stream
lacking _writev, the array data structure backing the
WriteReq buffer would greatly increase GC pressure.

Specifically, in the fs.WriteStream case, the
clearBuffer routine would only clear a single WriteReq
from the buffer before exiting, but would cause the
entire backing array to be GC'd. Switching to [].shift
lessened pressure, but still the bulk of the time was
spent in memcpy.

This replaces that structure with a linked list-backed
queue so that adding and removing from the queue is O(1).
In the _writev case, collecting the buffer requires an
O(N) loop over the buffer, but that was already being
performed to collect callbacks, so slowdown should be
neglible.

PR-URL: nodejs/node-v0.x-archive#8826
Reviewed-by: Timothy J Fontaine <tjfontaine@gmail.com>
Reviewed-by: Trevor Norris <trev.norris@gmail.com>
  • Loading branch information
chrisdickinson committed Dec 18, 2014
1 parent 93533e9 commit 9158666
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 22 deletions.
68 changes: 48 additions & 20 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ Writable.WritableState = WritableState;

var util = require('util');
var Stream = require('stream');
var debug = util.debuglog('stream');

util.inherits(Writable, Stream);

function WriteReq(chunk, encoding, cb) {
this.chunk = chunk;
this.encoding = encoding;
this.callback = cb;
this.next = null;
}

function WritableState(options, stream) {
Expand Down Expand Up @@ -109,7 +111,8 @@ function WritableState(options, stream) {
// the amount that is being written when _write is called.
this.writelen = 0;

this.buffer = [];
this.bufferedRequest = null;
this.lastBufferedRequest = null;

// number of pending user-supplied write callbacks
// this must be 0 before 'finish' can be emitted
Expand All @@ -123,6 +126,23 @@ function WritableState(options, stream) {
this.errorEmitted = false;
}

WritableState.prototype.getBuffer = function writableStateGetBuffer() {
var current = this.bufferedRequest;
var out = [];
while (current) {
out.push(current);
current = current.next;
}
return out;
};

Object.defineProperty(WritableState.prototype, 'buffer', {
get: util.deprecate(function() {
return this.getBuffer();
}, '_writableState.buffer is deprecated. Use ' +
'_writableState.getBuffer() instead.')
});

function Writable(options) {
// Writable ctor is applied to Duplexes, though they're not
// instanceof Writable, they're instanceof Readable.
Expand Down Expand Up @@ -216,7 +236,7 @@ Writable.prototype.uncork = function() {
!state.corked &&
!state.finished &&
!state.bufferProcessing &&
state.buffer.length)
state.bufferedRequest)
clearBuffer(this, state);
}
};
Expand Down Expand Up @@ -255,8 +275,15 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
if (!ret)
state.needDrain = true;

if (state.writing || state.corked)
state.buffer.push(new WriteReq(chunk, encoding, cb));
if (state.writing || state.corked) {
var last = state.lastBufferedRequest;
state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
if (last) {
last.next = state.lastBufferedRequest;
} else {
state.bufferedRequest = state.lastBufferedRequest;
}
}
else
doWrite(stream, state, false, len, chunk, encoding, cb);

Expand Down Expand Up @@ -313,7 +340,7 @@ function onwrite(stream, er) {
if (!finished &&
!state.corked &&
!state.bufferProcessing &&
state.buffer.length) {
state.bufferedRequest) {
clearBuffer(stream, state);
}

Expand Down Expand Up @@ -349,52 +376,53 @@ function onwriteDrain(stream, state) {
// if there's something in the buffer waiting, then process it
function clearBuffer(stream, state) {
state.bufferProcessing = true;
var entry = state.bufferedRequest;

if (stream._writev && state.buffer.length > 1) {
if (stream._writev && entry && entry.next) {
// Fast case, write everything using _writev()
var buffer = [];
var cbs = [];
for (var c = 0; c < state.buffer.length; c++)
cbs.push(state.buffer[c].callback);
while (entry) {
cbs.push(entry.callback);
buffer.push(entry);
entry = entry.next;
}

// count the one we are adding, as well.
// TODO(isaacs) clean this up
state.pendingcb++;
doWrite(stream, state, true, state.length, state.buffer, '', function(err) {
state.lastBufferedRequest = null;
doWrite(stream, state, true, state.length, buffer, '', function(err) {
for (var i = 0; i < cbs.length; i++) {
state.pendingcb--;
cbs[i](err);
}
});

// Clear buffer
state.buffer = [];
} else {
// Slow case, write chunks one-by-one
for (var c = 0; c < state.buffer.length; c++) {
var entry = state.buffer[c];
while (entry) {
var chunk = entry.chunk;
var encoding = entry.encoding;
var cb = entry.callback;
var len = state.objectMode ? 1 : chunk.length;

doWrite(stream, state, false, len, chunk, encoding, cb);

entry = entry.next;
// if we didn't call the onwrite immediately, then
// it means that we need to wait until it does.
// also, that means that the chunk and cb are currently
// being processed, so move the buffer counter past them.
if (state.writing) {
c++;
break;
}
}

if (c < state.buffer.length)
state.buffer = state.buffer.slice(c);
else
state.buffer.length = 0;
if (entry === null)
state.lastBufferedRequest = null;
}

state.bufferedRequest = entry;
state.bufferProcessing = false;
}

Expand Down Expand Up @@ -435,7 +463,7 @@ Writable.prototype.end = function(chunk, encoding, cb) {
function needFinish(stream, state) {
return (state.ending &&
state.length === 0 &&
state.buffer.length === 0 &&
state.bufferedRequest === null &&
!state.finished &&
!state.writing);
}
Expand Down
2 changes: 1 addition & 1 deletion lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ Socket.prototype.__defineGetter__('bytesWritten', function() {
data = this._pendingData,
encoding = this._pendingEncoding;

state.buffer.forEach(function(el) {
state.getBuffer().forEach(function(el) {
if (util.isBuffer(el.chunk))
bytes += el.chunk.length;
else
Expand Down
2 changes: 1 addition & 1 deletion test/simple/test-stream2-transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ test('writable side consumption', function(t) {
t.equal(tx._readableState.length, 10);
t.equal(transformed, 10);
t.equal(tx._transformState.writechunk.length, 5);
t.same(tx._writableState.buffer.map(function(c) {
t.same(tx._writableState.getBuffer().map(function(c) {
return c.chunk.length;
}), [6, 7, 8, 9, 10]);

Expand Down

0 comments on commit 9158666

Please sign in to comment.