Skip to content

Commit

Permalink
src: limit foreground tasks draining loop
Browse files Browse the repository at this point in the history
Foreground tasks that repost themselves can force the draining loop
to run indefinitely long without giving other tasks chance to run.

This limits the foreground task draining loop to run only the tasks
that were in the tasks queue at the beginning of the loop.

PR-URL: nodejs#19987
Fixes: nodejs#19937
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Yang Guo <yangguo@chromium.org>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: Tiancheng "Timothy" Gu <timothygu99@gmail.com>
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
Reviewed-By: Khaidi Chu <i@2333.moe>
  • Loading branch information
ulan authored and blattersturm committed Nov 2, 2018
1 parent 3f753ca commit 623e2b2
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 7 deletions.
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,7 @@
'test/cctest/test_base64.cc',
'test/cctest/test_node_postmortem_metadata.cc',
'test/cctest/test_environment.cc',
'test/cctest/test_platform.cc',
'test/cctest/test_util.cc',
'test/cctest/test_url.cc'
],
Expand Down
2 changes: 1 addition & 1 deletion src/inspector_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ class NodeInspectorClient : public V8InspectorClient {
terminated_ = false;
running_nested_loop_ = true;
while (!terminated_ && channel_->waitForFrontendMessage()) {
platform_->FlushForegroundTasks(env_->isolate());
while (platform_->FlushForegroundTasks(env_->isolate())) {}
}
terminated_ = false;
running_nested_loop_ = false;
Expand Down
22 changes: 18 additions & 4 deletions src/node_platform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ void PerIsolatePlatformData::PostDelayedTask(
}

PerIsolatePlatformData::~PerIsolatePlatformData() {
FlushForegroundTasksInternal();
while (FlushForegroundTasksInternal()) {}
CancelPendingDelayedTasks();

uv_close(reinterpret_cast<uv_handle_t*>(flush_tasks_),
Expand Down Expand Up @@ -227,7 +227,13 @@ bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
});
});
}
while (std::unique_ptr<Task> task = foreground_tasks_.Pop()) {
// Move all foreground tasks into a separate queue and flush that queue.
// This way tasks that are posted while flushing the queue will be run on the
// next call of FlushForegroundTasksInternal.
std::queue<std::unique_ptr<Task>> tasks = foreground_tasks_.PopAll();
while (!tasks.empty()) {
std::unique_ptr<Task> task = std::move(tasks.front());
tasks.pop();
did_work = true;
RunForegroundTask(std::move(task));
}
Expand Down Expand Up @@ -258,8 +264,8 @@ void NodePlatform::CallDelayedOnForegroundThread(Isolate* isolate,
std::unique_ptr<Task>(task), delay_in_seconds);
}

void NodePlatform::FlushForegroundTasks(v8::Isolate* isolate) {
ForIsolate(isolate)->FlushForegroundTasksInternal();
bool NodePlatform::FlushForegroundTasks(v8::Isolate* isolate) {
return ForIsolate(isolate)->FlushForegroundTasksInternal();
}

void NodePlatform::CancelPendingDelayedTasks(v8::Isolate* isolate) {
Expand Down Expand Up @@ -352,4 +358,12 @@ void TaskQueue<T>::Stop() {
tasks_available_.Broadcast(scoped_lock);
}

template <class T>
std::queue<std::unique_ptr<T>> TaskQueue<T>::PopAll() {
Mutex::ScopedLock scoped_lock(lock_);
std::queue<std::unique_ptr<T>> result;
result.swap(task_queue_);
return result;
}

} // namespace node
10 changes: 8 additions & 2 deletions src/node_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class TaskQueue {
void Push(std::unique_ptr<T> task);
std::unique_ptr<T> Pop();
std::unique_ptr<T> BlockingPop();
std::queue<std::unique_ptr<T>> PopAll();
void NotifyOfCompletion();
void BlockingDrain();
void Stop();
Expand Down Expand Up @@ -65,7 +66,9 @@ class PerIsolatePlatformData :
void ref();
int unref();

// Returns true iff work was dispatched or executed.
// Returns true if work was dispatched or executed. New tasks that are
// posted during flushing of the queue are postponed until the next
// flushing.
bool FlushForegroundTasksInternal();
void CancelPendingDelayedTasks();

Expand Down Expand Up @@ -130,7 +133,10 @@ class NodePlatform : public MultiIsolatePlatform {
double CurrentClockTimeMillis() override;
v8::TracingController* GetTracingController() override;

void FlushForegroundTasks(v8::Isolate* isolate);
// Returns true if work was dispatched or executed. New tasks that are
// posted during flushing of the queue are postponed until the next
// flushing.
bool FlushForegroundTasks(v8::Isolate* isolate);

void RegisterIsolate(IsolateData* isolate_data, uv_loop_t* loop) override;
void UnregisterIsolate(IsolateData* isolate_data) override;
Expand Down
55 changes: 55 additions & 0 deletions test/cctest/test_platform.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#include "node_internals.h"
#include "libplatform/libplatform.h"

#include <string>
#include "gtest/gtest.h"
#include "node_test_fixture.h"

// This task increments the given run counter and reposts itself until the
// repost counter reaches zero.
class RepostingTask : public v8::Task {
public:
explicit RepostingTask(int repost_count,
int* run_count,
v8::Isolate* isolate,
node::NodePlatform* platform)
: repost_count_(repost_count),
run_count_(run_count),
isolate_(isolate),
platform_(platform) {}

// v8::Task implementation
void Run() final {
++*run_count_;
if (repost_count_ > 0) {
--repost_count_;
platform_->CallOnForegroundThread(isolate_,
new RepostingTask(repost_count_, run_count_, isolate_, platform_));
}
}

private:
int repost_count_;
int* run_count_;
v8::Isolate* isolate_;
node::NodePlatform* platform_;
};

class PlatformTest : public EnvironmentTestFixture {};

TEST_F(PlatformTest, SkipNewTasksInFlushForegroundTasks) {
v8::Isolate::Scope isolate_scope(isolate_);
const v8::HandleScope handle_scope(isolate_);
const Argv argv;
Env env {handle_scope, argv};
int run_count = 0;
platform->CallOnForegroundThread(
isolate_, new RepostingTask(2, &run_count, isolate_, platform.get()));
EXPECT_TRUE(platform->FlushForegroundTasks(isolate_));
EXPECT_EQ(1, run_count);
EXPECT_TRUE(platform->FlushForegroundTasks(isolate_));
EXPECT_EQ(2, run_count);
EXPECT_TRUE(platform->FlushForegroundTasks(isolate_));
EXPECT_EQ(3, run_count);
EXPECT_FALSE(platform->FlushForegroundTasks(isolate_));
}

0 comments on commit 623e2b2

Please sign in to comment.