Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[remote-storage] Add healthcheck to grpc server #5461

Merged
merged 4 commits into from
May 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/jaeger/internal/extension/jaegerquery/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (storageHost) GetExporters() map[component.DataType]map[component.ID]compon
func TestServerDependencies(t *testing.T) {
expectedDependencies := []component.ID{jaegerstorage.ID}
telemetrySettings := component.TelemetrySettings{
Logger: zaptest.NewLogger(t),
Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())),
}

server := newServer(createDefaultConfig().(*Config), telemetrySettings)
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestServerStart(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
telemetrySettings := component.TelemetrySettings{
Logger: zaptest.NewLogger(t),
Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())),
}
server := newServer(tt.config, telemetrySettings)
err := server.Start(context.Background(), host)
Expand Down
8 changes: 4 additions & 4 deletions cmd/jaeger/internal/integration/e2e_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type E2EStorageIntegration struct {
// it also initialize the SpanWriter and SpanReader below.
// This function should be called before any of the tests start.
func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {
logger := zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel))
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
configFile := createStorageCleanerConfig(t, s.ConfigFile, storage)
t.Logf("Starting Jaeger-v2 in the background with config file %s", configFile)

Expand Down Expand Up @@ -98,14 +98,14 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {
// A Github Actions special annotation to create a foldable section
// in the Github runner output.
// https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#grouping-log-lines
fmt.Println("::group::Jaeger-v2 binary logs")
fmt.Println("::group::🚧 🚧 🚧 Jaeger-v2 binary logs")
outLogs, err := os.ReadFile(outFile.Name())
require.NoError(t, err)
fmt.Printf("Jaeger-v2 output logs:\n%s", outLogs)
fmt.Printf("🚧 🚧 🚧 Jaeger-v2 output logs:\n%s", outLogs)

errLogs, err := os.ReadFile(errFile.Name())
require.NoError(t, err)
fmt.Printf("Jaeger-v2 error logs:\n%s", errLogs)
fmt.Printf("🚧 🚧 🚧 Jaeger-v2 error logs:\n%s", errLogs)
// End of Github Actions foldable section annotation.
fmt.Println("::endgroup::")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/integration/span_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (r *spanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*mode
for i := range received.Spans {
spans = append(spans, &received.Spans[i])
}
r.logger.Info(fmt.Sprintf("GetTrace received %d spans (total %d)", len(received.Spans), len(spans)))
// r.logger.Info(fmt.Sprintf("GetTrace received %d spans (total %d)", len(received.Spans), len(spans)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@james-ryans are you sure we need this log? It's very noisy, I don't think it adds much value other than a form of progress reporting (which I would do with much lighter approach, maybe logging every 1000 spans)

}
r.logger.Info(fmt.Sprintf("GetTraces received a total of %d spans", len(spans)))

Expand Down
4 changes: 3 additions & 1 deletion cmd/remote-storage/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
"google.golang.org/grpc/reflection"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
Expand Down Expand Up @@ -115,8 +116,9 @@ func createGRPCServer(opts *Options, tm *tenancy.Manager, handler *shared.GRPCHa
}

server := grpc.NewServer(grpcOpts...)
healthServer := health.NewServer()
reflection.Register(server)
handler.Register(server)
handler.Register(server, healthServer)

return server, nil
}
Expand Down
14 changes: 13 additions & 1 deletion plugin/storage/grpc/shared/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"

"github.com/jaegertracing/jaeger/model"
Expand Down Expand Up @@ -84,14 +86,24 @@ func NewGRPCHandlerWithPlugins(
}

// Register registers the server as gRPC methods handler.
func (s *GRPCHandler) Register(ss *grpc.Server) error {
func (s *GRPCHandler) Register(ss *grpc.Server, hs *health.Server) error {
storage_v1.RegisterSpanReaderPluginServer(ss, s)
storage_v1.RegisterSpanWriterPluginServer(ss, s)
storage_v1.RegisterArchiveSpanReaderPluginServer(ss, s)
storage_v1.RegisterArchiveSpanWriterPluginServer(ss, s)
storage_v1.RegisterPluginCapabilitiesServer(ss, s)
storage_v1.RegisterDependenciesReaderPluginServer(ss, s)
storage_v1.RegisterStreamingSpanWriterPluginServer(ss, s)

hs.SetServingStatus("jaeger.storage.v1.SpanReaderPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
hs.SetServingStatus("jaeger.storage.v1.SpanWriterPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
hs.SetServingStatus("jaeger.storage.v1.ArchiveSpanReaderPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
hs.SetServingStatus("jaeger.storage.v1.ArchiveSpanWriterPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
hs.SetServingStatus("jaeger.storage.v1.PluginCapabilities", grpc_health_v1.HealthCheckResponse_SERVING)
hs.SetServingStatus("jaeger.storage.v1.DependenciesReaderPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
hs.SetServingStatus("jaeger.storage.v1.StreamingSpanWriterPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
grpc_health_v1.RegisterHealthServer(ss, hs)

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/badgerstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *BadgerIntegrationStorage) initialize(t *testing.T) {
s.factory = badger.NewFactory()
s.factory.Options.Primary.Ephemeral = false

logger := zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel))
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
err := s.factory.Initialize(metrics.NullFactory, logger)
require.NoError(t, err)
t.Cleanup(func() {
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (s *CassandraStorageIntegration) cleanUp(t *testing.T) {
}

func (s *CassandraStorageIntegration) initializeCassandraFactory(t *testing.T, flags []string) *cassandra.Factory {
logger := zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel))
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
f := cassandra.NewFactory()
v, command := config.Viperize(f.AddFlags)
require.NoError(t, command.ParseFlags(flags))
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s *ESStorageIntegration) esCleanUp(t *testing.T) {
}

func (s *ESStorageIntegration) initializeESFactory(t *testing.T, allTagsAsFields bool) *es.Factory {
logger := zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel))
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
f := es.NewFactory()
v, command := config.Viperize(f.AddFlags)
args := []string{
Expand Down
3 changes: 2 additions & 1 deletion plugin/storage/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"github.com/jaegertracing/jaeger/pkg/config"
Expand All @@ -34,7 +35,7 @@ type GRPCStorageIntegrationTestSuite struct {
}

func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) {
logger := zaptest.NewLogger(t)
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
s.remoteStorage = StartNewRemoteMemoryStorage(t)

f := grpc.NewFactory()
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type KafkaIntegrationTestSuite struct {
}

func (s *KafkaIntegrationTestSuite) initialize(t *testing.T) {
logger := zaptest.NewLogger(t, zaptest.Level(zap.DebugLevel))
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
const encoding = "json"
const groupID = "kafka-integration-test"
const clientID = "kafka-integration-test"
Expand Down
28 changes: 27 additions & 1 deletion plugin/storage/integration/remote_memory_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@
package integration

import (
"context"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/jaegertracing/jaeger/cmd/remote-storage/app"
"github.com/jaegertracing/jaeger/pkg/config"
Expand All @@ -25,7 +31,7 @@ type RemoteMemoryStorage struct {
}

func StartNewRemoteMemoryStorage(t *testing.T) *RemoteMemoryStorage {
logger := zaptest.NewLogger(t)
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
opts := &app.Options{
GRPCHostPort: ports.PortToHostPort(ports.RemoteStorageGRPC),
Tenancy: tenancy.Options{
Expand All @@ -45,6 +51,26 @@ func StartNewRemoteMemoryStorage(t *testing.T) *RemoteMemoryStorage {
require.NoError(t, err)
require.NoError(t, server.Start())

conn, err := grpc.NewClient(
opts.GRPCHostPort,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
require.NoError(t, err)
defer conn.Close()
healthClient := grpc_health_v1.NewHealthClient(conn)
require.Eventually(t, func() bool {
req := &grpc_health_v1.HealthCheckRequest{}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
resp, err := healthClient.Check(ctx, req)
if err != nil {
t.Logf("remote storage server is not ready: err=%v", err)
return false
}
t.Logf("remote storage server status: %v", resp.Status)
return resp.GetStatus() == grpc_health_v1.HealthCheckResponse_SERVING
}, 30*time.Second, time.Second, "failed to ensure remote storage server is ready")

return &RemoteMemoryStorage{
server: server,
storageFactory: storageFactory,
Expand Down
Loading