diff --git a/CHANGELOG.md b/CHANGELOG.md index bff75aefaa6..6847e047fc8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `NewSplitDriver` for OTLP exporter that allows sending traces and metrics to different endpoints. (#1418) - Add codeql worfklow to GitHub Actions (#1428) - Added Gosec workflow to GitHub Actions (#1429) +- A new HTTP driver for OTLP exporter in `exporters/otlp/otlphttp`. Currently it only supports the binary protobuf payloads. (#1420) ### Changed @@ -29,6 +30,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Unify endpoint API that related to OTel exporter. (#1401) - Metric aggregator Count() and histogram Bucket.Counts are consistently `uint64`. (1430) - `SamplingResult` now passed a `Tracestate` from the parent `SpanContext` (#1432) +- Moved gRPC driver for OTLP exporter to `exporters/otlp/otlpgrpc`. (#1420) ### Removed diff --git a/example/otel-collector/main.go b/example/otel-collector/main.go index e9bcba7fdd5..d4f31d72aa6 100644 --- a/example/otel-collector/main.go +++ b/example/otel-collector/main.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp" + "go.opentelemetry.io/otel/exporters/otlp/otlpgrpc" "go.opentelemetry.io/otel/label" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/propagation" @@ -49,10 +50,10 @@ func initProvider() func() { // `localhost:30080` endpoint. Otherwise, replace `localhost` with the // endpoint of your cluster. If you run the app inside k8s, then you can // probably connect directly to the service through dns - driver := otlp.NewGRPCDriver( - otlp.WithInsecure(), - otlp.WithEndpoint("localhost:30080"), - otlp.WithGRPCDialOption(grpc.WithBlock()), // useful for testing + driver := otlpgrpc.NewDriver( + otlpgrpc.WithInsecure(), + otlpgrpc.WithEndpoint("localhost:30080"), + otlpgrpc.WithDialOption(grpc.WithBlock()), // useful for testing ) exp, err := otlp.NewExporter(ctx, driver) handleErr(err, "failed to create exporter") diff --git a/exporters/otlp/internal/otlptest/collector.go b/exporters/otlp/internal/otlptest/collector.go new file mode 100644 index 00000000000..c3b7e46e137 --- /dev/null +++ b/exporters/otlp/internal/otlptest/collector.go @@ -0,0 +1,137 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlptest + +import ( + "sort" + + collectormetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1" + collectortracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1" + commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1" + metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1" + resourcepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/resource/v1" + tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1" +) + +// Collector is an interface that mock collectors should implements, +// so they can be used for the end-to-end testing. +type Collector interface { + Stop() error + GetResourceSpans() []*tracepb.ResourceSpans + GetMetrics() []*metricpb.Metric +} + +// SpansStorage stores the spans. Mock collectors could use it to +// store spans they have received. +type SpansStorage struct { + rsm map[string]*tracepb.ResourceSpans + spanCount int +} + +// MetricsStorage stores the metrics. Mock collectors could use it to +// store metrics they have received. +type MetricsStorage struct { + metrics []*metricpb.Metric +} + +// NewSpansStorage creates a new spans storage. +func NewSpansStorage() SpansStorage { + return SpansStorage{ + rsm: make(map[string]*tracepb.ResourceSpans), + } +} + +// AddSpans adds spans to the spans storage. +func (s *SpansStorage) AddSpans(request *collectortracepb.ExportTraceServiceRequest) { + for _, rs := range request.GetResourceSpans() { + rstr := resourceString(rs.Resource) + if existingRs, ok := s.rsm[rstr]; !ok { + s.rsm[rstr] = rs + // TODO (rghetia): Add support for library Info. + if len(rs.InstrumentationLibrarySpans) == 0 { + rs.InstrumentationLibrarySpans = []*tracepb.InstrumentationLibrarySpans{ + { + Spans: []*tracepb.Span{}, + }, + } + } + s.spanCount += len(rs.InstrumentationLibrarySpans[0].Spans) + } else { + if len(rs.InstrumentationLibrarySpans) > 0 { + newSpans := rs.InstrumentationLibrarySpans[0].GetSpans() + existingRs.InstrumentationLibrarySpans[0].Spans = + append(existingRs.InstrumentationLibrarySpans[0].Spans, + newSpans...) + s.spanCount += len(newSpans) + } + } + } +} + +// GetSpans returns the stored spans. +func (s *SpansStorage) GetSpans() []*tracepb.Span { + spans := make([]*tracepb.Span, 0, s.spanCount) + for _, rs := range s.rsm { + spans = append(spans, rs.InstrumentationLibrarySpans[0].Spans...) + } + return spans +} + +// GetResourceSpans returns the stored resource spans. +func (s *SpansStorage) GetResourceSpans() []*tracepb.ResourceSpans { + rss := make([]*tracepb.ResourceSpans, 0, len(s.rsm)) + for _, rs := range s.rsm { + rss = append(rss, rs) + } + return rss +} + +// NewMetricsStorage creates a new metrics storage. +func NewMetricsStorage() MetricsStorage { + return MetricsStorage{} +} + +// AddMetrics adds metrics to the metrics storage. +func (s *MetricsStorage) AddMetrics(request *collectormetricpb.ExportMetricsServiceRequest) { + for _, rm := range request.GetResourceMetrics() { + // TODO (rghetia) handle multiple resource and library info. + if len(rm.InstrumentationLibraryMetrics) > 0 { + s.metrics = append(s.metrics, rm.InstrumentationLibraryMetrics[0].Metrics...) + } + } +} + +// GetMetrics returns the stored metrics. +func (s *MetricsStorage) GetMetrics() []*metricpb.Metric { + // copy in order to not change. + m := make([]*metricpb.Metric, 0, len(s.metrics)) + return append(m, s.metrics...) +} + +func resourceString(res *resourcepb.Resource) string { + sAttrs := sortedAttributes(res.GetAttributes()) + rstr := "" + for _, attr := range sAttrs { + rstr = rstr + attr.String() + } + return rstr +} + +func sortedAttributes(attrs []*commonpb.KeyValue) []*commonpb.KeyValue { + sort.Slice(attrs[:], func(i, j int) bool { + return attrs[i].Key < attrs[j].Key + }) + return attrs +} diff --git a/exporters/otlp/internal/otlptest/data.go b/exporters/otlp/internal/otlptest/data.go new file mode 100644 index 00000000000..cac514d1694 --- /dev/null +++ b/exporters/otlp/internal/otlptest/data.go @@ -0,0 +1,138 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlptest + +import ( + "context" + "fmt" + "time" + + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/label" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/number" + exportmetric "go.opentelemetry.io/otel/sdk/export/metric" + exporttrace "go.opentelemetry.io/otel/sdk/export/trace" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/trace" +) + +// Used to avoid implementing locking functions for test +// checkpointsets. +type noopLocker struct{} + +// Lock implements sync.Locker, which is needed for +// exportmetric.CheckpointSet. +func (noopLocker) Lock() {} + +// Unlock implements sync.Locker, which is needed for +// exportmetric.CheckpointSet. +func (noopLocker) Unlock() {} + +// RLock implements exportmetric.CheckpointSet. +func (noopLocker) RLock() {} + +// RUnlock implements exportmetric.CheckpointSet. +func (noopLocker) RUnlock() {} + +// OneRecordCheckpointSet is a CheckpointSet that returns just one +// filled record. It may be useful for testing driver's metrics +// export. +type OneRecordCheckpointSet struct { + noopLocker +} + +var _ exportmetric.CheckpointSet = OneRecordCheckpointSet{} + +// ForEach implements exportmetric.CheckpointSet. It always invokes +// the callback once with always the same record. +func (OneRecordCheckpointSet) ForEach(kindSelector exportmetric.ExportKindSelector, recordFunc func(exportmetric.Record) error) error { + desc := metric.NewDescriptor( + "foo", + metric.CounterInstrumentKind, + number.Int64Kind, + ) + res := resource.NewWithAttributes(label.String("a", "b")) + agg := sum.New(1) + if err := agg[0].Update(context.Background(), number.NewInt64Number(42), &desc); err != nil { + return err + } + start := time.Date(2020, time.December, 8, 19, 15, 0, 0, time.UTC) + end := time.Date(2020, time.December, 8, 19, 16, 0, 0, time.UTC) + labels := label.NewSet(label.String("abc", "def"), label.Int64("one", 1)) + rec := exportmetric.NewRecord(&desc, &labels, res, agg[0].Aggregation(), start, end) + return recordFunc(rec) +} + +// SingleSpanSnapshot returns a one-element slice with a snapshot. It +// may be useful for testing driver's trace export. +func SingleSpanSnapshot() []*exporttrace.SpanSnapshot { + sd := &exporttrace.SpanSnapshot{ + SpanContext: trace.SpanContext{ + TraceID: trace.TraceID{2, 3, 4, 5, 6, 7, 8, 9, 2, 3, 4, 5, 6, 7, 8, 9}, + SpanID: trace.SpanID{3, 4, 5, 6, 7, 8, 9, 0}, + TraceFlags: trace.FlagsSampled, + }, + ParentSpanID: trace.SpanID{1, 2, 3, 4, 5, 6, 7, 8}, + SpanKind: trace.SpanKindInternal, + Name: "foo", + StartTime: time.Date(2020, time.December, 8, 20, 23, 0, 0, time.UTC), + EndTime: time.Date(2020, time.December, 0, 20, 24, 0, 0, time.UTC), + Attributes: []label.KeyValue{}, + MessageEvents: []exporttrace.Event{}, + Links: []trace.Link{}, + StatusCode: codes.Ok, + StatusMessage: "", + HasRemoteParent: false, + DroppedAttributeCount: 0, + DroppedMessageEventCount: 0, + DroppedLinkCount: 0, + ChildSpanCount: 0, + Resource: resource.NewWithAttributes(label.String("a", "b")), + InstrumentationLibrary: instrumentation.Library{ + Name: "bar", + Version: "0.0.0", + }, + } + return []*exporttrace.SpanSnapshot{sd} +} + +// EmptyCheckpointSet is a checkpointer that has no records at all. +type EmptyCheckpointSet struct { + noopLocker +} + +var _ exportmetric.CheckpointSet = EmptyCheckpointSet{} + +// ForEach implements exportmetric.CheckpointSet. It never invokes the +// callback. +func (EmptyCheckpointSet) ForEach(kindSelector exportmetric.ExportKindSelector, recordFunc func(exportmetric.Record) error) error { + return nil +} + +// FailCheckpointSet is a checkpointer that returns an error during +// ForEach. +type FailCheckpointSet struct { + noopLocker +} + +var _ exportmetric.CheckpointSet = FailCheckpointSet{} + +// ForEach implements exportmetric.CheckpointSet. It always fails. +func (FailCheckpointSet) ForEach(kindSelector exportmetric.ExportKindSelector, recordFunc func(exportmetric.Record) error) error { + return fmt.Errorf("fail") +} diff --git a/exporters/otlp/internal/otlptest/otlptest.go b/exporters/otlp/internal/otlptest/otlptest.go new file mode 100644 index 00000000000..5c5ec813992 --- /dev/null +++ b/exporters/otlp/internal/otlptest/otlptest.go @@ -0,0 +1,266 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlptest + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/otel/exporters/otlp" + commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1" + "go.opentelemetry.io/otel/label" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/number" + exportmetric "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/metric/controller/push" + processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" + "go.opentelemetry.io/otel/sdk/metric/selector/simple" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +// RunEndToEndTest can be used by protocol driver tests to validate +// themselves. +func RunEndToEndTest(ctx context.Context, t *testing.T, exp *otlp.Exporter, mcTraces, mcMetrics Collector) { + pOpts := []sdktrace.TracerProviderOption{ + sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), + sdktrace.WithBatcher( + exp, + // add following two options to ensure flush + sdktrace.WithBatchTimeout(5), + sdktrace.WithMaxExportBatchSize(10), + ), + } + tp1 := sdktrace.NewTracerProvider(append(pOpts, + sdktrace.WithResource(resource.NewWithAttributes( + label.String("rk1", "rv11)"), + label.Int64("rk2", 5), + )))...) + + tp2 := sdktrace.NewTracerProvider(append(pOpts, + sdktrace.WithResource(resource.NewWithAttributes( + label.String("rk1", "rv12)"), + label.Float32("rk3", 6.5), + )))...) + + tr1 := tp1.Tracer("test-tracer1") + tr2 := tp2.Tracer("test-tracer2") + // Now create few spans + m := 4 + for i := 0; i < m; i++ { + _, span := tr1.Start(ctx, "AlwaysSample") + span.SetAttributes(label.Int64("i", int64(i))) + span.End() + + _, span = tr2.Start(ctx, "AlwaysSample") + span.SetAttributes(label.Int64("i", int64(i))) + span.End() + } + + selector := simple.NewWithInexpensiveDistribution() + processor := processor.New(selector, exportmetric.StatelessExportKindSelector()) + pusher := push.New(processor, exp) + pusher.Start() + + meter := pusher.MeterProvider().Meter("test-meter") + labels := []label.KeyValue{label.Bool("test", true)} + + type data struct { + iKind metric.InstrumentKind + nKind number.Kind + val int64 + } + instruments := map[string]data{ + "test-int64-counter": {metric.CounterInstrumentKind, number.Int64Kind, 1}, + "test-float64-counter": {metric.CounterInstrumentKind, number.Float64Kind, 1}, + "test-int64-valuerecorder": {metric.ValueRecorderInstrumentKind, number.Int64Kind, 2}, + "test-float64-valuerecorder": {metric.ValueRecorderInstrumentKind, number.Float64Kind, 2}, + "test-int64-valueobserver": {metric.ValueObserverInstrumentKind, number.Int64Kind, 3}, + "test-float64-valueobserver": {metric.ValueObserverInstrumentKind, number.Float64Kind, 3}, + } + for name, data := range instruments { + data := data + switch data.iKind { + case metric.CounterInstrumentKind: + switch data.nKind { + case number.Int64Kind: + metric.Must(meter).NewInt64Counter(name).Add(ctx, data.val, labels...) + case number.Float64Kind: + metric.Must(meter).NewFloat64Counter(name).Add(ctx, float64(data.val), labels...) + default: + assert.Failf(t, "unsupported number testing kind", data.nKind.String()) + } + case metric.ValueRecorderInstrumentKind: + switch data.nKind { + case number.Int64Kind: + metric.Must(meter).NewInt64ValueRecorder(name).Record(ctx, data.val, labels...) + case number.Float64Kind: + metric.Must(meter).NewFloat64ValueRecorder(name).Record(ctx, float64(data.val), labels...) + default: + assert.Failf(t, "unsupported number testing kind", data.nKind.String()) + } + case metric.ValueObserverInstrumentKind: + switch data.nKind { + case number.Int64Kind: + metric.Must(meter).NewInt64ValueObserver(name, + func(_ context.Context, result metric.Int64ObserverResult) { + result.Observe(data.val, labels...) + }, + ) + case number.Float64Kind: + callback := func(v float64) metric.Float64ObserverFunc { + return metric.Float64ObserverFunc(func(_ context.Context, result metric.Float64ObserverResult) { result.Observe(v, labels...) }) + }(float64(data.val)) + metric.Must(meter).NewFloat64ValueObserver(name, callback) + default: + assert.Failf(t, "unsupported number testing kind", data.nKind.String()) + } + default: + assert.Failf(t, "unsupported metrics testing kind", data.iKind.String()) + } + } + + // Flush and close. + pusher.Stop() + func() { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + if err := tp1.Shutdown(ctx); err != nil { + t.Fatalf("failed to shut down a tracer provider 1: %v", err) + } + if err := tp2.Shutdown(ctx); err != nil { + t.Fatalf("failed to shut down a tracer provider 2: %v", err) + } + }() + + // Wait >2 cycles. + <-time.After(40 * time.Millisecond) + + // Now shutdown the exporter + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + if err := exp.Shutdown(ctx); err != nil { + t.Fatalf("failed to stop the exporter: %v", err) + } + + // Shutdown the collector too so that we can begin + // verification checks of expected data back. + _ = mcTraces.Stop() + _ = mcMetrics.Stop() + + // Now verify that we only got two resources + rss := mcTraces.GetResourceSpans() + if got, want := len(rss), 2; got != want { + t.Fatalf("resource span count: got %d, want %d\n", got, want) + } + + // Now verify spans and attributes for each resource span. + for _, rs := range rss { + if len(rs.InstrumentationLibrarySpans) == 0 { + t.Fatalf("zero Instrumentation Library Spans") + } + if got, want := len(rs.InstrumentationLibrarySpans[0].Spans), m; got != want { + t.Fatalf("span counts: got %d, want %d", got, want) + } + attrMap := map[int64]bool{} + for _, s := range rs.InstrumentationLibrarySpans[0].Spans { + if gotName, want := s.Name, "AlwaysSample"; gotName != want { + t.Fatalf("span name: got %s, want %s", gotName, want) + } + attrMap[s.Attributes[0].Value.Value.(*commonpb.AnyValue_IntValue).IntValue] = true + } + if got, want := len(attrMap), m; got != want { + t.Fatalf("span attribute unique values: got %d want %d", got, want) + } + for i := 0; i < m; i++ { + _, ok := attrMap[int64(i)] + if !ok { + t.Fatalf("span with attribute %d missing", i) + } + } + } + + metrics := mcMetrics.GetMetrics() + assert.Len(t, metrics, len(instruments), "not enough metrics exported") + seen := make(map[string]struct{}, len(instruments)) + for _, m := range metrics { + data, ok := instruments[m.Name] + if !ok { + assert.Failf(t, "unknown metrics", m.Name) + continue + } + seen[m.Name] = struct{}{} + + switch data.iKind { + case metric.CounterInstrumentKind: + switch data.nKind { + case number.Int64Kind: + if dp := m.GetIntSum().DataPoints; assert.Len(t, dp, 1) { + assert.Equal(t, data.val, dp[0].Value, "invalid value for %q", m.Name) + } + case number.Float64Kind: + if dp := m.GetDoubleSum().DataPoints; assert.Len(t, dp, 1) { + assert.Equal(t, float64(data.val), dp[0].Value, "invalid value for %q", m.Name) + } + default: + assert.Failf(t, "invalid number kind", data.nKind.String()) + } + case metric.ValueObserverInstrumentKind: + switch data.nKind { + case number.Int64Kind: + if dp := m.GetIntGauge().DataPoints; assert.Len(t, dp, 1) { + assert.Equal(t, data.val, dp[0].Value, "invalid value for %q", m.Name) + } + case number.Float64Kind: + if dp := m.GetDoubleGauge().DataPoints; assert.Len(t, dp, 1) { + assert.Equal(t, float64(data.val), dp[0].Value, "invalid value for %q", m.Name) + } + default: + assert.Failf(t, "invalid number kind", data.nKind.String()) + } + case metric.ValueRecorderInstrumentKind: + switch data.nKind { + case number.Int64Kind: + assert.NotNil(t, m.GetIntHistogram()) + if dp := m.GetIntHistogram().DataPoints; assert.Len(t, dp, 1) { + count := dp[0].Count + assert.Equal(t, uint64(1), count, "invalid count for %q", m.Name) + assert.Equal(t, int64(data.val*int64(count)), dp[0].Sum, "invalid sum for %q (value %d)", m.Name, data.val) + } + case number.Float64Kind: + assert.NotNil(t, m.GetDoubleHistogram()) + if dp := m.GetDoubleHistogram().DataPoints; assert.Len(t, dp, 1) { + count := dp[0].Count + assert.Equal(t, uint64(1), count, "invalid count for %q", m.Name) + assert.Equal(t, float64(data.val*int64(count)), dp[0].Sum, "invalid sum for %q (value %d)", m.Name, data.val) + } + default: + assert.Failf(t, "invalid number kind", data.nKind.String()) + } + default: + assert.Failf(t, "invalid metrics kind", data.iKind.String()) + } + } + + for i := range instruments { + if _, ok := seen[i]; !ok { + assert.Fail(t, fmt.Sprintf("no metric(s) exported for %q", i)) + } + } +} diff --git a/exporters/otlp/options.go b/exporters/otlp/options.go index 8a8432e4ec5..7cfaa35d3cc 100644 --- a/exporters/otlp/options.go +++ b/exporters/otlp/options.go @@ -21,7 +21,7 @@ import ( const ( // DefaultCollectorPort is the port the Exporter will attempt connect to // if no collector port is provided. - DefaultCollectorPort uint16 = 55680 + DefaultCollectorPort uint16 = 4317 // DefaultCollectorHost is the host address the Exporter will attempt // connect to if no collector address is provided. DefaultCollectorHost string = "localhost" diff --git a/exporters/otlp/otlp.go b/exporters/otlp/otlp.go index 1e90f7a7340..3380d6c2933 100644 --- a/exporters/otlp/otlp.go +++ b/exporters/otlp/otlp.go @@ -69,9 +69,7 @@ func NewUnstartedExporter(driver ProtocolDriver, opts ...ExporterOption) *Export } var ( - errNoClient = errors.New("no client") errAlreadyStarted = errors.New("already started") - errDisconnected = errors.New("exporter disconnected") ) // Start establishes connections to the OpenTelemetry collector. Starting an diff --git a/exporters/otlp/otlp_integration_test.go b/exporters/otlp/otlp_integration_test.go deleted file mode 100644 index 4865518a877..00000000000 --- a/exporters/otlp/otlp_integration_test.go +++ /dev/null @@ -1,809 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package otlp_test - -import ( - "context" - "fmt" - "net" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "google.golang.org/grpc" - "google.golang.org/grpc/encoding/gzip" - - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/exporters/otlp" - commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1" - "go.opentelemetry.io/otel/label" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/number" - exportmetric "go.opentelemetry.io/otel/sdk/export/metric" - exporttrace "go.opentelemetry.io/otel/sdk/export/trace" - "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" - "go.opentelemetry.io/otel/sdk/metric/controller/push" - processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" - "go.opentelemetry.io/otel/sdk/metric/selector/simple" - "go.opentelemetry.io/otel/sdk/resource" - sdktrace "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/trace" -) - -func TestNewExporter_endToEnd(t *testing.T) { - tests := []struct { - name string - additionalOpts []otlp.GRPCConnectionOption - }{ - { - name: "StandardExporter", - }, - { - name: "WithCompressor", - additionalOpts: []otlp.GRPCConnectionOption{ - otlp.WithCompressor(gzip.Name), - }, - }, - { - name: "WithGRPCServiceConfig", - additionalOpts: []otlp.GRPCConnectionOption{ - otlp.WithGRPCServiceConfig("{}"), - }, - }, - { - name: "WithGRPCDialOptions", - additionalOpts: []otlp.GRPCConnectionOption{ - otlp.WithGRPCDialOption(grpc.WithBlock()), - }, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - newExporterEndToEndTest(t, test.additionalOpts) - }) - } -} - -func newGRPCExporter(t *testing.T, ctx context.Context, endpoint string, additionalOpts ...otlp.GRPCConnectionOption) *otlp.Exporter { - opts := []otlp.GRPCConnectionOption{ - otlp.WithInsecure(), - otlp.WithEndpoint(endpoint), - otlp.WithReconnectionPeriod(50 * time.Millisecond), - } - - opts = append(opts, additionalOpts...) - driver := otlp.NewGRPCDriver(opts...) - exp, err := otlp.NewExporter(ctx, driver) - if err != nil { - t.Fatalf("failed to create a new collector exporter: %v", err) - } - return exp -} - -func runEndToEndTest(t *testing.T, ctx context.Context, exp *otlp.Exporter, mcTraces, mcMetrics *mockCollector) { - pOpts := []sdktrace.TracerProviderOption{ - sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), - sdktrace.WithBatcher( - exp, - // add following two options to ensure flush - sdktrace.WithBatchTimeout(5), - sdktrace.WithMaxExportBatchSize(10), - ), - } - tp1 := sdktrace.NewTracerProvider(append(pOpts, - sdktrace.WithResource(resource.NewWithAttributes( - label.String("rk1", "rv11)"), - label.Int64("rk2", 5), - )))...) - - tp2 := sdktrace.NewTracerProvider(append(pOpts, - sdktrace.WithResource(resource.NewWithAttributes( - label.String("rk1", "rv12)"), - label.Float32("rk3", 6.5), - )))...) - - tr1 := tp1.Tracer("test-tracer1") - tr2 := tp2.Tracer("test-tracer2") - // Now create few spans - m := 4 - for i := 0; i < m; i++ { - _, span := tr1.Start(ctx, "AlwaysSample") - span.SetAttributes(label.Int64("i", int64(i))) - span.End() - - _, span = tr2.Start(ctx, "AlwaysSample") - span.SetAttributes(label.Int64("i", int64(i))) - span.End() - } - - selector := simple.NewWithInexpensiveDistribution() - processor := processor.New(selector, exportmetric.StatelessExportKindSelector()) - pusher := push.New(processor, exp) - pusher.Start() - - meter := pusher.MeterProvider().Meter("test-meter") - labels := []label.KeyValue{label.Bool("test", true)} - - type data struct { - iKind metric.InstrumentKind - nKind number.Kind - val int64 - } - instruments := map[string]data{ - "test-int64-counter": {metric.CounterInstrumentKind, number.Int64Kind, 1}, - "test-float64-counter": {metric.CounterInstrumentKind, number.Float64Kind, 1}, - "test-int64-valuerecorder": {metric.ValueRecorderInstrumentKind, number.Int64Kind, 2}, - "test-float64-valuerecorder": {metric.ValueRecorderInstrumentKind, number.Float64Kind, 2}, - "test-int64-valueobserver": {metric.ValueObserverInstrumentKind, number.Int64Kind, 3}, - "test-float64-valueobserver": {metric.ValueObserverInstrumentKind, number.Float64Kind, 3}, - } - for name, data := range instruments { - data := data - switch data.iKind { - case metric.CounterInstrumentKind: - switch data.nKind { - case number.Int64Kind: - metric.Must(meter).NewInt64Counter(name).Add(ctx, data.val, labels...) - case number.Float64Kind: - metric.Must(meter).NewFloat64Counter(name).Add(ctx, float64(data.val), labels...) - default: - assert.Failf(t, "unsupported number testing kind", data.nKind.String()) - } - case metric.ValueRecorderInstrumentKind: - switch data.nKind { - case number.Int64Kind: - metric.Must(meter).NewInt64ValueRecorder(name).Record(ctx, data.val, labels...) - case number.Float64Kind: - metric.Must(meter).NewFloat64ValueRecorder(name).Record(ctx, float64(data.val), labels...) - default: - assert.Failf(t, "unsupported number testing kind", data.nKind.String()) - } - case metric.ValueObserverInstrumentKind: - switch data.nKind { - case number.Int64Kind: - metric.Must(meter).NewInt64ValueObserver(name, - func(_ context.Context, result metric.Int64ObserverResult) { - result.Observe(data.val, labels...) - }, - ) - case number.Float64Kind: - callback := func(v float64) metric.Float64ObserverFunc { - return metric.Float64ObserverFunc(func(_ context.Context, result metric.Float64ObserverResult) { result.Observe(v, labels...) }) - }(float64(data.val)) - metric.Must(meter).NewFloat64ValueObserver(name, callback) - default: - assert.Failf(t, "unsupported number testing kind", data.nKind.String()) - } - default: - assert.Failf(t, "unsupported metrics testing kind", data.iKind.String()) - } - } - - // Flush and close. - pusher.Stop() - func() { - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - if err := tp1.Shutdown(ctx); err != nil { - t.Fatalf("failed to shut down a tracer provider 1: %v", err) - } - if err := tp2.Shutdown(ctx); err != nil { - t.Fatalf("failed to shut down a tracer provider 2: %v", err) - } - }() - - // Wait >2 cycles. - <-time.After(40 * time.Millisecond) - - // Now shutdown the exporter - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - if err := exp.Shutdown(ctx); err != nil { - t.Fatalf("failed to stop the exporter: %v", err) - } - - // Shutdown the collector too so that we can begin - // verification checks of expected data back. - _ = mcTraces.stop() - _ = mcMetrics.stop() - - // Now verify that we only got two resources - rss := mcTraces.getResourceSpans() - if got, want := len(rss), 2; got != want { - t.Fatalf("resource span count: got %d, want %d\n", got, want) - } - - // Now verify spans and attributes for each resource span. - for _, rs := range rss { - if len(rs.InstrumentationLibrarySpans) == 0 { - t.Fatalf("zero Instrumentation Library Spans") - } - if got, want := len(rs.InstrumentationLibrarySpans[0].Spans), m; got != want { - t.Fatalf("span counts: got %d, want %d", got, want) - } - attrMap := map[int64]bool{} - for _, s := range rs.InstrumentationLibrarySpans[0].Spans { - if gotName, want := s.Name, "AlwaysSample"; gotName != want { - t.Fatalf("span name: got %s, want %s", gotName, want) - } - attrMap[s.Attributes[0].Value.Value.(*commonpb.AnyValue_IntValue).IntValue] = true - } - if got, want := len(attrMap), m; got != want { - t.Fatalf("span attribute unique values: got %d want %d", got, want) - } - for i := 0; i < m; i++ { - _, ok := attrMap[int64(i)] - if !ok { - t.Fatalf("span with attribute %d missing", i) - } - } - } - - metrics := mcMetrics.getMetrics() - assert.Len(t, metrics, len(instruments), "not enough metrics exported") - seen := make(map[string]struct{}, len(instruments)) - for _, m := range metrics { - data, ok := instruments[m.Name] - if !ok { - assert.Failf(t, "unknown metrics", m.Name) - continue - } - seen[m.Name] = struct{}{} - - switch data.iKind { - case metric.CounterInstrumentKind: - switch data.nKind { - case number.Int64Kind: - if dp := m.GetIntSum().DataPoints; assert.Len(t, dp, 1) { - assert.Equal(t, data.val, dp[0].Value, "invalid value for %q", m.Name) - } - case number.Float64Kind: - if dp := m.GetDoubleSum().DataPoints; assert.Len(t, dp, 1) { - assert.Equal(t, float64(data.val), dp[0].Value, "invalid value for %q", m.Name) - } - default: - assert.Failf(t, "invalid number kind", data.nKind.String()) - } - case metric.ValueObserverInstrumentKind: - switch data.nKind { - case number.Int64Kind: - if dp := m.GetIntGauge().DataPoints; assert.Len(t, dp, 1) { - assert.Equal(t, data.val, dp[0].Value, "invalid value for %q", m.Name) - } - case number.Float64Kind: - if dp := m.GetDoubleGauge().DataPoints; assert.Len(t, dp, 1) { - assert.Equal(t, float64(data.val), dp[0].Value, "invalid value for %q", m.Name) - } - default: - assert.Failf(t, "invalid number kind", data.nKind.String()) - } - case metric.ValueRecorderInstrumentKind: - switch data.nKind { - case number.Int64Kind: - assert.NotNil(t, m.GetIntHistogram()) - if dp := m.GetIntHistogram().DataPoints; assert.Len(t, dp, 1) { - count := dp[0].Count - assert.Equal(t, uint64(1), count, "invalid count for %q", m.Name) - assert.Equal(t, int64(data.val*int64(count)), dp[0].Sum, "invalid sum for %q (value %d)", m.Name, data.val) - } - case number.Float64Kind: - assert.NotNil(t, m.GetDoubleHistogram()) - if dp := m.GetDoubleHistogram().DataPoints; assert.Len(t, dp, 1) { - count := dp[0].Count - assert.Equal(t, uint64(1), count, "invalid count for %q", m.Name) - assert.Equal(t, float64(data.val*int64(count)), dp[0].Sum, "invalid sum for %q (value %d)", m.Name, data.val) - } - default: - assert.Failf(t, "invalid number kind", data.nKind.String()) - } - default: - assert.Failf(t, "invalid metrics kind", data.iKind.String()) - } - } - - for i := range instruments { - if _, ok := seen[i]; !ok { - assert.Fail(t, fmt.Sprintf("no metric(s) exported for %q", i)) - } - } -} - -func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionOption) { - mc := runMockCollectorAtEndpoint(t, "localhost:56561") - - defer func() { - _ = mc.stop() - }() - - <-time.After(5 * time.Millisecond) - - ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.endpoint, additionalOpts...) - defer func() { - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - if err := exp.Shutdown(ctx); err != nil { - panic(err) - } - }() - - runEndToEndTest(t, ctx, exp, mc, mc) -} - -func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) { - mc := runMockCollector(t) - defer func() { - _ = mc.stop() - }() - - ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.endpoint) - defer func() { - if err := exp.Shutdown(ctx); err != nil { - panic(err) - } - }() - - // Invoke Start numerous times, should return errAlreadyStarted - for i := 0; i < 10; i++ { - if err := exp.Start(ctx); err == nil || !strings.Contains(err.Error(), "already started") { - t.Fatalf("#%d unexpected Start error: %v", i, err) - } - } - - if err := exp.Shutdown(ctx); err != nil { - t.Fatalf("failed to Shutdown the exporter: %v", err) - } - // Invoke Shutdown numerous times - for i := 0; i < 10; i++ { - if err := exp.Shutdown(ctx); err != nil { - t.Fatalf(`#%d got error (%v) expected none`, i, err) - } - } -} - -func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { - mc := runMockCollector(t) - - reconnectionPeriod := 20 * time.Millisecond - ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.endpoint, - otlp.WithReconnectionPeriod(reconnectionPeriod)) - defer func() { - _ = exp.Shutdown(ctx) - }() - - // We'll now stop the collector right away to simulate a connection - // dying in the midst of communication or even not existing before. - _ = mc.stop() - - // In the test below, we'll stop the collector many times, - // while exporting traces and test to ensure that we can - // reconnect. - for j := 0; j < 3; j++ { - - // No endpoint up. - require.Error( - t, - exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "in the midst"}}), - "transport: Error while dialing dial tcp %s: connect: connection refused", - mc.endpoint, - ) - - // Now resurrect the collector by making a new one but reusing the - // old endpoint, and the collector should reconnect automatically. - nmc := runMockCollectorAtEndpoint(t, mc.endpoint) - - // Give the exporter sometime to reconnect - <-time.After(reconnectionPeriod * 4) - - n := 10 - for i := 0; i < n; i++ { - require.NoError(t, exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "Resurrected"}})) - } - - nmaSpans := nmc.getSpans() - // Expecting 10 SpanSnapshots that were sampled, given that - if g, w := len(nmaSpans), n; g != w { - t.Fatalf("Round #%d: Connected collector: spans: got %d want %d", j, g, w) - } - - dSpans := mc.getSpans() - // Expecting 0 spans to have been received by the original but now dead collector - if g, w := len(dSpans), 0; g != w { - t.Fatalf("Round #%d: Disconnected collector: spans: got %d want %d", j, g, w) - } - _ = nmc.stop() - } -} - -// This test takes a long time to run: to skip it, run tests using: -short -func TestNewExporter_collectorOnBadConnection(t *testing.T) { - if testing.Short() { - t.Skipf("Skipping this long running test") - } - - ln, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Failed to grab an available port: %v", err) - } - // Firstly close the "collector's" channel: optimistically this endpoint won't get reused ASAP - // However, our goal of closing it is to simulate an unavailable connection - _ = ln.Close() - - _, collectorPortStr, _ := net.SplitHostPort(ln.Addr().String()) - - endpoint := fmt.Sprintf("localhost:%s", collectorPortStr) - ctx := context.Background() - exp := newGRPCExporter(t, ctx, endpoint) - _ = exp.Shutdown(ctx) -} - -func TestNewExporter_withEndpoint(t *testing.T) { - mc := runMockCollector(t) - defer func() { - _ = mc.stop() - }() - - ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.endpoint) - _ = exp.Shutdown(ctx) -} - -func TestNewExporter_withHeaders(t *testing.T) { - mc := runMockCollector(t) - defer func() { - _ = mc.stop() - }() - - ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.endpoint, - otlp.WithHeaders(map[string]string{"header1": "value1"})) - require.NoError(t, exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "in the midst"}})) - - defer func() { - _ = exp.Shutdown(ctx) - }() - - headers := mc.getHeaders() - require.Len(t, headers.Get("header1"), 1) - assert.Equal(t, "value1", headers.Get("header1")[0]) -} - -func TestNewExporter_withMultipleAttributeTypes(t *testing.T) { - mc := runMockCollector(t) - - defer func() { - _ = mc.stop() - }() - - <-time.After(5 * time.Millisecond) - - ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.endpoint) - - defer func() { - _ = exp.Shutdown(ctx) - }() - - tp := sdktrace.NewTracerProvider( - sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), - sdktrace.WithBatcher( - exp, - // add following two options to ensure flush - sdktrace.WithBatchTimeout(5), - sdktrace.WithMaxExportBatchSize(10), - ), - ) - defer func() { _ = tp.Shutdown(ctx) }() - - tr := tp.Tracer("test-tracer") - testKvs := []label.KeyValue{ - label.Int("Int", 1), - label.Int32("Int32", int32(2)), - label.Int64("Int64", int64(3)), - label.Float32("Float32", float32(1.11)), - label.Float64("Float64", 2.22), - label.Bool("Bool", true), - label.String("String", "test"), - } - _, span := tr.Start(ctx, "AlwaysSample") - span.SetAttributes(testKvs...) - span.End() - - // Flush and close. - func() { - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - if err := tp.Shutdown(ctx); err != nil { - t.Fatalf("failed to shut down a tracer provider: %v", err) - } - }() - - // Wait >2 cycles. - <-time.After(40 * time.Millisecond) - - // Now shutdown the exporter - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - if err := exp.Shutdown(ctx); err != nil { - t.Fatalf("failed to stop the exporter: %v", err) - } - - // Shutdown the collector too so that we can begin - // verification checks of expected data back. - _ = mc.stop() - - // Now verify that we only got one span - rss := mc.getSpans() - if got, want := len(rss), 1; got != want { - t.Fatalf("resource span count: got %d, want %d\n", got, want) - } - - expected := []*commonpb.KeyValue{ - { - Key: "Int", - Value: &commonpb.AnyValue{ - Value: &commonpb.AnyValue_IntValue{ - IntValue: 1, - }, - }, - }, - { - Key: "Int32", - Value: &commonpb.AnyValue{ - Value: &commonpb.AnyValue_IntValue{ - IntValue: 2, - }, - }, - }, - { - Key: "Int64", - Value: &commonpb.AnyValue{ - Value: &commonpb.AnyValue_IntValue{ - IntValue: 3, - }, - }, - }, - { - Key: "Float32", - Value: &commonpb.AnyValue{ - Value: &commonpb.AnyValue_DoubleValue{ - DoubleValue: 1.11, - }, - }, - }, - { - Key: "Float64", - Value: &commonpb.AnyValue{ - Value: &commonpb.AnyValue_DoubleValue{ - DoubleValue: 2.22, - }, - }, - }, - { - Key: "Bool", - Value: &commonpb.AnyValue{ - Value: &commonpb.AnyValue_BoolValue{ - BoolValue: true, - }, - }, - }, - { - Key: "String", - Value: &commonpb.AnyValue{ - Value: &commonpb.AnyValue_StringValue{ - StringValue: "test", - }, - }, - }, - } - - // Verify attributes - if !assert.Len(t, rss[0].Attributes, len(expected)) { - t.Fatalf("attributes count: got %d, want %d\n", len(rss[0].Attributes), len(expected)) - } - for i, actual := range rss[0].Attributes { - if a, ok := actual.Value.Value.(*commonpb.AnyValue_DoubleValue); ok { - e, ok := expected[i].Value.Value.(*commonpb.AnyValue_DoubleValue) - if !ok { - t.Errorf("expected AnyValue_DoubleValue, got %T", expected[i].Value.Value) - continue - } - if !assert.InDelta(t, e.DoubleValue, a.DoubleValue, 0.01) { - continue - } - e.DoubleValue = a.DoubleValue - } - assert.Equal(t, expected[i], actual) - } -} - -type discCheckpointSet struct{} - -func (discCheckpointSet) ForEach(kindSelector exportmetric.ExportKindSelector, recordFunc func(exportmetric.Record) error) error { - desc := metric.NewDescriptor( - "foo", - metric.CounterInstrumentKind, - number.Int64Kind, - ) - res := resource.NewWithAttributes(label.String("a", "b")) - agg := sum.New(1) - start := time.Now().Add(-20 * time.Minute) - end := time.Now() - labels := label.NewSet() - rec := exportmetric.NewRecord(&desc, &labels, res, agg[0].Aggregation(), start, end) - return recordFunc(rec) -} - -func (discCheckpointSet) Lock() {} -func (discCheckpointSet) Unlock() {} -func (discCheckpointSet) RLock() {} -func (discCheckpointSet) RUnlock() {} - -func discSpanSnapshot() *exporttrace.SpanSnapshot { - return &exporttrace.SpanSnapshot{ - SpanContext: trace.SpanContext{ - TraceID: trace.TraceID{2, 3, 4, 5, 6, 7, 8, 9, 2, 3, 4, 5, 6, 7, 8, 9}, - SpanID: trace.SpanID{3, 4, 5, 6, 7, 8, 9, 0}, - TraceFlags: trace.FlagsSampled, - }, - ParentSpanID: trace.SpanID{1, 2, 3, 4, 5, 6, 7, 8}, - SpanKind: trace.SpanKindInternal, - Name: "foo", - StartTime: time.Now().Add(-20 * time.Minute), - EndTime: time.Now(), - Attributes: []label.KeyValue{}, - MessageEvents: []exporttrace.Event{}, - Links: []trace.Link{}, - StatusCode: codes.Ok, - StatusMessage: "", - HasRemoteParent: false, - DroppedAttributeCount: 0, - DroppedMessageEventCount: 0, - DroppedLinkCount: 0, - ChildSpanCount: 0, - Resource: resource.NewWithAttributes(label.String("a", "b")), - InstrumentationLibrary: instrumentation.Library{ - Name: "bar", - Version: "0.0.0", - }, - } -} - -func TestDisconnected(t *testing.T) { - ctx := context.Background() - // The endpoint is whatever, we want to be disconnected. But we - // setting a blocking connection, so dialing to the invalid - // endpoint actually fails. - exp := newGRPCExporter(t, ctx, "invalid", - otlp.WithReconnectionPeriod(time.Hour), - otlp.WithGRPCDialOption( - grpc.WithBlock(), - grpc.FailOnNonTempDialError(true), - ), - ) - defer func() { - assert.NoError(t, exp.Shutdown(ctx)) - }() - - assert.Error(t, exp.Export(ctx, discCheckpointSet{})) - assert.Error(t, exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{discSpanSnapshot()})) -} - -type emptyCheckpointSet struct{} - -func (emptyCheckpointSet) ForEach(kindSelector exportmetric.ExportKindSelector, recordFunc func(exportmetric.Record) error) error { - return nil -} - -func (emptyCheckpointSet) Lock() {} -func (emptyCheckpointSet) Unlock() {} -func (emptyCheckpointSet) RLock() {} -func (emptyCheckpointSet) RUnlock() {} - -func TestEmptyData(t *testing.T) { - mc := runMockCollectorAtEndpoint(t, "localhost:56561") - - defer func() { - _ = mc.stop() - }() - - <-time.After(5 * time.Millisecond) - - ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.endpoint) - defer func() { - assert.NoError(t, exp.Shutdown(ctx)) - }() - - assert.NoError(t, exp.ExportSpans(ctx, nil)) - assert.NoError(t, exp.Export(ctx, emptyCheckpointSet{})) -} - -type failCheckpointSet struct{} - -func (failCheckpointSet) ForEach(kindSelector exportmetric.ExportKindSelector, recordFunc func(exportmetric.Record) error) error { - return fmt.Errorf("fail") -} - -func (failCheckpointSet) Lock() {} -func (failCheckpointSet) Unlock() {} -func (failCheckpointSet) RLock() {} -func (failCheckpointSet) RUnlock() {} - -func TestFailedMetricTransform(t *testing.T) { - mc := runMockCollectorAtEndpoint(t, "localhost:56561") - - defer func() { - _ = mc.stop() - }() - - <-time.After(5 * time.Millisecond) - - ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.endpoint) - defer func() { - assert.NoError(t, exp.Shutdown(ctx)) - }() - - assert.Error(t, exp.Export(ctx, failCheckpointSet{})) -} - -func TestMultiConnectionDriver(t *testing.T) { - mcTraces := runMockCollector(t) - mcMetrics := runMockCollector(t) - - defer func() { - _ = mcTraces.stop() - _ = mcMetrics.stop() - }() - - <-time.After(5 * time.Millisecond) - - commonOpts := []otlp.GRPCConnectionOption{ - otlp.WithInsecure(), - otlp.WithReconnectionPeriod(50 * time.Millisecond), - otlp.WithGRPCDialOption(grpc.WithBlock()), - } - optsTraces := append([]otlp.GRPCConnectionOption{ - otlp.WithEndpoint(mcTraces.endpoint), - }, commonOpts...) - optsMetrics := append([]otlp.GRPCConnectionOption{ - otlp.WithEndpoint(mcMetrics.endpoint), - }, commonOpts...) - - tracesDriver := otlp.NewGRPCDriver(optsTraces...) - metricsDriver := otlp.NewGRPCDriver(optsMetrics...) - splitCfg := otlp.SplitConfig{ - ForMetrics: metricsDriver, - ForTraces: tracesDriver, - } - driver := otlp.NewSplitDriver(splitCfg) - ctx := context.Background() - exp, err := otlp.NewExporter(ctx, driver) - if err != nil { - t.Fatalf("failed to create a new collector exporter: %v", err) - } - defer func() { - assert.NoError(t, exp.Shutdown(ctx)) - }() - runEndToEndTest(t, ctx, exp, mcTraces, mcMetrics) -} diff --git a/exporters/otlp/otlp_test.go b/exporters/otlp/otlp_test.go index 470c465aae1..b36e27d2056 100644 --- a/exporters/otlp/otlp_test.go +++ b/exporters/otlp/otlp_test.go @@ -28,11 +28,38 @@ import ( metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1" tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1" "go.opentelemetry.io/otel/exporters/otlp/internal/transform" - metricsdk "go.opentelemetry.io/otel/sdk/export/metric" tracesdk "go.opentelemetry.io/otel/sdk/export/trace" ) +func stubSpanSnapshot(count int) []*tracesdk.SpanSnapshot { + spans := make([]*tracesdk.SpanSnapshot, 0, count) + for i := 0; i < count; i++ { + spans = append(spans, new(tracesdk.SpanSnapshot)) + } + return spans +} + +type stubCheckpointSet struct { + limit int +} + +var _ metricsdk.CheckpointSet = stubCheckpointSet{} + +func (s stubCheckpointSet) ForEach(kindSelector metricsdk.ExportKindSelector, recordFunc func(metricsdk.Record) error) error { + for i := 0; i < s.limit; i++ { + if err := recordFunc(metricsdk.Record{}); err != nil { + return err + } + } + return nil +} + +func (stubCheckpointSet) Lock() {} +func (stubCheckpointSet) Unlock() {} +func (stubCheckpointSet) RLock() {} +func (stubCheckpointSet) RUnlock() {} + type stubProtocolDriver struct { started int stopped int @@ -42,8 +69,8 @@ type stubProtocolDriver struct { injectedStartError error injectedStopError error - rm []metricpb.ResourceMetrics - rs []tracepb.ResourceSpans + rm []metricsdk.Record + rs []tracesdk.SpanSnapshot } var _ otlp.ProtocolDriver = (*stubProtocolDriver)(nil) @@ -70,6 +97,39 @@ func (m *stubProtocolDriver) Stop(ctx context.Context) error { func (m *stubProtocolDriver) ExportMetrics(parent context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error { m.metricsExported++ + return cps.ForEach(selector, func(record metricsdk.Record) error { + m.rm = append(m.rm, record) + return nil + }) +} + +func (m *stubProtocolDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error { + m.tracesExported++ + for _, rs := range ss { + if rs == nil { + continue + } + m.rs = append(m.rs, *rs) + } + return nil +} + +type stubTransformingProtocolDriver struct { + rm []metricpb.ResourceMetrics + rs []tracepb.ResourceSpans +} + +var _ otlp.ProtocolDriver = (*stubTransformingProtocolDriver)(nil) + +func (m *stubTransformingProtocolDriver) Start(ctx context.Context) error { + return nil +} + +func (m *stubTransformingProtocolDriver) Stop(ctx context.Context) error { + return nil +} + +func (m *stubTransformingProtocolDriver) ExportMetrics(parent context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error { rms, err := transform.CheckpointSet(parent, selector, cps, 1) if err != nil { return err @@ -83,8 +143,7 @@ func (m *stubProtocolDriver) ExportMetrics(parent context.Context, cps metricsdk return nil } -func (m *stubProtocolDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error { - m.tracesExported++ +func (m *stubTransformingProtocolDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error { for _, rs := range transform.SpanData(ss) { if rs == nil { continue @@ -94,13 +153,13 @@ func (m *stubProtocolDriver) ExportTraces(ctx context.Context, ss []*tracesdk.Sp return nil } -func (m *stubProtocolDriver) Reset() { +func (m *stubTransformingProtocolDriver) Reset() { m.rm = nil m.rs = nil } -func newExporter(t *testing.T, opts ...otlp.ExporterOption) (*otlp.Exporter, *stubProtocolDriver) { - driver := &stubProtocolDriver{} +func newExporter(t *testing.T, opts ...otlp.ExporterOption) (*otlp.Exporter, *stubTransformingProtocolDriver) { + driver := &stubTransformingProtocolDriver{} exp, err := otlp.NewExporter(context.Background(), driver, opts...) require.NoError(t, err) return exp, driver @@ -204,11 +263,13 @@ func TestSplitDriver(t *testing.T) { assert.Equal(t, 0, driverMetrics.tracesExported) assert.Equal(t, 0, driverMetrics.metricsExported) - assert.NoError(t, driver.ExportMetrics(ctx, discCheckpointSet{}, metricsdk.StatelessExportKindSelector())) - assert.NoError(t, driver.ExportTraces(ctx, []*tracesdk.SpanSnapshot{discSpanSnapshot()})) + recordCount := 5 + spanCount := 7 + assert.NoError(t, driver.ExportMetrics(ctx, stubCheckpointSet{recordCount}, metricsdk.StatelessExportKindSelector())) + assert.NoError(t, driver.ExportTraces(ctx, stubSpanSnapshot(spanCount))) assert.Len(t, driverTraces.rm, 0) - assert.Len(t, driverTraces.rs, 1) - assert.Len(t, driverMetrics.rm, 1) + assert.Len(t, driverTraces.rs, spanCount) + assert.Len(t, driverMetrics.rm, recordCount) assert.Len(t, driverMetrics.rs, 0) assert.Equal(t, 1, driverTraces.tracesExported) assert.Equal(t, 0, driverTraces.metricsExported) diff --git a/exporters/otlp/alignment_test.go b/exporters/otlp/otlpgrpc/alignment_test.go similarity index 87% rename from exporters/otlp/alignment_test.go rename to exporters/otlp/otlpgrpc/alignment_test.go index 276625637a1..ef3217b365b 100644 --- a/exporters/otlp/alignment_test.go +++ b/exporters/otlp/otlpgrpc/alignment_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package otlp +package otlpgrpc import ( "os" @@ -26,8 +26,8 @@ import ( func TestMain(m *testing.M) { fields := []ottest.FieldOffset{ { - Name: "grpcConnection.lastConnectErrPtr", - Offset: unsafe.Offsetof(grpcConnection{}.lastConnectErrPtr), + Name: "connection.lastConnectErrPtr", + Offset: unsafe.Offsetof(connection{}.lastConnectErrPtr), }, } if !ottest.Aligned8Byte(fields, os.Stderr) { diff --git a/exporters/otlp/grpcconnection.go b/exporters/otlp/otlpgrpc/connection.go similarity index 56% rename from exporters/otlp/grpcconnection.go rename to exporters/otlp/otlpgrpc/connection.go index 14cfcd9e6e9..c469d099c3e 100644 --- a/exporters/otlp/grpcconnection.go +++ b/exporters/otlp/otlpgrpc/connection.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package otlp // import "go.opentelemetry.io/otel/exporters/otlp" +package otlpgrpc import ( "context" @@ -26,7 +26,7 @@ import ( "google.golang.org/grpc/metadata" ) -type grpcConnection struct { +type connection struct { // Ensure pointer is 64-bit aligned for atomic operations on both 32 and 64 bit machines. lastConnectErrPtr unsafe.Pointer @@ -36,7 +36,7 @@ type grpcConnection struct { cc *grpc.ClientConn // these fields are read-only after constructor is finished - c grpcConnectionConfig + cfg config metadata metadata.MD newConnectionHandler func(cc *grpc.ClientConn) @@ -51,73 +51,73 @@ type grpcConnection struct { closeBackgroundConnectionDoneCh func(ch chan struct{}) } -func newGRPCConnection(c grpcConnectionConfig, handler func(cc *grpc.ClientConn)) *grpcConnection { - conn := new(grpcConnection) - conn.newConnectionHandler = handler - conn.c = c - if len(conn.c.headers) > 0 { - conn.metadata = metadata.New(conn.c.headers) +func newConnection(cfg config, handler func(cc *grpc.ClientConn)) *connection { + c := new(connection) + c.newConnectionHandler = handler + c.cfg = cfg + if len(c.cfg.headers) > 0 { + c.metadata = metadata.New(c.cfg.headers) } - conn.closeBackgroundConnectionDoneCh = func(ch chan struct{}) { + c.closeBackgroundConnectionDoneCh = func(ch chan struct{}) { close(ch) } - return conn + return c } -func (oc *grpcConnection) startConnection(ctx context.Context) { - oc.stopCh = make(chan struct{}) - oc.disconnectedCh = make(chan bool) - oc.backgroundConnectionDoneCh = make(chan struct{}) +func (c *connection) startConnection(ctx context.Context) { + c.stopCh = make(chan struct{}) + c.disconnectedCh = make(chan bool) + c.backgroundConnectionDoneCh = make(chan struct{}) - if err := oc.connect(ctx); err == nil { - oc.setStateConnected() + if err := c.connect(ctx); err == nil { + c.setStateConnected() } else { - oc.setStateDisconnected(err) + c.setStateDisconnected(err) } - go oc.indefiniteBackgroundConnection() + go c.indefiniteBackgroundConnection() } -func (oc *grpcConnection) lastConnectError() error { - errPtr := (*error)(atomic.LoadPointer(&oc.lastConnectErrPtr)) +func (c *connection) lastConnectError() error { + errPtr := (*error)(atomic.LoadPointer(&c.lastConnectErrPtr)) if errPtr == nil { return nil } return *errPtr } -func (oc *grpcConnection) saveLastConnectError(err error) { +func (c *connection) saveLastConnectError(err error) { var errPtr *error if err != nil { errPtr = &err } - atomic.StorePointer(&oc.lastConnectErrPtr, unsafe.Pointer(errPtr)) + atomic.StorePointer(&c.lastConnectErrPtr, unsafe.Pointer(errPtr)) } -func (oc *grpcConnection) setStateDisconnected(err error) { - oc.saveLastConnectError(err) +func (c *connection) setStateDisconnected(err error) { + c.saveLastConnectError(err) select { - case oc.disconnectedCh <- true: + case c.disconnectedCh <- true: default: } - oc.newConnectionHandler(nil) + c.newConnectionHandler(nil) } -func (oc *grpcConnection) setStateConnected() { - oc.saveLastConnectError(nil) +func (c *connection) setStateConnected() { + c.saveLastConnectError(nil) } -func (oc *grpcConnection) connected() bool { - return oc.lastConnectError() == nil +func (c *connection) connected() bool { + return c.lastConnectError() == nil } const defaultConnReattemptPeriod = 10 * time.Second -func (oc *grpcConnection) indefiniteBackgroundConnection() { +func (c *connection) indefiniteBackgroundConnection() { defer func() { - oc.closeBackgroundConnectionDoneCh(oc.backgroundConnectionDoneCh) + c.closeBackgroundConnectionDoneCh(c.backgroundConnectionDoneCh) }() - connReattemptPeriod := oc.c.reconnectionPeriod + connReattemptPeriod := c.cfg.reconnectionPeriod if connReattemptPeriod <= 0 { connReattemptPeriod = defaultConnReattemptPeriod } @@ -136,14 +136,14 @@ func (oc *grpcConnection) indefiniteBackgroundConnection() { // 2. Otherwise block until we are disconnected, and // then retry connecting select { - case <-oc.stopCh: + case <-c.stopCh: return - case <-oc.disconnectedCh: + case <-c.disconnectedCh: // Quickly check if we haven't stopped at the // same time. select { - case <-oc.stopCh: + case <-c.stopCh: return default: @@ -152,10 +152,10 @@ func (oc *grpcConnection) indefiniteBackgroundConnection() { // Normal scenario that we'll wait for } - if err := oc.connect(context.Background()); err == nil { - oc.setStateConnected() + if err := c.connect(context.Background()); err == nil { + c.setStateConnected() } else { - oc.setStateDisconnected(err) + c.setStateDisconnected(err) } // Apply some jitter to avoid lockstep retrials of other @@ -163,89 +163,89 @@ func (oc *grpcConnection) indefiniteBackgroundConnection() { // innocent DDOS, by clogging the machine's resources and network. jitter := time.Duration(rng.Int63n(maxJitterNanos)) select { - case <-oc.stopCh: + case <-c.stopCh: return case <-time.After(connReattemptPeriod + jitter): } } } -func (oc *grpcConnection) connect(ctx context.Context) error { - cc, err := oc.dialToCollector(ctx) +func (c *connection) connect(ctx context.Context) error { + cc, err := c.dialToCollector(ctx) if err != nil { return err } - oc.setConnection(cc) - oc.newConnectionHandler(cc) + c.setConnection(cc) + c.newConnectionHandler(cc) return nil } // setConnection sets cc as the client connection and returns true if // the connection state changed. -func (oc *grpcConnection) setConnection(cc *grpc.ClientConn) bool { - oc.mu.Lock() - defer oc.mu.Unlock() +func (c *connection) setConnection(cc *grpc.ClientConn) bool { + c.mu.Lock() + defer c.mu.Unlock() // If previous clientConn is same as the current then just return. // This doesn't happen right now as this func is only called with new ClientConn. // It is more about future-proofing. - if oc.cc == cc { + if c.cc == cc { return false } // If the previous clientConn was non-nil, close it - if oc.cc != nil { - _ = oc.cc.Close() + if c.cc != nil { + _ = c.cc.Close() } - oc.cc = cc + c.cc = cc return true } -func (oc *grpcConnection) dialToCollector(ctx context.Context) (*grpc.ClientConn, error) { - endpoint := oc.c.collectorEndpoint +func (c *connection) dialToCollector(ctx context.Context) (*grpc.ClientConn, error) { + endpoint := c.cfg.collectorEndpoint dialOpts := []grpc.DialOption{} - if oc.c.grpcServiceConfig != "" { - dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(oc.c.grpcServiceConfig)) + if c.cfg.serviceConfig != "" { + dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(c.cfg.serviceConfig)) } - if oc.c.clientCredentials != nil { - dialOpts = append(dialOpts, grpc.WithTransportCredentials(oc.c.clientCredentials)) - } else if oc.c.canDialInsecure { + if c.cfg.clientCredentials != nil { + dialOpts = append(dialOpts, grpc.WithTransportCredentials(c.cfg.clientCredentials)) + } else if c.cfg.canDialInsecure { dialOpts = append(dialOpts, grpc.WithInsecure()) } - if oc.c.compressor != "" { - dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(oc.c.compressor))) + if c.cfg.compressor != "" { + dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(c.cfg.compressor))) } - if len(oc.c.grpcDialOptions) != 0 { - dialOpts = append(dialOpts, oc.c.grpcDialOptions...) + if len(c.cfg.dialOptions) != 0 { + dialOpts = append(dialOpts, c.cfg.dialOptions...) } - ctx, cancel := oc.contextWithStop(ctx) + ctx, cancel := c.contextWithStop(ctx) defer cancel() - ctx = oc.contextWithMetadata(ctx) + ctx = c.contextWithMetadata(ctx) return grpc.DialContext(ctx, endpoint, dialOpts...) } -func (oc *grpcConnection) contextWithMetadata(ctx context.Context) context.Context { - if oc.metadata.Len() > 0 { - return metadata.NewOutgoingContext(ctx, oc.metadata) +func (c *connection) contextWithMetadata(ctx context.Context) context.Context { + if c.metadata.Len() > 0 { + return metadata.NewOutgoingContext(ctx, c.metadata) } return ctx } -func (oc *grpcConnection) shutdown(ctx context.Context) error { - close(oc.stopCh) +func (c *connection) shutdown(ctx context.Context) error { + close(c.stopCh) // Ensure that the backgroundConnector returns select { - case <-oc.backgroundConnectionDoneCh: + case <-c.backgroundConnectionDoneCh: case <-ctx.Done(): return ctx.Err() } - oc.mu.Lock() - cc := oc.cc - oc.cc = nil - oc.mu.Unlock() + c.mu.Lock() + cc := c.cc + c.cc = nil + c.mu.Unlock() if cc != nil { return cc.Close() @@ -254,7 +254,7 @@ func (oc *grpcConnection) shutdown(ctx context.Context) error { return nil } -func (oc *grpcConnection) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) { +func (c *connection) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) { // Unify the parent context Done signal with the connection's // stop channel. ctx, cancel := context.WithCancel(ctx) @@ -263,7 +263,7 @@ func (oc *grpcConnection) contextWithStop(ctx context.Context) (context.Context, case <-ctx.Done(): // Nothing to do, either cancelled or deadline // happened. - case <-oc.stopCh: + case <-c.stopCh: cancel() } }(ctx, cancel) diff --git a/exporters/otlp/otlpgrpc/doc.go b/exporters/otlp/otlpgrpc/doc.go new file mode 100644 index 00000000000..68f53fb0aab --- /dev/null +++ b/exporters/otlp/otlpgrpc/doc.go @@ -0,0 +1,25 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Package otlpgrpc provides an implementation of otlp.ProtocolDriver +that connects to the collector and sends traces and metrics using +gRPC. + +This package is currently in a pre-GA phase. Backwards incompatible +changes may be introduced in subsequent minor version releases as we +work to track the evolving OpenTelemetry specification and user +feedback. +*/ +package otlpgrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlpgrpc" diff --git a/exporters/otlp/grpcdriver.go b/exporters/otlp/otlpgrpc/driver.go similarity index 64% rename from exporters/otlp/grpcdriver.go rename to exporters/otlp/otlpgrpc/driver.go index 7d865360f10..ee6f7399cbd 100644 --- a/exporters/otlp/grpcdriver.go +++ b/exporters/otlp/otlpgrpc/driver.go @@ -12,15 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -package otlp // import "go.opentelemetry.io/otel/exporters/otlp" +package otlpgrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlpgrpc" import ( "context" + "errors" "fmt" "sync" "google.golang.org/grpc" + "go.opentelemetry.io/otel/exporters/otlp" colmetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1" coltracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1" metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1" @@ -30,28 +32,34 @@ import ( tracesdk "go.opentelemetry.io/otel/sdk/export/trace" ) -type grpcDriver struct { - connection *grpcConnection +type driver struct { + connection *connection lock sync.Mutex metricsClient colmetricpb.MetricsServiceClient tracesClient coltracepb.TraceServiceClient } -func NewGRPCDriver(opts ...GRPCConnectionOption) ProtocolDriver { - cfg := grpcConnectionConfig{ - collectorEndpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorPort), - grpcServiceConfig: DefaultGRPCServiceConfig, +var ( + errNoClient = errors.New("no client") + errDisconnected = errors.New("exporter disconnected") +) + +// NewDriver creates a new gRPC protocol driver. +func NewDriver(opts ...Option) otlp.ProtocolDriver { + cfg := config{ + collectorEndpoint: fmt.Sprintf("%s:%d", otlp.DefaultCollectorHost, otlp.DefaultCollectorPort), + serviceConfig: DefaultServiceConfig, } for _, opt := range opts { opt(&cfg) } - d := &grpcDriver{} - d.connection = newGRPCConnection(cfg, d.handleNewConnection) + d := &driver{} + d.connection = newConnection(cfg, d.handleNewConnection) return d } -func (d *grpcDriver) handleNewConnection(cc *grpc.ClientConn) { +func (d *driver) handleNewConnection(cc *grpc.ClientConn) { d.lock.Lock() defer d.lock.Unlock() if cc != nil { @@ -63,16 +71,22 @@ func (d *grpcDriver) handleNewConnection(cc *grpc.ClientConn) { } } -func (d *grpcDriver) Start(ctx context.Context) error { +// Start implements otlp.ProtocolDriver. It establishes a connection +// to the collector. +func (d *driver) Start(ctx context.Context) error { d.connection.startConnection(ctx) return nil } -func (d *grpcDriver) Stop(ctx context.Context) error { +// Stop implements otlp.ProtocolDriver. It shuts down the connection +// to the collector. +func (d *driver) Stop(ctx context.Context) error { return d.connection.shutdown(ctx) } -func (d *grpcDriver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error { +// ExportMetrics implements otlp.ProtocolDriver. It transforms metrics +// to protobuf binary format and sends the result to the collector. +func (d *driver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error { if !d.connection.connected() { return errDisconnected } @@ -90,7 +104,7 @@ func (d *grpcDriver) ExportMetrics(ctx context.Context, cps metricsdk.Checkpoint return d.uploadMetrics(ctx, rms) } -func (d *grpcDriver) uploadMetrics(ctx context.Context, protoMetrics []*metricpb.ResourceMetrics) error { +func (d *driver) uploadMetrics(ctx context.Context, protoMetrics []*metricpb.ResourceMetrics) error { ctx = d.connection.contextWithMetadata(ctx) err := func() error { d.lock.Lock() @@ -109,7 +123,9 @@ func (d *grpcDriver) uploadMetrics(ctx context.Context, protoMetrics []*metricpb return err } -func (d *grpcDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error { +// ExportTraces implements otlp.ProtocolDriver. It transforms spans to +// protobuf binary format and sends the result to the collector. +func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error { if !d.connection.connected() { return errDisconnected } @@ -124,7 +140,7 @@ func (d *grpcDriver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapsh return d.uploadTraces(ctx, protoSpans) } -func (d *grpcDriver) uploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) error { +func (d *driver) uploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) error { ctx = d.connection.contextWithMetadata(ctx) err := func() error { d.lock.Lock() diff --git a/exporters/otlp/example_test.go b/exporters/otlp/otlpgrpc/example_test.go similarity index 93% rename from exporters/otlp/example_test.go rename to exporters/otlp/otlpgrpc/example_test.go index c6cc4bfa115..d77e120eafd 100644 --- a/exporters/otlp/example_test.go +++ b/exporters/otlp/otlpgrpc/example_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package otlp_test +package otlpgrpc_test import ( "context" @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp" + "go.opentelemetry.io/otel/exporters/otlp/otlpgrpc" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/metric/controller/push" "go.opentelemetry.io/otel/sdk/metric/processor/basic" @@ -33,7 +34,7 @@ import ( func Example_insecure() { ctx := context.Background() - driver := otlp.NewGRPCDriver(otlp.WithInsecure()) + driver := otlpgrpc.NewDriver(otlpgrpc.WithInsecure()) exp, err := otlp.NewExporter(ctx, driver) if err != nil { log.Fatalf("Failed to create the collector exporter: %v", err) @@ -86,7 +87,7 @@ func Example_withTLS() { } ctx := context.Background() - driver := otlp.NewGRPCDriver(otlp.WithTLSCredentials(creds)) + driver := otlpgrpc.NewDriver(otlpgrpc.WithTLSCredentials(creds)) exp, err := otlp.NewExporter(ctx, driver) if err != nil { log.Fatalf("failed to create the collector exporter: %v", err) @@ -133,13 +134,13 @@ func Example_withTLS() { func Example_withDifferentSignalCollectors() { // Set different endpoints for the metrics and traces collectors - metricsDriver := otlp.NewGRPCDriver( - otlp.WithInsecure(), - otlp.WithEndpoint("localhost:30080"), + metricsDriver := otlpgrpc.NewDriver( + otlpgrpc.WithInsecure(), + otlpgrpc.WithEndpoint("localhost:30080"), ) - tracesDriver := otlp.NewGRPCDriver( - otlp.WithInsecure(), - otlp.WithEndpoint("localhost:30082"), + tracesDriver := otlpgrpc.NewDriver( + otlpgrpc.WithInsecure(), + otlpgrpc.WithEndpoint("localhost:30082"), ) splitCfg := otlp.SplitConfig{ ForMetrics: metricsDriver, diff --git a/exporters/otlp/mock_collector_test.go b/exporters/otlp/otlpgrpc/mock_collector_test.go similarity index 67% rename from exporters/otlp/mock_collector_test.go rename to exporters/otlp/otlpgrpc/mock_collector_test.go index 0c46375a62d..6e5bd5c33b7 100644 --- a/exporters/otlp/mock_collector_test.go +++ b/exporters/otlp/otlpgrpc/mock_collector_test.go @@ -12,13 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -package otlp_test +package otlpgrpc_test import ( "context" "fmt" "net" - "sort" "sync" "testing" "time" @@ -28,121 +27,73 @@ import ( collectormetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1" collectortracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1" - commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1" metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1" - resourcepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/resource/v1" tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1" + "go.opentelemetry.io/otel/exporters/otlp/internal/otlptest" ) func makeMockCollector(t *testing.T) *mockCollector { return &mockCollector{ t: t, traceSvc: &mockTraceService{ - rsm: map[string]*tracepb.ResourceSpans{}, + storage: otlptest.NewSpansStorage(), + }, + metricSvc: &mockMetricService{ + storage: otlptest.NewMetricsStorage(), }, - metricSvc: &mockMetricService{}, } } type mockTraceService struct { mu sync.RWMutex - rsm map[string]*tracepb.ResourceSpans + storage otlptest.SpansStorage headers metadata.MD } func (mts *mockTraceService) getHeaders() metadata.MD { + mts.mu.RLock() + defer mts.mu.RUnlock() return mts.headers } func (mts *mockTraceService) getSpans() []*tracepb.Span { mts.mu.RLock() defer mts.mu.RUnlock() - spans := []*tracepb.Span{} - for _, rs := range mts.rsm { - spans = append(spans, rs.InstrumentationLibrarySpans[0].Spans...) - } - return spans + return mts.storage.GetSpans() } func (mts *mockTraceService) getResourceSpans() []*tracepb.ResourceSpans { mts.mu.RLock() defer mts.mu.RUnlock() - rss := make([]*tracepb.ResourceSpans, 0, len(mts.rsm)) - for _, rs := range mts.rsm { - rss = append(rss, rs) - } - return rss + return mts.storage.GetResourceSpans() } func (mts *mockTraceService) Export(ctx context.Context, exp *collectortracepb.ExportTraceServiceRequest) (*collectortracepb.ExportTraceServiceResponse, error) { + reply := &collectortracepb.ExportTraceServiceResponse{} mts.mu.Lock() - mts.headers, _ = metadata.FromIncomingContext(ctx) defer mts.mu.Unlock() - rss := exp.GetResourceSpans() - for _, rs := range rss { - rstr := resourceString(rs.Resource) - existingRs, ok := mts.rsm[rstr] - if !ok { - mts.rsm[rstr] = rs - // TODO (rghetia): Add support for library Info. - if len(rs.InstrumentationLibrarySpans) == 0 { - rs.InstrumentationLibrarySpans = []*tracepb.InstrumentationLibrarySpans{ - { - Spans: []*tracepb.Span{}, - }, - } - } - } else { - if len(rs.InstrumentationLibrarySpans) > 0 { - existingRs.InstrumentationLibrarySpans[0].Spans = - append(existingRs.InstrumentationLibrarySpans[0].Spans, - rs.InstrumentationLibrarySpans[0].GetSpans()...) - } - } - } - return &collectortracepb.ExportTraceServiceResponse{}, nil -} - -func resourceString(res *resourcepb.Resource) string { - sAttrs := sortedAttributes(res.GetAttributes()) - rstr := "" - for _, attr := range sAttrs { - rstr = rstr + attr.String() - - } - return rstr -} - -func sortedAttributes(attrs []*commonpb.KeyValue) []*commonpb.KeyValue { - sort.Slice(attrs[:], func(i, j int) bool { - return attrs[i].Key < attrs[j].Key - }) - return attrs + mts.headers, _ = metadata.FromIncomingContext(ctx) + mts.storage.AddSpans(exp) + return reply, nil } type mockMetricService struct { mu sync.RWMutex - metrics []*metricpb.Metric + storage otlptest.MetricsStorage } func (mms *mockMetricService) getMetrics() []*metricpb.Metric { - // copy in order to not change. - m := make([]*metricpb.Metric, 0, len(mms.metrics)) mms.mu.RLock() defer mms.mu.RUnlock() - return append(m, mms.metrics...) + return mms.storage.GetMetrics() } func (mms *mockMetricService) Export(ctx context.Context, exp *collectormetricpb.ExportMetricsServiceRequest) (*collectormetricpb.ExportMetricsServiceResponse, error) { + reply := &collectormetricpb.ExportMetricsServiceResponse{} mms.mu.Lock() - for _, rm := range exp.GetResourceMetrics() { - // TODO (rghetia) handle multiple resource and library info. - if len(rm.InstrumentationLibraryMetrics) > 0 { - mms.metrics = append(mms.metrics, rm.InstrumentationLibraryMetrics[0].Metrics...) - } - } - mms.mu.Unlock() - return &collectormetricpb.ExportMetricsServiceResponse{}, nil + defer mms.mu.Unlock() + mms.storage.AddMetrics(exp) + return reply, nil } type mockCollector struct { @@ -191,6 +142,10 @@ func (mc *mockCollector) stop() error { return err } +func (mc *mockCollector) Stop() error { + return mc.stop() +} + func (mc *mockCollector) getSpans() []*tracepb.Span { return mc.traceSvc.getSpans() } @@ -199,6 +154,10 @@ func (mc *mockCollector) getResourceSpans() []*tracepb.ResourceSpans { return mc.traceSvc.getResourceSpans() } +func (mc *mockCollector) GetResourceSpans() []*tracepb.ResourceSpans { + return mc.getResourceSpans() +} + func (mc *mockCollector) getHeaders() metadata.MD { return mc.traceSvc.getHeaders() } @@ -207,6 +166,10 @@ func (mc *mockCollector) getMetrics() []*metricpb.Metric { return mc.metricSvc.getMetrics() } +func (mc *mockCollector) GetMetrics() []*metricpb.Metric { + return mc.getMetrics() +} + // runMockCollector is a helper function to create a mock Collector func runMockCollector(t *testing.T) *mockCollector { return runMockCollectorAtEndpoint(t, "localhost:0") diff --git a/exporters/otlp/grpcoptions.go b/exporters/otlp/otlpgrpc/options.go similarity index 73% rename from exporters/otlp/grpcoptions.go rename to exporters/otlp/otlpgrpc/options.go index bc8d9973f20..1700b5b8ee5 100644 --- a/exporters/otlp/grpcoptions.go +++ b/exporters/otlp/otlpgrpc/options.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package otlp // import "go.opentelemetry.io/otel/exporters/otlp" +package otlpgrpc import ( "time" @@ -22,7 +22,7 @@ import ( ) const ( - // DefaultGRPCServiceConfig is the gRPC service config used if none is + // DefaultServiceConfig is the gRPC service config used if none is // provided by the user. // // For more info on gRPC service configs: @@ -34,7 +34,7 @@ const ( // Note: MaxAttempts > 5 are treated as 5. See // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#validation-of-retrypolicy // for more details. - DefaultGRPCServiceConfig = `{ + DefaultServiceConfig = `{ "methodConfig":[{ "name":[ { "service":"opentelemetry.proto.collector.metrics.v1.MetricsService" }, @@ -46,7 +46,6 @@ const ( "MaxBackoff":"5s", "BackoffMultiplier":2, "RetryableStatusCodes":[ - "UNAVAILABLE", "CANCELLED", "DEADLINE_EXCEEDED", "RESOURCE_EXHAUSTED", @@ -60,24 +59,25 @@ const ( }` ) -type grpcConnectionConfig struct { +type config struct { canDialInsecure bool collectorEndpoint string compressor string reconnectionPeriod time.Duration - grpcServiceConfig string - grpcDialOptions []grpc.DialOption + serviceConfig string + dialOptions []grpc.DialOption headers map[string]string clientCredentials credentials.TransportCredentials } -type GRPCConnectionOption func(cfg *grpcConnectionConfig) +// Option applies an option to the gRPC driver. +type Option func(cfg *config) // WithInsecure disables client transport security for the exporter's gRPC connection // just like grpc.WithInsecure() https://pkg.go.dev/google.golang.org/grpc#WithInsecure // does. Note, by default, client security is required unless WithInsecure is used. -func WithInsecure() GRPCConnectionOption { - return func(cfg *grpcConnectionConfig) { +func WithInsecure() Option { + return func(cfg *config) { cfg.canDialInsecure = true } } @@ -85,16 +85,16 @@ func WithInsecure() GRPCConnectionOption { // WithEndpoint allows one to set the endpoint that the exporter will // connect to the collector on. If unset, it will instead try to use // connect to DefaultCollectorHost:DefaultCollectorPort. -func WithEndpoint(endpoint string) GRPCConnectionOption { - return func(cfg *grpcConnectionConfig) { +func WithEndpoint(endpoint string) Option { + return func(cfg *config) { cfg.collectorEndpoint = endpoint } } // WithReconnectionPeriod allows one to set the delay between next connection attempt // after failing to connect with the collector. -func WithReconnectionPeriod(rp time.Duration) GRPCConnectionOption { - return func(cfg *grpcConnectionConfig) { +func WithReconnectionPeriod(rp time.Duration) Option { + return func(cfg *config) { cfg.reconnectionPeriod = rp } } @@ -104,15 +104,15 @@ func WithReconnectionPeriod(rp time.Duration) GRPCConnectionOption { // with google.golang.org/grpc/encoding. This can be done by encoding.RegisterCompressor. Some // compressors auto-register on import, such as gzip, which can be registered by calling // `import _ "google.golang.org/grpc/encoding/gzip"` -func WithCompressor(compressor string) GRPCConnectionOption { - return func(cfg *grpcConnectionConfig) { +func WithCompressor(compressor string) Option { + return func(cfg *config) { cfg.compressor = compressor } } // WithHeaders will send the provided headers with gRPC requests -func WithHeaders(headers map[string]string) GRPCConnectionOption { - return func(cfg *grpcConnectionConfig) { +func WithHeaders(headers map[string]string) Option { + return func(cfg *config) { cfg.headers = headers } } @@ -122,24 +122,24 @@ func WithHeaders(headers map[string]string) GRPCConnectionOption { // of say a Certificate file or a tls.Certificate, because the retrieving // these credentials can be done in many ways e.g. plain file, in code tls.Config // or by certificate rotation, so it is up to the caller to decide what to use. -func WithTLSCredentials(creds credentials.TransportCredentials) GRPCConnectionOption { - return func(cfg *grpcConnectionConfig) { +func WithTLSCredentials(creds credentials.TransportCredentials) Option { + return func(cfg *config) { cfg.clientCredentials = creds } } -// WithGRPCServiceConfig defines the default gRPC service config used. -func WithGRPCServiceConfig(serviceConfig string) GRPCConnectionOption { - return func(cfg *grpcConnectionConfig) { - cfg.grpcServiceConfig = serviceConfig +// WithServiceConfig defines the default gRPC service config used. +func WithServiceConfig(serviceConfig string) Option { + return func(cfg *config) { + cfg.serviceConfig = serviceConfig } } -// WithGRPCDialOption opens support to any grpc.DialOption to be used. If it conflicts +// WithDialOption opens support to any grpc.DialOption to be used. If it conflicts // with some other configuration the GRPC specified via the collector the ones here will // take preference since they are set last. -func WithGRPCDialOption(opts ...grpc.DialOption) GRPCConnectionOption { - return func(cfg *grpcConnectionConfig) { - cfg.grpcDialOptions = opts +func WithDialOption(opts ...grpc.DialOption) Option { + return func(cfg *config) { + cfg.dialOptions = opts } } diff --git a/exporters/otlp/otlpgrpc/otlp_integration_test.go b/exporters/otlp/otlpgrpc/otlp_integration_test.go new file mode 100644 index 00000000000..5cd647c7726 --- /dev/null +++ b/exporters/otlp/otlpgrpc/otlp_integration_test.go @@ -0,0 +1,498 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlpgrpc_test + +import ( + "context" + "fmt" + "net" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "google.golang.org/grpc" + "google.golang.org/grpc/encoding/gzip" + + "go.opentelemetry.io/otel/exporters/otlp" + commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1" + "go.opentelemetry.io/otel/exporters/otlp/internal/otlptest" + "go.opentelemetry.io/otel/exporters/otlp/otlpgrpc" + "go.opentelemetry.io/otel/label" + exporttrace "go.opentelemetry.io/otel/sdk/export/trace" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +func TestNewExporter_endToEnd(t *testing.T) { + tests := []struct { + name string + additionalOpts []otlpgrpc.Option + }{ + { + name: "StandardExporter", + }, + { + name: "WithCompressor", + additionalOpts: []otlpgrpc.Option{ + otlpgrpc.WithCompressor(gzip.Name), + }, + }, + { + name: "WithServiceConfig", + additionalOpts: []otlpgrpc.Option{ + otlpgrpc.WithServiceConfig("{}"), + }, + }, + { + name: "WithDialOptions", + additionalOpts: []otlpgrpc.Option{ + otlpgrpc.WithDialOption(grpc.WithBlock()), + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + newExporterEndToEndTest(t, test.additionalOpts) + }) + } +} + +func newGRPCExporter(t *testing.T, ctx context.Context, endpoint string, additionalOpts ...otlpgrpc.Option) *otlp.Exporter { + opts := []otlpgrpc.Option{ + otlpgrpc.WithInsecure(), + otlpgrpc.WithEndpoint(endpoint), + otlpgrpc.WithReconnectionPeriod(50 * time.Millisecond), + } + + opts = append(opts, additionalOpts...) + driver := otlpgrpc.NewDriver(opts...) + exp, err := otlp.NewExporter(ctx, driver) + if err != nil { + t.Fatalf("failed to create a new collector exporter: %v", err) + } + return exp +} + +func newExporterEndToEndTest(t *testing.T, additionalOpts []otlpgrpc.Option) { + mc := runMockCollectorAtEndpoint(t, "localhost:56561") + + defer func() { + _ = mc.stop() + }() + + <-time.After(5 * time.Millisecond) + + ctx := context.Background() + exp := newGRPCExporter(t, ctx, mc.endpoint, additionalOpts...) + defer func() { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + if err := exp.Shutdown(ctx); err != nil { + panic(err) + } + }() + + otlptest.RunEndToEndTest(ctx, t, exp, mc, mc) +} + +func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) { + mc := runMockCollector(t) + defer func() { + _ = mc.stop() + }() + + ctx := context.Background() + exp := newGRPCExporter(t, ctx, mc.endpoint) + defer func() { + if err := exp.Shutdown(ctx); err != nil { + panic(err) + } + }() + + // Invoke Start numerous times, should return errAlreadyStarted + for i := 0; i < 10; i++ { + if err := exp.Start(ctx); err == nil || !strings.Contains(err.Error(), "already started") { + t.Fatalf("#%d unexpected Start error: %v", i, err) + } + } + + if err := exp.Shutdown(ctx); err != nil { + t.Fatalf("failed to Shutdown the exporter: %v", err) + } + // Invoke Shutdown numerous times + for i := 0; i < 10; i++ { + if err := exp.Shutdown(ctx); err != nil { + t.Fatalf(`#%d got error (%v) expected none`, i, err) + } + } +} + +func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { + mc := runMockCollector(t) + + reconnectionPeriod := 20 * time.Millisecond + ctx := context.Background() + exp := newGRPCExporter(t, ctx, mc.endpoint, + otlpgrpc.WithReconnectionPeriod(reconnectionPeriod)) + defer func() { + _ = exp.Shutdown(ctx) + }() + + // We'll now stop the collector right away to simulate a connection + // dying in the midst of communication or even not existing before. + _ = mc.stop() + + // In the test below, we'll stop the collector many times, + // while exporting traces and test to ensure that we can + // reconnect. + for j := 0; j < 3; j++ { + + // No endpoint up. + require.Error( + t, + exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "in the midst"}}), + "transport: Error while dialing dial tcp %s: connect: connection refused", + mc.endpoint, + ) + + // Now resurrect the collector by making a new one but reusing the + // old endpoint, and the collector should reconnect automatically. + nmc := runMockCollectorAtEndpoint(t, mc.endpoint) + + // Give the exporter sometime to reconnect + <-time.After(reconnectionPeriod * 4) + + n := 10 + for i := 0; i < n; i++ { + require.NoError(t, exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "Resurrected"}})) + } + + nmaSpans := nmc.getSpans() + // Expecting 10 SpanSnapshots that were sampled, given that + if g, w := len(nmaSpans), n; g != w { + t.Fatalf("Round #%d: Connected collector: spans: got %d want %d", j, g, w) + } + + dSpans := mc.getSpans() + // Expecting 0 spans to have been received by the original but now dead collector + if g, w := len(dSpans), 0; g != w { + t.Fatalf("Round #%d: Disconnected collector: spans: got %d want %d", j, g, w) + } + _ = nmc.stop() + } +} + +// This test takes a long time to run: to skip it, run tests using: -short +func TestNewExporter_collectorOnBadConnection(t *testing.T) { + if testing.Short() { + t.Skipf("Skipping this long running test") + } + + ln, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Failed to grab an available port: %v", err) + } + // Firstly close the "collector's" channel: optimistically this endpoint won't get reused ASAP + // However, our goal of closing it is to simulate an unavailable connection + _ = ln.Close() + + _, collectorPortStr, _ := net.SplitHostPort(ln.Addr().String()) + + endpoint := fmt.Sprintf("localhost:%s", collectorPortStr) + ctx := context.Background() + exp := newGRPCExporter(t, ctx, endpoint) + _ = exp.Shutdown(ctx) +} + +func TestNewExporter_withEndpoint(t *testing.T) { + mc := runMockCollector(t) + defer func() { + _ = mc.stop() + }() + + ctx := context.Background() + exp := newGRPCExporter(t, ctx, mc.endpoint) + _ = exp.Shutdown(ctx) +} + +func TestNewExporter_withHeaders(t *testing.T) { + mc := runMockCollector(t) + defer func() { + _ = mc.stop() + }() + + ctx := context.Background() + exp := newGRPCExporter(t, ctx, mc.endpoint, + otlpgrpc.WithHeaders(map[string]string{"header1": "value1"})) + require.NoError(t, exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "in the midst"}})) + + defer func() { + _ = exp.Shutdown(ctx) + }() + + headers := mc.getHeaders() + require.Len(t, headers.Get("header1"), 1) + assert.Equal(t, "value1", headers.Get("header1")[0]) +} + +func TestNewExporter_withMultipleAttributeTypes(t *testing.T) { + mc := runMockCollector(t) + + defer func() { + _ = mc.stop() + }() + + <-time.After(5 * time.Millisecond) + + ctx := context.Background() + exp := newGRPCExporter(t, ctx, mc.endpoint) + + defer func() { + _ = exp.Shutdown(ctx) + }() + + tp := sdktrace.NewTracerProvider( + sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), + sdktrace.WithBatcher( + exp, + // add following two options to ensure flush + sdktrace.WithBatchTimeout(5), + sdktrace.WithMaxExportBatchSize(10), + ), + ) + defer func() { _ = tp.Shutdown(ctx) }() + + tr := tp.Tracer("test-tracer") + testKvs := []label.KeyValue{ + label.Int("Int", 1), + label.Int32("Int32", int32(2)), + label.Int64("Int64", int64(3)), + label.Float32("Float32", float32(1.11)), + label.Float64("Float64", 2.22), + label.Bool("Bool", true), + label.String("String", "test"), + } + _, span := tr.Start(ctx, "AlwaysSample") + span.SetAttributes(testKvs...) + span.End() + + // Flush and close. + func() { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + if err := tp.Shutdown(ctx); err != nil { + t.Fatalf("failed to shut down a tracer provider: %v", err) + } + }() + + // Wait >2 cycles. + <-time.After(40 * time.Millisecond) + + // Now shutdown the exporter + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + if err := exp.Shutdown(ctx); err != nil { + t.Fatalf("failed to stop the exporter: %v", err) + } + + // Shutdown the collector too so that we can begin + // verification checks of expected data back. + _ = mc.stop() + + // Now verify that we only got one span + rss := mc.getSpans() + if got, want := len(rss), 1; got != want { + t.Fatalf("resource span count: got %d, want %d\n", got, want) + } + + expected := []*commonpb.KeyValue{ + { + Key: "Int", + Value: &commonpb.AnyValue{ + Value: &commonpb.AnyValue_IntValue{ + IntValue: 1, + }, + }, + }, + { + Key: "Int32", + Value: &commonpb.AnyValue{ + Value: &commonpb.AnyValue_IntValue{ + IntValue: 2, + }, + }, + }, + { + Key: "Int64", + Value: &commonpb.AnyValue{ + Value: &commonpb.AnyValue_IntValue{ + IntValue: 3, + }, + }, + }, + { + Key: "Float32", + Value: &commonpb.AnyValue{ + Value: &commonpb.AnyValue_DoubleValue{ + DoubleValue: 1.11, + }, + }, + }, + { + Key: "Float64", + Value: &commonpb.AnyValue{ + Value: &commonpb.AnyValue_DoubleValue{ + DoubleValue: 2.22, + }, + }, + }, + { + Key: "Bool", + Value: &commonpb.AnyValue{ + Value: &commonpb.AnyValue_BoolValue{ + BoolValue: true, + }, + }, + }, + { + Key: "String", + Value: &commonpb.AnyValue{ + Value: &commonpb.AnyValue_StringValue{ + StringValue: "test", + }, + }, + }, + } + + // Verify attributes + if !assert.Len(t, rss[0].Attributes, len(expected)) { + t.Fatalf("attributes count: got %d, want %d\n", len(rss[0].Attributes), len(expected)) + } + for i, actual := range rss[0].Attributes { + if a, ok := actual.Value.Value.(*commonpb.AnyValue_DoubleValue); ok { + e, ok := expected[i].Value.Value.(*commonpb.AnyValue_DoubleValue) + if !ok { + t.Errorf("expected AnyValue_DoubleValue, got %T", expected[i].Value.Value) + continue + } + if !assert.InDelta(t, e.DoubleValue, a.DoubleValue, 0.01) { + continue + } + e.DoubleValue = a.DoubleValue + } + assert.Equal(t, expected[i], actual) + } +} + +func TestDisconnected(t *testing.T) { + ctx := context.Background() + // The endpoint is whatever, we want to be disconnected. But we + // setting a blocking connection, so dialing to the invalid + // endpoint actually fails. + exp := newGRPCExporter(t, ctx, "invalid", + otlpgrpc.WithReconnectionPeriod(time.Hour), + otlpgrpc.WithDialOption( + grpc.WithBlock(), + grpc.FailOnNonTempDialError(true), + ), + ) + defer func() { + assert.NoError(t, exp.Shutdown(ctx)) + }() + + assert.Error(t, exp.Export(ctx, otlptest.OneRecordCheckpointSet{})) + assert.Error(t, exp.ExportSpans(ctx, otlptest.SingleSpanSnapshot())) +} + +func TestEmptyData(t *testing.T) { + mc := runMockCollectorAtEndpoint(t, "localhost:56561") + + defer func() { + _ = mc.stop() + }() + + <-time.After(5 * time.Millisecond) + + ctx := context.Background() + exp := newGRPCExporter(t, ctx, mc.endpoint) + defer func() { + assert.NoError(t, exp.Shutdown(ctx)) + }() + + assert.NoError(t, exp.ExportSpans(ctx, nil)) + assert.NoError(t, exp.Export(ctx, otlptest.EmptyCheckpointSet{})) +} + +func TestFailedMetricTransform(t *testing.T) { + mc := runMockCollectorAtEndpoint(t, "localhost:56561") + + defer func() { + _ = mc.stop() + }() + + <-time.After(5 * time.Millisecond) + + ctx := context.Background() + exp := newGRPCExporter(t, ctx, mc.endpoint) + defer func() { + assert.NoError(t, exp.Shutdown(ctx)) + }() + + assert.Error(t, exp.Export(ctx, otlptest.FailCheckpointSet{})) +} + +func TestMultiConnectionDriver(t *testing.T) { + mcTraces := runMockCollector(t) + mcMetrics := runMockCollector(t) + + defer func() { + _ = mcTraces.stop() + _ = mcMetrics.stop() + }() + + <-time.After(5 * time.Millisecond) + + commonOpts := []otlpgrpc.Option{ + otlpgrpc.WithInsecure(), + otlpgrpc.WithReconnectionPeriod(50 * time.Millisecond), + otlpgrpc.WithDialOption(grpc.WithBlock()), + } + optsTraces := append([]otlpgrpc.Option{ + otlpgrpc.WithEndpoint(mcTraces.endpoint), + }, commonOpts...) + optsMetrics := append([]otlpgrpc.Option{ + otlpgrpc.WithEndpoint(mcMetrics.endpoint), + }, commonOpts...) + + tracesDriver := otlpgrpc.NewDriver(optsTraces...) + metricsDriver := otlpgrpc.NewDriver(optsMetrics...) + splitCfg := otlp.SplitConfig{ + ForMetrics: metricsDriver, + ForTraces: tracesDriver, + } + driver := otlp.NewSplitDriver(splitCfg) + ctx := context.Background() + exp, err := otlp.NewExporter(ctx, driver) + if err != nil { + t.Fatalf("failed to create a new collector exporter: %v", err) + } + defer func() { + assert.NoError(t, exp.Shutdown(ctx)) + }() + otlptest.RunEndToEndTest(ctx, t, exp, mcTraces, mcMetrics) +} diff --git a/exporters/otlp/otlphttp/certificate_test.go b/exporters/otlp/otlphttp/certificate_test.go new file mode 100644 index 00000000000..04083232b0b --- /dev/null +++ b/exporters/otlp/otlphttp/certificate_test.go @@ -0,0 +1,92 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlphttp_test + +import ( + "bytes" + "crypto/ecdsa" + "crypto/elliptic" + cryptorand "crypto/rand" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "math/big" + mathrand "math/rand" + "net" + "time" +) + +type mathRandReader struct{} + +func (mathRandReader) Read(p []byte) (n int, err error) { + return mathrand.Read(p) +} + +var randReader mathRandReader + +type pemCertificate struct { + Certificate []byte + PrivateKey []byte +} + +// Based on https://golang.org/src/crypto/tls/generate_cert.go, +// simplified and weakened. +func generateWeakCertificate() (*pemCertificate, error) { + priv, err := ecdsa.GenerateKey(elliptic.P256(), randReader) + if err != nil { + return nil, err + } + keyUsage := x509.KeyUsageDigitalSignature + notBefore := time.Now() + notAfter := notBefore.Add(time.Hour) + serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128) + serialNumber, err := cryptorand.Int(randReader, serialNumberLimit) + if err != nil { + return nil, err + } + template := x509.Certificate{ + SerialNumber: serialNumber, + Subject: pkix.Name{ + Organization: []string{"otel-go"}, + }, + NotBefore: notBefore, + NotAfter: notAfter, + KeyUsage: keyUsage, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + BasicConstraintsValid: true, + DNSNames: []string{"localhost"}, + IPAddresses: []net.IP{net.IPv6loopback, net.IPv4(127, 0, 0, 1)}, + } + derBytes, err := x509.CreateCertificate(randReader, &template, &template, &priv.PublicKey, priv) + if err != nil { + return nil, err + } + certificateBuffer := new(bytes.Buffer) + if err := pem.Encode(certificateBuffer, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes}); err != nil { + return nil, err + } + privDERBytes, err := x509.MarshalPKCS8PrivateKey(priv) + if err != nil { + return nil, err + } + privBuffer := new(bytes.Buffer) + if err := pem.Encode(privBuffer, &pem.Block{Type: "PRIVATE KEY", Bytes: privDERBytes}); err != nil { + return nil, err + } + return &pemCertificate{ + Certificate: certificateBuffer.Bytes(), + PrivateKey: privBuffer.Bytes(), + }, nil +} diff --git a/exporters/otlp/otlphttp/doc.go b/exporters/otlp/otlphttp/doc.go new file mode 100644 index 00000000000..ee72adaeeca --- /dev/null +++ b/exporters/otlp/otlphttp/doc.go @@ -0,0 +1,24 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Package otlphttp implements a protocol driver that sends traces and +metrics to the collector using HTTP with binary protobuf payloads. + +This package is currently in a pre-GA phase. Backwards incompatible +changes may be introduced in subsequent minor version releases as we +work to track the evolving OpenTelemetry specification and user +feedback. +*/ +package otlphttp // import "go.opentelemetry.io/otel/exporters/otlp/otlphttp" diff --git a/exporters/otlp/otlphttp/driver.go b/exporters/otlp/otlphttp/driver.go new file mode 100644 index 00000000000..0de13bfd076 --- /dev/null +++ b/exporters/otlp/otlphttp/driver.go @@ -0,0 +1,291 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlphttp + +import ( + "bytes" + "compress/gzip" + "context" + "fmt" + "io" + "io/ioutil" + "math/rand" + "net" + "net/http" + "path" + "strings" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp" + colmetricspb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1" + coltracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1" + "go.opentelemetry.io/otel/exporters/otlp/internal/transform" + metricsdk "go.opentelemetry.io/otel/sdk/export/metric" + tracesdk "go.opentelemetry.io/otel/sdk/export/trace" +) + +const contentType = "application/x-protobuf" + +// Keep it in sync with golang's DefaultTransport from net/http! We +// have our own copy to avoid handling a situation where the +// DefaultTransport is overwritten with some different implementation +// of http.RoundTripper or it's modified by other package. +var ourTransport *http.Transport = &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: true, + }).DialContext, + ForceAttemptHTTP2: true, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, +} + +type driver struct { + client *http.Client + cfg config + + stopCh chan struct{} +} + +var _ otlp.ProtocolDriver = (*driver)(nil) + +// NewDriver creates a new HTTP driver. +func NewDriver(opts ...Option) otlp.ProtocolDriver { + cfg := config{ + endpoint: fmt.Sprintf("%s:%d", otlp.DefaultCollectorHost, otlp.DefaultCollectorPort), + compression: NoCompression, + tracesURLPath: DefaultTracesPath, + metricsURLPath: DefaultMetricsPath, + maxAttempts: DefaultMaxAttempts, + backoff: DefaultBackoff, + } + for _, opt := range opts { + opt.Apply(&cfg) + } + for pathPtr, defaultPath := range map[*string]string{ + &cfg.tracesURLPath: DefaultTracesPath, + &cfg.metricsURLPath: DefaultMetricsPath, + } { + tmp := strings.TrimSpace(*pathPtr) + if tmp == "" { + tmp = defaultPath + } else { + tmp = path.Clean(tmp) + if !path.IsAbs(tmp) { + tmp = fmt.Sprintf("/%s", tmp) + } + } + *pathPtr = tmp + } + if cfg.maxAttempts <= 0 { + cfg.maxAttempts = DefaultMaxAttempts + } + if cfg.maxAttempts > DefaultMaxAttempts { + cfg.maxAttempts = DefaultMaxAttempts + } + if cfg.backoff <= 0 { + cfg.backoff = DefaultBackoff + } + client := &http.Client{ + Transport: ourTransport, + } + if cfg.tlsCfg != nil { + transport := ourTransport.Clone() + transport.TLSClientConfig = cfg.tlsCfg + client.Transport = transport + } + return &driver{ + client: client, + cfg: cfg, + stopCh: make(chan struct{}), + } +} + +// Start implements otlp.ProtocolDriver. +func (d *driver) Start(ctx context.Context) error { + // nothing to do + return nil +} + +// Stop implements otlp.ProtocolDriver. +func (d *driver) Stop(ctx context.Context) error { + close(d.stopCh) + return nil +} + +// ExportMetrics implements otlp.ProtocolDriver. +func (d *driver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error { + rms, err := transform.CheckpointSet(ctx, selector, cps, 1) + if err != nil { + return err + } + if len(rms) == 0 { + return nil + } + pbRequest := &colmetricspb.ExportMetricsServiceRequest{ + ResourceMetrics: rms, + } + rawRequest, err := pbRequest.Marshal() + if err != nil { + return err + } + return d.send(ctx, rawRequest, d.cfg.metricsURLPath) +} + +// ExportTraces implements otlp.ProtocolDriver. +func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error { + protoSpans := transform.SpanData(ss) + if len(protoSpans) == 0 { + return nil + } + pbRequest := &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: protoSpans, + } + rawRequest, err := pbRequest.Marshal() + if err != nil { + return err + } + return d.send(ctx, rawRequest, d.cfg.tracesURLPath) +} + +func (d *driver) send(ctx context.Context, rawRequest []byte, urlPath string) error { + address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.endpoint, urlPath) + var cancel context.CancelFunc + ctx, cancel = d.contextWithStop(ctx) + defer cancel() + for i := 0; i < d.cfg.maxAttempts; i++ { + response, err := d.singleSend(ctx, rawRequest, address) + if err != nil { + return err + } + // We don't care about the body, so try to read it + // into /dev/null and close it immediately. The + // reading part is to facilitate connection reuse. + _, _ = io.Copy(ioutil.Discard, response.Body) + _ = response.Body.Close() + switch response.StatusCode { + case http.StatusOK: + return nil + case http.StatusTooManyRequests: + fallthrough + case http.StatusServiceUnavailable: + select { + case <-time.After(getWaitDuration(d.cfg.backoff, i)): + continue + case <-ctx.Done(): + return ctx.Err() + } + default: + return fmt.Errorf("Failed with HTTP status %s", response.Status) + } + } + return fmt.Errorf("Failed to send data to %s after %d tries", address, d.cfg.maxAttempts) +} + +func (d *driver) getScheme() string { + if d.cfg.insecure { + return "http" + } + return "https" +} + +func getWaitDuration(backoff time.Duration, i int) time.Duration { + // Strategy: after nth failed attempt, attempt resending after + // k * initialBackoff + jitter, where k is a random number in + // range [0, 2^n-1), and jitter is a random percentage of + // initialBackoff from [-5%, 5%). + // + // Based on + // https://en.wikipedia.org/wiki/Exponential_backoff#Example_exponential_backoff_algorithm + // + // Jitter is our addition. + + // There won't be an overflow, since i is capped to + // DefaultMaxAttempts (5). + upperK := (int64)(1) << (i + 1) + jitterPercent := (rand.Float64() - 0.5) / 10. + jitter := jitterPercent * (float64)(backoff) + k := rand.Int63n(upperK) + return (time.Duration)(k)*backoff + (time.Duration)(jitter) +} + +func (d *driver) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) { + // Unify the parent context Done signal with the driver's stop + // channel. + ctx, cancel := context.WithCancel(ctx) + go func(ctx context.Context, cancel context.CancelFunc) { + select { + case <-ctx.Done(): + // Nothing to do, either cancelled or deadline + // happened. + case <-d.stopCh: + cancel() + } + }(ctx, cancel) + return ctx, cancel +} + +func (d *driver) singleSend(ctx context.Context, rawRequest []byte, address string) (*http.Response, error) { + request, err := http.NewRequestWithContext(ctx, http.MethodPost, address, nil) + if err != nil { + return nil, err + } + bodyReader, contentLength, headers := d.prepareBody(rawRequest) + // Not closing bodyReader through defer, the HTTP Client's + // Transport will do it for us + request.Body = bodyReader + request.ContentLength = contentLength + for key, values := range headers { + for _, value := range values { + request.Header.Add(key, value) + } + } + return d.client.Do(request) +} + +func (d *driver) prepareBody(rawRequest []byte) (io.ReadCloser, int64, http.Header) { + var bodyReader io.ReadCloser + headers := http.Header{} + for k, v := range d.cfg.headers { + headers.Set(k, v) + } + contentLength := (int64)(len(rawRequest)) + headers.Set("Content-Type", contentType) + requestReader := bytes.NewBuffer(rawRequest) + switch d.cfg.compression { + case NoCompression: + bodyReader = ioutil.NopCloser(requestReader) + case GzipCompression: + preader, pwriter := io.Pipe() + go func() { + defer pwriter.Close() + gzipper := gzip.NewWriter(pwriter) + defer gzipper.Close() + _, err := io.Copy(gzipper, requestReader) + if err != nil { + otel.Handle(fmt.Errorf("otlphttp: failed to gzip request: %v", err)) + } + }() + headers.Set("Content-Encoding", "gzip") + bodyReader = preader + contentLength = -1 + } + return bodyReader, contentLength, headers +} diff --git a/exporters/otlp/otlphttp/driver_test.go b/exporters/otlp/otlphttp/driver_test.go new file mode 100644 index 00000000000..8bcfa245960 --- /dev/null +++ b/exporters/otlp/otlphttp/driver_test.go @@ -0,0 +1,418 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlphttp_test + +import ( + "context" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/exporters/otlp" + "go.opentelemetry.io/otel/exporters/otlp/internal/otlptest" + "go.opentelemetry.io/otel/exporters/otlp/otlphttp" +) + +const ( + relOtherMetricsPath = "post/metrics/here" + relOtherTracesPath = "post/traces/here" + otherMetricsPath = "/post/metrics/here" + otherTracesPath = "/post/traces/here" +) + +var ( + testHeaders = map[string]string{ + "Otel-Go-Key-1": "somevalue", + "Otel-Go-Key-2": "someothervalue", + } +) + +func TestEndToEnd(t *testing.T) { + tests := []struct { + name string + opts []otlphttp.Option + mcCfg mockCollectorConfig + tls bool + }{ + { + name: "no extra options", + opts: nil, + }, + { + name: "with gzip compression", + opts: []otlphttp.Option{ + otlphttp.WithCompression(otlphttp.GzipCompression), + }, + }, + { + name: "with empty paths (forced to defaults)", + opts: []otlphttp.Option{ + otlphttp.WithMetricsURLPath(""), + otlphttp.WithTracesURLPath(""), + }, + }, + { + name: "with different paths", + opts: []otlphttp.Option{ + otlphttp.WithMetricsURLPath(otherMetricsPath), + otlphttp.WithTracesURLPath(otherTracesPath), + }, + mcCfg: mockCollectorConfig{ + MetricsURLPath: otherMetricsPath, + TracesURLPath: otherTracesPath, + }, + }, + { + name: "with relative paths", + opts: []otlphttp.Option{ + otlphttp.WithMetricsURLPath(relOtherMetricsPath), + otlphttp.WithTracesURLPath(relOtherTracesPath), + }, + mcCfg: mockCollectorConfig{ + MetricsURLPath: otherMetricsPath, + TracesURLPath: otherTracesPath, + }, + }, + { + name: "with TLS", + opts: nil, + mcCfg: mockCollectorConfig{ + WithTLS: true, + }, + tls: true, + }, + { + name: "with extra headers", + opts: []otlphttp.Option{ + otlphttp.WithHeaders(testHeaders), + }, + mcCfg: mockCollectorConfig{ + ExpectedHeaders: testHeaders, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + mc := runMockCollector(t, tc.mcCfg) + defer mc.MustStop(t) + allOpts := []otlphttp.Option{ + otlphttp.WithEndpoint(mc.Endpoint()), + } + if tc.tls { + tlsConfig := mc.ClientTLSConfig() + require.NotNil(t, tlsConfig) + allOpts = append(allOpts, otlphttp.WithTLSClientConfig(tlsConfig)) + } else { + allOpts = append(allOpts, otlphttp.WithInsecure()) + } + allOpts = append(allOpts, tc.opts...) + driver := otlphttp.NewDriver(allOpts...) + ctx := context.Background() + exporter, err := otlp.NewExporter(ctx, driver) + if assert.NoError(t, err) { + defer func() { + assert.NoError(t, exporter.Shutdown(ctx)) + }() + otlptest.RunEndToEndTest(ctx, t, exporter, mc, mc) + } + }) + } +} + +func TestRetry(t *testing.T) { + statuses := []int{ + http.StatusTooManyRequests, + http.StatusServiceUnavailable, + } + mcCfg := mockCollectorConfig{ + InjectHTTPStatus: statuses, + } + mc := runMockCollector(t, mcCfg) + defer mc.MustStop(t) + driver := otlphttp.NewDriver( + otlphttp.WithEndpoint(mc.Endpoint()), + otlphttp.WithInsecure(), + otlphttp.WithMaxAttempts(len(statuses)+1), + ) + ctx := context.Background() + exporter, err := otlp.NewExporter(ctx, driver) + require.NoError(t, err) + defer func() { + assert.NoError(t, exporter.Shutdown(ctx)) + }() + err = exporter.ExportSpans(ctx, otlptest.SingleSpanSnapshot()) + assert.NoError(t, err) + assert.Len(t, mc.GetSpans(), 1) +} + +func TestRetryFailed(t *testing.T) { + statuses := []int{ + http.StatusTooManyRequests, + http.StatusServiceUnavailable, + } + mcCfg := mockCollectorConfig{ + InjectHTTPStatus: statuses, + } + mc := runMockCollector(t, mcCfg) + defer mc.MustStop(t) + driver := otlphttp.NewDriver( + otlphttp.WithEndpoint(mc.Endpoint()), + otlphttp.WithInsecure(), + otlphttp.WithMaxAttempts(1), + ) + ctx := context.Background() + exporter, err := otlp.NewExporter(ctx, driver) + require.NoError(t, err) + defer func() { + assert.NoError(t, exporter.Shutdown(ctx)) + }() + err = exporter.ExportSpans(ctx, otlptest.SingleSpanSnapshot()) + assert.Error(t, err) + assert.Empty(t, mc.GetSpans()) +} + +func TestNoRetry(t *testing.T) { + statuses := []int{ + http.StatusBadRequest, + } + mcCfg := mockCollectorConfig{ + InjectHTTPStatus: statuses, + } + mc := runMockCollector(t, mcCfg) + defer mc.MustStop(t) + driver := otlphttp.NewDriver( + otlphttp.WithEndpoint(mc.Endpoint()), + otlphttp.WithInsecure(), + otlphttp.WithMaxAttempts(len(statuses)+1), + ) + ctx := context.Background() + exporter, err := otlp.NewExporter(ctx, driver) + require.NoError(t, err) + defer func() { + assert.NoError(t, exporter.Shutdown(ctx)) + }() + err = exporter.ExportSpans(ctx, otlptest.SingleSpanSnapshot()) + assert.Error(t, err) + assert.Empty(t, mc.GetSpans()) +} + +func TestFailedCheckpoint(t *testing.T) { + mcCfg := mockCollectorConfig{} + mc := runMockCollector(t, mcCfg) + defer mc.MustStop(t) + driver := otlphttp.NewDriver( + otlphttp.WithEndpoint(mc.Endpoint()), + otlphttp.WithInsecure(), + ) + ctx := context.Background() + exporter, err := otlp.NewExporter(ctx, driver) + require.NoError(t, err) + defer func() { + assert.NoError(t, exporter.Shutdown(ctx)) + }() + err = exporter.Export(ctx, otlptest.FailCheckpointSet{}) + assert.Error(t, err) + assert.Empty(t, mc.GetMetrics()) +} + +func TestEmptyData(t *testing.T) { + mcCfg := mockCollectorConfig{} + mc := runMockCollector(t, mcCfg) + defer mc.MustStop(t) + driver := otlphttp.NewDriver( + otlphttp.WithEndpoint(mc.Endpoint()), + otlphttp.WithInsecure(), + ) + ctx := context.Background() + exporter, err := otlp.NewExporter(ctx, driver) + require.NoError(t, err) + defer func() { + assert.NoError(t, exporter.Shutdown(ctx)) + }() + err = exporter.Export(ctx, otlptest.EmptyCheckpointSet{}) + assert.NoError(t, err) + err = exporter.ExportSpans(ctx, nil) + assert.NoError(t, err) + assert.Empty(t, mc.GetMetrics()) + assert.Empty(t, mc.GetSpans()) +} + +func TestUnreasonableMaxAttempts(t *testing.T) { + // Max attempts is 5, we set collector to fail 7 times and try + // to configure max attempts to be either negative or too + // large. Since we set max attempts to 5 in such cases, + // exporting to the collector should fail. + type testcase struct { + name string + maxAttempts int + } + for _, tc := range []testcase{ + { + name: "negative max attempts", + maxAttempts: -3, + }, + { + name: "too large max attempts", + maxAttempts: 10, + }, + } { + t.Run(tc.name, func(t *testing.T) { + statuses := make([]int, 0, 7) + for i := 0; i < cap(statuses); i++ { + statuses = append(statuses, http.StatusTooManyRequests) + } + mcCfg := mockCollectorConfig{ + InjectHTTPStatus: statuses, + } + mc := runMockCollector(t, mcCfg) + defer mc.MustStop(t) + driver := otlphttp.NewDriver( + otlphttp.WithEndpoint(mc.Endpoint()), + otlphttp.WithInsecure(), + otlphttp.WithMaxAttempts(tc.maxAttempts), + otlphttp.WithBackoff(time.Millisecond), + ) + ctx := context.Background() + exporter, err := otlp.NewExporter(ctx, driver) + require.NoError(t, err) + defer func() { + assert.NoError(t, exporter.Shutdown(ctx)) + }() + err = exporter.ExportSpans(ctx, otlptest.SingleSpanSnapshot()) + assert.Error(t, err) + assert.Empty(t, mc.GetSpans()) + }) + } +} + +func TestUnreasonableBackoff(t *testing.T) { + // This sets backoff to negative value, which gets corrected + // to default backoff instead of being used. Default max + // attempts is 5, so we set the collector to fail 4 times, but + // we set the deadline to 3 times of the default backoff, so + // this should show that deadline is not met, meaning that the + // retries weren't immediate (as negative backoff could + // imply). + statuses := make([]int, 0, 4) + for i := 0; i < cap(statuses); i++ { + statuses = append(statuses, http.StatusTooManyRequests) + } + mcCfg := mockCollectorConfig{ + InjectHTTPStatus: statuses, + } + mc := runMockCollector(t, mcCfg) + defer mc.MustStop(t) + driver := otlphttp.NewDriver( + otlphttp.WithEndpoint(mc.Endpoint()), + otlphttp.WithInsecure(), + otlphttp.WithBackoff(-time.Millisecond), + ) + ctx, cancel := context.WithTimeout(context.Background(), 3*otlphttp.DefaultBackoff) + defer cancel() + exporter, err := otlp.NewExporter(ctx, driver) + require.NoError(t, err) + defer func() { + assert.NoError(t, exporter.Shutdown(ctx)) + }() + err = exporter.ExportSpans(ctx, otlptest.SingleSpanSnapshot()) + assert.Error(t, err) + assert.Empty(t, mc.GetSpans()) +} + +func TestCancelledContext(t *testing.T) { + mcCfg := mockCollectorConfig{} + mc := runMockCollector(t, mcCfg) + defer mc.MustStop(t) + driver := otlphttp.NewDriver( + otlphttp.WithEndpoint(mc.Endpoint()), + otlphttp.WithInsecure(), + ) + ctx, cancel := context.WithCancel(context.Background()) + exporter, err := otlp.NewExporter(ctx, driver) + require.NoError(t, err) + defer func() { + assert.NoError(t, exporter.Shutdown(ctx)) + }() + cancel() + err = exporter.ExportSpans(ctx, otlptest.SingleSpanSnapshot()) + assert.Error(t, err) + assert.Empty(t, mc.GetSpans()) +} + +func TestDeadlineContext(t *testing.T) { + statuses := make([]int, 0, 5) + for i := 0; i < cap(statuses); i++ { + statuses = append(statuses, http.StatusTooManyRequests) + } + mcCfg := mockCollectorConfig{ + InjectHTTPStatus: statuses, + } + mc := runMockCollector(t, mcCfg) + defer mc.MustStop(t) + driver := otlphttp.NewDriver( + otlphttp.WithEndpoint(mc.Endpoint()), + otlphttp.WithInsecure(), + otlphttp.WithBackoff(time.Minute), + ) + ctx := context.Background() + exporter, err := otlp.NewExporter(ctx, driver) + require.NoError(t, err) + defer func() { + assert.NoError(t, exporter.Shutdown(ctx)) + }() + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + err = exporter.ExportSpans(ctx, otlptest.SingleSpanSnapshot()) + assert.Error(t, err) + assert.Empty(t, mc.GetSpans()) +} + +func TestStopWhileExporting(t *testing.T) { + statuses := make([]int, 0, 5) + for i := 0; i < cap(statuses); i++ { + statuses = append(statuses, http.StatusTooManyRequests) + } + mcCfg := mockCollectorConfig{ + InjectHTTPStatus: statuses, + } + mc := runMockCollector(t, mcCfg) + defer mc.MustStop(t) + driver := otlphttp.NewDriver( + otlphttp.WithEndpoint(mc.Endpoint()), + otlphttp.WithInsecure(), + otlphttp.WithBackoff(time.Minute), + ) + ctx := context.Background() + exporter, err := otlp.NewExporter(ctx, driver) + require.NoError(t, err) + defer func() { + assert.NoError(t, exporter.Shutdown(ctx)) + }() + doneCh := make(chan struct{}) + go func() { + err := exporter.ExportSpans(ctx, otlptest.SingleSpanSnapshot()) + assert.Error(t, err) + assert.Empty(t, mc.GetSpans()) + close(doneCh) + }() + <-time.After(time.Second) + err = exporter.Shutdown(ctx) + assert.NoError(t, err) + <-doneCh +} diff --git a/exporters/otlp/otlphttp/mock_collector_test.go b/exporters/otlp/otlphttp/mock_collector_test.go new file mode 100644 index 00000000000..99c384e461c --- /dev/null +++ b/exporters/otlp/otlphttp/mock_collector_test.go @@ -0,0 +1,272 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlphttp_test + +import ( + "bytes" + "compress/gzip" + "context" + "crypto/tls" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + collectormetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1" + collectortracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1" + metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1" + tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1" + "go.opentelemetry.io/otel/exporters/otlp/internal/otlptest" + "go.opentelemetry.io/otel/exporters/otlp/otlphttp" +) + +type mockCollector struct { + endpoint string + server *http.Server + + spanLock sync.Mutex + spansStorage otlptest.SpansStorage + + metricLock sync.Mutex + metricsStorage otlptest.MetricsStorage + + injectHTTPStatus []int + injectContentType string + + clientTLSConfig *tls.Config + expectedHeaders map[string]string +} + +func (c *mockCollector) Stop() error { + return c.server.Shutdown(context.Background()) +} + +func (c *mockCollector) MustStop(t *testing.T) { + assert.NoError(t, c.server.Shutdown(context.Background())) +} + +func (c *mockCollector) GetSpans() []*tracepb.Span { + c.spanLock.Lock() + defer c.spanLock.Unlock() + return c.spansStorage.GetSpans() +} + +func (c *mockCollector) GetResourceSpans() []*tracepb.ResourceSpans { + c.spanLock.Lock() + defer c.spanLock.Unlock() + return c.spansStorage.GetResourceSpans() +} + +func (c *mockCollector) GetMetrics() []*metricpb.Metric { + c.metricLock.Lock() + defer c.metricLock.Unlock() + return c.metricsStorage.GetMetrics() +} + +func (c *mockCollector) Endpoint() string { + return c.endpoint +} + +func (c *mockCollector) ClientTLSConfig() *tls.Config { + return c.clientTLSConfig +} + +func (c *mockCollector) serveMetrics(w http.ResponseWriter, r *http.Request) { + if !c.checkHeaders(r) { + w.WriteHeader(http.StatusBadRequest) + return + } + response := collectormetricpb.ExportMetricsServiceResponse{} + rawResponse, err := response.Marshal() + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + if injectedStatus := c.getInjectHTTPStatus(); injectedStatus != 0 { + writeReply(w, rawResponse, injectedStatus, c.injectContentType) + return + } + rawRequest, err := readRequest(r) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + request := collectormetricpb.ExportMetricsServiceRequest{} + if err := request.Unmarshal(rawRequest); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + writeReply(w, rawResponse, 0, c.injectContentType) + c.metricLock.Lock() + defer c.metricLock.Unlock() + c.metricsStorage.AddMetrics(&request) +} + +func (c *mockCollector) serveTraces(w http.ResponseWriter, r *http.Request) { + if !c.checkHeaders(r) { + w.WriteHeader(http.StatusBadRequest) + return + } + response := collectortracepb.ExportTraceServiceResponse{} + rawResponse, err := response.Marshal() + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + if injectedStatus := c.getInjectHTTPStatus(); injectedStatus != 0 { + writeReply(w, rawResponse, injectedStatus, c.injectContentType) + return + } + rawRequest, err := readRequest(r) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + request := collectortracepb.ExportTraceServiceRequest{} + if err := request.Unmarshal(rawRequest); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + writeReply(w, rawResponse, 0, c.injectContentType) + c.spanLock.Lock() + defer c.spanLock.Unlock() + c.spansStorage.AddSpans(&request) +} + +func (c *mockCollector) checkHeaders(r *http.Request) bool { + for k, v := range c.expectedHeaders { + got := r.Header.Get(k) + if got != v { + return false + } + } + return true +} + +func (c *mockCollector) getInjectHTTPStatus() int { + if len(c.injectHTTPStatus) == 0 { + return 0 + } + status := c.injectHTTPStatus[0] + c.injectHTTPStatus = c.injectHTTPStatus[1:] + if len(c.injectHTTPStatus) == 0 { + c.injectHTTPStatus = nil + } + return status +} + +func readRequest(r *http.Request) ([]byte, error) { + if r.Header.Get("Content-Encoding") == "gzip" { + return readGzipBody(r.Body) + } + return ioutil.ReadAll(r.Body) +} + +func readGzipBody(body io.Reader) ([]byte, error) { + rawRequest := bytes.Buffer{} + gunzipper, err := gzip.NewReader(body) + if err != nil { + return nil, err + } + defer gunzipper.Close() + _, err = io.Copy(&rawRequest, gunzipper) + if err != nil { + return nil, err + } + return rawRequest.Bytes(), nil +} + +func writeReply(w http.ResponseWriter, rawResponse []byte, injectHTTPStatus int, injectContentType string) { + status := http.StatusOK + if injectHTTPStatus != 0 { + status = injectHTTPStatus + } + contentType := "application/x-protobuf" + if injectContentType != "" { + contentType = injectContentType + } + w.Header().Set("Content-Type", contentType) + w.WriteHeader(status) + _, _ = w.Write(rawResponse) +} + +type mockCollectorConfig struct { + MetricsURLPath string + TracesURLPath string + Port int + InjectHTTPStatus []int + InjectContentType string + WithTLS bool + ExpectedHeaders map[string]string +} + +func (c *mockCollectorConfig) fillInDefaults() { + if c.MetricsURLPath == "" { + c.MetricsURLPath = otlphttp.DefaultMetricsPath + } + if c.TracesURLPath == "" { + c.TracesURLPath = otlphttp.DefaultTracesPath + } +} + +func runMockCollector(t *testing.T, cfg mockCollectorConfig) *mockCollector { + cfg.fillInDefaults() + ln, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", cfg.Port)) + require.NoError(t, err) + _, portStr, err := net.SplitHostPort(ln.Addr().String()) + require.NoError(t, err) + m := &mockCollector{ + endpoint: fmt.Sprintf("localhost:%s", portStr), + spansStorage: otlptest.NewSpansStorage(), + metricsStorage: otlptest.NewMetricsStorage(), + injectHTTPStatus: cfg.InjectHTTPStatus, + injectContentType: cfg.InjectContentType, + expectedHeaders: cfg.ExpectedHeaders, + } + mux := http.NewServeMux() + mux.Handle(cfg.MetricsURLPath, http.HandlerFunc(m.serveMetrics)) + mux.Handle(cfg.TracesURLPath, http.HandlerFunc(m.serveTraces)) + server := &http.Server{ + Handler: mux, + } + if cfg.WithTLS { + pem, err := generateWeakCertificate() + require.NoError(t, err) + tlsCertificate, err := tls.X509KeyPair(pem.Certificate, pem.PrivateKey) + require.NoError(t, err) + server.TLSConfig = &tls.Config{ + Certificates: []tls.Certificate{tlsCertificate}, + } + + m.clientTLSConfig = &tls.Config{ + InsecureSkipVerify: true, + } + } + go func() { + if cfg.WithTLS { + _ = server.ServeTLS(ln, "", "") + } else { + _ = server.Serve(ln) + } + }() + m.server = server + return m +} diff --git a/exporters/otlp/otlphttp/options.go b/exporters/otlp/otlphttp/options.go new file mode 100644 index 00000000000..f80ee474874 --- /dev/null +++ b/exporters/otlp/otlphttp/options.go @@ -0,0 +1,180 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlphttp + +import ( + "crypto/tls" + "time" +) + +// Compression describes the compression used for payloads sent to the +// collector. +type Compression int + +const ( + // NoCompression tells the driver to send payloads without + // compression. + NoCompression Compression = iota + // GzipCompression tells the driver to send payloads after + // compressing them with gzip. + GzipCompression +) + +const ( + // DefaultMaxAttempts describes how many times the driver + // should retry the sending of the payload in case of a + // retryable error. + DefaultMaxAttempts int = 5 + // DefaultTracesPath is a default URL path for endpoint that + // receives spans. + DefaultTracesPath string = "/v1/traces" + // DefaultMetricsPath is a default URL path for endpoint that + // receives metrics. + DefaultMetricsPath string = "/v1/metrics" + // DefaultBackoff is a default base backoff time used in the + // exponential backoff strategy. + DefaultBackoff time.Duration = 300 * time.Millisecond +) + +type config struct { + endpoint string + compression Compression + tracesURLPath string + metricsURLPath string + maxAttempts int + backoff time.Duration + tlsCfg *tls.Config + insecure bool + headers map[string]string +} + +// Option applies an option to the HTTP driver. +type Option interface { + Apply(*config) +} + +type endpointOption string + +func (o endpointOption) Apply(cfg *config) { + cfg.endpoint = (string)(o) +} + +// WithEndpoint allows one to set the address of the collector +// endpoint that the driver will use to send metrics and spans. If +// unset, it will instead try to use +// DefaultCollectorHost:DefaultCollectorPort. Note that the endpoint +// must not contain any URL path. +func WithEndpoint(endpoint string) Option { + return (endpointOption)(endpoint) +} + +type compressionOption Compression + +func (o compressionOption) Apply(cfg *config) { + cfg.compression = (Compression)(o) +} + +// WithCompression tells the driver to compress the sent data. +func WithCompression(compression Compression) Option { + return (compressionOption)(compression) +} + +type tracesURLPathOption string + +func (o tracesURLPathOption) Apply(cfg *config) { + cfg.tracesURLPath = (string)(o) +} + +// WithTracesURLPath allows one to override the default URL path used +// for sending traces. If unset, DefaultTracesPath will be used. +func WithTracesURLPath(urlPath string) Option { + return (tracesURLPathOption)(urlPath) +} + +type metricsURLPathOption string + +func (o metricsURLPathOption) Apply(cfg *config) { + cfg.metricsURLPath = (string)(o) +} + +// WithMetricsURLPath allows one to override the default URL path used +// for sending metrics. If unset, DefaultMetricsPath will be used. +func WithMetricsURLPath(urlPath string) Option { + return (metricsURLPathOption)(urlPath) +} + +type maxAttemptsOption int + +func (o maxAttemptsOption) Apply(cfg *config) { + cfg.maxAttempts = (int)(o) +} + +// WithMaxAttempts allows one to override how many times the driver +// will try to send the payload in case of retryable errors. If unset, +// DefaultMaxAttempts will be used. +func WithMaxAttempts(maxAttempts int) Option { + return maxAttemptsOption(maxAttempts) +} + +type backoffOption time.Duration + +func (o backoffOption) Apply(cfg *config) { + cfg.backoff = (time.Duration)(o) +} + +// WithBackoff tells the driver to use the duration as a base of the +// exponential backoff strategy. If unset, DefaultBackoff will be +// used. +func WithBackoff(duration time.Duration) Option { + return (backoffOption)(duration) +} + +type tlsClientConfigOption tls.Config + +func (o *tlsClientConfigOption) Apply(cfg *config) { + cfg.tlsCfg = (*tls.Config)(o) +} + +// WithTLSClientConfig can be used to set up a custom TLS +// configuration for the client used to send payloads to the +// collector. Use it if you want to use a custom certificate. +func WithTLSClientConfig(tlsCfg *tls.Config) Option { + return (*tlsClientConfigOption)(tlsCfg) +} + +type insecureOption struct{} + +func (insecureOption) Apply(cfg *config) { + cfg.insecure = true +} + +// WithInsecure tells the driver to connect to the collector using the +// HTTP scheme, instead of HTTPS. +func WithInsecure() Option { + return insecureOption{} +} + +type headersOption map[string]string + +func (o headersOption) Apply(cfg *config) { + cfg.headers = (map[string]string)(o) +} + +// WithHeaders allows one to tell the driver to send additional HTTP +// headers with the payloads. Specifying headers like Content-Length, +// Content-Encoding and Content-Type may result in a broken driver. +func WithHeaders(headers map[string]string) Option { + return (headersOption)(headers) +}