Skip to content

Commit

Permalink
Add EventsWaiter
Browse files Browse the repository at this point in the history
  • Loading branch information
liutiexing committed Oct 15, 2021
1 parent a5392b3 commit e206173
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 70 deletions.
4 changes: 3 additions & 1 deletion paddle/fluid/framework/new_executor/event_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@
#include <cstdlib>
#include <mutex>
#include <vector>
#include "paddle/fluid/framework/new_executor/workqueue_utils.h"

namespace paddle {
namespace framework {

void* AlignedMalloc(size_t size, size_t alignment);
void AlignedFree(void* memory_ptr);

class EventCount {
public:
class Waiter;
Expand Down
5 changes: 3 additions & 2 deletions paddle/fluid/framework/new_executor/interpretercore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ InterpreterCore::InterpreterCore(const platform::Place& place,
main_program_(main_prog),
global_scope_(global_scope),
stream_analyzer_(place),
async_work_queue_(kHostNumThreads) {
async_work_queue_(kHostNumThreads, main_thread_blocker_) {
is_build_ = false;

feed_names_ = feed_names;
Expand Down Expand Up @@ -365,7 +365,8 @@ void InterpreterCore::ExecuteInstructionList(
}
}

async_work_queue_.WaitEmpty();
auto event_id = main_thread_blocker_.WaitEvent();
VLOG(3) << "event_id " << event_id;

PADDLE_ENFORCE_EQ(
op_run_number_.load(), vec_instr.size(),
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/framework/new_executor/interpretercore.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class InterpreterCore {
InterpreterProfiler dry_run_profiler_;
StreamAnalyzer stream_analyzer_;
EventManager event_manager_;
EventsWaiter main_thread_blocker_;
interpretercore::AsyncWorkQueue async_work_queue_;

InterpreterCoreGarbageCollector gc_;
Expand Down
12 changes: 8 additions & 4 deletions paddle/fluid/framework/new_executor/interpretercore_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "paddle/fluid/framework/garbage_collector.h"
#include "paddle/fluid/framework/new_executor/new_executor_defs.h"
#include "paddle/fluid/framework/new_executor/workqueue.h"
#include "paddle/fluid/framework/new_executor/workqueue_utils.h"
#include "paddle/fluid/framework/op_info.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
Expand All @@ -53,16 +54,19 @@ using AtomicVectorSizeT = std::vector<std::unique_ptr<std::atomic<size_t>>>;

class AsyncWorkQueue {
public:
explicit AsyncWorkQueue(size_t host_num_threads)
AsyncWorkQueue(size_t host_num_threads, EventsWaiter* waiter)
: host_num_thread_(host_num_threads) {
std::vector<WorkQueueOptions> group_options;
// for execute host Kernel
group_options.emplace_back(/*num_threads*/ host_num_threads,
/*allow_spinning*/ true,
/*track_task*/ true);
/*track_task*/ true,
/*queue_empty_waiter*/ waiter);
// for launch device Kernel
group_options.emplace_back(/*num_threads*/ 1,
/*allow_spinning*/ true, /*track_task*/ true);
/*allow_spinning*/ true,
/*track_task*/ true,
/*queue_empty_waiter*/ waiter);
queue_group_ = CreateWorkQueueGroup(group_options);
}

Expand All @@ -71,7 +75,7 @@ class AsyncWorkQueue {
AtomicVectorSizeT& PrepareAtomicVarRef(
const std::vector<VariableMetaInfo>& vec_meta_info);

void WaitEmpty() { queue_group_->WaitQueueGroupEmpty(); }
// void WaitEmpty() { queue_group_->WaitQueueGroupEmpty(); }

void AddTask(const OpFuncType& op_func_type, std::function<void()> fn) {
queue_group_->AddTask(static_cast<size_t>(op_func_type), std::move(fn));
Expand Down
30 changes: 9 additions & 21 deletions paddle/fluid/framework/new_executor/nonblocking_threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
namespace paddle {
namespace framework {

template <typename Notifier>
class TaskTracker {
public:
TaskTracker() : wait_empty_cv_(1) {}
TaskTracker() = default;

explicit TaskTracker(Notifier& notifier) : notifier_(&notifier) {}

TaskTracker(const TaskTracker&) = delete;

Expand All @@ -33,32 +36,17 @@ class TaskTracker {

void SubCounter() {
if (1 == num_tasks_.fetch_sub(1, std::memory_order_relaxed)) {
wait_empty_cv_.Notify(true);
if (notifier_ != nullptr) {
notifier_->NotifyEvent();
}
}
}

// only one user can wait at any time
void WaitTaskNumToZero() {
bool waiting = false;
if (!wait_empty_.compare_exchange_strong(waiting, true,
std::memory_order_seq_cst,
std::memory_order_relaxed)) {
abort();
}
EventCount::Waiter* w = wait_empty_cv_.GetWaiter(0);
wait_empty_cv_.Prewait();
if (num_tasks_.load(std::memory_order_relaxed) == 0) {
wait_empty_cv_.CancelWait();
} else {
wait_empty_cv_.CommitWait(w);
}
wait_empty_.store(false);
}
uint64_t PendingTaskNum() { return num_tasks_.load(); }

private:
alignas(64) std::atomic<uint64_t> num_tasks_{0};
alignas(64) EventCount wait_empty_cv_;
alignas(64) std::atomic<bool> wait_empty_{false};
Notifier* notifier_{nullptr};
};

template <typename Environment>
Expand Down
46 changes: 18 additions & 28 deletions paddle/fluid/framework/new_executor/workqueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@ namespace paddle {
namespace framework {
namespace {

using TaskTracker = TaskTracker<EventsWaiter::Notifier>;

class WorkQueueImpl : public WorkQueue {
public:
explicit WorkQueueImpl(const WorkQueueOptions& options)
: WorkQueue(options), queue_(nullptr), tracker_(nullptr) {
if (options_.track_task) {
explicit WorkQueueImpl(const WorkQueueOptions& options) : WorkQueue(options) {
if (options_.track_task && options.queue_empty_waiter != nullptr) {
void* storage = AlignedMalloc(sizeof(TaskTracker), alignof(TaskTracker));
tracker_ = new (storage) TaskTracker;
TaskTracker* tracker = reinterpret_cast<TaskTracker*>(storage);
auto event_id = options.queue_empty_waiter->RegisterEvent(
[tracker]() { return tracker->PendingTaskNum() == 0; });
auto notifier = options.queue_empty_waiter->GetEventNotifier(event_id);
tracker_ = new (storage) TaskTracker(notifier.get());
}
queue_ = new NonblockingThreadPool(options_.num_threads,
options_.allow_spinning);
Expand All @@ -44,20 +49,11 @@ class WorkQueueImpl : public WorkQueue {
queue_->AddTask(std::move(fn));
}

void WaitQueueEmpty() override {
if (tracker_ == nullptr) {
PADDLE_THROW(
platform::errors::Unavailable("set WorkQueueOptions.track_task = "
"true before call this interface."));
}
tracker_->WaitTaskNumToZero();
}

size_t NumThreads() const override { return queue_->NumThreads(); }

private:
NonblockingThreadPool* queue_;
TaskTracker* tracker_;
NonblockingThreadPool* queue_{nullptr};
TaskTracker* tracker_{nullptr};
};

class WorkQueueGroupImpl : public WorkQueueGroup {
Expand All @@ -69,8 +65,6 @@ class WorkQueueGroupImpl : public WorkQueueGroup {

void AddTask(size_t queue_idx, std::function<void()> fn) override;

void WaitQueueGroupEmpty() override;

size_t QueueNumThreads(size_t queue_idx) const override;

size_t QueueGroupNumThreads() const override;
Expand All @@ -92,9 +86,14 @@ WorkQueueGroupImpl::WorkQueueGroupImpl(
queues_storage_ = reinterpret_cast<NonblockingThreadPool*>(buffer);
for (size_t idx = 0; idx < num_queues; ++idx) {
const auto& options = queues_options_[idx];
if (options.track_task && tracker_ == nullptr) {
if (options.track_task && tracker_ == nullptr &&
options.queue_empty_waiter != nullptr) {
void* storage = AlignedMalloc(sizeof(TaskTracker), alignof(TaskTracker));
tracker_ = new (storage) TaskTracker;
TaskTracker* tracker = reinterpret_cast<TaskTracker*>(storage);
auto event_id = options.queue_empty_waiter->RegisterEvent(
[tracker]() { return tracker->PendingTaskNum() == 0; });
auto notifier = options.queue_empty_waiter->GetEventNotifier(event_id);
tracker_ = new (storage) TaskTracker(notifier.get());
}
queues_[idx] = new (&queues_storage_[idx])
NonblockingThreadPool(options.num_threads, options.allow_spinning);
Expand Down Expand Up @@ -124,15 +123,6 @@ void WorkQueueGroupImpl::AddTask(size_t queue_idx, std::function<void()> fn) {
queues_[queue_idx]->AddTask(std::move(fn));
}

void WorkQueueGroupImpl::WaitQueueGroupEmpty() {
if (nullptr == tracker_) {
PADDLE_THROW(platform::errors::Unavailable(
"set WorkQueueOptions.track_task = true for at least one of queues "
"before call this interface."));
}
tracker_->WaitTaskNumToZero();
}

size_t WorkQueueGroupImpl::QueueNumThreads(size_t queue_idx) const {
assert(queue_idx < queues_.size());
return queues_.at(queue_idx)->NumThreads();
Expand Down
24 changes: 18 additions & 6 deletions paddle/fluid/framework/new_executor/workqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,29 @@
namespace paddle {
namespace framework {

class EventsWaiter;

struct WorkQueueOptions {
WorkQueueOptions(size_t num_threads, bool allow_spinning, bool track_task)
: num_threads(num_threads),
allow_spinning(allow_spinning),
track_task(track_task) {}

WorkQueueOptions(size_t num_threads, bool allow_spinning, bool track_task,
EventsWaiter* waiter)
: num_threads(num_threads),
allow_spinning(allow_spinning),
track_task(track_task),
queue_empty_waiter(waiter) {}

size_t num_threads;
bool allow_spinning;
// If you need to blocking the calling thread to wait "queue empty", set
// track_task = true and set queue_empty_waiter. EventsWaiter::WaitEvent will
// block the calling thread until any of events (including "queue empty")
// occured.
bool track_task;
EventsWaiter* queue_empty_waiter{nullptr};
};

class WorkQueue {
Expand All @@ -44,9 +58,8 @@ class WorkQueue {

virtual void AddTask(std::function<void()> fn) = 0;

// set WorkQueueOptions.track_task = true before call this
// interface, otherwise will abort()
virtual void WaitQueueEmpty() = 0;
// See WorkQueueOptions.track_task for details
// virtual void WaitQueueEmpty() = 0;

virtual size_t NumThreads() const = 0;

Expand All @@ -67,9 +80,8 @@ class WorkQueueGroup {

virtual void AddTask(size_t queue_idx, std::function<void()> fn) = 0;

// set WorkQueueOptions.track_task = true for at least one of queues
// before call this interface, otherwise will abort()
virtual void WaitQueueGroupEmpty() = 0;
// See WorkQueueOptions.track_task for details
// virtual void WaitQueueGroupEmpty() = 0;

virtual size_t QueueNumThreads(size_t queue_idx) const = 0;

Expand Down
23 changes: 15 additions & 8 deletions paddle/fluid/framework/new_executor/workqueue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@
#include <atomic>
#include "glog/logging.h"
#include "gtest/gtest.h"
#include "paddle/fluid/framework/new_executor/workqueue_utils.h"

TEST(WorkQueue, TestSingleThreadedWorkQueue) {
VLOG(1) << "In Test";
using paddle::framework::WorkQueueOptions;
using paddle::framework::WorkQueue;
using paddle::framework::CreateSingleThreadedWorkQueue;
using paddle::framework::EventsWaiter;
std::atomic<bool> finished{false};
std::atomic<unsigned> counter{0};
constexpr unsigned kLoopNum = 1000000;
// CreateSingleThreadedWorkQueue
EventsWaiter events_waiter;
WorkQueueOptions options(/*num_threads*/ 1, /*allow_spinning*/ true,
/*track_task*/ true);
/*track_task*/ true, events_waiter);
auto work_queue = CreateSingleThreadedWorkQueue(options);
// NumThreads
EXPECT_EQ(work_queue->NumThreads(), 1u);
Expand All @@ -42,7 +45,7 @@ TEST(WorkQueue, TestSingleThreadedWorkQueue) {
});
// WaitQueueEmpty
EXPECT_EQ(finished.load(), false);
work_queue->WaitQueueEmpty();
events_waiter.WaitEvent();
EXPECT_EQ(finished.load(), true);
EXPECT_EQ(counter.load(), kLoopNum);
}
Expand All @@ -52,13 +55,15 @@ TEST(WorkQueue, TestMultiThreadedWorkQueue) {
using paddle::framework::WorkQueueOptions;
using paddle::framework::WorkQueue;
using paddle::framework::CreateMultiThreadedWorkQueue;
using paddle::framework::EventsWaiter;
std::atomic<bool> finished{false};
std::atomic<unsigned> counter{0};
constexpr unsigned kExternalLoopNum = 100;
constexpr unsigned kLoopNum = 1000000;
// CreateMultiThreadedWorkQueue
EventsWaiter events_waiter;
WorkQueueOptions options(/*num_threads*/ 10, /*allow_spinning*/ true,
/*track_task*/ true);
/*track_task*/ true, events_waiter);
auto work_queue = CreateMultiThreadedWorkQueue(options);
// NumThreads
EXPECT_EQ(work_queue->NumThreads(), 10u);
Expand All @@ -75,7 +80,7 @@ TEST(WorkQueue, TestMultiThreadedWorkQueue) {
}
// WaitQueueEmpty
EXPECT_EQ(finished.load(), false);
work_queue->WaitQueueEmpty();
events_waiter.WaitEvent();
EXPECT_EQ(finished.load(), true);
EXPECT_EQ(counter.load(), kLoopNum * kExternalLoopNum);
}
Expand All @@ -84,15 +89,17 @@ TEST(WorkQueue, TestWorkQueueGroup) {
using paddle::framework::WorkQueueOptions;
using paddle::framework::WorkQueueGroup;
using paddle::framework::CreateWorkQueueGroup;
using paddle::framework::EventsWaiter;
std::atomic<bool> finished{false};
std::atomic<unsigned> counter{0};
constexpr unsigned kExternalLoopNum = 100;
constexpr unsigned kLoopNum = 1000000;
// CreateMultiThreadedWorkQueue
// ThreadedWorkQueueGroup
EventsWaiter events_waiter;
WorkQueueOptions sq_options(/*num_threads*/ 1, /*allow_spinning*/ true,
/*track_task*/ true);
/*track_task*/ true, events_waiter);
WorkQueueOptions mq_options(/*num_threads*/ 10, /*allow_spinning*/ true,
/*track_task*/ true);
/*track_task*/ true, events_waiter);
auto queue_group = CreateWorkQueueGroup({sq_options, mq_options});
// NumThreads
EXPECT_EQ(queue_group->QueueNumThreads(0), 1u);
Expand All @@ -113,6 +120,6 @@ TEST(WorkQueue, TestWorkQueueGroup) {
}
});
// WaitQueueGroupEmpty()
queue_group->WaitQueueGroupEmpty();
events_waiter.WaitEvent();
EXPECT_EQ(counter.load(), kLoopNum * kExternalLoopNum + kLoopNum);
}
Loading

0 comments on commit e206173

Please sign in to comment.