Skip to content

Commit

Permalink
CPP-964 Add refresh-interval support for histogram metrics (#561)
Browse files Browse the repository at this point in the history
  • Loading branch information
absurdfarce committed Oct 23, 2023
1 parent 8d65d1e commit fc38817
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 33 deletions.
22 changes: 22 additions & 0 deletions include/cassandra.h
Original file line number Diff line number Diff line change
Expand Up @@ -2934,6 +2934,28 @@ CASS_EXPORT void
cass_cluster_set_monitor_reporting_interval(CassCluster* cluster,
unsigned interval_secs);

/**
* Sets the amount of time after which metric histograms should be refreshed.
* Upon refresh histograms are reset to zero, effectively dropping any history to
* that point. Refresh occurs when a snapshot is requested so ths value should
* be thought of as a minimum time to refresh.
*
* If refresh is not enabled the driver will continue to accumulate histogram
* data over the life of a session; this is the default behaviour and replicates
* the behaviour of previous versions.
*
* Note that the specified interval must be > 0 otherwise CASS_ERROR_LIB_BAD_PARAMS
* will be returned.
*
* @public @memberof CassCluster
*
* @param cluster
* @param refresh_interval Minimum interval (in milliseconds) for refresh interval
*/
CASS_EXPORT CassError
cass_cluster_set_histogram_refresh_interval(CassCluster* cluster,
unsigned refresh_interval);

/***********************************************************************************
*
* Session
Expand Down
8 changes: 8 additions & 0 deletions src/cluster_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,14 @@ void cass_cluster_set_monitor_reporting_interval(CassCluster* cluster, unsigned
cluster->config().set_monitor_reporting_interval_secs(interval_secs);
}

CassError cass_cluster_set_histogram_refresh_interval(CassCluster* cluster, unsigned refresh_interval) {
if (refresh_interval <= 0) {
return CASS_ERROR_LIB_BAD_PARAMS;
}
cluster->config().set_cluster_histogram_refresh_interval(refresh_interval);
return CASS_OK;
}

void cass_cluster_free(CassCluster* cluster) { delete cluster->from(); }

} // extern "C"
10 changes: 9 additions & 1 deletion src/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class Config {
, is_client_id_set_(false)
, host_listener_(new DefaultHostListener())
, monitor_reporting_interval_secs_(CASS_DEFAULT_CLIENT_MONITOR_EVENTS_INTERVAL_SECS)
, cluster_metadata_resolver_factory_(new DefaultClusterMetadataResolverFactory()) {
, cluster_metadata_resolver_factory_(new DefaultClusterMetadataResolverFactory())
, histogram_refresh_interval_(CASS_DEFAULT_HISTOGRAM_REFRESH_INTERVAL_NO_REFRESH) {
profiles_.set_empty_key(String());

// Assign the defaults to the cluster profile
Expand Down Expand Up @@ -392,6 +393,12 @@ class Config {
}
}

unsigned cluster_histogram_refresh_interval() const { return histogram_refresh_interval_; }

void set_cluster_histogram_refresh_interval(unsigned refresh_interval) {
histogram_refresh_interval_ = refresh_interval;
}

private:
void init_profiles();

Expand Down Expand Up @@ -441,6 +448,7 @@ class Config {
unsigned monitor_reporting_interval_secs_;
CloudSecureConnectionConfig cloud_secure_connection_config_;
ClusterMetadataResolverFactory::Ptr cluster_metadata_resolver_factory_;
unsigned histogram_refresh_interval_;
};

}}} // namespace datastax::internal::core
Expand Down
1 change: 1 addition & 0 deletions src/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
#define CASS_DEFAULT_MAX_TRACING_DATA_WAIT_TIME_MS 15
#define CASS_DEFAULT_RETRY_TRACING_DATA_WAIT_TIME_MS 3
#define CASS_DEFAULT_TRACING_CONSISTENCY CASS_CONSISTENCY_ONE
#define CASS_DEFAULT_HISTOGRAM_REFRESH_INTERVAL_NO_REFRESH 0

// Request-level defaults
#define CASS_DEFAULT_CONSISTENCY CASS_CONSISTENCY_LOCAL_ONE
Expand Down
114 changes: 83 additions & 31 deletions src/metrics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "allocated.hpp"
#include "atomic.hpp"
#include "constants.hpp"
#include "get_time.hpp"
#include "scoped_lock.hpp"
#include "scoped_ptr.hpp"
#include "utils.hpp"
Expand Down Expand Up @@ -268,9 +269,15 @@ class Metrics : public Allocated {
int64_t percentile_999th;
};

Histogram(ThreadState* thread_state)
Histogram(ThreadState* thread_state, unsigned refresh_interval = CASS_DEFAULT_HISTOGRAM_REFRESH_INTERVAL_NO_REFRESH)
: thread_state_(thread_state)
, histograms_(new PerThreadHistogram[thread_state->max_threads()]) {
, histograms_(new PerThreadHistogram[thread_state->max_threads()])
, zero_snapshot_(Snapshot {0,0,0,0,0,0,0,0,0,0}) {

refresh_interval_ = refresh_interval;
refresh_timestamp_ = get_time_since_epoch_ms();
cached_snapshot_ = zero_snapshot_;

hdr_init(1LL, HIGHEST_TRACKABLE_VALUE, 3, &histogram_);
uv_mutex_init(&mutex_);
}
Expand All @@ -286,38 +293,76 @@ class Metrics : public Allocated {

void get_snapshot(Snapshot* snapshot) const {
ScopedMutex l(&mutex_);
hdr_histogram* h = histogram_;
for (size_t i = 0; i < thread_state_->max_threads(); ++i) {
histograms_[i].add(h);

// In the "no refresh" case (the default) fall back to the old behaviour; add per-thread
// timestamps to histogram_ (without any clearing of data) and return what's there.
if (refresh_interval_ == CASS_DEFAULT_HISTOGRAM_REFRESH_INTERVAL_NO_REFRESH) {

for (size_t i = 0; i < thread_state_->max_threads(); ++i) {
histograms_[i].add(histogram_);
}

if (histogram_->total_count == 0) {
// There is no data; default to 0 for the stats.
copy_snapshot(zero_snapshot_, snapshot);
} else {
histogram_to_snapshot(histogram_, snapshot);
}
return;
}

if (h->total_count == 0) {
// There is no data; default to 0 for the stats.
snapshot->max = 0;
snapshot->min = 0;
snapshot->mean = 0;
snapshot->stddev = 0;
snapshot->median = 0;
snapshot->percentile_75th = 0;
snapshot->percentile_95th = 0;
snapshot->percentile_98th = 0;
snapshot->percentile_99th = 0;
snapshot->percentile_999th = 0;
} else {
snapshot->max = hdr_max(h);
snapshot->min = hdr_min(h);
snapshot->mean = static_cast<int64_t>(hdr_mean(h));
snapshot->stddev = static_cast<int64_t>(hdr_stddev(h));
snapshot->median = hdr_value_at_percentile(h, 50.0);
snapshot->percentile_75th = hdr_value_at_percentile(h, 75.0);
snapshot->percentile_95th = hdr_value_at_percentile(h, 95.0);
snapshot->percentile_98th = hdr_value_at_percentile(h, 98.0);
snapshot->percentile_99th = hdr_value_at_percentile(h, 99.0);
snapshot->percentile_999th = hdr_value_at_percentile(h, 99.9);
// Refresh interval is in use. If we've exceeded the interval clear histogram_,
// compute a new aggregate histogram and build (and cache) a new snapshot. Otherwise
// just return the cached version.
uint64_t now = get_time_since_epoch_ms();
if (now - refresh_timestamp_ >= refresh_interval_) {

hdr_reset(histogram_);

for (size_t i = 0; i < thread_state_->max_threads(); ++i) {
histograms_[i].add(histogram_);
}

if (histogram_->total_count == 0) {
copy_snapshot(zero_snapshot_, &cached_snapshot_);
} else {
histogram_to_snapshot(histogram_, &cached_snapshot_);
}
refresh_timestamp_ = now;
}

copy_snapshot(cached_snapshot_, snapshot);
}

private:

void copy_snapshot(Snapshot from, Snapshot* to) const {
to->min = from.min;
to->max = from.max;
to->mean = from.mean;
to->stddev = from.stddev;
to->median = from.median;
to->percentile_75th = from.percentile_75th;
to->percentile_95th = from.percentile_95th;
to->percentile_98th = from.percentile_98th;
to->percentile_99th = from.percentile_99th;
to->percentile_999th = from.percentile_999th;
}

void histogram_to_snapshot(hdr_histogram* h, Snapshot* to) const {
to->min = hdr_min(h);
to->max = hdr_max(h);
to->mean = static_cast<int64_t>(hdr_mean(h));
to->stddev = static_cast<int64_t>(hdr_stddev(h));
to->median = hdr_value_at_percentile(h, 50.0);
to->percentile_75th = hdr_value_at_percentile(h, 75.0);
to->percentile_95th = hdr_value_at_percentile(h, 95.0);
to->percentile_98th = hdr_value_at_percentile(h, 98.0);
to->percentile_99th = hdr_value_at_percentile(h, 99.0);
to->percentile_999th = hdr_value_at_percentile(h, 99.9);
}


class WriterReaderPhaser {
public:
WriterReaderPhaser()
Expand Down Expand Up @@ -409,14 +454,19 @@ class Metrics : public Allocated {
hdr_histogram* histogram_;
mutable uv_mutex_t mutex_;

unsigned refresh_interval_;
mutable uint64_t refresh_timestamp_;
mutable Snapshot cached_snapshot_;
const Snapshot zero_snapshot_;

private:
DISALLOW_COPY_AND_ASSIGN(Histogram);
};

Metrics(size_t max_threads)
Metrics(size_t max_threads, unsigned histogram_refresh_interval)
: thread_state_(max_threads)
, request_latencies(&thread_state_)
, speculative_request_latencies(&thread_state_)
, request_latencies(&thread_state_, histogram_refresh_interval)
, speculative_request_latencies(&thread_state_, histogram_refresh_interval)
, request_rates(&thread_state_)
, total_connections(&thread_state_)
, connection_timeouts(&thread_state_)
Expand Down Expand Up @@ -447,6 +497,8 @@ class Metrics : public Allocated {
Counter connection_timeouts;
Counter request_timeouts;

unsigned histogram_refresh_interval;

private:
DISALLOW_COPY_AND_ASSIGN(Metrics);
};
Expand Down
2 changes: 1 addition & 1 deletion src/session_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Future::Ptr SessionBase::connect(const Config& config, const String& keyspace) {
random_.reset();
}

metrics_.reset(new Metrics(config.thread_count_io() + 1));
metrics_.reset(new Metrics(config.thread_count_io() + 1, config.cluster_histogram_refresh_interval()));

cluster_.reset();
ClusterConnector::Ptr connector(
Expand Down
86 changes: 86 additions & 0 deletions tests/src/unit/tests/test_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,92 @@ TEST(MetricsUnitTest, HistogramEmpty) {
EXPECT_EQ(snapshot.percentile_999th, 0);
}

TEST(MetricsUnitTest, HistogramWithRefreshInterval) {
unsigned refresh_interval = 1000;
Metrics::ThreadState thread_state(1);
Metrics::Histogram histogram(&thread_state, refresh_interval);

Metrics::Histogram::Snapshot snapshot;

// Retrieval before the first interval runs will simply return zeros
histogram.get_snapshot(&snapshot);
EXPECT_EQ(snapshot.min, 0);
EXPECT_EQ(snapshot.max, 0);
EXPECT_EQ(snapshot.median, 0);
EXPECT_EQ(snapshot.percentile_75th, 0);
EXPECT_EQ(snapshot.percentile_95th, 0);
EXPECT_EQ(snapshot.percentile_98th, 0);
EXPECT_EQ(snapshot.percentile_99th, 0);
EXPECT_EQ(snapshot.percentile_999th, 0);
EXPECT_EQ(snapshot.mean, 0);
EXPECT_EQ(snapshot.stddev, 0);

// Values added during the first interval (or for that matter any
// interval) will be buffered in per-thread counters and will be
// included in the next generated snapshot
for (uint64_t i = 1; i <= 100; ++i) {
histogram.record_value(i);
}
test::Utils::msleep(1.2 * refresh_interval);

histogram.get_snapshot(&snapshot);
EXPECT_EQ(snapshot.min, 1);
EXPECT_EQ(snapshot.max, 100);
EXPECT_EQ(snapshot.median, 50);
EXPECT_EQ(snapshot.percentile_75th, 75);
EXPECT_EQ(snapshot.percentile_95th, 95);
EXPECT_EQ(snapshot.percentile_98th, 98);
EXPECT_EQ(snapshot.percentile_99th, 99);
EXPECT_EQ(snapshot.percentile_999th, 100);
EXPECT_EQ(snapshot.mean, 50);
EXPECT_EQ(snapshot.stddev, 28);

// Generated snapshot should only include values added within
// the current interval
test::Utils::msleep(1.2 * refresh_interval);
for (uint64_t i = 101; i <= 200; ++i) {
histogram.record_value(i);
}

histogram.get_snapshot(&snapshot);
EXPECT_EQ(snapshot.min, 101);
EXPECT_EQ(snapshot.max, 200);
EXPECT_EQ(snapshot.median, 150);
EXPECT_EQ(snapshot.percentile_75th, 175);
EXPECT_EQ(snapshot.percentile_95th, 195);
EXPECT_EQ(snapshot.percentile_98th, 198);
EXPECT_EQ(snapshot.percentile_99th, 199);
EXPECT_EQ(snapshot.percentile_999th, 200);
EXPECT_EQ(snapshot.mean, 150);
EXPECT_EQ(snapshot.stddev, 28);
}

// Variant of the case above. If we have no requests for the entirety
// of the refresh interval make sure the stats return zero
TEST(MetricsUnitTest, HistogramWithRefreshIntervalNoActivity) {
unsigned refresh_interval = 1000;
Metrics::ThreadState thread_state(1);
Metrics::Histogram histogram(&thread_state, refresh_interval);

Metrics::Histogram::Snapshot snapshot;

// Initial refresh interval (where we always return zero) + another interval of
// no activity
test::Utils::msleep(2.2 * refresh_interval);

histogram.get_snapshot(&snapshot);
EXPECT_EQ(snapshot.min, 0);
EXPECT_EQ(snapshot.max, 0);
EXPECT_EQ(snapshot.median, 0);
EXPECT_EQ(snapshot.percentile_75th, 0);
EXPECT_EQ(snapshot.percentile_95th, 0);
EXPECT_EQ(snapshot.percentile_98th, 0);
EXPECT_EQ(snapshot.percentile_99th, 0);
EXPECT_EQ(snapshot.percentile_999th, 0);
EXPECT_EQ(snapshot.mean, 0);
EXPECT_EQ(snapshot.stddev, 0);
}

TEST(MetricsUnitTest, HistogramWithThreads) {
HistogramThreadArgs args[NUM_THREADS];

Expand Down

0 comments on commit fc38817

Please sign in to comment.