From 1f750ff27e751e3cf9840e01cc1bb7723fb3be68 Mon Sep 17 00:00:00 2001 From: James Ryans Date: Sun, 19 May 2024 20:45:54 +0700 Subject: [PATCH 1/4] add healthcheck to grpc server Signed-off-by: James Ryans --- cmd/remote-storage/app/server.go | 4 +++- plugin/storage/grpc/shared/grpc_handler.go | 14 +++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/cmd/remote-storage/app/server.go b/cmd/remote-storage/app/server.go index ecc29f0ff9b..a79187ca471 100644 --- a/cmd/remote-storage/app/server.go +++ b/cmd/remote-storage/app/server.go @@ -22,6 +22,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/health" "google.golang.org/grpc/reflection" "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" @@ -115,8 +116,9 @@ func createGRPCServer(opts *Options, tm *tenancy.Manager, handler *shared.GRPCHa } server := grpc.NewServer(grpcOpts...) + healthServer := health.NewServer() reflection.Register(server) - handler.Register(server) + handler.Register(server, healthServer) return server, nil } diff --git a/plugin/storage/grpc/shared/grpc_handler.go b/plugin/storage/grpc/shared/grpc_handler.go index e19484b09e4..794bbf7e6e8 100644 --- a/plugin/storage/grpc/shared/grpc_handler.go +++ b/plugin/storage/grpc/shared/grpc_handler.go @@ -22,6 +22,8 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" "github.com/jaegertracing/jaeger/model" @@ -84,7 +86,7 @@ func NewGRPCHandlerWithPlugins( } // Register registers the server as gRPC methods handler. -func (s *GRPCHandler) Register(ss *grpc.Server) error { +func (s *GRPCHandler) Register(ss *grpc.Server, hs *health.Server) error { storage_v1.RegisterSpanReaderPluginServer(ss, s) storage_v1.RegisterSpanWriterPluginServer(ss, s) storage_v1.RegisterArchiveSpanReaderPluginServer(ss, s) @@ -92,6 +94,16 @@ func (s *GRPCHandler) Register(ss *grpc.Server) error { storage_v1.RegisterPluginCapabilitiesServer(ss, s) storage_v1.RegisterDependenciesReaderPluginServer(ss, s) storage_v1.RegisterStreamingSpanWriterPluginServer(ss, s) + + hs.SetServingStatus("jaeger.storage.v1.SpanReaderPlugin", grpc_health_v1.HealthCheckResponse_SERVING) + hs.SetServingStatus("jaeger.storage.v1.SpanWriterPlugin", grpc_health_v1.HealthCheckResponse_SERVING) + hs.SetServingStatus("jaeger.storage.v1.ArchiveSpanReaderPlugin", grpc_health_v1.HealthCheckResponse_SERVING) + hs.SetServingStatus("jaeger.storage.v1.ArchiveSpanWriterPlugin", grpc_health_v1.HealthCheckResponse_SERVING) + hs.SetServingStatus("jaeger.storage.v1.PluginCapabilities", grpc_health_v1.HealthCheckResponse_SERVING) + hs.SetServingStatus("jaeger.storage.v1.DependenciesReaderPlugin", grpc_health_v1.HealthCheckResponse_SERVING) + hs.SetServingStatus("jaeger.storage.v1.StreamingSpanWriterPlugin", grpc_health_v1.HealthCheckResponse_SERVING) + grpc_health_v1.RegisterHealthServer(ss, hs) + return nil } From fb5ebb7fd4edca0c72ba0c10fa191370bba20f34 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sun, 19 May 2024 14:42:46 -0400 Subject: [PATCH 2/4] log caller Signed-off-by: Yuri Shkuro --- cmd/jaeger/internal/extension/jaegerquery/server_test.go | 4 ++-- cmd/jaeger/internal/integration/e2e_integration.go | 2 +- pkg/testutils/logger.go | 2 +- plugin/storage/integration/badgerstore_test.go | 2 +- plugin/storage/integration/cassandra_test.go | 2 +- plugin/storage/integration/elasticsearch_test.go | 2 +- plugin/storage/integration/grpc_test.go | 3 ++- plugin/storage/integration/kafka_test.go | 2 +- plugin/storage/integration/remote_memory_storage.go | 3 ++- 9 files changed, 12 insertions(+), 10 deletions(-) diff --git a/cmd/jaeger/internal/extension/jaegerquery/server_test.go b/cmd/jaeger/internal/extension/jaegerquery/server_test.go index e6800a79aa8..91a8360fd5d 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server_test.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server_test.go @@ -101,7 +101,7 @@ func (storageHost) GetExporters() map[component.DataType]map[component.ID]compon func TestServerDependencies(t *testing.T) { expectedDependencies := []component.ID{jaegerstorage.ID} telemetrySettings := component.TelemetrySettings{ - Logger: zaptest.NewLogger(t), + Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())), } server := newServer(createDefaultConfig().(*Config), telemetrySettings) @@ -160,7 +160,7 @@ func TestServerStart(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { telemetrySettings := component.TelemetrySettings{ - Logger: zaptest.NewLogger(t), + Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())), } server := newServer(tt.config, telemetrySettings) err := server.Start(context.Background(), host) diff --git a/cmd/jaeger/internal/integration/e2e_integration.go b/cmd/jaeger/internal/integration/e2e_integration.go index 07f0c6de2b2..0f50046ced8 100644 --- a/cmd/jaeger/internal/integration/e2e_integration.go +++ b/cmd/jaeger/internal/integration/e2e_integration.go @@ -45,7 +45,7 @@ type E2EStorageIntegration struct { // it also initialize the SpanWriter and SpanReader below. // This function should be called before any of the tests start. func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) { - logger := zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel)) + logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) configFile := createStorageCleanerConfig(t, s.ConfigFile, storage) t.Logf("Starting Jaeger-v2 in the background with config file %s", configFile) diff --git a/pkg/testutils/logger.go b/pkg/testutils/logger.go index 4ebef3b9dd3..da320f54f52 100644 --- a/pkg/testutils/logger.go +++ b/pkg/testutils/logger.go @@ -49,7 +49,7 @@ func newRecordingCore() (zapcore.Core, *Buffer) { // NewEchoLogger is similar to NewLogger, but the logs are also echoed to t.Log. func NewEchoLogger(t *testing.T) (*zap.Logger, *Buffer) { core, buf := newRecordingCore() - echo := zaptest.NewLogger(t).Core() + echo := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())).Core() logger := zap.New(zapcore.NewTee(core, echo)) return logger, buf } diff --git a/plugin/storage/integration/badgerstore_test.go b/plugin/storage/integration/badgerstore_test.go index 6ce061a74db..0ddb3a023c3 100644 --- a/plugin/storage/integration/badgerstore_test.go +++ b/plugin/storage/integration/badgerstore_test.go @@ -35,7 +35,7 @@ func (s *BadgerIntegrationStorage) initialize(t *testing.T) { s.factory = badger.NewFactory() s.factory.Options.Primary.Ephemeral = false - logger := zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel)) + logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) err := s.factory.Initialize(metrics.NullFactory, logger) require.NoError(t, err) t.Cleanup(func() { diff --git a/plugin/storage/integration/cassandra_test.go b/plugin/storage/integration/cassandra_test.go index 00271f114fa..1e442d8d8ba 100644 --- a/plugin/storage/integration/cassandra_test.go +++ b/plugin/storage/integration/cassandra_test.go @@ -51,7 +51,7 @@ func (s *CassandraStorageIntegration) cleanUp(t *testing.T) { } func (s *CassandraStorageIntegration) initializeCassandraFactory(t *testing.T, flags []string) *cassandra.Factory { - logger := zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel)) + logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) f := cassandra.NewFactory() v, command := config.Viperize(f.AddFlags) require.NoError(t, command.ParseFlags(flags)) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index c1d71e3f1b3..e8534fc7dee 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -108,7 +108,7 @@ func (s *ESStorageIntegration) esCleanUp(t *testing.T) { } func (s *ESStorageIntegration) initializeESFactory(t *testing.T, allTagsAsFields bool) *es.Factory { - logger := zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel)) + logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) f := es.NewFactory() v, command := config.Viperize(f.AddFlags) args := []string{ diff --git a/plugin/storage/integration/grpc_test.go b/plugin/storage/integration/grpc_test.go index 8b6c24c9f6d..d5fa17d83ab 100644 --- a/plugin/storage/integration/grpc_test.go +++ b/plugin/storage/integration/grpc_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.uber.org/zap" "go.uber.org/zap/zaptest" "github.com/jaegertracing/jaeger/pkg/config" @@ -34,7 +35,7 @@ type GRPCStorageIntegrationTestSuite struct { } func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) { - logger := zaptest.NewLogger(t) + logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) s.remoteStorage = StartNewRemoteMemoryStorage(t) f := grpc.NewFactory() diff --git a/plugin/storage/integration/kafka_test.go b/plugin/storage/integration/kafka_test.go index 90f1e9775ab..f333821f3c8 100644 --- a/plugin/storage/integration/kafka_test.go +++ b/plugin/storage/integration/kafka_test.go @@ -43,7 +43,7 @@ type KafkaIntegrationTestSuite struct { } func (s *KafkaIntegrationTestSuite) initialize(t *testing.T) { - logger := zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel)) + logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) const encoding = "json" const groupID = "kafka-integration-test" const clientID = "kafka-integration-test" diff --git a/plugin/storage/integration/remote_memory_storage.go b/plugin/storage/integration/remote_memory_storage.go index 3ce5bf3d4d8..eba282edc28 100644 --- a/plugin/storage/integration/remote_memory_storage.go +++ b/plugin/storage/integration/remote_memory_storage.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.uber.org/zap" "go.uber.org/zap/zaptest" "github.com/jaegertracing/jaeger/cmd/remote-storage/app" @@ -25,7 +26,7 @@ type RemoteMemoryStorage struct { } func StartNewRemoteMemoryStorage(t *testing.T) *RemoteMemoryStorage { - logger := zaptest.NewLogger(t) + logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) opts := &app.Options{ GRPCHostPort: ports.PortToHostPort(ports.RemoteStorageGRPC), Tenancy: tenancy.Options{ From b3a750b7523fe77c98c6f80fa2c6c9783011ec7a Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sun, 19 May 2024 15:13:58 -0400 Subject: [PATCH 3/4] ping health check Signed-off-by: Yuri Shkuro --- .../internal/integration/e2e_integration.go | 6 ++--- .../internal/integration/span_reader.go | 2 +- pkg/testutils/logger.go | 2 +- .../integration/remote_memory_storage.go | 25 +++++++++++++++++++ 4 files changed, 30 insertions(+), 5 deletions(-) diff --git a/cmd/jaeger/internal/integration/e2e_integration.go b/cmd/jaeger/internal/integration/e2e_integration.go index 0f50046ced8..be8c9a9c00a 100644 --- a/cmd/jaeger/internal/integration/e2e_integration.go +++ b/cmd/jaeger/internal/integration/e2e_integration.go @@ -98,14 +98,14 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) { // A Github Actions special annotation to create a foldable section // in the Github runner output. // https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#grouping-log-lines - fmt.Println("::group::Jaeger-v2 binary logs") + fmt.Println("::group::🚧 🚧 🚧 Jaeger-v2 binary logs") outLogs, err := os.ReadFile(outFile.Name()) require.NoError(t, err) - fmt.Printf("Jaeger-v2 output logs:\n%s", outLogs) + fmt.Printf("🚧 🚧 🚧 Jaeger-v2 output logs:\n%s", outLogs) errLogs, err := os.ReadFile(errFile.Name()) require.NoError(t, err) - fmt.Printf("Jaeger-v2 error logs:\n%s", errLogs) + fmt.Printf("🚧 🚧 🚧 Jaeger-v2 error logs:\n%s", errLogs) // End of Github Actions foldable section annotation. fmt.Println("::endgroup::") } diff --git a/cmd/jaeger/internal/integration/span_reader.go b/cmd/jaeger/internal/integration/span_reader.go index 3c4d1fd1717..eb878836133 100644 --- a/cmd/jaeger/internal/integration/span_reader.go +++ b/cmd/jaeger/internal/integration/span_reader.go @@ -82,7 +82,7 @@ func (r *spanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*mode for i := range received.Spans { spans = append(spans, &received.Spans[i]) } - r.logger.Info(fmt.Sprintf("GetTrace received %d spans (total %d)", len(received.Spans), len(spans))) + // r.logger.Info(fmt.Sprintf("GetTrace received %d spans (total %d)", len(received.Spans), len(spans))) } r.logger.Info(fmt.Sprintf("GetTraces received a total of %d spans", len(spans))) diff --git a/pkg/testutils/logger.go b/pkg/testutils/logger.go index da320f54f52..4ebef3b9dd3 100644 --- a/pkg/testutils/logger.go +++ b/pkg/testutils/logger.go @@ -49,7 +49,7 @@ func newRecordingCore() (zapcore.Core, *Buffer) { // NewEchoLogger is similar to NewLogger, but the logs are also echoed to t.Log. func NewEchoLogger(t *testing.T) (*zap.Logger, *Buffer) { core, buf := newRecordingCore() - echo := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())).Core() + echo := zaptest.NewLogger(t).Core() logger := zap.New(zapcore.NewTee(core, echo)) return logger, buf } diff --git a/plugin/storage/integration/remote_memory_storage.go b/plugin/storage/integration/remote_memory_storage.go index eba282edc28..1bd2dd7b3e5 100644 --- a/plugin/storage/integration/remote_memory_storage.go +++ b/plugin/storage/integration/remote_memory_storage.go @@ -4,12 +4,17 @@ package integration import ( + "context" "os" "testing" + "time" "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zaptest" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/health/grpc_health_v1" "github.com/jaegertracing/jaeger/cmd/remote-storage/app" "github.com/jaegertracing/jaeger/pkg/config" @@ -46,6 +51,26 @@ func StartNewRemoteMemoryStorage(t *testing.T) *RemoteMemoryStorage { require.NoError(t, err) require.NoError(t, server.Start()) + conn, err := grpc.NewClient( + opts.GRPCHostPort, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + defer conn.Close() + healthClient := grpc_health_v1.NewHealthClient(conn) + require.Eventually(t, func() bool { + req := &grpc_health_v1.HealthCheckRequest{} + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) + defer cancel() + resp, err := healthClient.Check(ctx, req) + if err != nil { + t.Logf("remote storage server is not ready: err=%v", err) + return false + } + t.Logf("remote storage server status: %v", resp.Status) + return resp.GetStatus() == grpc_health_v1.HealthCheckResponse_SERVING + }, 30*time.Second, time.Second, "failed to ensure remote storage server is ready") + return &RemoteMemoryStorage{ server: server, storageFactory: storageFactory, From 4adedea9b8ec43838ac3250080d912a570943819 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sun, 19 May 2024 15:38:56 -0400 Subject: [PATCH 4/4] dedupe-spans Signed-off-by: Yuri Shkuro --- plugin/storage/integration/integration.go | 2 +- plugin/storage/integration/trace_compare.go | 22 ++++++++++++++ .../storage/integration/trace_compare_test.go | 30 +++++++++++++++++++ 3 files changed, 53 insertions(+), 1 deletion(-) create mode 100644 plugin/storage/integration/trace_compare_test.go diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 224a491df2a..d85616987ca 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -206,7 +206,7 @@ func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { found := s.waitForCondition(t, func(t *testing.T) bool { var err error actual, err = s.SpanReader.GetTrace(context.Background(), expectedTraceID) - return err == nil && len(actual.Spans) == len(expected.Spans) + return err == nil && len(actual.Spans) >= len(expected.Spans) }) if !assert.True(t, found) { CompareTraces(t, expected, actual) diff --git a/plugin/storage/integration/trace_compare.go b/plugin/storage/integration/trace_compare.go index 7fed434a409..bb8c9ca83ba 100644 --- a/plugin/storage/integration/trace_compare.go +++ b/plugin/storage/integration/trace_compare.go @@ -47,6 +47,20 @@ func CompareSliceOfTraces(t *testing.T, expected []*model.Trace, actual []*model } } +// trace.Spans may contain spans with the same SpanID. Remove duplicates +// and keep the first one. Use a map to keep track of the spans we've seen. +func dedupeSpans(trace *model.Trace) { + seen := make(map[model.SpanID]bool) + var newSpans []*model.Span + for _, span := range trace.Spans { + if !seen[span.SpanID] { + seen[span.SpanID] = true + newSpans = append(newSpans, span) + } + } + trace.Spans = newSpans +} + // CompareTraces compares two traces func CompareTraces(t *testing.T, expected *model.Trace, actual *model.Trace) { if expected.Spans == nil { @@ -55,6 +69,14 @@ func CompareTraces(t *testing.T, expected *model.Trace, actual *model.Trace) { } require.NotNil(t, actual) require.NotNil(t, actual.Spans) + + // some storage implementation may retry writing of spans and end up with duplicates. + countBefore := len(actual.Spans) + dedupeSpans(actual) + if countAfter := len(actual.Spans); countAfter != countBefore { + t.Logf("Removed spans with duplicate span IDs; before=%d, after=%d", countBefore, countAfter) + } + model.SortTrace(expected) model.SortTrace(actual) checkSize(t, expected, actual) diff --git a/plugin/storage/integration/trace_compare_test.go b/plugin/storage/integration/trace_compare_test.go new file mode 100644 index 00000000000..a419d624ea8 --- /dev/null +++ b/plugin/storage/integration/trace_compare_test.go @@ -0,0 +1,30 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package integration + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/jaegertracing/jaeger/model" +) + +func TestDedupeSpans(t *testing.T) { + trace := &model.Trace{ + Spans: []*model.Span{ + { + SpanID: 1, + }, + { + SpanID: 1, + }, + { + SpanID: 2, + }, + }, + } + dedupeSpans(trace) + assert.Len(t, trace.Spans, 2) +}