Skip to content

Commit

Permalink
Signozlogspipelineprocessor: an otel processor dedicated to signoz lo…
Browse files Browse the repository at this point in the history
…gs pipelines (#386)

contributes to SigNoz/signoz#5894 and
SigNoz/signoz#4444
  • Loading branch information
raj-k-singh committed Sep 13, 2024
1 parent 7b1965b commit 9bac5fc
Show file tree
Hide file tree
Showing 11 changed files with 1,120 additions and 0 deletions.
2 changes: 2 additions & 0 deletions components/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ import (
"github.com/SigNoz/signoz-otel-collector/exporter/clickhousetracesexporter"
"github.com/SigNoz/signoz-otel-collector/exporter/signozkafkaexporter"
_ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok"
"github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor"
"github.com/SigNoz/signoz-otel-collector/processor/signozspanmetricsprocessor"
"github.com/SigNoz/signoz-otel-collector/processor/signoztailsampler"
"github.com/SigNoz/signoz-otel-collector/processor/signoztransformprocessor"
Expand Down Expand Up @@ -255,6 +256,7 @@ func Components() (otelcol.Factories, error) {
logstransformprocessor.NewFactory(),
signoztailsampler.NewFactory(),
signoztransformprocessor.NewFactory(),
signozlogspipelineprocessor.NewFactory(),
}
for _, pr := range factories.Processors {
processors = append(processors, pr)
Expand Down
20 changes: 20 additions & 0 deletions processor/signozlogspipelineprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Signoz Logs Pipeline Processor

An otel processor for powering SigNoz logs pipelines.

The goal of this processor is to achieve a seamless logs pipelines experience for SigNoz users
while exposing the same interface as the logstransform processor in opentelemetry-collector-contrib

The implementation mostly borrows from, adapts and enhances the official logstransform processor (and stanza)
code to achieve goals of SigNoz pipelines

On success, query service should be able to add logs pipelines to otel collector config using
this processor without having to do much translation or augmentation of the user defined config

The following non-exhaustive list of capabilities/improvements on top of what logstransform provides are included
- sync ConsumeLogs i.e. ConsumeLogs should only return after the ConsumeLogs of the next processor in the pipeline has returned.
This helps with reliability of logs processing through the collector.
The otel-collector-contrib logstransform processor doesn't have a sync implementation for ConsumeLogs
- Enhanced addressing capabilities.
- should be able to refer to top level otlp log fields (like severity) in operator conditions
- should be able to use body.field in operator config if body is JSON without having to first parse body into an temporary attribute
23 changes: 23 additions & 0 deletions processor/signozlogspipelineprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Brought in as is from logstransform processor in opentelemetry-collector-contrib
package signozlogspipelineprocessor

import (
"errors"

"go.opentelemetry.io/collector/component"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
)

type Config struct {
adapter.BaseConfig `mapstructure:",squash"`
}

var _ component.Config = (*Config)(nil)

func (cfg *Config) Validate() error {
if len(cfg.BaseConfig.Operators) == 0 {
return errors.New("no operators were configured for signozlogspipeline processor")
}
return nil
}
48 changes: 48 additions & 0 deletions processor/signozlogspipelineprocessor/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Brought in as is from logstransform processor in opentelemetry-collector-contrib
package signozlogspipelineprocessor

import (
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex"
)

func TestLoadConfig(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NoError(t, component.UnmarshalConfig(cm, cfg))
assert.Equal(t, &Config{
BaseConfig: adapter.BaseConfig{
Operators: []operator.Config{
{
Builder: func() *regex.Config {
cfg := regex.NewConfig()
cfg.Regex = "^(?P<time>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$"
sevField := entry.NewAttributeField("sev")
sevCfg := helper.NewSeverityConfig()
sevCfg.ParseFrom = &sevField
cfg.SeverityConfig = &sevCfg
timeField := entry.NewAttributeField("time")
timeCfg := helper.NewTimeParser()
timeCfg.Layout = "%Y-%m-%d %H:%M:%S"
timeCfg.ParseFrom = &timeField
cfg.TimeParser = &timeCfg
return cfg
}(),
},
},
},
}, cfg)
}
48 changes: 48 additions & 0 deletions processor/signozlogspipelineprocessor/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Brought in as is from logstransform processor in opentelemetry-collector-contrib
// with identifiers changed for the new processor
package signozlogspipelineprocessor

import (
"context"
"errors"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)

func NewFactory() processor.Factory {
return processor.NewFactory(
component.MustNewType("signozlogspipeline"),
createDefaultConfig,
processor.WithLogs(createLogsProcessor, component.StabilityLevelDevelopment))
}

// Note: This isn't a valid configuration (no operators would lead to no work being done)
func createDefaultConfig() component.Config {
return &Config{
BaseConfig: adapter.BaseConfig{
Operators: []operator.Config{},
},
}
}

func createLogsProcessor(
_ context.Context,
set processor.CreateSettings,
cfg component.Config,
nextConsumer consumer.Logs) (processor.Logs, error) {
pCfg, ok := cfg.(*Config)
if !ok {
return nil, errors.New("could not initialize signozlogspipeline processor")
}

if len(pCfg.BaseConfig.Operators) == 0 {
return nil, errors.New("no operators were configured for signozlogspipeline processor")
}

return newProcessor(pCfg, nextConsumer, set.TelemetrySettings)
}
72 changes: 72 additions & 0 deletions processor/signozlogspipelineprocessor/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Brought in as is from logstransform processor in opentelemetry-collector-contrib
package signozlogspipelineprocessor

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/processor/processortest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex"
)

func TestCreateDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
assert.NotNil(t, cfg)
}

func TestCreateProcessor(t *testing.T) {
factory := NewFactory()
cfg := &Config{
BaseConfig: adapter.BaseConfig{
Operators: []operator.Config{
{
Builder: func() *regex.Config {
cfg := regex.NewConfig()
cfg.Regex = "^(?P<time>\\d{4}-\\d{2}-\\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$"
sevField := entry.NewAttributeField("sev")
sevCfg := helper.NewSeverityConfig()
sevCfg.ParseFrom = &sevField
cfg.SeverityConfig = &sevCfg
timeField := entry.NewAttributeField("time")
timeCfg := helper.NewTimeParser()
timeCfg.Layout = "%Y-%m-%d"
timeCfg.ParseFrom = &timeField
cfg.TimeParser = &timeCfg
return cfg
}(),
},
},
},
}

tp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
assert.NoError(t, err)
assert.NotNil(t, tp)
}

func TestInvalidOperators(t *testing.T) {
factory := NewFactory()
cfg := &Config{
BaseConfig: adapter.BaseConfig{
Operators: []operator.Config{
{
// invalid due to missing regex
Builder: regex.NewConfig(),
},
},
},
}

_, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, nil)
assert.Error(t, err)
}
Loading

0 comments on commit 9bac5fc

Please sign in to comment.