diff --git a/components/components.go b/components/components.go index 6b784ad5..06eb0720 100644 --- a/components/components.go +++ b/components/components.go @@ -117,6 +117,7 @@ import ( "github.com/SigNoz/signoz-otel-collector/exporter/clickhousetracesexporter" "github.com/SigNoz/signoz-otel-collector/exporter/signozkafkaexporter" _ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok" + "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor" "github.com/SigNoz/signoz-otel-collector/processor/signozspanmetricsprocessor" "github.com/SigNoz/signoz-otel-collector/processor/signoztailsampler" "github.com/SigNoz/signoz-otel-collector/processor/signoztransformprocessor" @@ -255,6 +256,7 @@ func Components() (otelcol.Factories, error) { logstransformprocessor.NewFactory(), signoztailsampler.NewFactory(), signoztransformprocessor.NewFactory(), + signozlogspipelineprocessor.NewFactory(), } for _, pr := range factories.Processors { processors = append(processors, pr) diff --git a/processor/signozlogspipelineprocessor/README.md b/processor/signozlogspipelineprocessor/README.md new file mode 100644 index 00000000..6ab68bd7 --- /dev/null +++ b/processor/signozlogspipelineprocessor/README.md @@ -0,0 +1,20 @@ +# Signoz Logs Pipeline Processor + +An otel processor for powering SigNoz logs pipelines. + +The goal of this processor is to achieve a seamless logs pipelines experience for SigNoz users +while exposing the same interface as the logstransform processor in opentelemetry-collector-contrib + +The implementation mostly borrows from, adapts and enhances the official logstransform processor (and stanza) +code to achieve goals of SigNoz pipelines + +On success, query service should be able to add logs pipelines to otel collector config using +this processor without having to do much translation or augmentation of the user defined config + +The following non-exhaustive list of capabilities/improvements on top of what logstransform provides are included +- sync ConsumeLogs i.e. ConsumeLogs should only return after the ConsumeLogs of the next processor in the pipeline has returned. + This helps with reliability of logs processing through the collector. + The otel-collector-contrib logstransform processor doesn't have a sync implementation for ConsumeLogs +- Enhanced addressing capabilities. + - should be able to refer to top level otlp log fields (like severity) in operator conditions + - should be able to use body.field in operator config if body is JSON without having to first parse body into an temporary attribute \ No newline at end of file diff --git a/processor/signozlogspipelineprocessor/config.go b/processor/signozlogspipelineprocessor/config.go new file mode 100644 index 00000000..d7819703 --- /dev/null +++ b/processor/signozlogspipelineprocessor/config.go @@ -0,0 +1,23 @@ +// Brought in as is from logstransform processor in opentelemetry-collector-contrib +package signozlogspipelineprocessor + +import ( + "errors" + + "go.opentelemetry.io/collector/component" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" +) + +type Config struct { + adapter.BaseConfig `mapstructure:",squash"` +} + +var _ component.Config = (*Config)(nil) + +func (cfg *Config) Validate() error { + if len(cfg.BaseConfig.Operators) == 0 { + return errors.New("no operators were configured for signozlogspipeline processor") + } + return nil +} diff --git a/processor/signozlogspipelineprocessor/config_test.go b/processor/signozlogspipelineprocessor/config_test.go new file mode 100644 index 00000000..29c6cda6 --- /dev/null +++ b/processor/signozlogspipelineprocessor/config_test.go @@ -0,0 +1,48 @@ +// Brought in as is from logstransform processor in opentelemetry-collector-contrib +package signozlogspipelineprocessor + +import ( + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap/confmaptest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex" +) + +func TestLoadConfig(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + assert.NoError(t, component.UnmarshalConfig(cm, cfg)) + assert.Equal(t, &Config{ + BaseConfig: adapter.BaseConfig{ + Operators: []operator.Config{ + { + Builder: func() *regex.Config { + cfg := regex.NewConfig() + cfg.Regex = "^(?P