Skip to content

Commit

Permalink
[kafka] Add option to supply destination topic through context (#34503)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

Add option to get destination topic from context. This allows for
upstream connectors to control the destination without polluting the
data being written.


**Link to tracking Issue:** <Issue number if applicable>
Fixes #34432

**Testing:** <Describe what testing was performed and which tests were
added.>
Added unit tests.

**Documentation:** <Describe the documentation added.>
Updated the component readme with the added setting.

---------

Co-authored-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>
Co-authored-by: Sean Marciniak <30928402+MovieStoreGuy@users.noreply.github.com>
  • Loading branch information
3 people committed Sep 17, 2024
1 parent 0e945b6 commit c1e2df6
Show file tree
Hide file tree
Showing 19 changed files with 203 additions and 20 deletions.
27 changes: 27 additions & 0 deletions .chloggen/kafka-topic-context.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkaexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add option to supply destination topic through context.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34503, 34432]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ pkg/batchpersignal/ @open-teleme
pkg/datadog/ @open-telemetry/collector-contrib-approvers @mx-psi @dineshg13 @liustanley @songy23 @mackjmr @ankitpatel96
pkg/experimentalmetricmetadata/ @open-telemetry/collector-contrib-approvers @rmfitzpatrick
pkg/golden/ @open-telemetry/collector-contrib-approvers @djaglowski @atoulme
pkg/kafka/topic/ @open-telemetry/collector-contrib-approvers @pavolloffay @MovieStoreGuy
pkg/ottl/ @open-telemetry/collector-contrib-approvers @TylerHelmuth @kentquirk @bogdandrutu @evan-bradley
pkg/pdatatest/ @open-telemetry/collector-contrib-approvers @djaglowski @fatsheep9146
pkg/pdatautil/ @open-telemetry/collector-contrib-approvers @dmitryax
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ body:
- pkg/datadog
- pkg/experimentalmetricmetadata
- pkg/golden
- pkg/kafka/topic
- pkg/ottl
- pkg/pdatatest
- pkg/pdatautil
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ body:
- pkg/datadog
- pkg/experimentalmetricmetadata
- pkg/golden
- pkg/kafka/topic
- pkg/ottl
- pkg/pdatatest
- pkg/pdatautil
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ body:
- pkg/datadog
- pkg/experimentalmetricmetadata
- pkg/golden
- pkg/kafka/topic
- pkg/ottl
- pkg/pdatatest
- pkg/pdatautil
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/unmaintained.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ body:
- pkg/datadog
- pkg/experimentalmetricmetadata
- pkg/golden
- pkg/kafka/topic
- pkg/ottl
- pkg/pdatatest
- pkg/pdatautil
Expand Down
1 change: 1 addition & 0 deletions cmd/otelcontribcol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ replaces:
- github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter => ../../exporter/fileexporter
- github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry => ../../pkg/resourcetotelemetry
- github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
- github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic => ../../pkg/kafka/topic
- github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opencensusexporter => ../../exporter/opencensusexporter
- github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opensearchexporter => ../../exporter/opensearchexporter
- github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders => ../../internal/metadataproviders
Expand Down
3 changes: 3 additions & 0 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.109.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/datadog v0.0.0-00010101000000-000000000000 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.109.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic v0.109.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.109.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.109.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.109.0 // indirect
Expand Down Expand Up @@ -1183,6 +1184,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourceto

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic => ../../pkg/kafka/topic

replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opencensusexporter => ../../exporter/opencensusexporter

replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opensearchexporter => ../../exporter/opensearchexporter
Expand Down
10 changes: 8 additions & 2 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ The following settings can be optionally configured:
- `brokers` (default = localhost:9092): The list of kafka brokers.
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup.
- `client_id` (default = "sarama"): The client ID to configure the Sarama Kafka client with. The client ID will be used for all produce requests.
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to export to.
- `topic_from_attribute` (default = ""): Specify the resource attribute whose value should be used as the message's topic. This option, when set, will take precedence over the default topic. If `topic_from_attribute` is not set, the message's topic will be set to the value of the configuration option `topic` instead.
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the default kafka topic to export to. See [Destination Topic](#destination-topic) below for more details.
- `topic_from_attribute` (default = ""): Specify the resource attribute whose value should be used as the message's topic. See [Destination Topic](#destination-topic) below for more details.
- `encoding` (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings:
- `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
- `otlp_json`: payload is JSON serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
Expand Down Expand Up @@ -105,3 +105,9 @@ exporters:
- localhost:9092
protocol_version: 2.0.0
```
## Destination Topic
The destination topic can be defined in a few different ways and takes priority in the following order:
1. When `topic_from_attribute` is configured, and the corresponding attribute is found on the ingested data, the value of this attribute is used.
2. If a prior component in the collector pipeline sets the topic on the context via the `topic.WithTopic` function (from the `github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic` package), the value set in the context is used.
3. Finally, the `topic` configuration is used as a default/fallback destination.
3 changes: 3 additions & 0 deletions exporter/kafkaexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.109.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.109.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.109.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic v0.109.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.109.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.109.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.109.0
Expand Down Expand Up @@ -106,6 +107,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal => ../../pkg/batchpersignal

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic => ../../pkg/kafka/topic

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger => ../../pkg/translator/jaeger

retract (
Expand Down
32 changes: 18 additions & 14 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic"
)

var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding")
Expand All @@ -40,8 +41,8 @@ func (ke kafkaErrors) Error() string {
return fmt.Sprintf("Failed to deliver %d messages due to %s", ke.count, ke.err)
}

func (e *kafkaTracesProducer) tracesPusher(_ context.Context, td ptrace.Traces) error {
messages, err := e.marshaler.Marshal(td, getTopic(&e.cfg, td.ResourceSpans()))
func (e *kafkaTracesProducer) tracesPusher(ctx context.Context, td ptrace.Traces) error {
messages, err := e.marshaler.Marshal(td, getTopic(ctx, &e.cfg, td.ResourceSpans()))
if err != nil {
return consumererror.NewPermanent(err)
}
Expand Down Expand Up @@ -98,8 +99,8 @@ type kafkaMetricsProducer struct {
logger *zap.Logger
}

func (e *kafkaMetricsProducer) metricsDataPusher(_ context.Context, md pmetric.Metrics) error {
messages, err := e.marshaler.Marshal(md, getTopic(&e.cfg, md.ResourceMetrics()))
func (e *kafkaMetricsProducer) metricsDataPusher(ctx context.Context, md pmetric.Metrics) error {
messages, err := e.marshaler.Marshal(md, getTopic(ctx, &e.cfg, md.ResourceMetrics()))
if err != nil {
return consumererror.NewPermanent(err)
}
Expand Down Expand Up @@ -156,8 +157,8 @@ type kafkaLogsProducer struct {
logger *zap.Logger
}

func (e *kafkaLogsProducer) logsDataPusher(_ context.Context, ld plog.Logs) error {
messages, err := e.marshaler.Marshal(ld, getTopic(&e.cfg, ld.ResourceLogs()))
func (e *kafkaLogsProducer) logsDataPusher(ctx context.Context, ld plog.Logs) error {
messages, err := e.marshaler.Marshal(ld, getTopic(ctx, &e.cfg, ld.ResourceLogs()))
if err != nil {
return consumererror.NewPermanent(err)
}
Expand Down Expand Up @@ -283,16 +284,19 @@ type resource interface {
Resource() pcommon.Resource
}

func getTopic[T resource](cfg *Config, resources resourceSlice[T]) string {
if cfg.TopicFromAttribute == "" {
return cfg.Topic
}
for i := 0; i < resources.Len(); i++ {
rv, ok := resources.At(i).Resource().Attributes().Get(cfg.TopicFromAttribute)
if ok && rv.Str() != "" {
return rv.Str()
func getTopic[T resource](ctx context.Context, cfg *Config, resources resourceSlice[T]) string {
if cfg.TopicFromAttribute != "" {
for i := 0; i < resources.Len(); i++ {
rv, ok := resources.At(i).Resource().Attributes().Get(cfg.TopicFromAttribute)
if ok && rv.Str() != "" {
return rv.Str()
}
}
}
contextTopic, ok := topic.FromContext(ctx)
if ok {
return contextTopic
}
return cfg.Topic
}

Expand Down
105 changes: 101 additions & 4 deletions exporter/kafkaexporter/kafka_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic"
)

func TestNewExporter_err_version(t *testing.T) {
Expand Down Expand Up @@ -205,6 +206,22 @@ func TestTracesPusher_attr(t *testing.T) {
require.NoError(t, err)
}

func TestTracesPusher_ctx(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
producer.ExpectSendMessageAndSucceed()

p := kafkaTracesProducer{
producer: producer,
marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
})
err := p.tracesPusher(topic.WithTopic(context.Background(), "my_topic"), testdata.GenerateTraces(2))
require.NoError(t, err)
}

func TestTracesPusher_err(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
Expand Down Expand Up @@ -271,6 +288,22 @@ func TestMetricsDataPusher_attr(t *testing.T) {
require.NoError(t, err)
}

func TestMetricsDataPusher_ctx(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
producer.ExpectSendMessageAndSucceed()

p := kafkaMetricsProducer{
producer: producer,
marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding, false),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
})
err := p.metricsDataPusher(topic.WithTopic(context.Background(), "my_topic"), testdata.GenerateMetrics(2))
require.NoError(t, err)
}

func TestMetricsDataPusher_err(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
Expand Down Expand Up @@ -337,6 +370,22 @@ func TestLogsDataPusher_attr(t *testing.T) {
require.NoError(t, err)
}

func TestLogsDataPusher_ctx(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
producer.ExpectSendMessageAndSucceed()

p := kafkaLogsProducer{
producer: producer,
marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding, false),
}
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
})
err := p.logsDataPusher(topic.WithTopic(context.Background(), "my_topic"), testdata.GenerateLogs(1))
require.NoError(t, err)
}

func TestLogsDataPusher_err(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
Expand Down Expand Up @@ -410,6 +459,7 @@ func Test_GetTopic(t *testing.T) {
tests := []struct {
name string
cfg Config
ctx context.Context
resource any
wantTopic string
}{
Expand All @@ -419,6 +469,7 @@ func Test_GetTopic(t *testing.T) {
TopicFromAttribute: "resource-attr",
Topic: "defaultTopic",
},
ctx: topic.WithTopic(context.Background(), "context-topic"),
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
wantTopic: "resource-attr-val-1",
},
Expand All @@ -428,6 +479,7 @@ func Test_GetTopic(t *testing.T) {
TopicFromAttribute: "resource-attr",
Topic: "defaultTopic",
},
ctx: topic.WithTopic(context.Background(), "context-topic"),
resource: testdata.GenerateTraces(1).ResourceSpans(),
wantTopic: "resource-attr-val-1",
},
Expand All @@ -437,6 +489,7 @@ func Test_GetTopic(t *testing.T) {
TopicFromAttribute: "resource-attr",
Topic: "defaultTopic",
},
ctx: topic.WithTopic(context.Background(), "context-topic"),
resource: testdata.GenerateLogs(1).ResourceLogs(),
wantTopic: "resource-attr-val-1",
},
Expand All @@ -446,14 +499,58 @@ func Test_GetTopic(t *testing.T) {
TopicFromAttribute: "nonexistent_attribute",
Topic: "defaultTopic",
},
ctx: context.Background(),
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
wantTopic: "defaultTopic",
},

{
name: "Valid metric context, return topic name",
cfg: Config{
TopicFromAttribute: "nonexistent_attribute",
Topic: "defaultTopic",
},
ctx: topic.WithTopic(context.Background(), "context-topic"),
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
wantTopic: "context-topic",
},
{
name: "Valid trace context, return topic name",
cfg: Config{
TopicFromAttribute: "nonexistent_attribute",
Topic: "defaultTopic",
},
ctx: topic.WithTopic(context.Background(), "context-topic"),
resource: testdata.GenerateTraces(1).ResourceSpans(),
wantTopic: "context-topic",
},
{
name: "Valid log context, return topic name",
cfg: Config{
TopicFromAttribute: "nonexistent_attribute",
Topic: "defaultTopic",
},
ctx: topic.WithTopic(context.Background(), "context-topic"),
resource: testdata.GenerateLogs(1).ResourceLogs(),
wantTopic: "context-topic",
},

{
name: "Attribute not found",
cfg: Config{
TopicFromAttribute: "nonexistent_attribute",
Topic: "defaultTopic",
},
ctx: context.Background(),
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
wantTopic: "defaultTopic",
},
{
name: "TopicFromAttribute not set, return default topic",
name: "TopicFromAttribute, return default topic",
cfg: Config{
Topic: "defaultTopic",
},
ctx: context.Background(),
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
wantTopic: "defaultTopic",
},
Expand All @@ -464,11 +561,11 @@ func Test_GetTopic(t *testing.T) {
topic := ""
switch r := tests[i].resource.(type) {
case pmetric.ResourceMetricsSlice:
topic = getTopic(&tests[i].cfg, r)
topic = getTopic(tests[i].ctx, &tests[i].cfg, r)
case ptrace.ResourceSpansSlice:
topic = getTopic(&tests[i].cfg, r)
topic = getTopic(tests[i].ctx, &tests[i].cfg, r)
case plog.ResourceLogsSlice:
topic = getTopic(&tests[i].cfg, r)
topic = getTopic(tests[i].ctx, &tests[i].cfg, r)
}
assert.Equal(t, tests[i].wantTopic, topic)
})
Expand Down
1 change: 1 addition & 0 deletions pkg/kafka/topic/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../../Makefile.Common
4 changes: 4 additions & 0 deletions pkg/kafka/topic/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Kafka Topic Context Accessor

This module is used for accessing the topic within a context.
See the [kafka exporter readme](../../../exporter/kafkaexporter/README.md#destination-topic) for more details.
3 changes: 3 additions & 0 deletions pkg/kafka/topic/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic

go 1.22
Loading

0 comments on commit c1e2df6

Please sign in to comment.