Skip to content

Commit

Permalink
Add support for ExponentialHistogram (#279)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed Feb 10, 2024
1 parent 198881b commit 0485ef8
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 57 deletions.
85 changes: 85 additions & 0 deletions exporter/clickhousemetricsexporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ package clickhousemetricsexporter
import (
"context"
"fmt"
"math"
"net/url"
"runtime/pprof"
"strings"
"sync"
"time"

chproto "github.com/ClickHouse/ch-go/proto"
clickhouse "github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -50,6 +52,7 @@ const (
DISTRIBUTED_TIME_SERIES_TABLE_V4 = "distributed_time_series_v4"
DISTRIBUTED_SAMPLES_TABLE = "distributed_samples_v2"
DISTRIBUTED_SAMPLES_TABLE_V4 = "distributed_samples_v4"
DISTRIBUTED_EXP_HIST_TABLE = "distributed_exp_hist"
TIME_SERIES_TABLE = "time_series_v2"
temporalityLabel = "__temporality__"
envLabel = "env"
Expand Down Expand Up @@ -489,6 +492,88 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
ch.mWrittenTimeSeries.Add(float64(n))
ch.l.Debugf("Wrote %d new time series.", n)
}

err = func() error {
statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, unix_milli, count, sum, min, max, sketch) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ch.database, DISTRIBUTED_EXP_HIST_TABLE), driver.WithReleaseConnection())
if err != nil {
return err
}

for i, ts := range data.Timeseries {
fingerprint := fingerprints[i]
for _, s := range ts.Histograms {

sum := s.Sum
var count uint64
if x, ok := s.Count.(*prompb.Histogram_CountInt); ok {
count = uint64(x.CountInt)
} else if x, ok := s.Count.(*prompb.Histogram_CountFloat); ok {
count = uint64(x.CountFloat)
}
min, max := s.PositiveCounts[1], s.PositiveCounts[2]
gamma := math.Pow(2, math.Pow(2, float64(-s.Schema)))
positiveOffset := s.PositiveCounts[0]
negativeOffset := s.NegativeCounts[0]
var positivebinCounts []float64
for _, x := range s.PositiveDeltas {
positivebinCounts = append(positivebinCounts, float64(x))
}
var negativebinCounts []float64
for _, x := range s.NegativeDeltas {
negativebinCounts = append(negativebinCounts, float64(x))
}
var zeroCount int
if x, ok := s.ZeroCount.(*prompb.Histogram_ZeroCountInt); ok {
zeroCount = int(x.ZeroCountInt)
} else if x, ok := s.ZeroCount.(*prompb.Histogram_ZeroCountFloat); ok {
zeroCount = int(x.ZeroCountFloat)
}

sketch := chproto.DD{
Mapping: &chproto.IndexMapping{Gamma: gamma},
PositiveValues: &chproto.Store{
ContiguousBinIndexOffset: int32(positiveOffset),
ContiguousBinCounts: positivebinCounts,
},
NegativeValues: &chproto.Store{
ContiguousBinIndexOffset: int32(negativeOffset),
ContiguousBinCounts: negativebinCounts,
},
ZeroCount: float64(zeroCount),
}

meta := metricNameToMeta[fingerprintToName[fingerprint][nameLabel]]
err = statement.Append(
fingerprintToName[fingerprint][envLabel],
meta.Temporality.String(),
fingerprintToName[fingerprint][nameLabel],
fingerprint,
s.Timestamp,
count,
sum,
min,
max,
sketch,
)
if err != nil {
return err
}
}
}

start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, string(component.DataTypeMetrics)),
tag.Upsert(tableKey, DISTRIBUTED_EXP_HIST_TABLE),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
return err
}()
if err != nil {
return err
}

return nil
}

Expand Down
19 changes: 18 additions & 1 deletion exporter/clickhousemetricsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
temporality = metric.Sum().AggregationTemporality()
case pmetric.MetricTypeHistogram:
temporality = metric.Histogram().AggregationTemporality()
case pmetric.MetricTypeExponentialHistogram:
temporality = metric.ExponentialHistogram().AggregationTemporality()
case pmetric.MetricTypeSummary:
temporality = pmetric.AggregationTemporalityUnspecified
default:
Expand Down Expand Up @@ -256,7 +258,22 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
addSingleSummaryDataPoint(dataPoints.At(x), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
}
case pmetric.MetricTypeExponentialHistogram:
// TODO(srikanthccv): implement
// we don't support cumulative exponential histograms
if temporality == pmetric.AggregationTemporalityCumulative {
dropped++
prwe.logger.Warn("Dropped cumulative histogram metric", zap.String("name", metric.Name()))
continue
}

dataPoints := metric.ExponentialHistogram().DataPoints()
if dataPoints.Len() == 0 {
dropped++
prwe.logger.Warn("Dropped exponential histogram metric with no data points", zap.String("name", metric.Name()))
}

for x := 0; x < dataPoints.Len(); x++ {
addSingleExponentialHistogramDataPoint(dataPoints.At(x), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
}
default:
dropped++
name := metric.Name()
Expand Down
66 changes: 66 additions & 0 deletions exporter/clickhousemetricsexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,72 @@ func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Res
}
}

func addExpHistogram(tsMap map[string]*prompb.TimeSeries, h *prompb.Histogram, labels []prompb.Label,
metric pmetric.Metric) string {

if h == nil || labels == nil || tsMap == nil {
return ""
}

sig := timeSeriesSignature(metric, &labels)
ts, ok := tsMap[sig]

if ok {
ts.Histograms = append(ts.Histograms, *h)
} else {
newTs := &prompb.TimeSeries{
Labels: labels,
Histograms: []prompb.Histogram{*h},
}
tsMap[sig] = newTs
}

return sig
}

func addSingleExponentialHistogramDataPoint(
pt pmetric.ExponentialHistogramDataPoint,
resource pcommon.Resource,
metric pmetric.Metric,
namespace string,
tsMap map[string]*prompb.TimeSeries,
externalLabels map[string]string,
) {
time := convertTimeStamp(pt.Timestamp())
baseName := getPromMetricName(metric, namespace)

var positiveDeltas []int64
var negativeDeltas []int64

scale := int32(pt.Scale())
positiveOffset := int64(pt.Positive().Offset())
for _, i := range pt.Positive().BucketCounts().AsRaw() {
positiveDeltas = append(positiveDeltas, int64(i))
}
negativeOffset := int64(pt.Negative().Offset())
for _, i := range pt.Negative().BucketCounts().AsRaw() {
negativeDeltas = append(negativeDeltas, int64(i))
}
sum := pt.Sum()
count := pt.Count()
// Prometheus doesn't support min and max, we add them to PositiveCounts slice
// TODO(srikanthccv): Move to OTEL model
min := pt.Min()
max := pt.Max()

addExpHistogram(tsMap, &prompb.Histogram{
Schema: scale,
Sum: sum,
Count: &prompb.Histogram_CountInt{CountInt: count},
PositiveDeltas: positiveDeltas,
PositiveCounts: []float64{float64(positiveOffset), min, max},
NegativeDeltas: negativeDeltas,
NegativeCounts: []float64{float64(negativeOffset)},
ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: pt.ZeroCount()},
Timestamp: time,
}, createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName), metric)
}

func orderBySampleTimestamp(tsArray []prompb.TimeSeries) []prompb.TimeSeries {
for i := range tsArray {
sL := tsArray[i].Samples
Expand Down
33 changes: 17 additions & 16 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/SigNoz/signoz-otel-collector
go 1.21

require (
github.com/ClickHouse/ch-go v0.58.2
github.com/ClickHouse/clickhouse-go/v2 v2.15.0
github.com/IBM/sarama v1.41.2
github.com/Shopify/sarama v1.38.1
Expand All @@ -13,7 +14,7 @@ require (
github.com/golang-migrate/migrate/v4 v4.15.1
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/golang/snappy v0.0.4
github.com/google/uuid v1.3.1
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/golang-lru v1.0.2
Expand Down Expand Up @@ -131,7 +132,7 @@ require (
github.com/prometheus/common v0.44.0
github.com/prometheus/prometheus v0.47.2
github.com/segmentio/ksuid v1.0.4
github.com/sirupsen/logrus v1.9.2
github.com/sirupsen/logrus v1.9.3
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.16.0
github.com/stretchr/testify v1.8.4
Expand Down Expand Up @@ -164,7 +165,7 @@ require (
go.opentelemetry.io/collector/receiver v0.88.0
go.opentelemetry.io/collector/receiver/otlpreceiver v0.88.0
go.opentelemetry.io/collector/semconv v0.88.0
go.opentelemetry.io/otel/trace v1.19.0
go.opentelemetry.io/otel/trace v1.22.0
go.uber.org/atomic v1.11.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.26.0
Expand Down Expand Up @@ -199,7 +200,6 @@ require (
github.com/Azure/go-autorest/logger v0.2.1 // indirect
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 // indirect
github.com/ClickHouse/ch-go v0.58.2 // indirect
github.com/DataDog/zstd v1.5.0 // indirect
github.com/GehirnInc/crypt v0.0.0-20200316065508-bb7000b8a962 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.20.0 // indirect
Expand Down Expand Up @@ -253,7 +253,7 @@ require (
github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.6.1 // indirect
github.com/go-faster/errors v0.7.1 // indirect
github.com/go-openapi/jsonpointer v0.20.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.4 // indirect
Expand Down Expand Up @@ -347,7 +347,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite v0.88.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/winperfcounters v0.88.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc4 // indirect
github.com/opencontainers/image-spec v1.1.0-rc5 // indirect
github.com/opencontainers/runc v1.1.12 // indirect
github.com/opencontainers/runtime-spec v1.0.3-0.20220909204839-494a5a6aca78 // indirect
github.com/opencontainers/selinux v1.10.0 // indirect
Expand Down Expand Up @@ -445,7 +445,7 @@ require (
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/gogo/googleapis v1.4.1 // indirect
Expand All @@ -465,7 +465,7 @@ require (
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.1 // indirect
github.com/klauspost/compress v1.17.5 // indirect
github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165 // indirect
github.com/leoluk/perflib_exporter v0.2.1 // indirect
github.com/lib/pq v1.10.9 // indirect
Expand All @@ -490,7 +490,7 @@ require (
github.com/paulmach/orb v0.10.0 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_model v0.5.0 // indirect
Expand Down Expand Up @@ -519,16 +519,16 @@ require (
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 // indirect
go.opentelemetry.io/contrib/propagators/b3 v1.20.0 // indirect
go.opentelemetry.io/contrib/zpages v0.45.0 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel v1.22.0 // indirect
go.opentelemetry.io/otel/bridge/opencensus v0.42.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.42.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
go.opentelemetry.io/otel/sdk v1.19.0 // indirect
go.opentelemetry.io/otel/metric v1.22.0 // indirect
go.opentelemetry.io/otel/sdk v1.22.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.19.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.16.0 // indirect
gonum.org/v1/gonum v0.14.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a // indirect
Expand All @@ -537,7 +537,8 @@ require (
)

replace (
github.com/ClickHouse/clickhouse-go/v2 v2.15.0 => github.com/SigNoz/clickhouse-go/v2 v2.15.1
github.com/ClickHouse/ch-go v0.58.2 => github.com/SigNoz/ch-go v0.61.2-dd
github.com/ClickHouse/clickhouse-go/v2 v2.15.0 => github.com/SigNoz/clickhouse-go/v2 v2.15.2
github.com/golang-migrate/migrate/v4 => github.com/SigNoz/golang-migrate/v4 v4.16.4
github.com/vjeantet/grok => github.com/signoz/grok v1.0.3

Expand Down
Loading

0 comments on commit 0485ef8

Please sign in to comment.