Skip to content

Commit

Permalink
Signozlogspipelineprocessor: use processorhelper.NewLogsProcessor for…
Browse files Browse the repository at this point in the history
… creating processor (#398)

Using the standard helper simplifies the implementation and should also
take care of common processor observability concerns
  • Loading branch information
raj-k-singh committed Sep 14, 2024
1 parent 9bac5fc commit f3af05c
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 31 deletions.
26 changes: 22 additions & 4 deletions processor/signozlogspipelineprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package signozlogspipelineprocessor
import (
"context"
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
Expand All @@ -30,19 +32,35 @@ func createDefaultConfig() component.Config {
}
}

var processorCapabilities = consumer.Capabilities{MutatesData: true}

func createLogsProcessor(
_ context.Context,
ctx context.Context,
set processor.CreateSettings,
cfg component.Config,
nextConsumer consumer.Logs) (processor.Logs, error) {
nextConsumer consumer.Logs,
) (processor.Logs, error) {
pCfg, ok := cfg.(*Config)
if !ok {
return nil, errors.New("could not initialize signozlogspipeline processor")
}

if len(pCfg.BaseConfig.Operators) == 0 {
return nil, errors.New("no operators were configured for signozlogspipeline processor")
}

return newProcessor(pCfg, nextConsumer, set.TelemetrySettings)
proc, err := newLogsPipelineProcessor(pCfg, set.TelemetrySettings)
if err != nil {
return nil, fmt.Errorf("couldn't build \"signozlogspipeline\" processor %w", err)
}

return processorhelper.NewLogsProcessor(
ctx,
set,
cfg,
nextConsumer,
proc.ProcessLogs,
processorhelper.WithStart(proc.Start),
processorhelper.WithShutdown(proc.Shutdown),
processorhelper.WithCapabilities(processorCapabilities),
)
}
21 changes: 5 additions & 16 deletions processor/signozlogspipelineprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,20 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"

_ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok" // ensure grok parser gets registered.
)

func newProcessor(
func newLogsPipelineProcessor(
processorConfig *Config,
nextConsumer consumer.Logs,
telemetrySettings component.TelemetrySettings,
) (*logsPipelineProcessor, error) {

sink := &stanzaToOtelConsumer{
nextConsumer: nextConsumer,
logger: telemetrySettings.Logger,
logger: telemetrySettings.Logger,
}

stanzaPipeline, err := pipeline.Config{
Expand All @@ -40,7 +37,6 @@ func newProcessor(

return &logsPipelineProcessor{
telemetrySettings: telemetrySettings,
nextConsumer: nextConsumer,

processorConfig: processorConfig,
stanzaPipeline: stanzaPipeline,
Expand All @@ -49,10 +45,8 @@ func newProcessor(
}, nil
}

// Implements processor.Logs
type logsPipelineProcessor struct {
telemetrySettings component.TelemetrySettings
nextConsumer consumer.Logs

processorConfig *Config
stanzaPipeline *pipeline.DirectedPipeline
Expand Down Expand Up @@ -99,12 +93,7 @@ func (p *logsPipelineProcessor) Shutdown(ctx context.Context) error {
return nil
}

// baseConsumer interface
func (p *logsPipelineProcessor) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true}
}

func (p *logsPipelineProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
func (p *logsPipelineProcessor) ProcessLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) {
p.telemetrySettings.Logger.Debug(
"logsPipelineProcessor received logs",
zap.Any("logs", ld),
Expand All @@ -118,8 +107,8 @@ func (p *logsPipelineProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) e
}
}

// Flush processed entries as a single plog.Logs to next consumer.
return p.sink.flush(ctx)
// Return processed entries as a single plog.Logs
return p.sink.flush(ctx), nil
}

// Helpers below have been brought in as is from stanza adapter for logstransform
Expand Down
15 changes: 4 additions & 11 deletions processor/signozlogspipelineprocessor/stanza_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
)

// A stanza operator that consumes stanza entries and converts them to pdata.Log
// for passing them on to the next otel consumer.
type stanzaToOtelConsumer struct {
nextConsumer consumer.Logs
logger *zap.Logger
logger *zap.Logger

// One plog.Logs can contain many log records. While the otel processor ConsumeLogs works
// with one plog.Logs at a time, the stanza pipeline works with one log entry at a time.
Expand Down Expand Up @@ -76,16 +75,10 @@ func (c *stanzaToOtelConsumer) Process(ctx context.Context, entry *entry.Entry)
// `processedEntries` accumulates processed entries for a single plog.Logs until the
// signozlogspipeline processor flushes these entries out - converting them back into
// a single plog.Logs that gets sent to `nextConsumer.ConsumeLogs`
func (c *stanzaToOtelConsumer) flush(ctx context.Context) error {
func (c *stanzaToOtelConsumer) flush(ctx context.Context) plog.Logs {
plogs := convertEntriesToPlogs(c.processedEntries)
c.processedEntries = []*entry.Entry{}

err := c.nextConsumer.ConsumeLogs(ctx, plogs)
if err != nil {
return err
}

return nil
return plogs
}

func (c *stanzaToOtelConsumer) Logger() *zap.Logger {
Expand Down

0 comments on commit f3af05c

Please sign in to comment.