Skip to content

Commit

Permalink
Add experimental support for APM latency with Exp Histogram (#288)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed Feb 13, 2024
1 parent 44c3807 commit 936dc65
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 0 deletions.
2 changes: 2 additions & 0 deletions processor/signozspanmetricsprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type Config struct {

// MetricsEmitInterval is the time period between when metrics are flushed or emitted to the configured MetricsExporter.
MetricsFlushInterval time.Duration `mapstructure:"metrics_flush_interval"`

EnableExpHistogram bool `mapstructure:"enable_exp_histogram"`
}

// GetAggregationTemporality converts the string value given in the config into a AggregationTemporality.
Expand Down
1 change: 1 addition & 0 deletions processor/signozspanmetricsprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func createDefaultConfig() component.Config {
DimensionsCacheSize: defaultDimensionsCacheSize,
skipSanitizeLabel: dropSanitizationFeatureGate.IsEnabled(),
MetricsFlushInterval: 60 * time.Second,
EnableExpHistogram: false,
}
}

Expand Down
133 changes: 133 additions & 0 deletions processor/signozspanmetricsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"
"unicode"

"github.com/lightstep/go-expohisto/structure"
"github.com/tilinna/clock"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand Down Expand Up @@ -67,6 +68,10 @@ type exemplarData struct {
value float64
}

type exponentialHistogram struct {
histogram *structure.Histogram[float64]
}

type metricKey string

type processorImp struct {
Expand All @@ -80,6 +85,7 @@ type processorImp struct {

// Additional dimensions to add to metrics.
dimensions []dimension // signoz_latency metric
expDimensions []dimension // signoz_latency exphisto metric
callDimensions []dimension // signoz_calls_total metric
dbCallDimensions []dimension // signoz_db_latency_* metric
externalCallDimensions []dimension // signoz_external_call_latency_* metric
Expand All @@ -91,6 +97,8 @@ type processorImp struct {
histograms map[metricKey]*histogramData // signoz_latency metric
latencyBounds []float64

expHistograms map[metricKey]*exponentialHistogram

callHistograms map[metricKey]*histogramData // signoz_calls_total metric
callLatencyBounds []float64

Expand All @@ -105,6 +113,7 @@ type processorImp struct {
// An LRU cache of dimension key-value maps keyed by a unique identifier formed by a concatenation of its values:
// e.g. { "foo/barOK": { "serviceName": "foo", "operation": "/bar", "status_code": "OK" }}
metricKeyToDimensions *cache.Cache[metricKey, pcommon.Map]
expHistogramKeyToDimensions *cache.Cache[metricKey, pcommon.Map]
callMetricKeyToDimensions *cache.Cache[metricKey, pcommon.Map]
dbCallMetricKeyToDimensions *cache.Cache[metricKey, pcommon.Map]
externalCallMetricKeyToDimensions *cache.Cache[metricKey, pcommon.Map]
Expand Down Expand Up @@ -146,6 +155,39 @@ type histogramData struct {
exemplarsData []exemplarData
}

func expoHistToExponentialDataPoint(agg *structure.Histogram[float64], dp pmetric.ExponentialHistogramDataPoint) {
dp.SetCount(agg.Count())
dp.SetSum(agg.Sum())
if agg.Count() != 0 {
dp.SetMin(agg.Min())
dp.SetMax(agg.Max())
}

dp.SetZeroCount(agg.ZeroCount())
dp.SetScale(agg.Scale())

for _, half := range []struct {
inFunc func() *structure.Buckets
outFunc func() pmetric.ExponentialHistogramDataPointBuckets
}{
{agg.Positive, dp.Positive},
{agg.Negative, dp.Negative},
} {
in := half.inFunc()
out := half.outFunc()
out.SetOffset(in.Offset())
out.BucketCounts().EnsureCapacity(int(in.Len()))

for i := uint32(0); i < in.Len(); i++ {
out.BucketCounts().Append(in.At(i))
}
}
}

func (h *exponentialHistogram) Observe(value float64) {
h.histogram.Update(value)
}

func newProcessor(logger *zap.Logger, instanceID string, config component.Config, ticker *clock.Ticker) (*processorImp, error) {
logger.Info("Building signozspanmetricsprocessor")
pConfig := config.(*Config)
Expand Down Expand Up @@ -185,6 +227,11 @@ func newProcessor(logger *zap.Logger, instanceID string, config component.Config
if err != nil {
return nil, err
}
expHistogramKeyToDimensionsCache, err := cache.NewCache[metricKey, pcommon.Map](pConfig.DimensionsCacheSize)
if err != nil {
return nil, err
}

callMetricKeyToDimensionsCache, err := cache.NewCache[metricKey, pcommon.Map](pConfig.DimensionsCacheSize)
if err != nil {
return nil, err
Expand All @@ -209,6 +256,7 @@ func newProcessor(logger *zap.Logger, instanceID string, config component.Config
config: *pConfig,
startTimestamp: pcommon.NewTimestampFromTime(time.Now()),
histograms: make(map[metricKey]*histogramData),
expHistograms: make(map[metricKey]*exponentialHistogram),
callHistograms: make(map[metricKey]*histogramData),
dbCallHistograms: make(map[metricKey]*histogramData),
externalCallHistograms: make(map[metricKey]*histogramData),
Expand All @@ -217,11 +265,13 @@ func newProcessor(logger *zap.Logger, instanceID string, config component.Config
dbCallLatencyBounds: bounds,
externalCallLatencyBounds: bounds,
dimensions: newDimensions(pConfig.Dimensions),
expDimensions: newDimensions(pConfig.Dimensions),
callDimensions: newDimensions(callDimensions),
dbCallDimensions: newDimensions(dbCallDimensions),
externalCallDimensions: newDimensions(externalCallDimensions),
keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
metricKeyToDimensions: metricKeyToDimensionsCache,
expHistogramKeyToDimensions: expHistogramKeyToDimensionsCache,
callMetricKeyToDimensions: callMetricKeyToDimensionsCache,
dbCallMetricKeyToDimensions: dbMetricKeyToDimensionsCache,
externalCallMetricKeyToDimensions: externalCallMetricKeyToDimensionsCache,
Expand Down Expand Up @@ -431,7 +481,12 @@ func (p *processorImp) buildMetrics() (pmetric.Metrics, error) {
return pmetric.Metrics{}, err
}

if err := p.collectExpHistogramMetrics(ilm); err != nil {
return pmetric.Metrics{}, err
}

p.metricKeyToDimensions.RemoveEvictedItems()
p.expHistogramKeyToDimensions.RemoveEvictedItems()
p.callMetricKeyToDimensions.RemoveEvictedItems()
p.dbCallMetricKeyToDimensions.RemoveEvictedItems()
p.externalCallMetricKeyToDimensions.RemoveEvictedItems()
Expand All @@ -457,6 +512,12 @@ func (p *processorImp) buildMetrics() (pmetric.Metrics, error) {
}
}

for key := range p.expHistograms {
if !p.expHistogramKeyToDimensions.Contains(key) {
delete(p.expHistograms, key)
}
}

// If delta metrics, reset accumulated data
if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
p.resetAccumulatedMetrics()
Expand Down Expand Up @@ -507,6 +568,31 @@ func (p *processorImp) collectLatencyMetrics(ilm pmetric.ScopeMetrics) error {
return nil
}

// collectExpHistogramMetrics collects the raw latency metrics, writing the data
// into the given instrumentation library metrics.
func (p *processorImp) collectExpHistogramMetrics(ilm pmetric.ScopeMetrics) error {
mExpLatency := ilm.Metrics().AppendEmpty()
mExpLatency.SetName("signoz_latency")
mExpLatency.SetUnit("ms")
mExpLatency.SetEmptyExponentialHistogram().SetAggregationTemporality(p.config.GetAggregationTemporality())
dps := mExpLatency.ExponentialHistogram().DataPoints()
dps.EnsureCapacity(len(p.expHistograms))
timestamp := pcommon.NewTimestampFromTime(time.Now())
for key, hist := range p.expHistograms {
dp := dps.AppendEmpty()
dp.SetStartTimestamp(p.startTimestamp)
dp.SetTimestamp(timestamp)
expoHistToExponentialDataPoint(hist.histogram, dp)
dimensions, err := p.getDimensionsByExpHistogramKey(key)
if err != nil {
return err
}

dimensions.CopyTo(dps.At(dps.Len() - 1).Attributes())
}
return nil
}

// collectDBCallMetrics collects the raw latency sum and count metrics, writing the data
// into the given instrumentation library metrics.
func (p *processorImp) collectDBCallMetrics(ilm pmetric.ScopeMetrics) error {
Expand Down Expand Up @@ -625,6 +711,14 @@ func (p *processorImp) getDimensionsByMetricKey(k metricKey) (pcommon.Map, error
return pcommon.Map{}, fmt.Errorf("value not found in metricKeyToDimensions cache by key %q", k)
}

// getDimensionsByExpHistogramKey gets dimensions from `expHistogramKeyToDimensions` cache.
func (p *processorImp) getDimensionsByExpHistogramKey(k metricKey) (pcommon.Map, error) {
if attributeMap, ok := p.expHistogramKeyToDimensions.Get(k); ok {
return attributeMap, nil
}
return pcommon.Map{}, fmt.Errorf("value not found in expHistogramKeyToDimensions cache by key %q", k)
}

// callMetricKeyToDimensions gets dimensions from `callMetricKeyToDimensions` cache.
func (p *processorImp) getDimensionsByCallMetricKey(k metricKey) (pcommon.Map, error) {
if attributeMap, ok := p.callMetricKeyToDimensions.Get(k); ok {
Expand Down Expand Up @@ -792,6 +886,14 @@ func (p *processorImp) aggregateMetricsForSpan(serviceName string, span ptrace.S
p.cache(serviceName, span, key, resourceAttr)
p.updateHistogram(key, latencyInMilliseconds, span.TraceID(), span.SpanID())

if p.config.EnableExpHistogram {
p.keyBuf.Reset()
buildKey(p.keyBuf, serviceName, span, p.expDimensions, resourceAttr)
expKey := metricKey(p.keyBuf.String())
p.expHistogramCache(serviceName, span, expKey, resourceAttr)
p.updateExpHistogram(expKey, latencyInMilliseconds, span.TraceID(), span.SpanID())
}

p.keyBuf.Reset()
buildKey(p.keyBuf, serviceName, span, p.callDimensions, resourceAttr)
callKey := metricKey(p.keyBuf.String())
Expand Down Expand Up @@ -853,11 +955,13 @@ func (p *processorImp) aggregateMetrics(traces ptrace.Traces) {
// metricKeyToDimensions.
func (p *processorImp) resetAccumulatedMetrics() {
p.histograms = make(map[metricKey]*histogramData)
p.expHistograms = make(map[metricKey]*exponentialHistogram)
p.callHistograms = make(map[metricKey]*histogramData)
p.dbCallHistograms = make(map[metricKey]*histogramData)
p.externalCallHistograms = make(map[metricKey]*histogramData)

p.metricKeyToDimensions.Purge()
p.expHistogramKeyToDimensions.Purge()
p.callMetricKeyToDimensions.Purge()
p.dbCallMetricKeyToDimensions.Purge()
p.externalCallMetricKeyToDimensions.Purge()
Expand All @@ -881,6 +985,24 @@ func (p *processorImp) updateHistogram(key metricKey, latency float64, traceID p
histo.exemplarsData = append(histo.exemplarsData, exemplarData{traceID: traceID, spanID: spanID, value: latency})
}

func (p *processorImp) updateExpHistogram(key metricKey, latency float64, traceID pcommon.TraceID, spanID pcommon.SpanID) {
histo, ok := p.expHistograms[key]
if !ok {
histogram := new(structure.Histogram[float64])
cfg := structure.NewConfig(
structure.WithMaxSize(structure.DefaultMaxSize),
)
histogram.Init(cfg)

histo = &exponentialHistogram{
histogram: histogram,
}
p.expHistograms[key] = histo
}

histo.Observe(latency)
}

func (p *processorImp) updateCallHistogram(key metricKey, latency float64, traceID pcommon.TraceID, spanID pcommon.SpanID) {
histo, ok := p.callHistograms[key]
if !ok {
Expand Down Expand Up @@ -1111,6 +1233,17 @@ func (p *processorImp) cache(serviceName string, span ptrace.Span, k metricKey,
}
}

// expHistogramCache caches the dimension key-value map for the metricKey if there is a cache miss.
// This enables a lookup of the dimension key-value map when constructing the metric like so:
//
// LabelsMap().InitFromMap(p.expHistogramKeyToDimensions[key])
func (p *processorImp) expHistogramCache(serviceName string, span ptrace.Span, k metricKey, resourceAttrs pcommon.Map) {
// Use Get to ensure any existing key has its recent-ness updated.
if _, has := p.expHistogramKeyToDimensions.Get(k); !has {
p.expHistogramKeyToDimensions.Add(k, p.buildDimensionKVs(serviceName, span, p.expDimensions, resourceAttrs))
}
}

// callCache caches the dimension key-value map for the metricKey if there is a cache miss.
// This enables a lookup of the dimension key-value map when constructing the metric like so:
//
Expand Down

0 comments on commit 936dc65

Please sign in to comment.