Skip to content

Commit

Permalink
Merge branch 'main' into chore/replicated-merge-tree
Browse files Browse the repository at this point in the history
  • Loading branch information
prashant-shahi committed Mar 12, 2024
2 parents 9529c14 + d5d2d80 commit 2358078
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 93 deletions.
2 changes: 1 addition & 1 deletion exporter/clickhousetracesexporter/clickhouse_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func (s *storage) pushTraceData(ctx context.Context, td ptrace.Traces) error {
}
}
}
err := s.Writer.WriteBatchOfSpans(batchOfSpans)
err := s.Writer.WriteBatchOfSpans(ctx, batchOfSpans)
if err != nil {
zap.S().Error("Error in writing spans to clickhouse: ", err)
return err
Expand Down
3 changes: 2 additions & 1 deletion exporter/clickhousetracesexporter/clickhouse_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package clickhousetracesexporter

import (
"context"
"flag"
"fmt"

Expand All @@ -39,7 +40,7 @@ type Factory struct {

// Writer writes spans to storage.
type Writer interface {
WriteBatchOfSpans(span []*Span) error
WriteBatchOfSpans(ctx context.Context, span []*Span) error
}

type writerMaker func(WriterOptions) (Writer, error)
Expand Down
28 changes: 12 additions & 16 deletions exporter/clickhousetracesexporter/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,7 @@ func NewSpanWriter(options WriterOptions) *SpanWriter {
return writer
}

func (w *SpanWriter) writeIndexBatch(batchSpans []*Span) error {

ctx := context.Background()
func (w *SpanWriter) writeIndexBatch(ctx context.Context, batchSpans []*Span) error {
var statement driver.Batch
var err error

Expand Down Expand Up @@ -164,9 +162,7 @@ func (w *SpanWriter) writeIndexBatch(batchSpans []*Span) error {
return err
}

func (w *SpanWriter) writeTagBatch(batchSpans []*Span) error {

ctx := context.Background()
func (w *SpanWriter) writeTagBatch(ctx context.Context, batchSpans []*Span) error {
var tagKeyStatement driver.Batch
var tagStatement driver.Batch
var err error
Expand Down Expand Up @@ -298,9 +294,7 @@ func (w *SpanWriter) writeTagBatch(batchSpans []*Span) error {
return err
}

func (w *SpanWriter) writeErrorBatch(batchSpans []*Span) error {

ctx := context.Background()
func (w *SpanWriter) writeErrorBatch(ctx context.Context, batchSpans []*Span) error {
var statement driver.Batch
var err error

Expand Down Expand Up @@ -357,8 +351,7 @@ func stringToBool(s string) bool {
return false
}

func (w *SpanWriter) writeModelBatch(batchSpans []*Span) error {
ctx := context.Background()
func (w *SpanWriter) writeModelBatch(ctx context.Context, batchSpans []*Span) error {
var statement driver.Batch
var err error

Expand All @@ -380,6 +373,9 @@ func (w *SpanWriter) writeModelBatch(batchSpans []*Span) error {
usageMap := span.TraceModel
usageMap.TagMap = map[string]string{}
serialized, err = json.Marshal(span.TraceModel)
if err != nil {
return err
}
serializedUsage, err := json.Marshal(usageMap)

if err != nil {
Expand Down Expand Up @@ -413,27 +409,27 @@ func (w *SpanWriter) writeModelBatch(batchSpans []*Span) error {
}

// WriteBatchOfSpans writes the encoded batch of spans
func (w *SpanWriter) WriteBatchOfSpans(batch []*Span) error {
func (w *SpanWriter) WriteBatchOfSpans(ctx context.Context, batch []*Span) error {
if w.spansTable != "" {
if err := w.writeModelBatch(batch); err != nil {
if err := w.writeModelBatch(ctx, batch); err != nil {
w.logger.Error("Could not write a batch of spans to model table: ", zap.Error(err))
return err
}
}
if w.indexTable != "" {
if err := w.writeIndexBatch(batch); err != nil {
if err := w.writeIndexBatch(ctx, batch); err != nil {
w.logger.Error("Could not write a batch of spans to index table: ", zap.Error(err))
return err
}
}
if w.errorTable != "" {
if err := w.writeErrorBatch(batch); err != nil {
if err := w.writeErrorBatch(ctx, batch); err != nil {
w.logger.Error("Could not write a batch of spans to error table: ", zap.Error(err))
return err
}
}
if w.attributeTable != "" && w.attributeKeyTable != "" {
if err := w.writeTagBatch(batch); err != nil {
if err := w.writeTagBatch(ctx, batch); err != nil {
w.logger.Error("Could not write a batch of spans to tag/tagKey tables: ", zap.Error(err))
return err
}
Expand Down
3 changes: 3 additions & 0 deletions processor/signozspanmetricsprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ type Config struct {
MetricsFlushInterval time.Duration `mapstructure:"metrics_flush_interval"`

EnableExpHistogram bool `mapstructure:"enable_exp_histogram"`

MaxServicesToTrack int `mapstructure:"max_services_to_track"`
MaxOperationsToTrackPerService int `mapstructure:"max_operations_to_track_per_service"`
}

// GetAggregationTemporality converts the string value given in the config into a AggregationTemporality.
Expand Down
62 changes: 36 additions & 26 deletions processor/signozspanmetricsprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,27 +36,33 @@ import (
func TestLoadConfig(t *testing.T) {
defaultMethod := "GET"
testcases := []struct {
configFile string
wantMetricsExporter string
wantLatencyHistogramBuckets []time.Duration
wantDimensions []Dimension
wantDimensionsCacheSize int
wantAggregationTemporality string
wantMetricsFlushInterval time.Duration
configFile string
wantMetricsExporter string
wantLatencyHistogramBuckets []time.Duration
wantDimensions []Dimension
wantDimensionsCacheSize int
wantAggregationTemporality string
wantMetricsFlushInterval time.Duration
wantMaxServicesToTrack int
wantMaxOperationsToTrackPerService int
}{
{
configFile: "config-2-pipelines.yaml",
wantMetricsExporter: "prometheus",
wantAggregationTemporality: cumulative,
wantDimensionsCacheSize: 500,
wantMetricsFlushInterval: 30 * time.Second,
configFile: "config-2-pipelines.yaml",
wantMetricsExporter: "prometheus",
wantAggregationTemporality: cumulative,
wantDimensionsCacheSize: 500,
wantMetricsFlushInterval: 30 * time.Second,
wantMaxServicesToTrack: 256,
wantMaxOperationsToTrackPerService: 2048,
},
{
configFile: "config-3-pipelines.yaml",
wantMetricsExporter: "otlp/spanmetrics",
wantAggregationTemporality: cumulative,
wantDimensionsCacheSize: defaultDimensionsCacheSize,
wantMetricsFlushInterval: 60 * time.Second,
configFile: "config-3-pipelines.yaml",
wantMetricsExporter: "otlp/spanmetrics",
wantAggregationTemporality: cumulative,
wantDimensionsCacheSize: defaultDimensionsCacheSize,
wantMetricsFlushInterval: 60 * time.Second,
wantMaxServicesToTrack: 256,
wantMaxOperationsToTrackPerService: 2048,
},
{
configFile: "config-full.yaml",
Expand All @@ -74,9 +80,11 @@ func TestLoadConfig(t *testing.T) {
{"http.method", &defaultMethod},
{"http.status_code", nil},
},
wantDimensionsCacheSize: 1500,
wantAggregationTemporality: delta,
wantMetricsFlushInterval: 60 * time.Second,
wantDimensionsCacheSize: 1500,
wantAggregationTemporality: delta,
wantMetricsFlushInterval: 60 * time.Second,
wantMaxServicesToTrack: 512,
wantMaxOperationsToTrackPerService: 69420,
},
}
for _, tc := range testcases {
Expand All @@ -103,12 +111,14 @@ func TestLoadConfig(t *testing.T) {
require.NotNil(t, cfg)
assert.Equal(t,
&Config{
MetricsExporter: tc.wantMetricsExporter,
LatencyHistogramBuckets: tc.wantLatencyHistogramBuckets,
Dimensions: tc.wantDimensions,
DimensionsCacheSize: tc.wantDimensionsCacheSize,
AggregationTemporality: tc.wantAggregationTemporality,
MetricsFlushInterval: tc.wantMetricsFlushInterval,
MetricsExporter: tc.wantMetricsExporter,
LatencyHistogramBuckets: tc.wantLatencyHistogramBuckets,
Dimensions: tc.wantDimensions,
DimensionsCacheSize: tc.wantDimensionsCacheSize,
AggregationTemporality: tc.wantAggregationTemporality,
MetricsFlushInterval: tc.wantMetricsFlushInterval,
MaxServicesToTrack: tc.wantMaxServicesToTrack,
MaxOperationsToTrackPerService: tc.wantMaxOperationsToTrackPerService,
},
cfg.Processors[component.NewID(typeStr)],
)
Expand Down
16 changes: 10 additions & 6 deletions processor/signozspanmetricsprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ const (
// The value of "type" key in configuration.
typeStr = "signozspanmetrics"
// The stability level of the processor.
stability = component.StabilityLevelBeta
stability = component.StabilityLevelBeta
maxNumberOfServicesToTrack = 256
maxNumberOfOperationsToTrackPerService = 2048
)

// NewFactory creates a factory for the spanmetrics processor.
Expand All @@ -44,11 +46,13 @@ func NewFactory() processor.Factory {

func createDefaultConfig() component.Config {
return &Config{
AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE",
DimensionsCacheSize: defaultDimensionsCacheSize,
skipSanitizeLabel: dropSanitizationFeatureGate.IsEnabled(),
MetricsFlushInterval: 60 * time.Second,
EnableExpHistogram: false,
AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE",
DimensionsCacheSize: defaultDimensionsCacheSize,
skipSanitizeLabel: dropSanitizationFeatureGate.IsEnabled(),
MetricsFlushInterval: 60 * time.Second,
EnableExpHistogram: false,
MaxServicesToTrack: maxNumberOfServicesToTrack,
MaxOperationsToTrackPerService: maxNumberOfOperationsToTrackPerService,
}
}

Expand Down
Loading

0 comments on commit 2358078

Please sign in to comment.