diff --git a/cmd/collector/app/flags/flags.go b/cmd/collector/app/flags/flags.go index 8b3203b853c..c77814267d7 100644 --- a/cmd/collector/app/flags/flags.go +++ b/cmd/collector/app/flags/flags.go @@ -194,9 +194,9 @@ func AddFlags(flags *flag.FlagSet) { addGRPCFlags(flags, grpcServerFlagsCfg, ports.PortToHostPort(ports.CollectorGRPC)) flags.Bool(flagCollectorOTLPEnabled, true, "Enables OpenTelemetry OTLP receiver on dedicated HTTP and gRPC ports") - addHTTPFlags(flags, otlpServerFlagsCfg.HTTP, "") + addHTTPFlags(flags, otlpServerFlagsCfg.HTTP, ":4318") corsOTLPFlags.AddFlags(flags) - addGRPCFlags(flags, otlpServerFlagsCfg.GRPC, "") + addGRPCFlags(flags, otlpServerFlagsCfg.GRPC, ":4317") flags.String(flagZipkinHTTPHostPort, "", "The host:port (e.g. 127.0.0.1:9411 or :9411) of the collector's Zipkin server (disabled by default)") flags.Bool(flagZipkinKeepAliveEnabled, true, "KeepAlive configures allow Keep-Alive for Zipkin HTTP server (enabled by default)") diff --git a/cmd/collector/app/server/grpc_test.go b/cmd/collector/app/server/grpc_test.go index 487c6bc0216..27855c3ee3e 100644 --- a/cmd/collector/app/server/grpc_test.go +++ b/cmd/collector/app/server/grpc_test.go @@ -129,7 +129,6 @@ func TestCollectorReflection(t *testing.T) { grpctest.ReflectionServiceValidator{ HostPort: params.HostPortActual, - Server: server, ExpectedServices: []string{ "jaeger.api_v2.CollectorService", "jaeger.api_v2.SamplingManager", diff --git a/cmd/jaeger/internal/extension/jaegerquery/config.go b/cmd/jaeger/internal/extension/jaegerquery/config.go index e5640c2710b..ec5de1bb69c 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/config.go +++ b/cmd/jaeger/internal/extension/jaegerquery/config.go @@ -6,6 +6,7 @@ package jaegerquery import ( "github.com/asaskevich/govalidator" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" queryApp "github.com/jaegertracing/jaeger/cmd/query/app" @@ -18,10 +19,13 @@ var _ component.ConfigValidator = (*Config)(nil) type Config struct { queryApp.QueryOptionsBase `mapstructure:",squash"` - TraceStoragePrimary string `valid:"required" mapstructure:"trace_storage"` - TraceStorageArchive string `valid:"optional" mapstructure:"trace_storage_archive"` - confighttp.ServerConfig `mapstructure:",squash"` - Tenancy tenancy.Options `mapstructure:"multi_tenancy"` + TraceStoragePrimary string `valid:"required" mapstructure:"trace_storage"` + TraceStorageArchive string `valid:"optional" mapstructure:"trace_storage_archive"` + + HTTP confighttp.ServerConfig `mapstructure:",squash"` + GRPC configgrpc.ServerConfig `mapstructure:",squash"` + + Tenancy tenancy.Options `mapstructure:"multi_tenancy"` } func (cfg *Config) Validate() error { diff --git a/cmd/jaeger/internal/extension/jaegerquery/factory.go b/cmd/jaeger/internal/extension/jaegerquery/factory.go index 93d35781e10..9d50bc55ea5 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/factory.go +++ b/cmd/jaeger/internal/extension/jaegerquery/factory.go @@ -7,7 +7,9 @@ import ( "context" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/extension" "github.com/jaegertracing/jaeger/ports" @@ -25,9 +27,15 @@ func NewFactory() extension.Factory { func createDefaultConfig() component.Config { return &Config{ - ServerConfig: confighttp.ServerConfig{ + HTTP: confighttp.ServerConfig{ Endpoint: ports.PortToHostPort(ports.QueryHTTP), }, + GRPC: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: ports.PortToHostPort(ports.QueryGRPC), + Transport: confignet.TransportTypeTCP, + }, + }, } } diff --git a/cmd/jaeger/internal/extension/jaegerquery/server.go b/cmd/jaeger/internal/extension/jaegerquery/server.go index a512c8ed07f..d4dfa53715d 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server.go @@ -18,7 +18,6 @@ import ( "github.com/jaegertracing/jaeger/pkg/telemetery" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/metrics/disabled" - "github.com/jaegertracing/jaeger/ports" ) var ( @@ -127,9 +126,10 @@ func (s *server) makeQueryOptions() *queryApp.QueryOptions { return &queryApp.QueryOptions{ QueryOptionsBase: s.config.QueryOptionsBase, - // TODO expose via config - HTTPHostPort: ports.PortToHostPort(ports.QueryHTTP), - GRPCHostPort: ports.PortToHostPort(ports.QueryGRPC), + // TODO utilize OTEL helpers for creating HTTP/GRPC servers + HTTPHostPort: s.config.HTTP.Endpoint, + GRPCHostPort: s.config.GRPC.NetAddr.Endpoint, + // TODO handle TLS } } diff --git a/cmd/jaeger/internal/extension/jaegerquery/server_test.go b/cmd/jaeger/internal/extension/jaegerquery/server_test.go index 36cc4d82c4c..73d165ed491 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server_test.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server_test.go @@ -6,7 +6,9 @@ package jaegerquery import ( "context" "fmt" + "net/http" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -17,6 +19,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" + "github.com/jaegertracing/jaeger/internal/grpctest" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/storage" @@ -160,14 +163,40 @@ func TestServerStart(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { telemetrySettings := component.TelemetrySettings{ - Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())), + Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())), + ReportStatus: func(*component.StatusEvent) {}, } + tt.config.HTTP.Endpoint = ":0" + tt.config.GRPC.NetAddr.Endpoint = ":0" server := newServer(tt.config, telemetrySettings) err := server.Start(context.Background(), host) - if tt.expectedErr == "" { require.NoError(t, err) defer server.Shutdown(context.Background()) + // We need to wait for servers to become available. + // Otherwise, we could call shutdown before the servers are even started, + // which could cause flaky code coverage by going through error cases. + require.Eventually(t, + func() bool { + resp, err := http.Get(fmt.Sprintf("http://%s/", server.server.HTTPAddr())) + if err != nil { + return false + } + defer resp.Body.Close() + return resp.StatusCode == http.StatusOK + }, + 10*time.Second, + 100*time.Millisecond, + "server not started") + grpctest.ReflectionServiceValidator{ + HostPort: server.server.GRPCAddr(), + ExpectedServices: []string{ + "jaeger.api_v2.QueryService", + "jaeger.api_v3.QueryService", + "jaeger.api_v2.metrics.MetricsQueryService", + "grpc.health.v1.Health", + }, + }.Execute(t) } else { require.ErrorContains(t, err, tt.expectedErr) } diff --git a/cmd/query/app/handler_archive_test.go b/cmd/query/app/handler_archive_test.go index 9b48941d0cb..82f0daa721c 100644 --- a/cmd/query/app/handler_archive_test.go +++ b/cmd/query/app/handler_archive_test.go @@ -48,7 +48,7 @@ func TestGetArchivedTrace_NotFound(t *testing.T) { } { tc := tc // capture loop var t.Run(tc.name, func(t *testing.T) { - withTestServer(func(ts *testServer) { + withTestServer(t, func(ts *testServer) { ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(nil, spanstore.ErrTraceNotFound).Once() var response structuredResponse @@ -66,7 +66,7 @@ func TestGetArchivedTraceSuccess(t *testing.T) { mockReader := &spanstoremocks.Reader{} mockReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(mockTrace, nil).Once() - withTestServer(func(ts *testServer) { + withTestServer(t, func(ts *testServer) { // make main reader return NotFound ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(nil, spanstore.ErrTraceNotFound).Once() @@ -81,7 +81,7 @@ func TestGetArchivedTraceSuccess(t *testing.T) { // Test failure in parsing trace ID. func TestArchiveTrace_BadTraceID(t *testing.T) { - withTestServer(func(ts *testServer) { + withTestServer(t, func(ts *testServer) { var response structuredResponse err := postJSON(ts.server.URL+"/api/archive/badtraceid", []string{}, &response) require.Error(t, err) @@ -95,7 +95,7 @@ func TestArchiveTrace_TraceNotFound(t *testing.T) { Return(nil, spanstore.ErrTraceNotFound).Once() mockWriter := &spanstoremocks.Writer{} // Not actually going to write the trace, so no need to define mockWriter action - withTestServer(func(ts *testServer) { + withTestServer(t, func(ts *testServer) { // make main reader return NotFound ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(nil, spanstore.ErrTraceNotFound).Once() @@ -106,7 +106,7 @@ func TestArchiveTrace_TraceNotFound(t *testing.T) { } func TestArchiveTrace_NoStorage(t *testing.T) { - withTestServer(func(ts *testServer) { + withTestServer(t, func(ts *testServer) { var response structuredResponse err := postJSON(ts.server.URL+"/api/archive/"+mockTraceID.String(), []string{}, &response) require.EqualError(t, err, `500 error from server: {"data":null,"total":0,"limit":0,"offset":0,"errors":[{"code":500,"msg":"archive span storage was not configured"}]}`+"\n") @@ -117,7 +117,7 @@ func TestArchiveTrace_Success(t *testing.T) { mockWriter := &spanstoremocks.Writer{} mockWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")). Return(nil).Times(2) - withTestServer(func(ts *testServer) { + withTestServer(t, func(ts *testServer) { ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(mockTrace, nil).Once() var response structuredResponse @@ -130,7 +130,7 @@ func TestArchiveTrace_WriteErrors(t *testing.T) { mockWriter := &spanstoremocks.Writer{} mockWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")). Return(errors.New("cannot save")).Times(2) - withTestServer(func(ts *testServer) { + withTestServer(t, func(ts *testServer) { ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(mockTrace, nil).Once() var response structuredResponse diff --git a/cmd/query/app/handler_deps_test.go b/cmd/query/app/handler_deps_test.go index 182e149f94d..9d6e064855d 100644 --- a/cmd/query/app/handler_deps_test.go +++ b/cmd/query/app/handler_deps_test.go @@ -310,8 +310,7 @@ func TestFilterDependencies(t *testing.T) { } func TestGetDependenciesSuccess(t *testing.T) { - ts := initializeTestServer() - defer ts.server.Close() + ts := initializeTestServer(t) expectedDependencies := []model.DependencyLink{{Parent: "killer", Child: "queen", CallCount: 12}} endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier) ts.dependencyReader.On("GetDependencies", @@ -332,10 +331,12 @@ func TestGetDependenciesSuccess(t *testing.T) { } func TestGetDependenciesCassandraFailure(t *testing.T) { - ts := initializeTestServer() - defer ts.server.Close() + ts := initializeTestServer(t) endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier) - ts.dependencyReader.On("GetDependencies", endTs, defaultDependencyLookbackDuration).Return(nil, errStorage).Times(1) + ts.dependencyReader.On("GetDependencies", + mock.Anything, // context + endTs, + defaultDependencyLookbackDuration).Return(nil, errStorage).Times(1) var response structuredResponse err := getJSON(ts.server.URL+"/api/dependencies?endTs=1476374248550&service=testing", &response) @@ -343,18 +344,14 @@ func TestGetDependenciesCassandraFailure(t *testing.T) { } func TestGetDependenciesEndTimeParsingFailure(t *testing.T) { - ts := initializeTestServer() - defer ts.server.Close() - + ts := initializeTestServer(t) var response structuredResponse err := getJSON(ts.server.URL+"/api/dependencies?endTs=shazbot&service=testing", &response) require.Error(t, err) } func TestGetDependenciesLookbackParsingFailure(t *testing.T) { - ts := initializeTestServer() - defer ts.server.Close() - + ts := initializeTestServer(t) var response structuredResponse err := getJSON(ts.server.URL+"/api/dependencies?endTs=1476374248550&service=testing&lookback=shazbot", &response) require.Error(t, err) diff --git a/cmd/query/app/http_handler_test.go b/cmd/query/app/http_handler_test.go index 3034f6595d0..680420023eb 100644 --- a/cmd/query/app/http_handler_test.go +++ b/cmd/query/app/http_handler_test.go @@ -38,7 +38,8 @@ import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.uber.org/zap" - "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest" + "go.uber.org/zap/zaptest/observer" "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" "github.com/jaegertracing/jaeger/model" @@ -102,8 +103,9 @@ type structuredTraceResponse struct { Errors []structuredError `json:"errors"` } -func initializeTestServerWithHandler(queryOptions querysvc.QueryServiceOptions, options ...HandlerOption) *testServer { +func initializeTestServerWithHandler(t *testing.T, queryOptions querysvc.QueryServiceOptions, options ...HandlerOption) *testServer { return initializeTestServerWithOptions( + t, &tenancy.Manager{}, queryOptions, append( @@ -119,23 +121,33 @@ func initializeTestServerWithHandler(queryOptions querysvc.QueryServiceOptions, ) } -func initializeTestServerWithOptions(tenancyMgr *tenancy.Manager, queryOptions querysvc.QueryServiceOptions, options ...HandlerOption) *testServer { +func initializeTestServerWithOptions( + t *testing.T, + tenancyMgr *tenancy.Manager, + queryOptions querysvc.QueryServiceOptions, + options ...HandlerOption, +) *testServer { + options = append(options, HandlerOptions.Logger(zaptest.NewLogger(t))) readStorage := &spanstoremocks.Reader{} dependencyStorage := &depsmocks.Reader{} qs := querysvc.NewQueryService(readStorage, dependencyStorage, queryOptions) r := NewRouter() handler := NewAPIHandler(qs, tenancyMgr, options...) handler.RegisterRoutes(r) - return &testServer{ + ts := &testServer{ server: httptest.NewServer(tenancy.ExtractTenantHTTPHandler(tenancyMgr, r)), spanReader: readStorage, dependencyReader: dependencyStorage, handler: handler, } + t.Cleanup(func() { + ts.server.Close() + }) + return ts } -func initializeTestServer(options ...HandlerOption) *testServer { - return initializeTestServerWithHandler(querysvc.QueryServiceOptions{}, options...) +func initializeTestServer(t *testing.T, options ...HandlerOption) *testServer { + return initializeTestServerWithHandler(t, querysvc.QueryServiceOptions{}, options...) } type testServer struct { @@ -145,15 +157,13 @@ type testServer struct { server *httptest.Server } -func withTestServer(doTest func(s *testServer), queryOptions querysvc.QueryServiceOptions, options ...HandlerOption) { - ts := initializeTestServerWithOptions(&tenancy.Manager{}, queryOptions, options...) - defer ts.server.Close() +func withTestServer(t *testing.T, doTest func(s *testServer), queryOptions querysvc.QueryServiceOptions, options ...HandlerOption) { + ts := initializeTestServerWithOptions(t, &tenancy.Manager{}, queryOptions, options...) doTest(ts) } func TestGetTraceSuccess(t *testing.T) { - ts := initializeTestServer() - defer ts.server.Close() + ts := initializeTestServer(t) ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(mockTrace, nil).Once() @@ -163,44 +173,20 @@ func TestGetTraceSuccess(t *testing.T) { assert.Empty(t, response.Errors) } -type logData struct { - e zapcore.Entry - f []zapcore.Field -} - -type testLogger struct { - logs *[]logData -} - -func (testLogger) Enabled(zapcore.Level) bool { return true } -func (l testLogger) With([]zapcore.Field) zapcore.Core { return l } -func (testLogger) Sync() error { return nil } -func (l testLogger) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { - return ce.AddCore(e, l) -} - -func (l testLogger) Write(e zapcore.Entry, f []zapcore.Field) error { - *l.logs = append(*l.logs, logData{e: e, f: f}) - return nil -} - func TestLogOnServerError(t *testing.T) { - l := &testLogger{ - logs: &[]logData{}, - } + zapCore, logs := observer.New(zap.InfoLevel) + logger := zap.New(zapCore) readStorage := &spanstoremocks.Reader{} dependencyStorage := &depsmocks.Reader{} qs := querysvc.NewQueryService(readStorage, dependencyStorage, querysvc.QueryServiceOptions{}) - apiHandlerOptions := []HandlerOption{ - HandlerOptions.Logger(zap.New(l)), - } - h := NewAPIHandler(qs, &tenancy.Manager{}, apiHandlerOptions...) + h := NewAPIHandler(qs, &tenancy.Manager{}, HandlerOptions.Logger(logger)) e := errors.New("test error") h.handleError(&httptest.ResponseRecorder{}, e, http.StatusInternalServerError) - require.Len(t, *l.logs, 1) - assert.Equal(t, "HTTP handler, Internal Server Error", (*l.logs)[0].e.Message) - assert.Len(t, (*l.logs)[0].f, 1) - assert.Equal(t, e, (*l.logs)[0].f[0].Interface) + require.Len(t, logs.All(), 1) + log := logs.All()[0] + assert.Equal(t, "HTTP handler, Internal Server Error", log.Message) + require.Len(t, log.Context, 1) + assert.Equal(t, e, log.Context[0].Interface) } // httpResponseErrWriter implements the http.ResponseWriter interface that returns an error on Write. @@ -321,8 +307,7 @@ func TestGetTrace(t *testing.T) { jTracer := jtracer.JTracer{OTEL: tracerProvider} defer tracerProvider.Shutdown(context.Background()) - ts := initializeTestServer(HandlerOptions.Tracer(jTracer.OTEL)) - defer ts.server.Close() + ts := initializeTestServer(t, HandlerOptions.Tracer(jTracer.OTEL)) ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), model.NewTraceID(0, 0x123456abc)). Return(makeMockTrace(t), nil).Once() @@ -343,8 +328,7 @@ func TestGetTrace(t *testing.T) { } func TestGetTraceDBFailure(t *testing.T) { - ts := initializeTestServer() - defer ts.server.Close() + ts := initializeTestServer(t) ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(nil, errStorage).Once() @@ -354,8 +338,7 @@ func TestGetTraceDBFailure(t *testing.T) { } func TestGetTraceNotFound(t *testing.T) { - ts := initializeTestServer() - defer ts.server.Close() + ts := initializeTestServer(t) ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(nil, spanstore.ErrTraceNotFound).Once() @@ -366,13 +349,13 @@ func TestGetTraceNotFound(t *testing.T) { func TestGetTraceAdjustmentFailure(t *testing.T) { ts := initializeTestServerWithHandler( + t, querysvc.QueryServiceOptions{ Adjuster: adjuster.Func(func(trace *model.Trace) (*model.Trace, error) { return trace, errAdjustment }), }, ) - defer ts.server.Close() ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(mockTrace, nil).Once() @@ -384,8 +367,7 @@ func TestGetTraceAdjustmentFailure(t *testing.T) { } func TestGetTraceBadTraceID(t *testing.T) { - ts := initializeTestServer() - defer ts.server.Close() + ts := initializeTestServer(t) var response structuredResponse err := getJSON(ts.server.URL+`/api/traces/chumbawumba`, &response) @@ -393,8 +375,7 @@ func TestGetTraceBadTraceID(t *testing.T) { } func TestSearchSuccess(t *testing.T) { - ts := initializeTestServer() - defer ts.server.Close() + ts := initializeTestServer(t) ts.spanReader.On("FindTraces", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceQueryParameters")). Return([]*model.Trace{mockTrace}, nil).Once() @@ -405,8 +386,7 @@ func TestSearchSuccess(t *testing.T) { } func TestSearchByTraceIDSuccess(t *testing.T) { - ts := initializeTestServer() - defer ts.server.Close() + ts := initializeTestServer(t) ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(mockTrace, nil).Twice() @@ -419,10 +399,9 @@ func TestSearchByTraceIDSuccess(t *testing.T) { func TestSearchByTraceIDSuccessWithArchive(t *testing.T) { archiveReadMock := &spanstoremocks.Reader{} - ts := initializeTestServerWithOptions(&tenancy.Manager{}, querysvc.QueryServiceOptions{ + ts := initializeTestServerWithOptions(t, &tenancy.Manager{}, querysvc.QueryServiceOptions{ ArchiveSpanReader: archiveReadMock, }) - defer ts.server.Close() ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(nil, spanstore.ErrTraceNotFound).Twice() archiveReadMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). @@ -436,8 +415,7 @@ func TestSearchByTraceIDSuccessWithArchive(t *testing.T) { } func TestSearchByTraceIDNotFound(t *testing.T) { - ts := initializeTestServer() - defer ts.server.Close() + ts := initializeTestServer(t) ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(nil, spanstore.ErrTraceNotFound).Once() @@ -449,8 +427,7 @@ func TestSearchByTraceIDNotFound(t *testing.T) { } func TestSearchByTraceIDFailure(t *testing.T) { - ts := initializeTestServer() - defer ts.server.Close() + ts := initializeTestServer(t) whatsamattayou := "https://youtu.be/WrKFOCg13QQ" ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(nil, fmt.Errorf(whatsamattayou)).Once() @@ -462,6 +439,7 @@ func TestSearchByTraceIDFailure(t *testing.T) { func TestSearchModelConversionFailure(t *testing.T) { ts := initializeTestServerWithOptions( + t, &tenancy.Manager{}, querysvc.QueryServiceOptions{ Adjuster: adjuster.Func(func(trace *model.Trace) (*model.Trace, error) { @@ -469,7 +447,6 @@ func TestSearchModelConversionFailure(t *testing.T) { }), }, ) - defer ts.server.Close() ts.spanReader.On("FindTraces", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceQueryParameters")). Return([]*model.Trace{mockTrace}, nil).Once() var response structuredResponse @@ -480,8 +457,7 @@ func TestSearchModelConversionFailure(t *testing.T) { } func TestSearchDBFailure(t *testing.T) { - ts := initializeTestServer() - defer ts.server.Close() + ts := initializeTestServer(t) ts.spanReader.On("FindTraces", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceQueryParameters")). Return(nil, fmt.Errorf("whatsamattayou")).Once() @@ -510,8 +486,7 @@ func TestSearchFailures(t *testing.T) { } func testIndividualSearchFailures(t *testing.T, urlStr, errMsg string) { - ts := initializeTestServer() - defer ts.server.Close() + ts := initializeTestServer(t) ts.spanReader.On("Query", mock.AnythingOfType("spanstore.TraceQueryParameters")). Return([]*model.Trace{}, nil).Once() @@ -521,8 +496,7 @@ func testIndividualSearchFailures(t *testing.T, urlStr, errMsg string) { } func TestGetServicesSuccess(t *testing.T) { - ts := initializeTestServer() - defer ts.server.Close() + ts := initializeTestServer(t) expectedServices := []string{"trifle", "bling"} ts.spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil).Once() @@ -537,8 +511,7 @@ func TestGetServicesSuccess(t *testing.T) { } func TestGetServicesStorageFailure(t *testing.T) { - ts := initializeTestServer() - defer ts.server.Close() + ts := initializeTestServer(t) ts.spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(nil, errStorage).Once() var response structuredResponse @@ -547,8 +520,7 @@ func TestGetServicesStorageFailure(t *testing.T) { } func TestGetOperationsSuccess(t *testing.T) { - ts := initializeTestServer() - defer ts.server.Close() + ts := initializeTestServer(t) expectedOperations := []spanstore.Operation{{Name: ""}, {Name: "get", SpanKind: "server"}} ts.spanReader.On( "GetOperations", @@ -577,17 +549,14 @@ func TestGetOperationsSuccess(t *testing.T) { } func TestGetOperationsNoServiceName(t *testing.T) { - ts := initializeTestServer() - defer ts.server.Close() - + ts := initializeTestServer(t) var response structuredResponse err := getJSON(ts.server.URL+"/api/operations", &response) require.Error(t, err) } func TestGetOperationsStorageFailure(t *testing.T) { - ts := initializeTestServer() - defer ts.server.Close() + ts := initializeTestServer(t) ts.spanReader.On( "GetOperations", mock.AnythingOfType("*context.valueCtx"), @@ -599,8 +568,7 @@ func TestGetOperationsStorageFailure(t *testing.T) { } func TestGetOperationsLegacySuccess(t *testing.T) { - ts := initializeTestServer() - defer ts.server.Close() + ts := initializeTestServer(t) expectedOperationNames := []string{"", "get"} expectedOperations := []spanstore.Operation{ {Name: ""}, @@ -621,8 +589,7 @@ func TestGetOperationsLegacySuccess(t *testing.T) { } func TestGetOperationsLegacyStorageFailure(t *testing.T) { - ts := initializeTestServer() - defer ts.server.Close() + ts := initializeTestServer(t) ts.spanReader.On( "GetOperations", mock.AnythingOfType("*context.valueCtx"), @@ -641,7 +608,7 @@ func TestTransformOTLPSuccess(t *testing.T) { require.NoError(t, err) return out } - withTestServer(func(ts *testServer) { + withTestServer(t, func(ts *testServer) { inFile, err := os.Open("./fixture/otlp2jaeger-in.json") require.NoError(t, err) @@ -661,7 +628,7 @@ func TestTransformOTLPSuccess(t *testing.T) { } func TestTransformOTLPReadError(t *testing.T) { - withTestServer(func(ts *testServer) { + withTestServer(t, func(ts *testServer) { bytesReader := &IoReaderMock{} bytesReader.On("Read", mock.AnythingOfType("[]uint8")).Return(0, errors.New("Mocked error")) _, err := ts.server.Client().Post(ts.server.URL+"/api/transform", "application/json", bytesReader) @@ -670,7 +637,7 @@ func TestTransformOTLPReadError(t *testing.T) { } func TestTransformOTLPBadPayload(t *testing.T) { - withTestServer(func(ts *testServer) { + withTestServer(t, func(ts *testServer) { response := new(any) request := "Bad Payload" err := postJSON(ts.server.URL+"/api/transform", request, response) @@ -683,8 +650,7 @@ func TestGetMetricsSuccess(t *testing.T) { apiHandlerOptions := []HandlerOption{ HandlerOptions.MetricsQueryService(mr), } - ts := initializeTestServer(apiHandlerOptions...) - defer ts.server.Close() + ts := initializeTestServer(t, apiHandlerOptions...) expectedLabel := &metrics.Label{ Name: "service_name", Value: "emailservice", @@ -766,11 +732,7 @@ func TestGetMetricsSuccess(t *testing.T) { func TestMetricsReaderError(t *testing.T) { metricsReader := &metricsmocks.Reader{} - apiHandlerOptions := []HandlerOption{ - HandlerOptions.MetricsQueryService(metricsReader), - } - ts := initializeTestServer(apiHandlerOptions...) - defer ts.server.Close() + ts := initializeTestServer(t, HandlerOptions.MetricsQueryService(metricsReader)) for _, tc := range []struct { name string @@ -818,10 +780,7 @@ func TestMetricsQueryDisabled(t *testing.T) { disabledReader, err := disabled.NewMetricsReader() require.NoError(t, err) - apiHandlerOptions := []HandlerOption{ - HandlerOptions.MetricsQueryService(disabledReader), - } - ts := initializeTestServer(apiHandlerOptions...) + ts := initializeTestServer(t, HandlerOptions.MetricsQueryService(disabledReader)) defer ts.server.Close() for _, tc := range []struct { @@ -854,10 +813,7 @@ func TestMetricsQueryDisabled(t *testing.T) { func TestGetMinStep(t *testing.T) { metricsReader := &metricsmocks.Reader{} - apiHandlerOptions := []HandlerOption{ - HandlerOptions.MetricsQueryService(metricsReader), - } - ts := initializeTestServer(apiHandlerOptions...) + ts := initializeTestServer(t, HandlerOptions.MetricsQueryService(metricsReader)) defer ts.server.Close() // Prepare metricsReader.On( @@ -946,10 +902,9 @@ func TestSearchTenancyHTTP(t *testing.T) { tenancyOptions := tenancy.Options{ Enabled: true, } - ts := initializeTestServerWithOptions( + ts := initializeTestServerWithOptions(t, tenancy.NewManager(&tenancyOptions), querysvc.QueryServiceOptions{}) - defer ts.server.Close() ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(mockTrace, nil).Twice() @@ -973,10 +928,7 @@ func TestSearchTenancyRejectionHTTP(t *testing.T) { tenancyOptions := tenancy.Options{ Enabled: true, } - ts := initializeTestServerWithOptions( - tenancy.NewManager(&tenancyOptions), - querysvc.QueryServiceOptions{}) - defer ts.server.Close() + ts := initializeTestServerWithOptions(t, tenancy.NewManager(&tenancyOptions), querysvc.QueryServiceOptions{}) ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(mockTrace, nil).Twice() @@ -1002,10 +954,7 @@ func TestSearchTenancyFlowTenantHTTP(t *testing.T) { tenancyOptions := tenancy.Options{ Enabled: true, } - ts := initializeTestServerWithOptions( - tenancy.NewManager(&tenancyOptions), - querysvc.QueryServiceOptions{}) - defer ts.server.Close() + ts := initializeTestServerWithOptions(t, tenancy.NewManager(&tenancyOptions), querysvc.QueryServiceOptions{}) ts.spanReader.On("GetTrace", mock.MatchedBy(func(v any) bool { ctx, ok := v.(context.Context) if !ok || tenancy.GetTenant(ctx) != "acme" { diff --git a/cmd/query/app/query_parser_test.go b/cmd/query/app/query_parser_test.go index e75d448a144..45fa6ea454c 100644 --- a/cmd/query/app/query_parser_test.go +++ b/cmd/query/app/query_parser_test.go @@ -256,8 +256,7 @@ func TestParseRepeatedSpanKinds(t *testing.T) { } func TestParameterErrors(t *testing.T) { - ts := initializeTestServer() - defer ts.server.Close() + ts := initializeTestServer(t) for _, tc := range []struct { name string diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index 6e5261beadc..f44d685c470 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -78,8 +78,9 @@ func NewServer(querySvc *querysvc.QueryService, if err != nil { return nil, fmt.Errorf("invalid gRPC server host:port: %w", err) } + separatePorts := grpcPort != httpPort || grpcPort == "0" || httpPort == "0" - if (options.TLSHTTP.Enabled || options.TLSGRPC.Enabled) && (grpcPort == httpPort) { + if (options.TLSHTTP.Enabled || options.TLSGRPC.Enabled) && !separatePorts { return nil, errors.New("server with TLS enabled can not use same host ports for gRPC and HTTP. Use dedicated HTTP and gRPC host ports instead") } @@ -98,7 +99,7 @@ func NewServer(querySvc *querysvc.QueryService, queryOptions: options, grpcServer: grpcServer, httpServer: httpServer, - separatePorts: grpcPort != httpPort, + separatePorts: separatePorts, Setting: telset, }, nil } @@ -232,8 +233,8 @@ func (s *Server) initListener() (cmux.CMux, error) { } s.Logger.Info( "Query server started", - zap.String("http_addr", s.httpConn.Addr().String()), - zap.String("grpc_addr", s.grpcConn.Addr().String()), + zap.String("http_addr", s.HTTPAddr()), + zap.String("grpc_addr", s.GRPCAddr()), ) return nil, nil } @@ -346,6 +347,14 @@ func (s *Server) Start() error { return nil } +func (s *Server) HTTPAddr() string { + return s.httpConn.Addr().String() +} + +func (s *Server) GRPCAddr() string { + return s.grpcConn.Addr().String() +} + // Close stops HTTP, GRPC servers and closes the port listener. func (s *Server) Close() error { errs := []error{ diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index e2d5c839365..cbfa7f3c4df 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -353,8 +353,8 @@ func TestServerHTTPTLS(t *testing.T) { } serverOptions := &QueryOptions{ - GRPCHostPort: ports.GetAddressFromCLIOptions(ports.QueryGRPC, ""), - HTTPHostPort: ports.GetAddressFromCLIOptions(ports.QueryHTTP, ""), + GRPCHostPort: ":0", + HTTPHostPort: ":0", TLSHTTP: test.TLS, TLSGRPC: tlsGrpc, QueryOptionsBase: QueryOptionsBase{ @@ -385,14 +385,14 @@ func TestServerHTTPTLS(t *testing.T) { require.NoError(t, err0) dialer := &net.Dialer{Timeout: 2 * time.Second} - conn, err1 := tls.DialWithDialer(dialer, "tcp", "localhost:"+fmt.Sprintf("%d", ports.QueryHTTP), clientTLSCfg) + conn, err1 := tls.DialWithDialer(dialer, "tcp", server.HTTPAddr(), clientTLSCfg) clientError = err1 clientClose = nil if conn != nil { clientClose = conn.Close } } else { - conn, err1 := net.DialTimeout("tcp", "localhost:"+fmt.Sprintf("%d", ports.QueryHTTP), 2*time.Second) + conn, err1 := net.DialTimeout("tcp", server.HTTPAddr(), 2*time.Second) clientError = err1 clientClose = nil if conn != nil { @@ -417,7 +417,7 @@ func TestServerHTTPTLS(t *testing.T) { } querySvc.spanReader.On("FindTraces", mock.Anything, mock.Anything).Return([]*model.Trace{mockTrace}, nil).Once() queryString := "/api/traces?service=service&start=0&end=0&operation=operation&limit=200&minDuration=20ms" - req, err := http.NewRequest(http.MethodGet, "https://localhost:"+fmt.Sprintf("%d", ports.QueryHTTP)+queryString, nil) + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("https://%s/%s", server.HTTPAddr(), queryString), nil) require.NoError(t, err) req.Header.Add("Accept", "application/json") @@ -490,8 +490,8 @@ func TestServerGRPCTLS(t *testing.T) { tlsHttp = enabledTLSCfg } serverOptions := &QueryOptions{ - GRPCHostPort: ports.GetAddressFromCLIOptions(ports.QueryGRPC, ""), - HTTPHostPort: ports.GetAddressFromCLIOptions(ports.QueryHTTP, ""), + GRPCHostPort: ":0", + HTTPHostPort: ":0", TLSHTTP: tlsHttp, TLSGRPC: test.TLS, QueryOptionsBase: QueryOptionsBase{ @@ -518,9 +518,9 @@ func TestServerGRPCTLS(t *testing.T) { require.NoError(t, err0) defer test.clientTLS.Close() creds := credentials.NewTLS(clientTLSCfg) - client = newGRPCClientWithTLS(t, ports.PortToHostPort(ports.QueryGRPC), creds) + client = newGRPCClientWithTLS(t, server.GRPCAddr(), creds) } else { - client = newGRPCClientWithTLS(t, ports.PortToHostPort(ports.QueryGRPC), nil) + client = newGRPCClientWithTLS(t, server.GRPCAddr(), nil) } t.Cleanup(func() { require.NoError(t, client.conn.Close()) @@ -699,13 +699,8 @@ func TestServerHandlesPortZero(t *testing.T) { message := logs.FilterMessage("Query server started") assert.Equal(t, 1, message.Len(), "Expected 'Query server started' log message.") - onlyEntry := message.All()[0] - port := onlyEntry.ContextMap()["port"].(int64) - assert.Greater(t, port, int64(0)) - grpctest.ReflectionServiceValidator{ - HostPort: fmt.Sprintf(":%v", port), - Server: server.grpcServer, + HostPort: server.GRPCAddr(), ExpectedServices: []string{ "jaeger.api_v2.QueryService", "jaeger.api_v3.QueryService", diff --git a/cmd/remote-storage/app/server_test.go b/cmd/remote-storage/app/server_test.go index 29324de7d05..e6234b8facb 100644 --- a/cmd/remote-storage/app/server_test.go +++ b/cmd/remote-storage/app/server_test.go @@ -77,7 +77,7 @@ func TestNewServer_CreateStorageErrors(t *testing.T) { require.NoError(t, err) err = s.Start() require.NoError(t, err) - validateGRPCServer(t, s.grpcConn.Addr().String(), s.grpcServer) + validateGRPCServer(t, s.grpcConn.Addr().String()) s.grpcConn.Close() // causes logged error } @@ -398,17 +398,16 @@ func TestServerHandlesPortZero(t *testing.T) { onlyEntry := message.All()[0] hostPort := onlyEntry.ContextMap()["addr"].(string) - validateGRPCServer(t, hostPort, server.grpcServer) + validateGRPCServer(t, hostPort) server.Close() assert.Equal(t, healthcheck.Unavailable, flagsSvc.HC().Get()) } -func validateGRPCServer(t *testing.T, hostPort string, server *grpc.Server) { +func validateGRPCServer(t *testing.T, hostPort string) { grpctest.ReflectionServiceValidator{ HostPort: hostPort, - Server: server, ExpectedServices: []string{ "jaeger.storage.v1.SpanReaderPlugin", "jaeger.storage.v1.SpanWriterPlugin", @@ -417,7 +416,7 @@ func validateGRPCServer(t *testing.T, hostPort string, server *grpc.Server) { "jaeger.storage.v1.ArchiveSpanReaderPlugin", "jaeger.storage.v1.ArchiveSpanWriterPlugin", "jaeger.storage.v1.StreamingSpanWriterPlugin", - // "grpc.health.v1.Health", + "grpc.health.v1.Health", }, }.Execute(t) } diff --git a/examples/hotrod/docker-compose.yml b/examples/hotrod/docker-compose.yml index 8a0e2a90368..250f78f82b8 100644 --- a/examples/hotrod/docker-compose.yml +++ b/examples/hotrod/docker-compose.yml @@ -12,10 +12,6 @@ services: - "4318:4318" environment: - LOG_LEVEL=debug - # Since v0.105 the OTEL Collector components default hostname to 'localhost'. - # However, that does not work inside a Docker container, so we listen on all IPs. - - COLLECTOR_OTLP_GRPC_HOST_PORT=0.0.0.0:4317 - - COLLECTOR_OTLP_HTTP_HOST_PORT=0.0.0.0:4318 networks: - jaeger-example hotrod: diff --git a/internal/grpctest/reflection.go b/internal/grpctest/reflection.go index 8aba438531d..c1e6e6a0c47 100644 --- a/internal/grpctest/reflection.go +++ b/internal/grpctest/reflection.go @@ -27,7 +27,6 @@ import ( // ReflectionServiceValidator verifies that a gRPC service at a given address // supports reflection service. Called must invoke Execute func. type ReflectionServiceValidator struct { - Server *grpc.Server HostPort string ExpectedServices []string } diff --git a/internal/grpctest/reflection_test.go b/internal/grpctest/reflection_test.go index a814e5418e7..af6ebe7290b 100644 --- a/internal/grpctest/reflection_test.go +++ b/internal/grpctest/reflection_test.go @@ -41,7 +41,6 @@ func TestReflectionServiceValidator(t *testing.T) { ReflectionServiceValidator{ HostPort: listener.Addr().String(), - Server: server, ExpectedServices: []string{"grpc.reflection.v1alpha.ServerReflection"}, }.Execute(t) }