Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support graceful shutdown in grpc plugin #3249

Merged
merged 3 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ install-mockery:
generate-mocks: install-mockery
$(MOCKERY) -all -dir ./pkg/es/ -output ./pkg/es/mocks && rm pkg/es/mocks/ClientBuilder.go
$(MOCKERY) -all -dir ./storage/spanstore/ -output ./storage/spanstore/mocks
$(MOCKERY) -all -dir ./proto-gen/storage_v1/ -output ./proto-gen/storage_v1/mocks

.PHONY: echo-version
echo-version:
Expand Down
9 changes: 9 additions & 0 deletions plugin/storage/grpc/proto/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ message WriteSpanResponse {

}

// empty; extensible in the future
message CloseWriterRequest {
}

// empty; extensible in the future
message CloseWriterResponse {
}

message GetTraceRequest {
bytes trace_id = 1 [
(gogoproto.nullable) = false,
Expand Down Expand Up @@ -135,6 +143,7 @@ message FindTraceIDsResponse {
service SpanWriterPlugin {
// spanstore/Writer
rpc WriteSpan(WriteSpanRequest) returns (WriteSpanResponse);
rpc Close(CloseWriterRequest) returns (CloseWriterResponse);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a backwards-incompatible change. It would work for plugins that are based on grpc_server, but that's not a requirement for all plugins, some of them could even be written in a different language.

Aren't there other mechanisms available for the plugin to do a graceful shutdown? I am not sure how hashicorp/plugin manages the child process, maybe there's a way to add a hook on the OS level, like trapping the kill signal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that is an incompatible change?

I expect that all GRPC implementation follow the spec and return codes.Unimplemented for methods that are not implemented by the server. So we will perform rpc, get an error and just ignore it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's the case then it might be ok. Do you have reference to the GRPC spec that documents that (or is it protobuf spec)?

It would be good to validate this on the test mtemory-plugin that we have, i.e. compile it with existing code, then run with an upgraded client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the reference. https://github.com/grpc/grpc/blob/master/doc/statuscodes.md

Here you can see go implementation. https://github.com/grpc/grpc-go/blob/c361e9ea1646283baf7b23a5d060c45fce9a1dea/server.go#L1616-L1626

Checked core C implementation just in case. Looks like it does not handle Service/Method lookup and just passes raw messages to the single user provided handler. So in theory there might be plugin written in C that would break. But I'm not sure it is worth handling that case.

Validated by building memstore from master, and collector from branch (with additional debug Printf).

prime@bee ~/C/jaeger (master)> go build ./examples/memstore-plugin/
prime@bee ~/C/jaeger (master)> git checkout grpc_writer_graceful_shutdown 
Switched to branch 'grpc_writer_graceful_shutdown'
prime@bee ~/C/jaeger (grpc_writer_graceful_shutdown)> go build ./cmd/collector/
prime@bee ~/C/jaeger (grpc_writer_graceful_shutdown)> env SPAN_STORAGE_TYPE=grpc-plugin ./collector --grpc-storage-plugin.binary=./memstore-plugin
2021/09/07 20:37:05 maxprocs: Leaving GOMAXPROCS=8: CPU quota undefined
{"level":"info","ts":1631036225.8372736,"caller":"flags/service.go:117","msg":"Mounting metrics handler on admin server","route":"/metrics"}
{"level":"info","ts":1631036225.8373,"caller":"flags/service.go:123","msg":"Mounting expvar handler on admin server","route":"/debug/vars"}
{"level":"info","ts":1631036225.837396,"caller":"flags/admin.go:104","msg":"Mounting health check on admin server","route":"/"}
{"level":"info","ts":1631036225.8374236,"caller":"flags/admin.go:115","msg":"Starting admin HTTP server","http-addr":":14269"}
{"level":"info","ts":1631036225.8374321,"caller":"flags/admin.go:96","msg":"Admin server started","http.host-port":"[::]:14269","health-status":"unavailable"}
{"level":"info","ts":1631036225.8412266,"caller":"channelz/logging.go:50","msg":"[core]parsed scheme: \"\"","system":"grpc","grpc_log":true}
{"level":"info","ts":1631036225.841276,"caller":"channelz/logging.go:50","msg":"[core]scheme \"\" not registered, fallback to default scheme","system":"grpc","grpc_log":true}
{"level":"info","ts":1631036225.8412967,"caller":"channelz/logging.go:50","msg":"[core]ccResolverWrapper: sending update to cc: {[{unused  <nil> 0 <nil>}] <nil> <nil>}","system":"grpc","grpc_log":true}
{"level":"info","ts":1631036225.8413036,"caller":"channelz/logging.go:50","msg":"[core]ClientConn switching balancer to \"pick_first\"","system":"grpc","grpc_log":true}
{"level":"info","ts":1631036225.841308,"caller":"channelz/logging.go:50","msg":"[core]Channel switches to new LB policy \"pick_first\"","system":"grpc","grpc_log":true}
{"level":"info","ts":1631036225.8413296,"caller":"channelz/logging.go:50","msg":"[core]Subchannel Connectivity change to CONNECTING","system":"grpc","grpc_log":true}
{"level":"info","ts":1631036225.8413532,"caller":"grpclog/component.go:71","msg":"[core]blockingPicker: the picked transport is not ready, loop back to repick","system":"grpc","grpc_log":true}
{"level":"info","ts":1631036225.841393,"caller":"grpclog/component.go:71","msg":"[core]blockingPicker: the picked transport is not ready, loop back to repick","system":"grpc","grpc_log":true}
{"level":"info","ts":1631036225.8413951,"caller":"channelz/logging.go:50","msg":"[core]Subchannel picks a new address \"unused\" to connect","system":"grpc","grpc_log":true}
{"level":"info","ts":1631036225.8414173,"caller":"grpclog/component.go:71","msg":"[core]pickfirstBalancer: UpdateSubConnState: 0xc00052c9a0, {CONNECTING <nil>}","system":"grpc","grpc_log":true}
{"level":"info","ts":1631036225.841442,"caller":"channelz/logging.go:50","msg":"[core]Channel Connectivity change to CONNECTING","system":"grpc","grpc_log":true}
{"level":"info","ts":1631036225.841611,"caller":"channelz/logging.go:50","msg":"[core]Subchannel Connectivity change to READY","system":"grpc","grpc_log":true}
{"level":"info","ts":1631036225.8416324,"caller":"grpclog/component.go:71","msg":"[core]pickfirstBalancer: UpdateSubConnState: 0xc00052c9a0, {READY <nil>}","system":"grpc","grpc_log":true}
{"level":"info","ts":1631036225.8416374,"caller":"channelz/logging.go:50","msg":"[core]Channel Connectivity change to READY","system":"grpc","grpc_log":true}
{"level":"info","ts":1631036225.8417265,"caller":"grpc/factory.go:79","msg":"External plugin storage configuration","configuration":{"PluginBinary":"./memstore-plugin","PluginConfigurationFile":"","PluginLogLevel":"warn"}}
{"level":"info","ts":1631036225.8419108,"caller":"static/strategy_store.go:203","msg":"No sampling strategies provided or URL is unavailable, using defaults"}
{"level":"info","ts":1631036225.8441026,"caller":"server/grpc.go:82","msg":"Starting jaeger-collector gRPC server","grpc.host-port":":14250"}
{"level":"info","ts":1631036225.8441298,"caller":"server/http.go:48","msg":"Starting jaeger-collector HTTP server","http host-port":":14268"}
{"level":"info","ts":1631036225.844246,"caller":"server/zipkin.go:49","msg":"Not listening for Zipkin HTTP traffic, port not configured"}
{"level":"info","ts":1631036225.844262,"caller":"healthcheck/handler.go:129","msg":"Health Check state change","status":"ready"}
^C{"level":"info","ts":1631036226.242645,"caller":"flags/service.go:154","msg":"Shutting down"}
{"level":"info","ts":1631036226.2427242,"caller":"healthcheck/handler.go:129","msg":"Health Check state change","status":"unavailable"}
{"level":"info","ts":1631036226.2428875,"caller":"healthcheck/handler.go:129","msg":"Health Check state change","status":"unavailable"}
rpc error: code = Unimplemented desc = unknown method Close for service jaeger.storage.v1.SpanWriterPlugin
{"level":"info","ts":1631036226.2463157,"caller":"flags/service.go:162","msg":"Shutdown complete"}

}

service SpanReaderPlugin {
Expand Down
10 changes: 10 additions & 0 deletions plugin/storage/grpc/shared/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,23 @@ func (c *grpcClient) WriteSpan(ctx context.Context, span *model.Span) error {
_, err := c.writerClient.WriteSpan(ctx, &storage_v1.WriteSpanRequest{
Span: span,
})

if err != nil {
return fmt.Errorf("plugin error: %w", err)
}

return nil
}

func (c *grpcClient) Close() error {
_, err := c.writerClient.Close(context.Background(), &storage_v1.CloseWriterRequest{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: does it make sense to define a timeout/deadline for the context in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. In-process writer currently has not timeout on shutdown.
  2. Plugin RPC works over unix socket. Network timeout can't happen. If child process dies, we will receive error right away.
  3. In my use case, there will be external timeout on instance shutdown from the deployment system.

So I think using context.Background is OK.

if err != nil && status.Code(err) != codes.Unimplemented {
return fmt.Errorf("plugin error: %w", err)
}

return nil
}

// GetDependencies returns all interservice dependencies
func (c *grpcClient) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
resp, err := c.depsReaderClient.GetDependencies(ctx, &storage_v1.GetDependenciesRequest{
Expand Down
19 changes: 19 additions & 0 deletions plugin/storage/grpc/shared/grpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,25 @@ func TestGRPCClientWriteSpan(t *testing.T) {
})
}

func TestGRPCClientCloseWriter(t *testing.T) {
withGRPCClient(func(r *grpcClientTest) {
r.spanWriter.On("Close", mock.Anything, &storage_v1.CloseWriterRequest{}).Return(&storage_v1.CloseWriterResponse{}, nil)

err := r.client.Close()
assert.NoError(t, err)
})
}

func TestGRPCClientCloseNotSupported(t *testing.T) {
withGRPCClient(func(r *grpcClientTest) {
r.spanWriter.On("Close", mock.Anything, &storage_v1.CloseWriterRequest{}).Return(
nil, status.Errorf(codes.Unimplemented, "method not implemented"))

err := r.client.Close()
assert.NoError(t, err)
})
}

func TestGRPCClientGetDependencies(t *testing.T) {
withGRPCClient(func(r *grpcClientTest) {
lookback := time.Duration(1 * time.Second)
Expand Down
13 changes: 13 additions & 0 deletions plugin/storage/grpc/shared/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package shared
import (
"context"
"fmt"
"io"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -54,6 +55,18 @@ func (s *grpcServer) WriteSpan(ctx context.Context, r *storage_v1.WriteSpanReque
return &storage_v1.WriteSpanResponse{}, nil
}

func (s *grpcServer) Close(ctx context.Context, r *storage_v1.CloseWriterRequest) (*storage_v1.CloseWriterResponse, error) {
if closer, ok := s.Impl.SpanWriter().(io.Closer); ok {
if err := closer.Close(); err != nil {
return nil, err
}

return &storage_v1.CloseWriterResponse{}, nil
} else {
return nil, status.Error(codes.Unimplemented, "span writer does not support graceful shutdown")
}
}

// GetTrace takes a traceID and streams a Trace associated with that traceID
func (s *grpcServer) GetTrace(r *storage_v1.GetTraceRequest, stream storage_v1.SpanReaderPlugin_GetTraceServer) error {
trace, err := s.Impl.SpanReader().GetTrace(stream.Context(), r.TraceID)
Expand Down
13 changes: 9 additions & 4 deletions proto-gen/storage_v1/mocks/DependenciesReaderPluginClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions proto-gen/storage_v1/mocks/DependenciesReaderPluginServer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 9 additions & 4 deletions proto-gen/storage_v1/mocks/SpanReaderPluginClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions proto-gen/storage_v1/mocks/SpanReaderPluginServer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions proto-gen/storage_v1/mocks/SpanReaderPlugin_FindTracesClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions proto-gen/storage_v1/mocks/SpanReaderPlugin_FindTracesServer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions proto-gen/storage_v1/mocks/SpanReaderPlugin_GetTraceClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions proto-gen/storage_v1/mocks/SpanReaderPlugin_GetTraceServer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 39 additions & 4 deletions proto-gen/storage_v1/mocks/SpanWriterPluginClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 29 additions & 3 deletions proto-gen/storage_v1/mocks/SpanWriterPluginServer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading