Skip to content

Commit

Permalink
src: keep track of open requests
Browse files Browse the repository at this point in the history
Workers cannot shut down while requests are open, so keep a counter
that is increased whenever libuv requests are made and decreased
whenever their callback is called.

This also applies to other embedders, who may want to shut down
an `Environment` instance early.

Many thanks for Stephen Belanger for reviewing the original version of
this commit in the Ayo.js project.

Fixes: #20517
Refs: ayojs/ayo#85
PR-URL: #19377
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
addaleax committed May 14, 2018
1 parent 75aad90 commit 8995408
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 24 deletions.
9 changes: 9 additions & 0 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,15 @@ inline void Environment::CloseHandle(T* handle, OnCloseCallback callback) {
});
}

void Environment::IncreaseWaitingRequestCounter() {
request_waiting_++;
}

void Environment::DecreaseWaitingRequestCounter() {
request_waiting_--;
CHECK_GE(request_waiting_, 0);
}

inline uv_loop_t* Environment::event_loop() const {
return isolate_data()->event_loop();
}
Expand Down
6 changes: 4 additions & 2 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ Environment::Environment(IsolateData* isolate_data,
#if HAVE_INSPECTOR
inspector_agent_(new inspector::Agent(this)),
#endif
handle_cleanup_waiting_(0),
http_parser_buffer_(nullptr),
fs_stats_field_array_(isolate_, kFsStatsFieldsLength * 2),
context_(context->GetIsolate(), context) {
Expand Down Expand Up @@ -241,8 +240,11 @@ void Environment::CleanupHandles() {
hc.cb_(this, hc.handle_, hc.arg_);
handle_cleanup_queue_.clear();

while (handle_cleanup_waiting_ != 0 || !handle_wrap_queue_.IsEmpty())
while (handle_cleanup_waiting_ != 0 ||
request_waiting_ != 0 ||
!handle_wrap_queue_.IsEmpty()) {
uv_run(event_loop(), UV_RUN_ONCE);
}
}

void Environment::StartProfilerIdleNotifier() {
Expand Down
6 changes: 5 additions & 1 deletion src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,9 @@ class Environment {
inline uv_check_t* immediate_check_handle();
inline uv_idle_t* immediate_idle_handle();

inline void IncreaseWaitingRequestCounter();
inline void DecreaseWaitingRequestCounter();

inline AsyncHooks* async_hooks();
inline ImmediateInfo* immediate_info();
inline TickInfo* tick_info();
Expand Down Expand Up @@ -833,7 +836,8 @@ class Environment {
HandleWrapQueue handle_wrap_queue_;
ReqWrapQueue req_wrap_queue_;
std::list<HandleCleanup> handle_cleanup_queue_;
int handle_cleanup_waiting_;
int handle_cleanup_waiting_ = 0;
int request_waiting_ = 0;

double* heap_statistics_buffer_ = nullptr;
double* heap_space_statistics_buffer_ = nullptr;
Expand Down
5 changes: 5 additions & 0 deletions src/node_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3388,6 +3388,9 @@ class Work : public node::AsyncResource {
// Establish a handle scope here so that every callback doesn't have to.
// Also it is needed for the exception-handling below.
v8::HandleScope scope(env->isolate);
node::Environment* env_ = node::Environment::GetCurrent(env->isolate);
env_->DecreaseWaitingRequestCounter();

CallbackScope callback_scope(work);

NAPI_CALL_INTO_MODULE(env,
Expand Down Expand Up @@ -3488,6 +3491,8 @@ napi_status napi_queue_async_work(napi_env env, napi_async_work work) {

uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);

node::Environment* env_ = node::Environment::GetCurrent(env->isolate);
env_->IncreaseWaitingRequestCounter();
CALL_UV(env, uv_queue_work(event_loop,
w->Request(),
uvimpl::Work::ExecuteCallback,
Expand Down
13 changes: 11 additions & 2 deletions src/node_crypto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4645,9 +4645,12 @@ void PBKDF2Request::After() {


void PBKDF2Request::After(uv_work_t* work_req, int status) {
CHECK_EQ(status, 0);
std::unique_ptr<PBKDF2Request> req(
ContainerOf(&PBKDF2Request::work_req_, work_req));
req->env()->DecreaseWaitingRequestCounter();
if (status == UV_ECANCELED)
return;
CHECK_EQ(status, 0);
req->After();
}

Expand Down Expand Up @@ -4698,6 +4701,7 @@ void PBKDF2(const FunctionCallbackInfo<Value>& args) {
if (args[5]->IsFunction()) {
obj->Set(env->context(), env->ondone_string(), args[5]).FromJust();

env->IncreaseWaitingRequestCounter();
uv_queue_work(env->event_loop(),
req.release()->work_req(),
PBKDF2Request::Work,
Expand Down Expand Up @@ -4837,10 +4841,13 @@ void RandomBytesCheck(RandomBytesRequest* req, Local<Value> (*argv)[2]) {


void RandomBytesAfter(uv_work_t* work_req, int status) {
CHECK_EQ(status, 0);
std::unique_ptr<RandomBytesRequest> req(
ContainerOf(&RandomBytesRequest::work_req_, work_req));
Environment* env = req->env();
env->DecreaseWaitingRequestCounter();
if (status == UV_ECANCELED)
return;
CHECK_EQ(status, 0);
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
Local<Value> argv[2];
Expand Down Expand Up @@ -4880,6 +4887,7 @@ void RandomBytes(const FunctionCallbackInfo<Value>& args) {
if (args[1]->IsFunction()) {
obj->Set(env->context(), env->ondone_string(), args[1]).FromJust();

env->IncreaseWaitingRequestCounter();
uv_queue_work(env->event_loop(),
req.release()->work_req(),
RandomBytesWork,
Expand Down Expand Up @@ -4919,6 +4927,7 @@ void RandomBytesBuffer(const FunctionCallbackInfo<Value>& args) {
if (args[3]->IsFunction()) {
obj->Set(env->context(), env->ondone_string(), args[3]).FromJust();

env->IncreaseWaitingRequestCounter();
uv_queue_work(env->event_loop(),
req.release()->work_req(),
RandomBytesWork,
Expand Down
13 changes: 10 additions & 3 deletions src/node_zlib.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ class ZCtx : public AsyncWrap {
}

// async version
env->IncreaseWaitingRequestCounter();
uv_queue_work(env->event_loop(), work_req, ZCtx::Process, ZCtx::After);
}

Expand Down Expand Up @@ -361,10 +362,17 @@ class ZCtx : public AsyncWrap {

// v8 land!
static void After(uv_work_t* work_req, int status) {
CHECK_EQ(status, 0);

ZCtx* ctx = ContainerOf(&ZCtx::work_req_, work_req);
Environment* env = ctx->env();
ctx->write_in_progress_ = false;

env->DecreaseWaitingRequestCounter();
if (status == UV_ECANCELED) {
ctx->Close();
return;
}

CHECK_EQ(status, 0);

HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
Expand All @@ -374,7 +382,6 @@ class ZCtx : public AsyncWrap {

ctx->write_result_[0] = ctx->strm_.avail_out;
ctx->write_result_[1] = ctx->strm_.avail_in;
ctx->write_in_progress_ = false;

// call the write() cb
Local<Function> cb = PersistentToLocal(env->isolate(),
Expand Down
43 changes: 28 additions & 15 deletions src/req_wrap-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ ReqWrap<T>::ReqWrap(Environment* env,
// FIXME(bnoordhuis) The fact that a reinterpret_cast is needed is
// arguably a good indicator that there should be more than one queue.
env->req_wrap_queue()->PushBack(reinterpret_cast<ReqWrap<uv_req_t>*>(this));

Reset();
}

template <typename T>
Expand All @@ -33,14 +35,21 @@ void ReqWrap<T>::Dispatched() {
req_.data = this;
}

template <typename T>
void ReqWrap<T>::Reset() {
original_callback_ = nullptr;
req_.data = nullptr;
}

template <typename T>
ReqWrap<T>* ReqWrap<T>::from_req(T* req) {
return ContainerOf(&ReqWrap<T>::req_, req);
}

template <typename T>
void ReqWrap<T>::Cancel() {
uv_cancel(reinterpret_cast<uv_req_t*>(&req_));
if (req_.data == this) // Only cancel if already dispatched.
uv_cancel(reinterpret_cast<uv_req_t*>(&req_));
}

// Below is dark template magic designed to invoke libuv functions that
Expand Down Expand Up @@ -95,7 +104,7 @@ struct CallLibuvFunction<ReqT, void(*)(ReqT*, Args...)> {
template <typename ReqT, typename T>
struct MakeLibuvRequestCallback {
static T For(ReqWrap<ReqT>* req_wrap, T v) {
static_assert(!std::is_function<T>::value,
static_assert(!is_callable<T>::value,
"MakeLibuvRequestCallback missed a callback");
return v;
}
Expand All @@ -109,6 +118,7 @@ struct MakeLibuvRequestCallback<ReqT, void(*)(ReqT*, Args...)> {

static void Wrapper(ReqT* req, Args... args) {
ReqWrap<ReqT>* req_wrap = ContainerOf(&ReqWrap<ReqT>::req_, req);
req_wrap->env()->DecreaseWaitingRequestCounter();
F original_callback = reinterpret_cast<F>(req_wrap->original_callback_);
original_callback(req, args...);
}
Expand All @@ -128,23 +138,26 @@ int ReqWrap<T>::Dispatch(LibuvFunction fn, Args... args) {

// This expands as:
//
// return fn(env()->event_loop(), req(), arg1, arg2, Wrapper, arg3, ...)
// ^ ^ ^
// | | |
// \-- Omitted if `fn` has no | |
// first `uv_loop_t*` argument | |
// | |
// A function callback whose first argument | |
// matches the libuv request type is replaced ---/ |
// by the `Wrapper` method defined above |
// |
// Other (non-function) arguments are passed -----/
// through verbatim
return CallLibuvFunction<T, LibuvFunction>::Call(
// int err = fn(env()->event_loop(), req(), arg1, arg2, Wrapper, arg3, ...)
// ^ ^ ^
// | | |
// \-- Omitted if `fn` has no | |
// first `uv_loop_t*` argument | |
// | |
// A function callback whose first argument | |
// matches the libuv request type is replaced ---/ |
// by the `Wrapper` method defined above |
// |
// Other (non-function) arguments are passed -----/
// through verbatim
int err = CallLibuvFunction<T, LibuvFunction>::Call(
fn,
env()->event_loop(),
req(),
MakeLibuvRequestCallback<T, Args>::For(this, args)...);
if (err >= 0)
env()->IncreaseWaitingRequestCounter();
return err;
}

} // namespace node
Expand Down
2 changes: 2 additions & 0 deletions src/req_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class ReqWrap : public AsyncWrap {
// Call this after the req has been dispatched, if that did not already
// happen by using Dispatch().
inline void Dispatched();
// Call this after a request has finished, if re-using this object is planned.
inline void Reset();
T* req() { return &req_; }
inline void Cancel();

Expand Down
10 changes: 9 additions & 1 deletion src/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,16 @@ struct MallocedBuffer {
MallocedBuffer& operator=(const MallocedBuffer&) = delete;
};

} // namespace node
// Test whether some value can be called with ().
template<typename T, typename = void>
struct is_callable : std::is_function<T> { };

template<typename T>
struct is_callable<T, typename std::enable_if<
std::is_same<decltype(void(&T::operator())), void>::value
>::type> : std::true_type { };

} // namespace node

#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

Expand Down

0 comments on commit 8995408

Please sign in to comment.