Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib: remove queue implementation from JSStreamWrap #17918

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 70 additions & 89 deletions lib/internal/wrap_js_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ const uv = process.binding('uv');
const debug = util.debuglog('stream_wrap');
const errors = require('internal/errors');

const kCurrentWriteRequest = Symbol('kCurrentWriteRequest');
const kCurrentShutdownRequest = Symbol('kCurrentShutdownRequest');

function isClosing() { return this.owner.isClosing(); }
function onreadstart() { return this.owner.readStart(); }
function onreadstop() { return this.owner.readStop(); }
function onshutdown(req) { return this.owner.doShutdown(req); }
function onwrite(req, bufs) { return this.owner.doWrite(req, bufs); }

/* This class serves as a wrapper for when the C++ side of Node wants access
* to a standard JS stream. For example, TLS or HTTP do not operate on network
* resources conceptually, although that is the common case and what we are
Expand All @@ -27,12 +36,13 @@ class JSStreamWrap extends Socket {
debug('close');
this.doClose(cb);
};
handle.isAlive = () => this.isAlive();
handle.isClosing = () => this.isClosing();
handle.onreadstart = () => this.readStart();
handle.onreadstop = () => this.readStop();
handle.onshutdown = (req) => this.doShutdown(req);
handle.onwrite = (req, bufs) => this.doWrite(req, bufs);
// Inside of the following functions, `this` refers to the handle
// and `this.owner` refers to this JSStreamWrap instance.
handle.isClosing = isClosing;
handle.onreadstart = onreadstart;
handle.onreadstop = onreadstop;
handle.onshutdown = onshutdown;
handle.onwrite = onwrite;

stream.pause();
stream.on('error', (err) => this.emit('error', err));
Expand Down Expand Up @@ -60,7 +70,10 @@ class JSStreamWrap extends Socket {

super({ handle, manualStart: true });
this.stream = stream;
this._list = null;
this[kCurrentWriteRequest] = null;
this[kCurrentShutdownRequest] = null;

// Start reading.
this.read(0);
}

Expand All @@ -69,10 +82,6 @@ class JSStreamWrap extends Socket {
return JSStreamWrap;
}

isAlive() {
return true;
}

isClosing() {
return !this.readable || !this.writable;
}
Expand All @@ -88,33 +97,56 @@ class JSStreamWrap extends Socket {
}

doShutdown(req) {
assert.strictEqual(this[kCurrentShutdownRequest], null);
this[kCurrentShutdownRequest] = req;

// TODO(addaleax): It might be nice if we could get into a state where
// DoShutdown() is not called on streams while a write is still pending.
//
// Currently, the only part of the code base where that happens is the
// TLS implementation, which calls both DoWrite() and DoShutdown() on the
// underlying network stream inside of its own DoShutdown() method.
// Working around that on the native side is not quite trivial (yet?),
// so for now that is supported here.

if (this[kCurrentWriteRequest] !== null)
return this.on('drain', () => this.doShutdown(req));
assert.strictEqual(this[kCurrentWriteRequest], null);

const handle = this._handle;
const item = this._enqueue('shutdown', req);

this.stream.end(() => {
// Ensure that write was dispatched
setImmediate(() => {
if (!this._dequeue(item))
return;

handle.finishShutdown(req, 0);
this.finishShutdown(handle, 0);
});
});
return 0;
}

// handle === this._handle except when called from doClose().
finishShutdown(handle, errCode) {
// The shutdown request might already have been cancelled.
if (this[kCurrentShutdownRequest] === null)
return;
const req = this[kCurrentShutdownRequest];
this[kCurrentShutdownRequest] = null;
handle.finishShutdown(req, errCode);
}

doWrite(req, bufs) {
const self = this;
const handle = this._handle;
assert.strictEqual(this[kCurrentWriteRequest], null);
assert.strictEqual(this[kCurrentShutdownRequest], null);
this[kCurrentWriteRequest] = req;

var pending = bufs.length;
const handle = this._handle;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The linter would complain about that. :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing it, though. Same for the const handle = ... on line 112.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s passed to the self.finishWrite() call down on line 162

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, indeed. Unfortunate fold in the diff.

const self = this;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it still needed ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@billouboq it’s used inside the done() function below :)


// Queue the request to be able to cancel it
const item = this._enqueue('write', req);
let pending = bufs.length;

this.stream.cork();
for (var n = 0; n < bufs.length; n++)
this.stream.write(bufs[n], done);
for (var i = 0; i < bufs.length; ++i)
this.stream.write(bufs[i], done);
this.stream.uncork();

function done(err) {
Expand All @@ -126,93 +158,42 @@ class JSStreamWrap extends Socket {

let errCode = 0;
if (err) {
const code = uv[`UV_${err.code}`];
errCode = (err.code && code) ? code : uv.UV_EPIPE;
errCode = uv[`UV_${err.code}`] || uv.UV_EPIPE;
}

// Ensure that write was dispatched
setImmediate(function() {
// Do not invoke callback twice
if (!self._dequeue(item))
return;

handle.finishWrite(req, errCode);
setImmediate(() => {
self.finishWrite(handle, errCode);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can avoid this closure completely with setImmediate(finishWrite, self, handle, errCode).

});
}

return 0;
}

_enqueue(type, req) {
const item = new QueueItem(type, req);
if (this._list === null) {
this._list = item;
return item;
}

item.next = this._list.next;
item.prev = this._list;
item.next.prev = item;
item.prev.next = item;

return item;
}

_dequeue(item) {
assert(item instanceof QueueItem);

var next = item.next;
var prev = item.prev;

if (next === null && prev === null)
return false;

item.next = null;
item.prev = null;

if (next === item) {
prev = null;
next = null;
} else {
prev.next = next;
next.prev = prev;
}

if (this._list === item)
this._list = next;
// handle === this._handle except when called from doClose().
finishWrite(handle, errCode) {
// The write request might already have been cancelled.
if (this[kCurrentWriteRequest] === null)
return;
const req = this[kCurrentWriteRequest];
this[kCurrentWriteRequest] = null;

return true;
handle.finishWrite(req, errCode);
}

doClose(cb) {
const handle = this._handle;

setImmediate(() => {
while (this._list !== null) {
const item = this._list;
const req = item.req;
this._dequeue(item);

const errCode = uv.UV_ECANCELED;
if (item.type === 'write') {
handle.finishWrite(req, errCode);
} else if (item.type === 'shutdown') {
handle.finishShutdown(req, errCode);
}
}

// Should be already set by net.js
assert.strictEqual(this._handle, null);

this.finishWrite(handle, uv.UV_ECANCELED);
this.finishShutdown(handle, uv.UV_ECANCELED);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you remove this closure as well?

cb();
});
}
}

function QueueItem(type, req) {
this.type = type;
this.req = req;
this.prev = this;
this.next = this;
}

module.exports = JSStreamWrap;
1 change: 0 additions & 1 deletion src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ class ModuleWrap;
V(internal_string, "internal") \
V(ipv4_string, "IPv4") \
V(ipv6_string, "IPv6") \
V(isalive_string, "isAlive") \
V(isclosing_string, "isClosing") \
V(issuer_string, "issuer") \
V(issuercert_string, "issuerCertificate") \
Expand Down
8 changes: 1 addition & 7 deletions src/js_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,7 @@ AsyncWrap* JSStream::GetAsyncWrap() {


bool JSStream::IsAlive() {
HandleScope scope(env()->isolate());
Context::Scope context_scope(env()->context());
v8::Local<v8::Value> fn = object()->Get(env()->isalive_string());
if (!fn->IsFunction())
return false;
return MakeCallback(fn.As<v8::Function>(), 0, nullptr)
.ToLocalChecked()->IsTrue();
return true;
}


Expand Down