-
Notifications
You must be signed in to change notification settings - Fork 1
/
Scheduler.cpp
162 lines (148 loc) · 6.3 KB
/
Scheduler.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
//
// Scheduler.cpp
//
//
// Created by Benedikt Hegner on 4/10/12.
// Copyright (c) 2012 CERN. All rights reserved.
//
// include tbb
#include "tbb/compat/thread"
// include fwk
#include "EventLoopManager.h"
#include "Scheduler.h"
tbb::task* AlgoTask::execute(){
task_->algo_->body(task_->event_state_->context);
scheduler_->algo_is_done(task_);
//if (successful) task_->event_state_->algo_states[task_->algo_id_] = ACCEPT;
//else task_->event_state_->algo_states[task_->algo_id_] = REJECT;
return NULL;
}
Scheduler::Scheduler(Whiteboard& wb) :
algos_(0), algo_pool_(0), wb_(wb), new_events_queue_(),
loop_manager_(0), has_to_stop_()
{
has_to_stop_ = false;
}
std::vector<state_type> Scheduler::compute_dependencies() {
std::vector<state_type> all_requirements(algos_->size());
// create the mapping productname : index
std::map<std::string,unsigned int> product_indices;
for (unsigned int i = 0, n_algos = algos_->size(); i < n_algos; ++i) {
AlgoBase* algo = (*algos_)[i];
const std::vector<std::string>& outputs = algo->get_outputs();
for (unsigned int j = 0, n_outputs = outputs.size(); j < n_outputs; ++j){
product_indices[outputs[j]] = i;
}
}
// use the mapping to create a bit pattern of input requirements
state_type termination_requirement(0);
for (unsigned int i = 0, n_algos = algos_->size(); i < n_algos; ++i) {
state_type requirements(0);
//printf(" %i: %s\n",i,algos_[i]->get_name());
const std::vector<std::string>& inputs = (*algos_)[i]->get_inputs();
for (unsigned int j = 0, n_inputs = inputs.size(); j < n_inputs; ++j){
unsigned int input_index = product_indices[inputs[j]];
requirements[input_index] = true;
//printf("\tconnecting to %s (via '%s')\n", algos_[input_index]->get_name(), inputs[j].c_str());
}
all_requirements[i] = requirements;
termination_requirement[i] = true;
}
termination_requirement_ = termination_requirement;
return all_requirements;
}
// check for finished tasks, update event state and delete the AlgoTaskId
void Scheduler::task_cleanup(){
AlgoTaskId* result(0);
bool queue_full(false);
do {
queue_full = done_queue_.try_pop(result);
if (queue_full) {
state_type new_bits(result->event_state_->state);
new_bits[result->algo_id_] = true;
result->event_state_->state = new_bits;
result->event_state_->algo_states[result->algo_id_] = ACCEPT;
delete result;
}
} while (queue_full);
}
void Scheduler::algo_is_done(AlgoTaskId* result){
algo_pool_->release(result->algo_, result->algo_id_);
done_queue_.push(result);
//state_type new_bits(result->event_state_->state);
//new_bits[result->algo_id_] = true;
//result->event_state_->state = new_bits;
//delete result;
}
void Scheduler::start_event(unsigned int event_number){
new_events_queue_.push(event_number);
}
void Scheduler::initialise(AlgoPool* algo_pool, const std::vector<AlgoBase*>* algos, EventLoopManager* loop_manager){
algo_pool_ = algo_pool;
algos_ = algos;
loop_manager_ = loop_manager;
}
void Scheduler::operator()(){
//get the bit patterns and sort by node id (like the available algos)
std::vector<state_type> bits = compute_dependencies();
// some book keeping vectors
size_t size = algos_->size();
std::vector<EventState*> event_states(0); //TODO - has to move to init
do {
// BEGIN TODO: replace by thread safe code in start event
// loop through all events that need to be started
bool queue_full(false);
do {
unsigned int event_number(0);
queue_full = new_events_queue_.try_pop(event_number);
if (queue_full){
Context* context(0);
bool whiteboard_available = wb_.get_context(context);
if (whiteboard_available){
EventState* event_state = new EventState(size);
event_states.push_back(event_state);
event_state->context = context;
context->write(event_number, "event","event");
}
}
} while(queue_full);
for (unsigned int algo = 0; algo < size; ++algo) {
// loop through all currently active events
for (unsigned int event_id = 0; event_id < event_states.size() ; ++event_id) {
EventState*& event_state = event_states[event_id];
// extract event_id specific quantities
state_type& current_event_bits = event_state->state;
// check whether all dependencies for the algorithm are fulfilled...
state_type tmp = (current_event_bits & bits[algo]) ^ bits[algo];
/// ...whether all required products are there...
// ... and whether the algo was previously started
std::vector<AlgoState>& algo_states = event_state->algo_states;
if ((tmp==0) && (algo_states[algo] == NOT_RUN)) {
// is there an available Algo instance one can use?
AlgoBase* algo_instance(0);
bool algo_free(0);
algo_free = algo_pool_->acquire(algo_instance, algo);
if (algo_free) { ;
AlgoTaskId* task = new AlgoTaskId((*algos_)[algo],algo,event_state);
tbb::task* t = new( tbb::task::allocate_root() ) AlgoTask(task, this);
tbb::task::enqueue( *t);
algo_states[algo] = SCHEDULED;
}
}
}
}
task_cleanup();
// check for finished events and clean up
for (std::vector<EventState*>::iterator i = event_states.begin(), end = event_states.end(); i != end; ++i){
if ((*i)->state == termination_requirement_) {
Context*& context = (*i)->context;
wb_.release_context(context);
loop_manager_->finished_event();
delete (*i);
i = event_states.erase(i);
}
}
std::this_thread::yield();
} while (not has_to_stop_);
return;
};