Skip to content

Commit

Permalink
Add more metrics to ROV (#1465)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcague committed Sep 30, 2019
1 parent a54eb4a commit 0694784
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 1 deletion.
15 changes: 15 additions & 0 deletions erizo/src/erizo/thread/ThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ constexpr int kNumThreadsPerScheduler = 2;

using erizo::ThreadPool;
using erizo::Worker;
using erizo::DurationDistribution;

ThreadPool::ThreadPool(unsigned int num_workers)
: workers_{}, scheduler_{std::make_shared<Scheduler>(kNumThreadsPerScheduler)} {
Expand Down Expand Up @@ -46,3 +47,17 @@ void ThreadPool::close() {
}
scheduler_->stop(true);
}

DurationDistribution ThreadPool::getDurationDistribution() {
DurationDistribution total_durations;
for (auto worker : workers_) {
total_durations += worker->getDurationDistribution();
}
return total_durations;
}

void ThreadPool::resetStats() {
for (auto worker : workers_) {
worker->resetStats();
}
}
3 changes: 3 additions & 0 deletions erizo/src/erizo/thread/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ class ThreadPool {
void start();
void close();

void resetStats();
DurationDistribution getDurationDistribution();

private:
std::vector<std::shared_ptr<Worker>> workers_;
std::shared_ptr<Scheduler> scheduler_;
Expand Down
48 changes: 48 additions & 0 deletions erizo/src/erizo/thread/Worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "lib/ClockUtils.h"

using erizo::Worker;
using erizo::DurationDistribution;
using erizo::SimulatedWorker;
using erizo::ScheduledTaskReference;

Expand All @@ -22,6 +23,30 @@ void ScheduledTaskReference::cancel() {
cancelled = true;
}

DurationDistribution::DurationDistribution()
: duration_0_10_ms{0},
duration_10_50_ms{0},
duration_50_100_ms{0},
duration_100_1000_ms{0},
duration_1000_ms{0} {}

void DurationDistribution::reset() {
duration_0_10_ms = 0;
duration_10_50_ms = 0;
duration_50_100_ms = 0;
duration_100_1000_ms = 0;
duration_1000_ms = 0;
}

DurationDistribution& DurationDistribution::operator+=(const DurationDistribution& durations) {
duration_0_10_ms += durations.duration_0_10_ms;
duration_10_50_ms += durations.duration_10_50_ms;
duration_50_100_ms += durations.duration_50_100_ms;
duration_100_1000_ms += durations.duration_100_1000_ms;
duration_1000_ms += durations.duration_1000_ms;
return *this;
}

Worker::Worker(std::weak_ptr<Scheduler> scheduler, std::shared_ptr<Clock> the_clock)
: scheduler_{scheduler},
clock_{the_clock},
Expand Down Expand Up @@ -106,11 +131,34 @@ std::function<void()> Worker::safeTask(std::function<void(std::shared_ptr<Worker
std::weak_ptr<Worker> weak_this = shared_from_this();
return [f, weak_this] {
if (auto this_ptr = weak_this.lock()) {
time_point start = this_ptr->clock_->now();
f(this_ptr);
time_point end = this_ptr->clock_->now();
this_ptr->addToStats(end - start);
}
};
}

void Worker::addToStats(duration task_duration) {
if (task_duration <= std::chrono::milliseconds(10)) {
durations_.duration_0_10_ms++;
} else if (task_duration <= std::chrono::milliseconds(50)) {
durations_.duration_10_50_ms++;
} else if (task_duration <= std::chrono::milliseconds(100)) {
durations_.duration_50_100_ms++;
} else if (task_duration <= std::chrono::milliseconds(1000)) {
durations_.duration_100_1000_ms++;
} else {
durations_.duration_1000_ms++;
}
}

void Worker::resetStats() {
task(safeTask([](std::shared_ptr<Worker> worker) {
worker->durations_.reset();
}));
}

SimulatedWorker::SimulatedWorker(std::shared_ptr<SimulatedClock> the_clock)
: Worker(std::make_shared<Scheduler>(1), the_clock), clock_{the_clock} {
}
Expand Down
20 changes: 20 additions & 0 deletions erizo/src/erizo/thread/Worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@ class ScheduledTaskReference {
std::atomic<bool> cancelled;
};

class DurationDistribution {
public:
DurationDistribution();
~DurationDistribution() {}
void reset();
DurationDistribution& operator+=(const DurationDistribution& buf);

public:
uint duration_0_10_ms;
uint duration_10_50_ms;
uint duration_50_100_ms;
uint duration_100_1000_ms;
uint duration_1000_ms;
};

class Worker : public std::enable_shared_from_this<Worker> {
public:
typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;
Expand All @@ -48,9 +63,13 @@ class Worker : public std::enable_shared_from_this<Worker> {

virtual void scheduleEvery(ScheduledTask f, duration period);

void resetStats();
DurationDistribution getDurationDistribution() { return durations_; }

private:
void scheduleEvery(ScheduledTask f, duration period, duration next_delay);
std::function<void()> safeTask(std::function<void(std::shared_ptr<Worker>)> f);
void addToStats(duration task_duration);

protected:
int next_scheduled_ = 0;
Expand All @@ -63,6 +82,7 @@ class Worker : public std::enable_shared_from_this<Worker> {
boost::thread_group group_;
std::atomic<bool> closed_;
boost::thread::id thread_id_;
DurationDistribution durations_;
};

class SimulatedWorker : public Worker {
Expand Down
22 changes: 22 additions & 0 deletions erizoAPI/ThreadPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ using v8::FunctionTemplate;
using v8::HandleScope;
using v8::Exception;

using erizo::DurationDistribution;

Nan::Persistent<Function> ThreadPool::constructor;

ThreadPool::ThreadPool() {
Expand All @@ -28,6 +30,8 @@ NAN_MODULE_INIT(ThreadPool::Init) {
// Prototype
Nan::SetPrototypeMethod(tpl, "close", close);
Nan::SetPrototypeMethod(tpl, "start", start);
Nan::SetPrototypeMethod(tpl, "getDurationDistribution", getDurationDistribution);
Nan::SetPrototypeMethod(tpl, "resetStats", resetStats);

constructor.Reset(tpl->GetFunction());
Nan::Set(target, Nan::New("ThreadPool").ToLocalChecked(), Nan::GetFunction(tpl).ToLocalChecked());
Expand Down Expand Up @@ -58,3 +62,21 @@ NAN_METHOD(ThreadPool::start) {

obj->me->start();
}

NAN_METHOD(ThreadPool::getDurationDistribution) {
ThreadPool* obj = Nan::ObjectWrap::Unwrap<ThreadPool>(info.Holder());
DurationDistribution duration_distribution = obj->me->getDurationDistribution();
v8::Local<v8::Array> array = Nan::New<v8::Array>(5);
Nan::Set(array, 0, Nan::New(duration_distribution.duration_0_10_ms));
Nan::Set(array, 1, Nan::New(duration_distribution.duration_10_50_ms));
Nan::Set(array, 2, Nan::New(duration_distribution.duration_50_100_ms));
Nan::Set(array, 3, Nan::New(duration_distribution.duration_100_1000_ms));
Nan::Set(array, 4, Nan::New(duration_distribution.duration_1000_ms));

info.GetReturnValue().Set(array);
}

NAN_METHOD(ThreadPool::resetStats) {
ThreadPool* obj = Nan::ObjectWrap::Unwrap<ThreadPool>(info.Holder());
obj->me->resetStats();
}
3 changes: 3 additions & 0 deletions erizoAPI/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class ThreadPool : public Nan::ObjectWrap {
*/
static NAN_METHOD(start);

static NAN_METHOD(getDurationDistribution);
static NAN_METHOD(resetStats);

static Nan::Persistent<v8::Function> constructor;
};

Expand Down
50 changes: 49 additions & 1 deletion erizo_controller/ROV/rovMetricsGatherer.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ class RovMetricsGatherer {
totalPublishers: new promClient.Gauge({ name: this.getNameWithPrefix('total_publishers'), help: 'total active publishers' }),
totalSubscribers: new promClient.Gauge({ name: this.getNameWithPrefix('total_subscribers'), help: 'total active subscribers' }),
activeErizoJsProcesses: new promClient.Gauge({ name: this.getNameWithPrefix('active_erizojs_processes'), help: 'active processes' }),
totalConnectionsFailed: new promClient.Gauge({ name: this.getNameWithPrefix('total_connections_failed'), help: 'connections failed' }),
taskDuration0To10ms: new promClient.Gauge({ name: this.getNameWithPrefix('task_duration_0_to_10_ms'), help: 'tasks lasted less than 10 ms' }),
taskDuration10To50ms: new promClient.Gauge({ name: this.getNameWithPrefix('task_duration_10_to_50_ms'), help: 'tasks lasted between 10 and 50 ms' }),
taskDuration50To100ms: new promClient.Gauge({ name: this.getNameWithPrefix('task_duration_50_to_100_ms'), help: 'tasks lasted between 50 and 100 ms' }),
taskDuration100To1000ms: new promClient.Gauge({ name: this.getNameWithPrefix('task_duration_100_to_1000_ms'), help: 'tasks lasted between 100 ms and 1 s' }),
taskDurationMoreThan1000ms: new promClient.Gauge({ name: this.getNameWithPrefix('task_duration_1000_ms'), help: 'tasks lasted more than 1 s' }),
connectionQualityHigh: new promClient.Gauge({ name: this.getNameWithPrefix('connection_quality_high'), help: 'connections with high quality' }),
connectionQualityMedium: new promClient.Gauge({ name: this.getNameWithPrefix('connection_quality_medium'), help: 'connections with medium quality' }),
connectionQualityLow: new promClient.Gauge({ name: this.getNameWithPrefix('connection_quality_low'), help: 'connections with low quality' }),
totalPublishersInErizoJS: new promClient.Gauge({ name: this.getNameWithPrefix('total_publishers_erizojs'), help: 'total active publishers in erizo js' }),
totalSubscribersInErizoJS: new promClient.Gauge({ name: this.getNameWithPrefix('total_subscribers_erizojs'), help: 'total active subscribers in erizo js' }),
};
this.log = logger;
}
Expand Down Expand Up @@ -80,12 +91,49 @@ class RovMetricsGatherer {
return Promise.resolve();
}

getErizoJSMetrics() {
this.log.debug('Getting total connections failed');
return this.rovClient.runInComponentList('console.log(JSON.stringify(context.getAndResetMetrics()))', this.rovClient.components.erizoJS)
.then((results) => {
let totalConnectionsFailed = 0;
let taskDurationDistribution = Array(5).fill(0);
let connectionLevels = Array(10).fill(0);
let publishers = 0;
let subscribers = 0;
results.forEach((result) => {
const parsedResult = JSON.parse(result);
totalConnectionsFailed += parsedResult.connectionsFailed;
taskDurationDistribution =
taskDurationDistribution.map((a, i) => a + parsedResult.durationDistribution[i]);
connectionLevels = connectionLevels.map((a, i) => a + parsedResult.connectionLevels[i]);
publishers += parsedResult.publishers;
subscribers += parsedResult.subscribers;
});
this.log.debug(`Total connections failed: ${totalConnectionsFailed}`);
this.prometheusMetrics.totalConnectionsFailed.set(totalConnectionsFailed);
this.prometheusMetrics.taskDuration0To10ms.set(taskDurationDistribution[0]);
this.prometheusMetrics.taskDuration10To50ms.set(taskDurationDistribution[1]);
this.prometheusMetrics.taskDuration50To100ms.set(taskDurationDistribution[2]);
this.prometheusMetrics.taskDuration100To1000ms.set(taskDurationDistribution[3]);
this.prometheusMetrics.taskDurationMoreThan1000ms.set(taskDurationDistribution[4]);

this.prometheusMetrics.connectionQualityHigh.set(connectionLevels[2]);
this.prometheusMetrics.connectionQualityMedium.set(connectionLevels[1]);
this.prometheusMetrics.connectionQualityLow.set(connectionLevels[0]);

this.prometheusMetrics.totalPublishersInErizoJS.set(publishers);
this.prometheusMetrics.totalSubscribersInErizoJS.set(subscribers);
return Promise.resolve();
});
}

gatherMetrics() {
return this.rovClient.updateComponentsList()
.then(() => this.getTotalRooms())
.then(() => this.getTotalClients())
.then(() => this.getTotalPublishersAndSubscribers())
.then(() => this.getActiveProcesses());
.then(() => this.getActiveProcesses())
.then(() => this.getErizoJSMetrics());
}
}

Expand Down
40 changes: 40 additions & 0 deletions erizo_controller/erizoJS/erizoJSController.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,16 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {
}
};

const initMetrics = () => {
that.metrics = {
connectionsFailed: 0,
};
};


that.publishers = publishers;
that.ioThreadPool = io;
initMetrics();

const forEachPublisher = (action) => {
const publisherStreamIds = Object.keys(publishers);
Expand All @@ -81,6 +88,10 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {
connectionId, connectionEvent, newStatus) => {
const rpcID = `erizoController_${erizoControllerId}`;
amqper.callRpc(rpcID, 'connectionStatusEvent', [clientId, connectionId, newStatus, connectionEvent]);

if (connectionEvent.type === 'failed') {
that.metrics.connectionsFailed += 1;
}
};

const getOrCreateClient = (erizoControllerId, clientId, singlePC = false) => {
Expand Down Expand Up @@ -631,5 +642,34 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {
}
};

that.getAndResetMetrics = () => {
const metrics = Object.assign({}, that.metrics);
metrics.totalConnections = 0;
metrics.connectionLevels = Array(10).fill(0);
metrics.publishers = Object.keys(that.publishers).length;
let subscribers = 0;
Object.keys(that.publishers).forEach((streamId, publisher) => {
subscribers += publisher.numSubscribers;
});
metrics.subscribers = subscribers;

metrics.durationDistribution = threadPool.getDurationDistribution();
threadPool.resetStats();

clients.forEach((client) => {
const connections = client.getConnections();
metrics.totalConnections += connections.length;

connections.forEach((connection) => {
const level = connection.qualityLevel;
if (level >= 0 && level < metrics.connectionLevels.length) {
metrics.connectionLevels[level] += 1;
}
});
});
initMetrics();
return metrics;
};

return that;
};
4 changes: 4 additions & 0 deletions erizo_controller/erizoJS/models/Client.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class Client extends EventEmitter {
log.debug(`Client connections list size after add : ${this.connections.size}`);
}

getConnections() {
return Array.from(this.connections.values());
}

forceCloseConnection(id) {
const connection = this.connections.get(id);
if (connection !== undefined) {
Expand Down

0 comments on commit 0694784

Please sign in to comment.