diff --git a/.chloggen/otelarrow-exporttimeout.yaml b/.chloggen/otelarrow-exporttimeout.yaml new file mode 100644 index 000000000000..ea97db44729e --- /dev/null +++ b/.chloggen/otelarrow-exporttimeout.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: otelarrowexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add gRPC timeout propagation. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34733] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/cmd/otelcontribcol/builder-config.yaml b/cmd/otelcontribcol/builder-config.yaml index cdabdd632afe..c97896352518 100644 --- a/cmd/otelcontribcol/builder-config.yaml +++ b/cmd/otelcontribcol/builder-config.yaml @@ -499,3 +499,5 @@ replaces: - github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/cfgardenobserver => ../../extension/observer/cfgardenobserver - github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter => ../../exporter/rabbitmqexporter - github.com/open-telemetry/opentelemetry-collector-contrib/receiver/githubreceiver => ../../receiver/githubreceiver + - github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil => ../../internal/grpcutil + diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index f552de113f09..0ef5d911e0ad 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -641,6 +641,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker v0.108.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.108.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.108.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil v0.108.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.108.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.108.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/kubelet v0.108.0 // indirect @@ -671,7 +672,7 @@ require ( github.com/open-telemetry/otel-arrow v0.25.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect - github.com/opencontainers/runc v1.1.14 // indirect + github.com/opencontainers/runc v1.1.13 // indirect github.com/opencontainers/runtime-spec v1.2.0 // indirect github.com/opencontainers/selinux v1.10.0 // indirect github.com/opensearch-project/opensearch-go/v2 v2.3.0 // indirect @@ -1371,3 +1372,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/obse replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/rabbitmqexporter => ../../exporter/rabbitmqexporter replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/githubreceiver => ../../receiver/githubreceiver + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil => ../../internal/grpcutil diff --git a/cmd/otelcontribcol/go.sum b/cmd/otelcontribcol/go.sum index fc2a006cc9af..191722910524 100644 --- a/cmd/otelcontribcol/go.sum +++ b/cmd/otelcontribcol/go.sum @@ -1961,8 +1961,8 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= -github.com/opencontainers/runc v1.1.14 h1:rgSuzbmgz5DUJjeSnw337TxDbRuqjs6iqQck/2weR6w= -github.com/opencontainers/runc v1.1.14/go.mod h1:E4C2z+7BxR7GHXp0hAY53mek+x49X1LjPNeMTfRGvOA= +github.com/opencontainers/runc v1.1.13 h1:98S2srgG9vw0zWcDpFMn5TRrh8kLxa/5OFUstuUhmRs= +github.com/opencontainers/runc v1.1.13/go.mod h1:R016aXacfp/gwQBYw2FDGa9m+n6atbLWrYY8hNMT/sA= github.com/opencontainers/runtime-spec v1.2.0 h1:z97+pHb3uELt/yiAWD691HNHQIF07bE7dzrbT927iTk= github.com/opencontainers/runtime-spec v1.2.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opencontainers/selinux v1.10.0 h1:rAiKF8hTcgLI3w0DHm6i0ylVVcOrlgR1kK99DRLDhyU= diff --git a/exporter/otelarrowexporter/go.mod b/exporter/otelarrowexporter/go.mod index b342b39f77af..3fbcf9990075 100644 --- a/exporter/otelarrowexporter/go.mod +++ b/exporter/otelarrowexporter/go.mod @@ -4,6 +4,7 @@ go 1.22.0 require ( github.com/apache/arrow/go/v16 v16.1.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil v0.108.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow v0.108.0 github.com/open-telemetry/otel-arrow v0.25.0 github.com/stretchr/testify v1.9.0 @@ -105,3 +106,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/otela replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver => ../../receiver/otelarrowreceiver replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil => ../../internal/grpcutil diff --git a/exporter/otelarrowexporter/internal/arrow/exporter.go b/exporter/otelarrowexporter/internal/arrow/exporter.go index 8903e707a549..e42205af197a 100644 --- a/exporter/otelarrowexporter/internal/arrow/exporter.go +++ b/exporter/otelarrowexporter/internal/arrow/exporter.go @@ -23,6 +23,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/status" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats" ) @@ -310,6 +311,10 @@ func (e *Exporter) SendAndWait(ctx context.Context, data any) (bool, error) { } md["otlp-pdata-size"] = strconv.Itoa(uncompSize) + if dead, ok := ctx.Deadline(); ok { + md["grpc-timeout"] = grpcutil.EncodeTimeout(time.Until(dead)) + } + wri := writeItem{ records: data, md: md, diff --git a/exporter/otelarrowexporter/internal/arrow/exporter_test.go b/exporter/otelarrowexporter/internal/arrow/exporter_test.go index 4f488af53e87..659c5c92d926 100644 --- a/exporter/otelarrowexporter/internal/arrow/exporter_test.go +++ b/exporter/otelarrowexporter/internal/arrow/exporter_test.go @@ -32,6 +32,7 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" ) @@ -576,65 +577,94 @@ func TestArrowExporterStreaming(t *testing.T) { // TestArrowExporterHeaders tests a mix of outgoing context headers. func TestArrowExporterHeaders(t *testing.T) { - tc := newSingleStreamMetadataTestCase(t) - channel := newHealthyTestChannel() + for _, withDeadline := range []bool{true, false} { + t.Run(fmt.Sprint("with_deadline=", withDeadline), func(t *testing.T) { - tc.traceCall.AnyTimes().DoAndReturn(tc.returnNewStream(channel)) + tc := newSingleStreamMetadataTestCase(t) + channel := newHealthyTestChannel() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - require.NoError(t, tc.exporter.Start(ctx)) + tc.traceCall.AnyTimes().DoAndReturn(tc.returnNewStream(channel)) - var expectOutput []metadata.MD - var actualOutput []metadata.MD + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - md := metadata.MD{} - hpd := hpack.NewDecoder(4096, func(f hpack.HeaderField) { - md[f.Name] = append(md[f.Name], f.Value) - }) - for data := range channel.sendChannel() { - if len(data.Headers) == 0 { - actualOutput = append(actualOutput, nil) - } else { - _, err := hpd.Write(data.Headers) + require.NoError(t, tc.exporter.Start(ctx)) + + var expectOutput []metadata.MD + var actualOutput []metadata.MD + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + md := metadata.MD{} + hpd := hpack.NewDecoder(4096, func(f hpack.HeaderField) { + md[f.Name] = append(md[f.Name], f.Value) + }) + for data := range channel.sendChannel() { + if len(data.Headers) == 0 { + actualOutput = append(actualOutput, nil) + } else { + _, err := hpd.Write(data.Headers) + require.NoError(t, err) + actualOutput = append(actualOutput, md) + md = metadata.MD{} + } + channel.recv <- statusOKFor(data.BatchId) + } + }() + + for times := 0; times < 10; times++ { + input := testdata.GenerateTraces(2) + + if times%2 == 1 { + md := metadata.MD{ + "expected1": []string{"metadata1"}, + "expected2": []string{fmt.Sprint(times)}, + "otlp-pdata-size": []string{"329"}, + } + expectOutput = append(expectOutput, md) + } else { + expectOutput = append(expectOutput, metadata.MD{ + "otlp-pdata-size": []string{"329"}, + }) + } + + sendCtx := ctx + if withDeadline { + var sendCancel context.CancelFunc + sendCtx, sendCancel = context.WithTimeout(sendCtx, time.Second) + defer sendCancel() + } + + sent, err := tc.exporter.SendAndWait(sendCtx, input) require.NoError(t, err) - actualOutput = append(actualOutput, md) - md = metadata.MD{} + require.True(t, sent) } - channel.recv <- statusOKFor(data.BatchId) - } - }() - - for times := 0; times < 10; times++ { - input := testdata.GenerateTraces(2) + // Stop the test conduit started above. + cancel() + wg.Wait() - if times%2 == 1 { - md := metadata.MD{ - "expected1": []string{"metadata1"}, - "expected2": []string{fmt.Sprint(times)}, - "otlp-pdata-size": []string{"329"}, + // Manual check for proper deadline propagation. Since the test + // is timed we don't expect an exact match. + if withDeadline { + for _, out := range actualOutput { + dead := out.Get("grpc-timeout") + require.Len(t, dead, 1) + require.NotEmpty(t, dead[0]) + to, err := grpcutil.DecodeTimeout(dead[0]) + require.NoError(t, err) + // Allow the test to lapse for 0.5s. + require.Less(t, time.Second/2, to) + require.GreaterOrEqual(t, time.Second, to) + out.Delete("grpc-timeout") + } } - expectOutput = append(expectOutput, md) - } else { - expectOutput = append(expectOutput, metadata.MD{ - "otlp-pdata-size": []string{"329"}, - }) - } - sent, err := tc.exporter.SendAndWait(context.Background(), input) - require.NoError(t, err) - require.True(t, sent) + require.Equal(t, expectOutput, actualOutput) + require.NoError(t, tc.exporter.Shutdown(ctx)) + }) } - // Stop the test conduit started above. - cancel() - wg.Wait() - - require.Equal(t, expectOutput, actualOutput) - require.NoError(t, tc.exporter.Shutdown(ctx)) } // TestArrowExporterIsTraced tests whether trace and span ID are diff --git a/internal/otelarrow/go.mod b/internal/otelarrow/go.mod index d449b9df33d5..8c6792439ffc 100644 --- a/internal/otelarrow/go.mod +++ b/internal/otelarrow/go.mod @@ -64,6 +64,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mostynb/go-grpc-compression v1.2.3 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil v0.108.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.108.0 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -110,3 +111,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otela replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter => ../../exporter/otelarrowexporter replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../sharedcomponent + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil => ../grpcutil diff --git a/receiver/otelarrowreceiver/go.mod b/receiver/otelarrowreceiver/go.mod index ed374f7926ac..748d9bdb7ba0 100644 --- a/receiver/otelarrowreceiver/go.mod +++ b/receiver/otelarrowreceiver/go.mod @@ -106,3 +106,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/otela replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter => ../../exporter/otelarrowexporter + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil => ../../internal/grpcutil