diff --git a/cmd/ingester/app/flags.go b/cmd/ingester/app/flags.go index 98efb6eb4ce..4ce110f9348 100644 --- a/cmd/ingester/app/flags.go +++ b/cmd/ingester/app/flags.go @@ -114,7 +114,12 @@ func AddOTELFlags(flagSet *flag.FlagSet) { flagSet.String( KafkaConsumerConfigPrefix+SuffixEncoding, DefaultEncoding, - fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join(kafka.AllEncodings, "\", \""))) + fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join( + append(kafka.AllEncodings, + kafka.EncodingZipkinJSON, + kafka.EncodingZipkinProto, + kafka.EncodingOTLPProto, + ), "\", \""))) auth.AddFlags(KafkaConsumerConfigPrefix, flagSet) } diff --git a/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go index 3b4a3a2fd4f..37356a4b59e 100644 --- a/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go +++ b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go @@ -145,6 +145,14 @@ func MustOtelEncodingForJaegerEncoding(jaegerEncoding string) string { return "jaeger_proto" case kafka.EncodingJSON: return "jaeger_json" + case kafka.EncodingOTLPProto: + return "otlp_proto" + case kafka.EncodingZipkinProto: + return "zipkin_proto" + case kafka.EncodingZipkinJSON: + return "zipkin_json" + case kafka.EncodingZipkinThrift: + return "zipkin_thrift" } panic(jaegerEncoding + " is not a supported kafka encoding in the OTEL collector.") diff --git a/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver_test.go b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver_test.go index 921161bc199..ee5e50ffe72 100644 --- a/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver_test.go +++ b/cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver_test.go @@ -104,6 +104,22 @@ func TestMustOtelEncodingForJaegerEncoding(t *testing.T) { in: kafka.EncodingJSON, expected: "jaeger_json", }, + { + in: kafka.EncodingOTLPProto, + expected: "otlp_proto", + }, + { + in: kafka.EncodingZipkinProto, + expected: "zipkin_proto", + }, + { + in: kafka.EncodingZipkinJSON, + expected: "zipkin_json", + }, + { + in: kafka.EncodingZipkinThrift, + expected: "zipkin_thrift", + }, { in: "not-an-encoding", expectsPanic: true, diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index 57af48c6343..d7f89c150ae 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -34,6 +34,12 @@ const ( EncodingProto = "protobuf" // EncodingZipkinThrift is used for spans encoded as Zipkin Thrift. EncodingZipkinThrift = "zipkin-thrift" + // EncodingZipkinProto is used for spans encoded as Zipkin Protobuf. + EncodingZipkinProto = "zipkin-proto" + // EncodingZipkinJSON is used for spans encoded as Zipkin JSON. + EncodingZipkinJSON = "zipkin-json" + // EncodingOTLPProto is used for spans encoded as OTLP Protobuf. + EncodingOTLPProto = "otlp-proto" configPrefix = "kafka.producer" suffixBrokers = ".brokers"