diff --git a/processor/signozspanmetricsprocessor/config.go b/processor/signozspanmetricsprocessor/config.go index f660af24..7594b35c 100644 --- a/processor/signozspanmetricsprocessor/config.go +++ b/processor/signozspanmetricsprocessor/config.go @@ -83,6 +83,8 @@ type Config struct { // MetricsEmitInterval is the time period between when metrics are flushed or emitted to the configured MetricsExporter. MetricsFlushInterval time.Duration `mapstructure:"metrics_flush_interval"` + + EnableExpHistogram bool `mapstructure:"enable_exp_histogram"` } // GetAggregationTemporality converts the string value given in the config into a AggregationTemporality. diff --git a/processor/signozspanmetricsprocessor/factory.go b/processor/signozspanmetricsprocessor/factory.go index d258ce00..a3927b05 100644 --- a/processor/signozspanmetricsprocessor/factory.go +++ b/processor/signozspanmetricsprocessor/factory.go @@ -48,6 +48,7 @@ func createDefaultConfig() component.Config { DimensionsCacheSize: defaultDimensionsCacheSize, skipSanitizeLabel: dropSanitizationFeatureGate.IsEnabled(), MetricsFlushInterval: 60 * time.Second, + EnableExpHistogram: false, } } diff --git a/processor/signozspanmetricsprocessor/processor.go b/processor/signozspanmetricsprocessor/processor.go index 4ec6c764..e3f24774 100644 --- a/processor/signozspanmetricsprocessor/processor.go +++ b/processor/signozspanmetricsprocessor/processor.go @@ -26,6 +26,7 @@ import ( "time" "unicode" + "github.com/lightstep/go-expohisto/structure" "github.com/tilinna/clock" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -67,6 +68,10 @@ type exemplarData struct { value float64 } +type exponentialHistogram struct { + histogram *structure.Histogram[float64] +} + type metricKey string type processorImp struct { @@ -80,6 +85,7 @@ type processorImp struct { // Additional dimensions to add to metrics. dimensions []dimension // signoz_latency metric + expDimensions []dimension // signoz_latency exphisto metric callDimensions []dimension // signoz_calls_total metric dbCallDimensions []dimension // signoz_db_latency_* metric externalCallDimensions []dimension // signoz_external_call_latency_* metric @@ -91,6 +97,8 @@ type processorImp struct { histograms map[metricKey]*histogramData // signoz_latency metric latencyBounds []float64 + expHistograms map[metricKey]*exponentialHistogram + callHistograms map[metricKey]*histogramData // signoz_calls_total metric callLatencyBounds []float64 @@ -105,6 +113,7 @@ type processorImp struct { // An LRU cache of dimension key-value maps keyed by a unique identifier formed by a concatenation of its values: // e.g. { "foo/barOK": { "serviceName": "foo", "operation": "/bar", "status_code": "OK" }} metricKeyToDimensions *cache.Cache[metricKey, pcommon.Map] + expHistogramKeyToDimensions *cache.Cache[metricKey, pcommon.Map] callMetricKeyToDimensions *cache.Cache[metricKey, pcommon.Map] dbCallMetricKeyToDimensions *cache.Cache[metricKey, pcommon.Map] externalCallMetricKeyToDimensions *cache.Cache[metricKey, pcommon.Map] @@ -146,6 +155,39 @@ type histogramData struct { exemplarsData []exemplarData } +func expoHistToExponentialDataPoint(agg *structure.Histogram[float64], dp pmetric.ExponentialHistogramDataPoint) { + dp.SetCount(agg.Count()) + dp.SetSum(agg.Sum()) + if agg.Count() != 0 { + dp.SetMin(agg.Min()) + dp.SetMax(agg.Max()) + } + + dp.SetZeroCount(agg.ZeroCount()) + dp.SetScale(agg.Scale()) + + for _, half := range []struct { + inFunc func() *structure.Buckets + outFunc func() pmetric.ExponentialHistogramDataPointBuckets + }{ + {agg.Positive, dp.Positive}, + {agg.Negative, dp.Negative}, + } { + in := half.inFunc() + out := half.outFunc() + out.SetOffset(in.Offset()) + out.BucketCounts().EnsureCapacity(int(in.Len())) + + for i := uint32(0); i < in.Len(); i++ { + out.BucketCounts().Append(in.At(i)) + } + } +} + +func (h *exponentialHistogram) Observe(value float64) { + h.histogram.Update(value) +} + func newProcessor(logger *zap.Logger, instanceID string, config component.Config, ticker *clock.Ticker) (*processorImp, error) { logger.Info("Building signozspanmetricsprocessor") pConfig := config.(*Config) @@ -185,6 +227,11 @@ func newProcessor(logger *zap.Logger, instanceID string, config component.Config if err != nil { return nil, err } + expHistogramKeyToDimensionsCache, err := cache.NewCache[metricKey, pcommon.Map](pConfig.DimensionsCacheSize) + if err != nil { + return nil, err + } + callMetricKeyToDimensionsCache, err := cache.NewCache[metricKey, pcommon.Map](pConfig.DimensionsCacheSize) if err != nil { return nil, err @@ -209,6 +256,7 @@ func newProcessor(logger *zap.Logger, instanceID string, config component.Config config: *pConfig, startTimestamp: pcommon.NewTimestampFromTime(time.Now()), histograms: make(map[metricKey]*histogramData), + expHistograms: make(map[metricKey]*exponentialHistogram), callHistograms: make(map[metricKey]*histogramData), dbCallHistograms: make(map[metricKey]*histogramData), externalCallHistograms: make(map[metricKey]*histogramData), @@ -217,11 +265,13 @@ func newProcessor(logger *zap.Logger, instanceID string, config component.Config dbCallLatencyBounds: bounds, externalCallLatencyBounds: bounds, dimensions: newDimensions(pConfig.Dimensions), + expDimensions: newDimensions(pConfig.Dimensions), callDimensions: newDimensions(callDimensions), dbCallDimensions: newDimensions(dbCallDimensions), externalCallDimensions: newDimensions(externalCallDimensions), keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)), metricKeyToDimensions: metricKeyToDimensionsCache, + expHistogramKeyToDimensions: expHistogramKeyToDimensionsCache, callMetricKeyToDimensions: callMetricKeyToDimensionsCache, dbCallMetricKeyToDimensions: dbMetricKeyToDimensionsCache, externalCallMetricKeyToDimensions: externalCallMetricKeyToDimensionsCache, @@ -431,7 +481,12 @@ func (p *processorImp) buildMetrics() (pmetric.Metrics, error) { return pmetric.Metrics{}, err } + if err := p.collectExpHistogramMetrics(ilm); err != nil { + return pmetric.Metrics{}, err + } + p.metricKeyToDimensions.RemoveEvictedItems() + p.expHistogramKeyToDimensions.RemoveEvictedItems() p.callMetricKeyToDimensions.RemoveEvictedItems() p.dbCallMetricKeyToDimensions.RemoveEvictedItems() p.externalCallMetricKeyToDimensions.RemoveEvictedItems() @@ -457,6 +512,12 @@ func (p *processorImp) buildMetrics() (pmetric.Metrics, error) { } } + for key := range p.expHistograms { + if !p.expHistogramKeyToDimensions.Contains(key) { + delete(p.expHistograms, key) + } + } + // If delta metrics, reset accumulated data if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta { p.resetAccumulatedMetrics() @@ -507,6 +568,31 @@ func (p *processorImp) collectLatencyMetrics(ilm pmetric.ScopeMetrics) error { return nil } +// collectExpHistogramMetrics collects the raw latency metrics, writing the data +// into the given instrumentation library metrics. +func (p *processorImp) collectExpHistogramMetrics(ilm pmetric.ScopeMetrics) error { + mExpLatency := ilm.Metrics().AppendEmpty() + mExpLatency.SetName("signoz_latency") + mExpLatency.SetUnit("ms") + mExpLatency.SetEmptyExponentialHistogram().SetAggregationTemporality(p.config.GetAggregationTemporality()) + dps := mExpLatency.ExponentialHistogram().DataPoints() + dps.EnsureCapacity(len(p.expHistograms)) + timestamp := pcommon.NewTimestampFromTime(time.Now()) + for key, hist := range p.expHistograms { + dp := dps.AppendEmpty() + dp.SetStartTimestamp(p.startTimestamp) + dp.SetTimestamp(timestamp) + expoHistToExponentialDataPoint(hist.histogram, dp) + dimensions, err := p.getDimensionsByExpHistogramKey(key) + if err != nil { + return err + } + + dimensions.CopyTo(dps.At(dps.Len() - 1).Attributes()) + } + return nil +} + // collectDBCallMetrics collects the raw latency sum and count metrics, writing the data // into the given instrumentation library metrics. func (p *processorImp) collectDBCallMetrics(ilm pmetric.ScopeMetrics) error { @@ -625,6 +711,14 @@ func (p *processorImp) getDimensionsByMetricKey(k metricKey) (pcommon.Map, error return pcommon.Map{}, fmt.Errorf("value not found in metricKeyToDimensions cache by key %q", k) } +// getDimensionsByExpHistogramKey gets dimensions from `expHistogramKeyToDimensions` cache. +func (p *processorImp) getDimensionsByExpHistogramKey(k metricKey) (pcommon.Map, error) { + if attributeMap, ok := p.expHistogramKeyToDimensions.Get(k); ok { + return attributeMap, nil + } + return pcommon.Map{}, fmt.Errorf("value not found in expHistogramKeyToDimensions cache by key %q", k) +} + // callMetricKeyToDimensions gets dimensions from `callMetricKeyToDimensions` cache. func (p *processorImp) getDimensionsByCallMetricKey(k metricKey) (pcommon.Map, error) { if attributeMap, ok := p.callMetricKeyToDimensions.Get(k); ok { @@ -792,6 +886,14 @@ func (p *processorImp) aggregateMetricsForSpan(serviceName string, span ptrace.S p.cache(serviceName, span, key, resourceAttr) p.updateHistogram(key, latencyInMilliseconds, span.TraceID(), span.SpanID()) + if p.config.EnableExpHistogram { + p.keyBuf.Reset() + buildKey(p.keyBuf, serviceName, span, p.expDimensions, resourceAttr) + expKey := metricKey(p.keyBuf.String()) + p.expHistogramCache(serviceName, span, expKey, resourceAttr) + p.updateExpHistogram(expKey, latencyInMilliseconds, span.TraceID(), span.SpanID()) + } + p.keyBuf.Reset() buildKey(p.keyBuf, serviceName, span, p.callDimensions, resourceAttr) callKey := metricKey(p.keyBuf.String()) @@ -853,11 +955,13 @@ func (p *processorImp) aggregateMetrics(traces ptrace.Traces) { // metricKeyToDimensions. func (p *processorImp) resetAccumulatedMetrics() { p.histograms = make(map[metricKey]*histogramData) + p.expHistograms = make(map[metricKey]*exponentialHistogram) p.callHistograms = make(map[metricKey]*histogramData) p.dbCallHistograms = make(map[metricKey]*histogramData) p.externalCallHistograms = make(map[metricKey]*histogramData) p.metricKeyToDimensions.Purge() + p.expHistogramKeyToDimensions.Purge() p.callMetricKeyToDimensions.Purge() p.dbCallMetricKeyToDimensions.Purge() p.externalCallMetricKeyToDimensions.Purge() @@ -881,6 +985,24 @@ func (p *processorImp) updateHistogram(key metricKey, latency float64, traceID p histo.exemplarsData = append(histo.exemplarsData, exemplarData{traceID: traceID, spanID: spanID, value: latency}) } +func (p *processorImp) updateExpHistogram(key metricKey, latency float64, traceID pcommon.TraceID, spanID pcommon.SpanID) { + histo, ok := p.expHistograms[key] + if !ok { + histogram := new(structure.Histogram[float64]) + cfg := structure.NewConfig( + structure.WithMaxSize(structure.DefaultMaxSize), + ) + histogram.Init(cfg) + + histo = &exponentialHistogram{ + histogram: histogram, + } + p.expHistograms[key] = histo + } + + histo.Observe(latency) +} + func (p *processorImp) updateCallHistogram(key metricKey, latency float64, traceID pcommon.TraceID, spanID pcommon.SpanID) { histo, ok := p.callHistograms[key] if !ok { @@ -1111,6 +1233,17 @@ func (p *processorImp) cache(serviceName string, span ptrace.Span, k metricKey, } } +// expHistogramCache caches the dimension key-value map for the metricKey if there is a cache miss. +// This enables a lookup of the dimension key-value map when constructing the metric like so: +// +// LabelsMap().InitFromMap(p.expHistogramKeyToDimensions[key]) +func (p *processorImp) expHistogramCache(serviceName string, span ptrace.Span, k metricKey, resourceAttrs pcommon.Map) { + // Use Get to ensure any existing key has its recent-ness updated. + if _, has := p.expHistogramKeyToDimensions.Get(k); !has { + p.expHistogramKeyToDimensions.Add(k, p.buildDimensionKVs(serviceName, span, p.expDimensions, resourceAttrs)) + } +} + // callCache caches the dimension key-value map for the metricKey if there is a cache miss. // This enables a lookup of the dimension key-value map when constructing the metric like so: //