Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for ExponentialHistogram #279

Merged
merged 3 commits into from
Feb 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions exporter/clickhousemetricsexporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ package clickhousemetricsexporter
import (
"context"
"fmt"
"math"
"net/url"
"runtime/pprof"
"strings"
"sync"
"time"

chproto "github.com/ClickHouse/ch-go/proto"
clickhouse "github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -50,6 +52,7 @@ const (
DISTRIBUTED_TIME_SERIES_TABLE_V4 = "distributed_time_series_v4"
DISTRIBUTED_SAMPLES_TABLE = "distributed_samples_v2"
DISTRIBUTED_SAMPLES_TABLE_V4 = "distributed_samples_v4"
DISTRIBUTED_EXP_HIST_TABLE = "distributed_exp_hist"
TIME_SERIES_TABLE = "time_series_v2"
temporalityLabel = "__temporality__"
envLabel = "env"
Expand Down Expand Up @@ -489,6 +492,88 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
ch.mWrittenTimeSeries.Add(float64(n))
ch.l.Debugf("Wrote %d new time series.", n)
}

err = func() error {
statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, unix_milli, count, sum, min, max, sketch) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ch.database, DISTRIBUTED_EXP_HIST_TABLE), driver.WithReleaseConnection())
if err != nil {
return err
}

for i, ts := range data.Timeseries {
fingerprint := fingerprints[i]
for _, s := range ts.Histograms {

sum := s.Sum
var count uint64
if x, ok := s.Count.(*prompb.Histogram_CountInt); ok {
count = uint64(x.CountInt)
} else if x, ok := s.Count.(*prompb.Histogram_CountFloat); ok {
count = uint64(x.CountFloat)
}
min, max := s.PositiveCounts[1], s.PositiveCounts[2]
gamma := math.Pow(2, math.Pow(2, float64(-s.Schema)))
positiveOffset := s.PositiveCounts[0]
negativeOffset := s.NegativeCounts[0]
var positivebinCounts []float64
for _, x := range s.PositiveDeltas {
positivebinCounts = append(positivebinCounts, float64(x))
}
var negativebinCounts []float64
for _, x := range s.NegativeDeltas {
negativebinCounts = append(negativebinCounts, float64(x))
}
var zeroCount int
if x, ok := s.ZeroCount.(*prompb.Histogram_ZeroCountInt); ok {
zeroCount = int(x.ZeroCountInt)
} else if x, ok := s.ZeroCount.(*prompb.Histogram_ZeroCountFloat); ok {
zeroCount = int(x.ZeroCountFloat)
}

sketch := chproto.DD{
Mapping: &chproto.IndexMapping{Gamma: gamma},
PositiveValues: &chproto.Store{
ContiguousBinIndexOffset: int32(positiveOffset),
ContiguousBinCounts: positivebinCounts,
},
NegativeValues: &chproto.Store{
ContiguousBinIndexOffset: int32(negativeOffset),
ContiguousBinCounts: negativebinCounts,
},
ZeroCount: float64(zeroCount),
}

meta := metricNameToMeta[fingerprintToName[fingerprint][nameLabel]]
err = statement.Append(
fingerprintToName[fingerprint][envLabel],
meta.Temporality.String(),
fingerprintToName[fingerprint][nameLabel],
fingerprint,
s.Timestamp,
count,
sum,
min,
max,
sketch,
)
if err != nil {
return err
}
}
}

start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, string(component.DataTypeMetrics)),
tag.Upsert(tableKey, DISTRIBUTED_EXP_HIST_TABLE),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
return err
}()
if err != nil {
return err
}

return nil
}

Expand Down
19 changes: 18 additions & 1 deletion exporter/clickhousemetricsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
temporality = metric.Sum().AggregationTemporality()
case pmetric.MetricTypeHistogram:
temporality = metric.Histogram().AggregationTemporality()
case pmetric.MetricTypeExponentialHistogram:
temporality = metric.ExponentialHistogram().AggregationTemporality()
case pmetric.MetricTypeSummary:
temporality = pmetric.AggregationTemporalityUnspecified
default:
Expand Down Expand Up @@ -256,7 +258,22 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
addSingleSummaryDataPoint(dataPoints.At(x), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
}
case pmetric.MetricTypeExponentialHistogram:
// TODO(srikanthccv): implement
// we don't support cumulative exponential histograms
if temporality == pmetric.AggregationTemporalityCumulative {
dropped++
prwe.logger.Warn("Dropped cumulative histogram metric", zap.String("name", metric.Name()))
continue
}

dataPoints := metric.ExponentialHistogram().DataPoints()
if dataPoints.Len() == 0 {
dropped++
prwe.logger.Warn("Dropped exponential histogram metric with no data points", zap.String("name", metric.Name()))
}

for x := 0; x < dataPoints.Len(); x++ {
addSingleExponentialHistogramDataPoint(dataPoints.At(x), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
}
default:
dropped++
name := metric.Name()
Expand Down
66 changes: 66 additions & 0 deletions exporter/clickhousemetricsexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,72 @@ func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Res
}
}

func addExpHistogram(tsMap map[string]*prompb.TimeSeries, h *prompb.Histogram, labels []prompb.Label,
metric pmetric.Metric) string {

if h == nil || labels == nil || tsMap == nil {
return ""
}

sig := timeSeriesSignature(metric, &labels)
ts, ok := tsMap[sig]

if ok {
ts.Histograms = append(ts.Histograms, *h)
} else {
newTs := &prompb.TimeSeries{
Labels: labels,
Histograms: []prompb.Histogram{*h},
}
tsMap[sig] = newTs
}

return sig
}

func addSingleExponentialHistogramDataPoint(
pt pmetric.ExponentialHistogramDataPoint,
resource pcommon.Resource,
metric pmetric.Metric,
namespace string,
tsMap map[string]*prompb.TimeSeries,
externalLabels map[string]string,
) {
time := convertTimeStamp(pt.Timestamp())
baseName := getPromMetricName(metric, namespace)

var positiveDeltas []int64
var negativeDeltas []int64

scale := int32(pt.Scale())
positiveOffset := int64(pt.Positive().Offset())
for _, i := range pt.Positive().BucketCounts().AsRaw() {
positiveDeltas = append(positiveDeltas, int64(i))
}
negativeOffset := int64(pt.Negative().Offset())
for _, i := range pt.Negative().BucketCounts().AsRaw() {
negativeDeltas = append(negativeDeltas, int64(i))
}
sum := pt.Sum()
count := pt.Count()
// Prometheus doesn't support min and max, we add them to PositiveCounts slice
// TODO(srikanthccv): Move to OTEL model
min := pt.Min()
max := pt.Max()

addExpHistogram(tsMap, &prompb.Histogram{
Schema: scale,
Sum: sum,
Count: &prompb.Histogram_CountInt{CountInt: count},
PositiveDeltas: positiveDeltas,
PositiveCounts: []float64{float64(positiveOffset), min, max},
NegativeDeltas: negativeDeltas,
NegativeCounts: []float64{float64(negativeOffset)},
ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: pt.ZeroCount()},
Timestamp: time,
}, createAttributes(resource, pt.Attributes(), externalLabels, nameStr, baseName), metric)
}

func orderBySampleTimestamp(tsArray []prompb.TimeSeries) []prompb.TimeSeries {
for i := range tsArray {
sL := tsArray[i].Samples
Expand Down
33 changes: 17 additions & 16 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/SigNoz/signoz-otel-collector
go 1.21

require (
github.com/ClickHouse/ch-go v0.58.2
github.com/ClickHouse/clickhouse-go/v2 v2.15.0
github.com/IBM/sarama v1.41.2
github.com/Shopify/sarama v1.38.1
Expand All @@ -13,7 +14,7 @@ require (
github.com/golang-migrate/migrate/v4 v4.15.1
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/golang/snappy v0.0.4
github.com/google/uuid v1.3.1
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/golang-lru v1.0.2
Expand Down Expand Up @@ -131,7 +132,7 @@ require (
github.com/prometheus/common v0.44.0
github.com/prometheus/prometheus v0.47.2
github.com/segmentio/ksuid v1.0.4
github.com/sirupsen/logrus v1.9.2
github.com/sirupsen/logrus v1.9.3
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.16.0
github.com/stretchr/testify v1.8.4
Expand Down Expand Up @@ -164,7 +165,7 @@ require (
go.opentelemetry.io/collector/receiver v0.88.0
go.opentelemetry.io/collector/receiver/otlpreceiver v0.88.0
go.opentelemetry.io/collector/semconv v0.88.0
go.opentelemetry.io/otel/trace v1.19.0
go.opentelemetry.io/otel/trace v1.22.0
go.uber.org/atomic v1.11.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.26.0
Expand Down Expand Up @@ -199,7 +200,6 @@ require (
github.com/Azure/go-autorest/logger v0.2.1 // indirect
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 // indirect
github.com/ClickHouse/ch-go v0.58.2 // indirect
github.com/DataDog/zstd v1.5.0 // indirect
github.com/GehirnInc/crypt v0.0.0-20200316065508-bb7000b8a962 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.20.0 // indirect
Expand Down Expand Up @@ -253,7 +253,7 @@ require (
github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.6.1 // indirect
github.com/go-faster/errors v0.7.1 // indirect
github.com/go-openapi/jsonpointer v0.20.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.4 // indirect
Expand Down Expand Up @@ -347,7 +347,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite v0.88.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/winperfcounters v0.88.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc4 // indirect
github.com/opencontainers/image-spec v1.1.0-rc5 // indirect
github.com/opencontainers/runc v1.1.12 // indirect
github.com/opencontainers/runtime-spec v1.0.3-0.20220909204839-494a5a6aca78 // indirect
github.com/opencontainers/selinux v1.10.0 // indirect
Expand Down Expand Up @@ -445,7 +445,7 @@ require (
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/gogo/googleapis v1.4.1 // indirect
Expand All @@ -465,7 +465,7 @@ require (
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.1 // indirect
github.com/klauspost/compress v1.17.5 // indirect
github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165 // indirect
github.com/leoluk/perflib_exporter v0.2.1 // indirect
github.com/lib/pq v1.10.9 // indirect
Expand All @@ -490,7 +490,7 @@ require (
github.com/paulmach/orb v0.10.0 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_model v0.5.0 // indirect
Expand Down Expand Up @@ -519,16 +519,16 @@ require (
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 // indirect
go.opentelemetry.io/contrib/propagators/b3 v1.20.0 // indirect
go.opentelemetry.io/contrib/zpages v0.45.0 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel v1.22.0 // indirect
go.opentelemetry.io/otel/bridge/opencensus v0.42.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.42.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
go.opentelemetry.io/otel/sdk v1.19.0 // indirect
go.opentelemetry.io/otel/metric v1.22.0 // indirect
go.opentelemetry.io/otel/sdk v1.22.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.19.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.16.0 // indirect
gonum.org/v1/gonum v0.14.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a // indirect
Expand All @@ -537,7 +537,8 @@ require (
)

replace (
github.com/ClickHouse/clickhouse-go/v2 v2.15.0 => github.com/SigNoz/clickhouse-go/v2 v2.15.1
github.com/ClickHouse/ch-go v0.58.2 => github.com/SigNoz/ch-go v0.61.2-dd
github.com/ClickHouse/clickhouse-go/v2 v2.15.0 => github.com/SigNoz/clickhouse-go/v2 v2.15.2
github.com/golang-migrate/migrate/v4 => github.com/SigNoz/golang-migrate/v4 v4.16.4
github.com/vjeantet/grok => github.com/signoz/grok v1.0.3

Expand Down
Loading
Loading