Skip to content

Commit

Permalink
Add jaeger-v2 kafka exporter and receiver configuration
Browse files Browse the repository at this point in the history
Signed-off-by: James Ryans <james.ryans2012@gmail.com>
  • Loading branch information
james-ryans committed Jan 16, 2024
1 parent 663a04e commit 998fe18
Show file tree
Hide file tree
Showing 29 changed files with 1,087 additions and 29 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ cmd/collector/collector
cmd/collector/collector-*
cmd/ingester/ingester
cmd/ingester/ingester-*
cmd/jaeger/integration/results
cmd/remote-storage/remote-storage
cmd/remote-storage/remote-storage-*
cmd/es-index-cleaner/es-index-cleaner-*
Expand Down
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
SHELL := /bin/bash
JAEGER_IMPORT_PATH = github.com/jaegertracing/jaeger
STORAGE_PKGS = ./plugin/storage/integration/...
OTEL_INTEGRATION_PATH = ./cmd/jaeger/integration/...

# These DOCKER_xxx vars are used when building Docker images.
DOCKER_NAMESPACE?=jaegertracing
Expand Down Expand Up @@ -141,6 +142,15 @@ index-rollover-integration-test: docker-images-elastic
go clean -testcache
bash -c "set -e; set -o pipefail; $(GOTEST) -tags index_rollover -coverpkg=./... -coverprofile cover-index-rollover.out $(STORAGE_PKGS) $(COLORIZE)"

# Don't detect data race because testbed has race condition issue
.PHONY: otel-integration-test
otel-integration-test: GOTEST := GOCACHE=$(GOCACHE) $(GO) test -v
otel-integration-test:
# Expire tests results for storage integration tests since the environment might change
# even though the code remains the same.
go clean -testcache
bash -c "set -e; set -o pipefail; $(GOTEST) -coverpkg=./... -coverprofile cover.out $(OTEL_INTEGRATION_PATH) $(COLORIZE)"

.PHONY: cover
cover: nocover
bash -c "set -e; set -o pipefail; $(GOTEST) -tags=memory_storage_integration -timeout 5m -coverprofile cover.out ./... | tee test-results.json"
Expand Down
30 changes: 30 additions & 0 deletions cmd/jaeger/collector-with-kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
service:
pipelines:
traces:
receivers: [otlp, jaeger, zipkin]
processors: [batch]
exporters: [kafka]

receivers:
otlp:
protocols:
grpc:
http:

jaeger:
protocols:
grpc:
thrift_binary:
thrift_compact:
thrift_http:

zipkin:

processors:
batch:

exporters:
kafka:
brokers:
- localhost:9092
encoding: otlp_proto # available encodings are otlp_proto, jaeger_proto, jaeger_json, zipkin_proto, zipkin_json, zipkin_thrift
31 changes: 31 additions & 0 deletions cmd/jaeger/ingester-with-remote.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
service:
extensions: [jaeger_storage]
pipelines:
traces:
receivers: [kafka]
processors: [batch]
exporters: [jaeger_storage_exporter] # same as in the default cmd/jaeger-v2/config.yaml
telemetry:
metrics:
address: 0.0.0.0:8889 # to avoid port conflict with collector-with-kafka.yaml

extensions:
jaeger_storage:
grpc:
memstore:
server: localhost:17271
connection-timeout: 5s

receivers:
kafka:
brokers:
- localhost:9092
encoding: otlp_proto # available encodings are otlp_proto, jaeger_proto, jaeger_json, zipkin_proto, zipkin_json, zipkin_thrift
initial_offset: earliest # consume messages from the beginning

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: memstore
32 changes: 32 additions & 0 deletions cmd/jaeger/ingester.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
service:
extensions: [jaeger_storage]
pipelines:
traces:
receivers: [kafka]
processors: [batch]
exporters: [jaeger_storage_exporter] # same as in the default cmd/jaeger-v2/config.yaml
telemetry:
metrics:
address: 0.0.0.0:8889 # to avoid port conflict with collector-with-kafka.yaml

extensions:
jaeger_storage:
memory:
memstore:
max_traces: 100000
memstore_archive:
max_traces: 100000

receivers:
kafka:
brokers:
- localhost:9092
encoding: otlp_proto # available encodings are otlp_proto, jaeger_proto, jaeger_json, zipkin_proto, zipkin_json, zipkin_thrift
initial_offset: earliest # consume messages from the beginning

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: memstore
16 changes: 16 additions & 0 deletions cmd/jaeger/integration/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Integration

Jaeger v2 integration tests are built on top of [OTEL Testbed module](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/testbed). OTEL Testbed provide comprehensive tools for conducting end-to-end tests for the OTEL Collector, such as reproducible short-term benchmarks, correctness tests, long-running stability tests and maximum load stress tests. To learn more about OTEL Testbed, please refer to the their [README](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/testbed/README.md)

## kafka_test

Kafka e2e test checks if the pipelines through `kafka` and finally at `remote-storage` have stored match exactly with the provided data using `GoldenDataProvider` (Provides data from the "Golden" dataset generated using pairwise combinatorial testing a.k.a PICT techniques for use in correctness tests) and validated using `CorrectnessTestValidator`.

The pipelines are checked in 2 steps, which the first test case verifies if the spans sent to Kafka are correct, and the second one checks the spans stored in the remote storage.
![kafka diagram](kafka_diagram.jpeg)

To conduct the tests, run the following command:

```
scripts/otel-kafka-integration-test.sh [kafka_version=latest] [remote_storage_version=latest]
```
1 change: 1 addition & 0 deletions cmd/jaeger/integration/datareceivers/.nocover
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
FIXME
59 changes: 59 additions & 0 deletions cmd/jaeger/integration/datareceivers/jaegerstorage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package datareceivers

import (
"context"
"fmt"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"

"github.com/jaegertracing/jaeger/cmd/jaeger/integration/receivers/storagereceiver"
)

type jaegerStorageDataReceiver struct {
Port int
receiver receiver.Traces
}

func NewJaegerStorageDataReceiver(port int) testbed.DataReceiver {
return &jaegerStorageDataReceiver{Port: port}
}

func (dr *jaegerStorageDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, _ consumer.Logs) error {
factory := storagereceiver.NewFactory()
cfg := factory.CreateDefaultConfig().(*storagereceiver.Config)
cfg.GRPC.RemoteServerAddr = fmt.Sprintf("localhost:%d", dr.Port)
cfg.GRPC.RemoteConnectTimeout = time.Duration(5 * time.Second)
// TODO add support for other backends

var err error
set := receivertest.NewNopCreateSettings()
dr.receiver, err = factory.CreateTracesReceiver(context.Background(), set, cfg, tc)
if err != nil {
return err
}

return dr.receiver.Start(context.Background(), componenttest.NewNopHost())
}

func (dr *jaegerStorageDataReceiver) Stop() error {
return dr.receiver.Shutdown(context.Background())
}

func (dr *jaegerStorageDataReceiver) GenConfigYAMLStr() string {
return fmt.Sprintf(`
jaeger_storage_receiver:
grpc-plugin:
server: localhost:%d`, dr.Port)
}

func (dr *jaegerStorageDataReceiver) ProtocolName() string {
return "jaeger_storage_receiver"
}
57 changes: 57 additions & 0 deletions cmd/jaeger/integration/datareceivers/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package datareceivers

import (
"context"
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"
)

type kafkaDataReceiver struct {
testbed.DataReceiverBase
receiver receiver.Traces
}

func NewKafkaDataReceiver(port int) testbed.DataReceiver {
return &kafkaDataReceiver{DataReceiverBase: testbed.DataReceiverBase{Port: port}}
}

func (dr *kafkaDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, _ consumer.Logs) error {
factory := kafkareceiver.NewFactory()
cfg := factory.CreateDefaultConfig().(*kafkareceiver.Config)
cfg.Brokers = []string{fmt.Sprintf("localhost:%d", dr.Port)}
cfg.GroupID = "testbed_collector"

var err error
set := receivertest.NewNopCreateSettings()
dr.receiver, err = factory.CreateTracesReceiver(context.Background(), set, cfg, tc)
if err != nil {
return err
}

return dr.receiver.Start(context.Background(), componenttest.NewNopHost())
}

func (dr *kafkaDataReceiver) Stop() error {
return dr.receiver.Shutdown(context.Background())
}

func (dr *kafkaDataReceiver) GenConfigYAMLStr() string {
return fmt.Sprintf(`
kafka:
brokers:
- localhost:%d
encoding: otlp_proto`, dr.Port)
}

func (dr *kafkaDataReceiver) ProtocolName() string {
return "kafka"
}
Loading

0 comments on commit 998fe18

Please sign in to comment.