diff --git a/exporter/clickhousetracesexporter/clickhouse_exporter.go b/exporter/clickhousetracesexporter/clickhouse_exporter.go index 38587981..91818af8 100644 --- a/exporter/clickhousetracesexporter/clickhouse_exporter.go +++ b/exporter/clickhousetracesexporter/clickhouse_exporter.go @@ -421,7 +421,7 @@ func (s *storage) pushTraceData(ctx context.Context, td ptrace.Traces) error { } } } - err := s.Writer.WriteBatchOfSpans(batchOfSpans) + err := s.Writer.WriteBatchOfSpans(ctx, batchOfSpans) if err != nil { zap.S().Error("Error in writing spans to clickhouse: ", err) return err diff --git a/exporter/clickhousetracesexporter/clickhouse_factory.go b/exporter/clickhousetracesexporter/clickhouse_factory.go index 2eeff059..d110c4d6 100644 --- a/exporter/clickhousetracesexporter/clickhouse_factory.go +++ b/exporter/clickhousetracesexporter/clickhouse_factory.go @@ -15,6 +15,7 @@ package clickhousetracesexporter import ( + "context" "flag" "fmt" @@ -39,7 +40,7 @@ type Factory struct { // Writer writes spans to storage. type Writer interface { - WriteBatchOfSpans(span []*Span) error + WriteBatchOfSpans(ctx context.Context, span []*Span) error } type writerMaker func(WriterOptions) (Writer, error) diff --git a/exporter/clickhousetracesexporter/writer.go b/exporter/clickhousetracesexporter/writer.go index c5a2d67e..e92c0e70 100644 --- a/exporter/clickhousetracesexporter/writer.go +++ b/exporter/clickhousetracesexporter/writer.go @@ -90,9 +90,7 @@ func NewSpanWriter(options WriterOptions) *SpanWriter { return writer } -func (w *SpanWriter) writeIndexBatch(batchSpans []*Span) error { - - ctx := context.Background() +func (w *SpanWriter) writeIndexBatch(ctx context.Context, batchSpans []*Span) error { var statement driver.Batch var err error @@ -164,9 +162,7 @@ func (w *SpanWriter) writeIndexBatch(batchSpans []*Span) error { return err } -func (w *SpanWriter) writeTagBatch(batchSpans []*Span) error { - - ctx := context.Background() +func (w *SpanWriter) writeTagBatch(ctx context.Context, batchSpans []*Span) error { var tagKeyStatement driver.Batch var tagStatement driver.Batch var err error @@ -298,9 +294,7 @@ func (w *SpanWriter) writeTagBatch(batchSpans []*Span) error { return err } -func (w *SpanWriter) writeErrorBatch(batchSpans []*Span) error { - - ctx := context.Background() +func (w *SpanWriter) writeErrorBatch(ctx context.Context, batchSpans []*Span) error { var statement driver.Batch var err error @@ -357,8 +351,7 @@ func stringToBool(s string) bool { return false } -func (w *SpanWriter) writeModelBatch(batchSpans []*Span) error { - ctx := context.Background() +func (w *SpanWriter) writeModelBatch(ctx context.Context, batchSpans []*Span) error { var statement driver.Batch var err error @@ -380,6 +373,9 @@ func (w *SpanWriter) writeModelBatch(batchSpans []*Span) error { usageMap := span.TraceModel usageMap.TagMap = map[string]string{} serialized, err = json.Marshal(span.TraceModel) + if err != nil { + return err + } serializedUsage, err := json.Marshal(usageMap) if err != nil { @@ -413,27 +409,27 @@ func (w *SpanWriter) writeModelBatch(batchSpans []*Span) error { } // WriteBatchOfSpans writes the encoded batch of spans -func (w *SpanWriter) WriteBatchOfSpans(batch []*Span) error { +func (w *SpanWriter) WriteBatchOfSpans(ctx context.Context, batch []*Span) error { if w.spansTable != "" { - if err := w.writeModelBatch(batch); err != nil { + if err := w.writeModelBatch(ctx, batch); err != nil { w.logger.Error("Could not write a batch of spans to model table: ", zap.Error(err)) return err } } if w.indexTable != "" { - if err := w.writeIndexBatch(batch); err != nil { + if err := w.writeIndexBatch(ctx, batch); err != nil { w.logger.Error("Could not write a batch of spans to index table: ", zap.Error(err)) return err } } if w.errorTable != "" { - if err := w.writeErrorBatch(batch); err != nil { + if err := w.writeErrorBatch(ctx, batch); err != nil { w.logger.Error("Could not write a batch of spans to error table: ", zap.Error(err)) return err } } if w.attributeTable != "" && w.attributeKeyTable != "" { - if err := w.writeTagBatch(batch); err != nil { + if err := w.writeTagBatch(ctx, batch); err != nil { w.logger.Error("Could not write a batch of spans to tag/tagKey tables: ", zap.Error(err)) return err } diff --git a/processor/signozspanmetricsprocessor/config.go b/processor/signozspanmetricsprocessor/config.go index 7594b35c..af4c06d2 100644 --- a/processor/signozspanmetricsprocessor/config.go +++ b/processor/signozspanmetricsprocessor/config.go @@ -85,6 +85,9 @@ type Config struct { MetricsFlushInterval time.Duration `mapstructure:"metrics_flush_interval"` EnableExpHistogram bool `mapstructure:"enable_exp_histogram"` + + MaxServicesToTrack int `mapstructure:"max_services_to_track"` + MaxOperationsToTrackPerService int `mapstructure:"max_operations_to_track_per_service"` } // GetAggregationTemporality converts the string value given in the config into a AggregationTemporality. diff --git a/processor/signozspanmetricsprocessor/config_test.go b/processor/signozspanmetricsprocessor/config_test.go index db70f28d..3d519eaf 100644 --- a/processor/signozspanmetricsprocessor/config_test.go +++ b/processor/signozspanmetricsprocessor/config_test.go @@ -36,27 +36,33 @@ import ( func TestLoadConfig(t *testing.T) { defaultMethod := "GET" testcases := []struct { - configFile string - wantMetricsExporter string - wantLatencyHistogramBuckets []time.Duration - wantDimensions []Dimension - wantDimensionsCacheSize int - wantAggregationTemporality string - wantMetricsFlushInterval time.Duration + configFile string + wantMetricsExporter string + wantLatencyHistogramBuckets []time.Duration + wantDimensions []Dimension + wantDimensionsCacheSize int + wantAggregationTemporality string + wantMetricsFlushInterval time.Duration + wantMaxServicesToTrack int + wantMaxOperationsToTrackPerService int }{ { - configFile: "config-2-pipelines.yaml", - wantMetricsExporter: "prometheus", - wantAggregationTemporality: cumulative, - wantDimensionsCacheSize: 500, - wantMetricsFlushInterval: 30 * time.Second, + configFile: "config-2-pipelines.yaml", + wantMetricsExporter: "prometheus", + wantAggregationTemporality: cumulative, + wantDimensionsCacheSize: 500, + wantMetricsFlushInterval: 30 * time.Second, + wantMaxServicesToTrack: 256, + wantMaxOperationsToTrackPerService: 2048, }, { - configFile: "config-3-pipelines.yaml", - wantMetricsExporter: "otlp/spanmetrics", - wantAggregationTemporality: cumulative, - wantDimensionsCacheSize: defaultDimensionsCacheSize, - wantMetricsFlushInterval: 60 * time.Second, + configFile: "config-3-pipelines.yaml", + wantMetricsExporter: "otlp/spanmetrics", + wantAggregationTemporality: cumulative, + wantDimensionsCacheSize: defaultDimensionsCacheSize, + wantMetricsFlushInterval: 60 * time.Second, + wantMaxServicesToTrack: 256, + wantMaxOperationsToTrackPerService: 2048, }, { configFile: "config-full.yaml", @@ -74,9 +80,11 @@ func TestLoadConfig(t *testing.T) { {"http.method", &defaultMethod}, {"http.status_code", nil}, }, - wantDimensionsCacheSize: 1500, - wantAggregationTemporality: delta, - wantMetricsFlushInterval: 60 * time.Second, + wantDimensionsCacheSize: 1500, + wantAggregationTemporality: delta, + wantMetricsFlushInterval: 60 * time.Second, + wantMaxServicesToTrack: 512, + wantMaxOperationsToTrackPerService: 69420, }, } for _, tc := range testcases { @@ -103,12 +111,14 @@ func TestLoadConfig(t *testing.T) { require.NotNil(t, cfg) assert.Equal(t, &Config{ - MetricsExporter: tc.wantMetricsExporter, - LatencyHistogramBuckets: tc.wantLatencyHistogramBuckets, - Dimensions: tc.wantDimensions, - DimensionsCacheSize: tc.wantDimensionsCacheSize, - AggregationTemporality: tc.wantAggregationTemporality, - MetricsFlushInterval: tc.wantMetricsFlushInterval, + MetricsExporter: tc.wantMetricsExporter, + LatencyHistogramBuckets: tc.wantLatencyHistogramBuckets, + Dimensions: tc.wantDimensions, + DimensionsCacheSize: tc.wantDimensionsCacheSize, + AggregationTemporality: tc.wantAggregationTemporality, + MetricsFlushInterval: tc.wantMetricsFlushInterval, + MaxServicesToTrack: tc.wantMaxServicesToTrack, + MaxOperationsToTrackPerService: tc.wantMaxOperationsToTrackPerService, }, cfg.Processors[component.NewID(typeStr)], ) diff --git a/processor/signozspanmetricsprocessor/factory.go b/processor/signozspanmetricsprocessor/factory.go index a3927b05..421f070c 100644 --- a/processor/signozspanmetricsprocessor/factory.go +++ b/processor/signozspanmetricsprocessor/factory.go @@ -30,7 +30,9 @@ const ( // The value of "type" key in configuration. typeStr = "signozspanmetrics" // The stability level of the processor. - stability = component.StabilityLevelBeta + stability = component.StabilityLevelBeta + maxNumberOfServicesToTrack = 256 + maxNumberOfOperationsToTrackPerService = 2048 ) // NewFactory creates a factory for the spanmetrics processor. @@ -44,11 +46,13 @@ func NewFactory() processor.Factory { func createDefaultConfig() component.Config { return &Config{ - AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE", - DimensionsCacheSize: defaultDimensionsCacheSize, - skipSanitizeLabel: dropSanitizationFeatureGate.IsEnabled(), - MetricsFlushInterval: 60 * time.Second, - EnableExpHistogram: false, + AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE", + DimensionsCacheSize: defaultDimensionsCacheSize, + skipSanitizeLabel: dropSanitizationFeatureGate.IsEnabled(), + MetricsFlushInterval: 60 * time.Second, + EnableExpHistogram: false, + MaxServicesToTrack: maxNumberOfServicesToTrack, + MaxOperationsToTrackPerService: maxNumberOfOperationsToTrackPerService, } } diff --git a/processor/signozspanmetricsprocessor/processor.go b/processor/signozspanmetricsprocessor/processor.go index e3f24774..29cc04f6 100644 --- a/processor/signozspanmetricsprocessor/processor.go +++ b/processor/signozspanmetricsprocessor/processor.go @@ -54,6 +54,8 @@ const ( defaultDimensionsCacheSize = 1000 resourcePrefix = "resource_" + overflowServiceName = "overflow_service" + overflowOperation = "overflow_operation" ) var ( @@ -126,6 +128,10 @@ type processorImp struct { started bool shutdownOnce sync.Once + + serviceToOperations map[string]map[string]struct{} + maxNumberOfServicesToTrack int + maxNumberOfOperationsToTrackPerService int } type dimension struct { @@ -189,7 +195,7 @@ func (h *exponentialHistogram) Observe(value float64) { } func newProcessor(logger *zap.Logger, instanceID string, config component.Config, ticker *clock.Ticker) (*processorImp, error) { - logger.Info("Building signozspanmetricsprocessor") + logger.Info("Building signozspanmetricsprocessor with config", zap.Any("config", config)) pConfig := config.(*Config) bounds := defaultLatencyHistogramBucketsMs @@ -251,34 +257,37 @@ func newProcessor(logger *zap.Logger, instanceID string, config component.Config } return &processorImp{ - logger: logger, - instanceID: instanceID, - config: *pConfig, - startTimestamp: pcommon.NewTimestampFromTime(time.Now()), - histograms: make(map[metricKey]*histogramData), - expHistograms: make(map[metricKey]*exponentialHistogram), - callHistograms: make(map[metricKey]*histogramData), - dbCallHistograms: make(map[metricKey]*histogramData), - externalCallHistograms: make(map[metricKey]*histogramData), - latencyBounds: bounds, - callLatencyBounds: bounds, - dbCallLatencyBounds: bounds, - externalCallLatencyBounds: bounds, - dimensions: newDimensions(pConfig.Dimensions), - expDimensions: newDimensions(pConfig.Dimensions), - callDimensions: newDimensions(callDimensions), - dbCallDimensions: newDimensions(dbCallDimensions), - externalCallDimensions: newDimensions(externalCallDimensions), - keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)), - metricKeyToDimensions: metricKeyToDimensionsCache, - expHistogramKeyToDimensions: expHistogramKeyToDimensionsCache, - callMetricKeyToDimensions: callMetricKeyToDimensionsCache, - dbCallMetricKeyToDimensions: dbMetricKeyToDimensionsCache, - externalCallMetricKeyToDimensions: externalCallMetricKeyToDimensionsCache, - attrsCardinality: make(map[string]map[string]struct{}), - excludePatternRegex: excludePatternRegex, - ticker: ticker, - done: make(chan struct{}), + logger: logger, + instanceID: instanceID, + config: *pConfig, + startTimestamp: pcommon.NewTimestampFromTime(time.Now()), + histograms: make(map[metricKey]*histogramData), + expHistograms: make(map[metricKey]*exponentialHistogram), + callHistograms: make(map[metricKey]*histogramData), + dbCallHistograms: make(map[metricKey]*histogramData), + externalCallHistograms: make(map[metricKey]*histogramData), + latencyBounds: bounds, + callLatencyBounds: bounds, + dbCallLatencyBounds: bounds, + externalCallLatencyBounds: bounds, + dimensions: newDimensions(pConfig.Dimensions), + expDimensions: newDimensions(pConfig.Dimensions), + callDimensions: newDimensions(callDimensions), + dbCallDimensions: newDimensions(dbCallDimensions), + externalCallDimensions: newDimensions(externalCallDimensions), + keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)), + metricKeyToDimensions: metricKeyToDimensionsCache, + expHistogramKeyToDimensions: expHistogramKeyToDimensionsCache, + callMetricKeyToDimensions: callMetricKeyToDimensionsCache, + dbCallMetricKeyToDimensions: dbMetricKeyToDimensionsCache, + externalCallMetricKeyToDimensions: externalCallMetricKeyToDimensionsCache, + attrsCardinality: make(map[string]map[string]struct{}), + excludePatternRegex: excludePatternRegex, + ticker: ticker, + done: make(chan struct{}), + serviceToOperations: make(map[string]map[string]struct{}), + maxNumberOfServicesToTrack: pConfig.MaxServicesToTrack, + maxNumberOfOperationsToTrackPerService: pConfig.MaxOperationsToTrackPerService, }, nil } @@ -881,21 +890,21 @@ func (p *processorImp) aggregateMetricsForSpan(serviceName string, span ptrace.S } // Always reset the buffer before re-using. p.keyBuf.Reset() - buildKey(p.keyBuf, serviceName, span, p.dimensions, resourceAttr) + p.buildKey(p.keyBuf, serviceName, span, p.dimensions, resourceAttr) key := metricKey(p.keyBuf.String()) p.cache(serviceName, span, key, resourceAttr) p.updateHistogram(key, latencyInMilliseconds, span.TraceID(), span.SpanID()) if p.config.EnableExpHistogram { p.keyBuf.Reset() - buildKey(p.keyBuf, serviceName, span, p.expDimensions, resourceAttr) + p.buildKey(p.keyBuf, serviceName, span, p.expDimensions, resourceAttr) expKey := metricKey(p.keyBuf.String()) p.expHistogramCache(serviceName, span, expKey, resourceAttr) p.updateExpHistogram(expKey, latencyInMilliseconds, span.TraceID(), span.SpanID()) } p.keyBuf.Reset() - buildKey(p.keyBuf, serviceName, span, p.callDimensions, resourceAttr) + p.buildKey(p.keyBuf, serviceName, span, p.callDimensions, resourceAttr) callKey := metricKey(p.keyBuf.String()) p.callCache(serviceName, span, callKey, resourceAttr) p.updateCallHistogram(callKey, latencyInMilliseconds, span.TraceID(), span.SpanID()) @@ -1073,10 +1082,19 @@ func (p *processorImp) resetExemplarData() { } func (p *processorImp) buildDimensionKVs(serviceName string, span ptrace.Span, optionalDims []dimension, resourceAttrs pcommon.Map) pcommon.Map { + + spanName := span.Name() + if len(p.serviceToOperations) > p.maxNumberOfServicesToTrack { + serviceName = overflowServiceName + } + if len(p.serviceToOperations[serviceName]) > p.maxNumberOfOperationsToTrackPerService { + spanName = overflowOperation + } + dims := pcommon.NewMap() dims.EnsureCapacity(4 + len(optionalDims)) dims.PutStr(serviceNameKey, serviceName) - dims.PutStr(operationKey, span.Name()) + dims.PutStr(operationKey, spanName) dims.PutStr(spanKindKey, SpanKindStr(span.Kind())) dims.PutStr(statusCodeKey, StatusCodeStr(span.Status().Code())) for _, d := range optionalDims { @@ -1138,9 +1156,18 @@ func concatDimensionValue(dest *bytes.Buffer, value string, prefixSep bool) { // or resource attributes. If the dimension exists in both, the span's attributes, being the most specific, takes precedence. // // The metric key is a simple concatenation of dimension values, delimited by a null character. -func buildKey(dest *bytes.Buffer, serviceName string, span ptrace.Span, optionalDims []dimension, resourceAttrs pcommon.Map) { +func (p *processorImp) buildKey(dest *bytes.Buffer, serviceName string, span ptrace.Span, optionalDims []dimension, resourceAttrs pcommon.Map) { + spanName := span.Name() + if len(p.serviceToOperations) > p.maxNumberOfServicesToTrack { + p.logger.Warn("Too many services to track, using overflow service name", zap.Int("maxNumberOfServicesToTrack", p.maxNumberOfServicesToTrack)) + serviceName = overflowServiceName + } + if len(p.serviceToOperations[serviceName]) > p.maxNumberOfOperationsToTrackPerService { + p.logger.Warn("Too many operations to track, using overflow operation name", zap.Int("maxNumberOfOperationsToTrackPerService", p.maxNumberOfOperationsToTrackPerService)) + spanName = overflowOperation + } concatDimensionValue(dest, serviceName, false) - concatDimensionValue(dest, span.Name(), true) + concatDimensionValue(dest, spanName, true) concatDimensionValue(dest, SpanKindStr(span.Kind()), true) concatDimensionValue(dest, StatusCodeStr(span.Status().Code()), true) @@ -1149,6 +1176,11 @@ func buildKey(dest *bytes.Buffer, serviceName string, span ptrace.Span, optional concatDimensionValue(dest, v.AsString(), true) } } + if _, ok := p.serviceToOperations[serviceName]; !ok { + p.serviceToOperations[serviceName] = make(map[string]struct{}) + } + p.serviceToOperations[serviceName][spanName] = struct{}{} + } // buildCustomKey builds the metric key from the service name and span metadata such as operation, kind, status_code and diff --git a/processor/signozspanmetricsprocessor/processor_test.go b/processor/signozspanmetricsprocessor/processor_test.go index f2e7829b..2a77a823 100644 --- a/processor/signozspanmetricsprocessor/processor_test.go +++ b/processor/signozspanmetricsprocessor/processor_test.go @@ -411,9 +411,9 @@ func BenchmarkProcessorConsumeTraces(b *testing.B) { func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, defaultNullValue *pcommon.Value, temporality string, logger *zap.Logger, excludePatterns []ExcludePattern) *processorImp { defaultNotInSpanAttrVal := pcommon.NewValueStr("defaultNotInSpanAttrVal") // use size 2 for LRU cache for testing purpose - metricKeyToDimensions, err := cache.NewCache[metricKey, pcommon.Map](DimensionsCacheSize) - callMetricKeyToDimensions, err := cache.NewCache[metricKey, pcommon.Map](DimensionsCacheSize) - dbCallMetricKeyToDimensions, err := cache.NewCache[metricKey, pcommon.Map](DimensionsCacheSize) + metricKeyToDimensions, _ := cache.NewCache[metricKey, pcommon.Map](DimensionsCacheSize) + callMetricKeyToDimensions, _ := cache.NewCache[metricKey, pcommon.Map](DimensionsCacheSize) + dbCallMetricKeyToDimensions, _ := cache.NewCache[metricKey, pcommon.Map](DimensionsCacheSize) externalCallMetricKeyToDimensions, err := cache.NewCache[metricKey, pcommon.Map](DimensionsCacheSize) defaultDimensions := []dimension{ @@ -503,8 +503,11 @@ func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, de dbCallMetricKeyToDimensions: dbCallMetricKeyToDimensions, externalCallMetricKeyToDimensions: externalCallMetricKeyToDimensions, - attrsCardinality: make(map[string]map[string]struct{}), - excludePatternRegex: excludePatternRegex, + attrsCardinality: make(map[string]map[string]struct{}), + serviceToOperations: make(map[string]map[string]struct{}), + maxNumberOfServicesToTrack: maxNumberOfServicesToTrack, + maxNumberOfOperationsToTrackPerService: maxNumberOfOperationsToTrackPerService, + excludePatternRegex: excludePatternRegex, } } @@ -749,15 +752,27 @@ func newOTLPExporters(t *testing.T) (component.ID, exporter.Metrics, exporter.Tr } func TestBuildKeySameServiceOperationCharSequence(t *testing.T) { + + // Prepare + mexp := &mocks.MetricsExporter{} + tcon := &mocks.TracesConsumer{} + + // Mocked metric exporter will perform validation on metrics, during p.ConsumeTraces() + mexp.On("ConsumeMetrics", mock.Anything, mock.Anything).Return(nil) + tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) + + defaultNullValue := pcommon.NewValueStr("defaultNullValue") + p := newProcessorImp(mexp, tcon, &defaultNullValue, pmetric.AggregationTemporalityCumulative.String(), zaptest.NewLogger(t), []ExcludePattern{}) + span0 := ptrace.NewSpan() span0.SetName("c") buf := &bytes.Buffer{} - buildKey(buf, "ab", span0, nil, pcommon.NewMap()) + p.buildKey(buf, "ab", span0, nil, pcommon.NewMap()) k0 := metricKey(buf.String()) buf.Reset() span1 := ptrace.NewSpan() span1.SetName("bc") - buildKey(buf, "a", span1, nil, pcommon.NewMap()) + p.buildKey(buf, "a", span1, nil, pcommon.NewMap()) k1 := metricKey(buf.String()) assert.NotEqual(t, k0, k1) assert.Equal(t, metricKey("ab\u0000c\u0000SPAN_KIND_UNSPECIFIED\u0000STATUS_CODE_UNSET"), k0) @@ -765,6 +780,17 @@ func TestBuildKeySameServiceOperationCharSequence(t *testing.T) { } func TestBuildKeyWithDimensions(t *testing.T) { + // Prepare + mexp := &mocks.MetricsExporter{} + tcon := &mocks.TracesConsumer{} + + // Mocked metric exporter will perform validation on metrics, during p.ConsumeTraces() + mexp.On("ConsumeMetrics", mock.Anything, mock.Anything).Return(nil) + tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) + + defaultNullValue := pcommon.NewValueStr("defaultNullValue") + p := newProcessorImp(mexp, tcon, &defaultNullValue, pmetric.AggregationTemporalityCumulative.String(), zaptest.NewLogger(t), []ExcludePattern{}) + defaultFoo := pcommon.NewValueStr("bar") for _, tc := range []struct { name string @@ -852,7 +878,7 @@ func TestBuildKeyWithDimensions(t *testing.T) { assert.NoError(t, span0.Attributes().FromRaw(tc.spanAttrMap)) span0.SetName("c") buf := &bytes.Buffer{} - buildKey(buf, "ab", span0, tc.optionalDims, resAttr) + p.buildKey(buf, "ab", span0, tc.optionalDims, resAttr) assert.Equal(t, tc.wantKey, buf.String()) }) } @@ -1011,3 +1037,71 @@ func TestProcessorUpdateExemplars(t *testing.T) { assert.NoError(t, err) assert.Empty(t, p.histograms[key].exemplarsData) } + +func TestBuildKeyWithDimensionsOverflow(t *testing.T) { + // Prepare + mexp := &mocks.MetricsExporter{} + tcon := &mocks.TracesConsumer{} + + // Mocked metric exporter will perform validation on metrics, during p.ConsumeTraces() + mexp.On("ConsumeMetrics", mock.Anything, mock.Anything).Return(nil) + tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) + + observedZapCore, observedLogs := observer.New(zap.DebugLevel) + observedLogger := zap.New(observedZapCore) + + defaultNullValue := pcommon.NewValueStr("defaultNullValue") + p := newProcessorImp(mexp, tcon, &defaultNullValue, pmetric.AggregationTemporalityCumulative.String(), observedLogger, []ExcludePattern{}) + resAttr := pcommon.NewMap() + + for i := 0; i <= p.maxNumberOfServicesToTrack; i++ { + span0 := ptrace.NewSpan() + span0.SetName("span") + buf := &bytes.Buffer{} + serviceName := fmt.Sprintf("service-%d", i) + p.buildKey(buf, serviceName, span0, []dimension{}, resAttr) + } + + // adding new service should result in overflow + span0 := ptrace.NewSpan() + span0.SetName("span") + buf := &bytes.Buffer{} + serviceName := fmt.Sprintf("service-%d", p.maxNumberOfServicesToTrack) + p.buildKey(buf, serviceName, span0, []dimension{}, resAttr) + + assert.Contains(t, buf.String(), overflowServiceName) + + found := false + for _, log := range observedLogs.All() { + if strings.Contains(log.Message, "Too many services to track, using overflow service name") { + found = true + } + } + assert.True(t, found) + + // reset to test operations + p.serviceToOperations = make(map[string]map[string]struct{}) + p.buildKey(buf, "simple_service", span0, []dimension{}, resAttr) + + for i := 0; i <= p.maxNumberOfOperationsToTrackPerService; i++ { + span0 := ptrace.NewSpan() + span0.SetName(fmt.Sprintf("operation-%d", i)) + buf := &bytes.Buffer{} + p.buildKey(buf, "simple_service", span0, []dimension{}, resAttr) + } + + // adding a new operation to service "simple_service" should result in overflow + span0 = ptrace.NewSpan() + span0.SetName(fmt.Sprintf("operation-%d", p.maxNumberOfOperationsToTrackPerService)) + buf = &bytes.Buffer{} + p.buildKey(buf, "simple_service", span0, []dimension{}, resAttr) + assert.Contains(t, buf.String(), overflowOperation) + + found = false + for _, log := range observedLogs.All() { + if strings.Contains(log.Message, "Too many operations to track, using overflow operation name") { + found = true + } + } + assert.True(t, found) +} diff --git a/processor/signozspanmetricsprocessor/testdata/config-full.yaml b/processor/signozspanmetricsprocessor/testdata/config-full.yaml index cf4a9b67..96630e76 100644 --- a/processor/signozspanmetricsprocessor/testdata/config-full.yaml +++ b/processor/signozspanmetricsprocessor/testdata/config-full.yaml @@ -38,6 +38,9 @@ processors: latency_histogram_buckets: [100us, 1ms, 2ms, 6ms, 10ms, 100ms, 250ms] dimensions_cache_size: 1500 + max_services_to_track: 512 + max_operations_to_track_per_service: 69420 + # Additional list of dimensions on top of: # - service.name # - operation