Skip to content

Commit

Permalink
Inject trace context to grpc metadata (#2870)
Browse files Browse the repository at this point in the history
* inject trace context to grpc metadata

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* remove unused vars

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* run make fmt

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* fix year in header

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* refactor with ContextUpgradeFunc

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* refactor another usage in archive.go

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* eliminate metadata copy

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* retry unit tests

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* update hashicorp/go-plugin to 1.4.2

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* tidy

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* remove context upgrade

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* add tracing interceptor

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* remove test cases in grpc_plugin_test

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* directly depend on grpc-ot

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* run make fmt

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* add example

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* add docs

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>
  • Loading branch information
lujiajing1126 committed Jun 4, 2021
1 parent 2687983 commit ff32436
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 17 deletions.
38 changes: 34 additions & 4 deletions examples/memstore-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ import (
"path"
"strings"

"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/hashicorp/go-plugin"
"github.com/opentracing/opentracing-go"
"github.com/spf13/viper"
jaegerClientConfig "github.com/uber/jaeger-client-go/config"
googleGRPC "google.golang.org/grpc"

"github.com/jaegertracing/jaeger/plugin/storage/grpc"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
Expand All @@ -28,6 +33,10 @@ import (
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const (
serviceName = "mem-store"
)

var configPath string

func main() {
Expand All @@ -46,13 +55,34 @@ func main() {
opts := memory.Options{}
opts.InitFromViper(v)

plugin := &memoryStorePlugin{
traceCfg := &jaegerClientConfig.Configuration{
ServiceName: serviceName,
Sampler: &jaegerClientConfig.SamplerConfig{
Type: "const",
Param: 1.0,
},
RPCMetrics: true,
}

tracer, closer, err := traceCfg.NewTracer()
if err != nil {
panic("Failed to initialize tracer")
}
defer closer.Close()
opentracing.SetGlobalTracer(tracer)

memStorePlugin := &memoryStorePlugin{
store: memory.NewStore(),
archiveStore: memory.NewStore(),
}
grpc.Serve(&shared.PluginServices{
Store: plugin,
ArchiveStore: plugin,
grpc.ServeWithGRPCServer(&shared.PluginServices{
Store: memStorePlugin,
ArchiveStore: memStorePlugin,
}, func(options []googleGRPC.ServerOption) *googleGRPC.Server {
return plugin.DefaultGRPCServer([]googleGRPC.ServerOption{
googleGRPC.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)),
googleGRPC.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(tracer)),
})
})
}

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ require (
github.com/gorilla/handlers v1.5.1
github.com/gorilla/mux v1.8.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/hashicorp/go-hclog v0.16.1
github.com/hashicorp/go-plugin v1.4.1
github.com/hashicorp/go-plugin v1.4.2
github.com/hashicorp/yamux v0.0.0-20190923154419-df201c70410d // indirect
github.com/kr/pretty v0.2.1
github.com/mailru/easyjson v0.7.7 // indirect
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 h1:MJG/KsmcqMwFAkh8mTnAwhyKoB+sTAnY4CACC110tbU=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
Expand All @@ -363,8 +364,8 @@ github.com/hashicorp/go-hclog v0.16.1/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-plugin v1.4.1 h1:6UltRQlLN9iZO513VveELp5xyaFxVD2+1OVylE+2E+w=
github.com/hashicorp/go-plugin v1.4.1/go.mod h1:5fGEH17QVwTTcR0zV7yhDPLLmFX9YSZ38b18Udy6vYQ=
github.com/hashicorp/go-plugin v1.4.2 h1:yFvG3ufXXpqiMiZx9HLcaK3XbIqQ1WJFR/F1a2CuVw0=
github.com/hashicorp/go-plugin v1.4.2/go.mod h1:5fGEH17QVwTTcR0zV7yhDPLLmFX9YSZ38b18Udy6vYQ=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
Expand Down
22 changes: 22 additions & 0 deletions plugin/storage/grpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,28 @@ There are more logger options that can be used with `hclog` listed on [godoc](ht
Note: Setting the `Output` option to `os.Stdout` can confuse the `go-plugin` framework and lead it to consider the plugin
errored.

Tracing
-------

When `grpc-plugin` is used, it will be running as a separated process, thus context propagation is necessary for inter-process scenarios.

In order to get complete traces containing both `jaeger-component`(s) and the `grpc-plugin`, developers should enable tracing at server-side.
Thus, we can leverage gRPC interceptors,

```golang
grpc.ServeWithGRPCServer(&shared.PluginServices{
Store: memStorePlugin,
ArchiveStore: memStorePlugin,
}, func(options []googleGRPC.ServerOption) *googleGRPC.Server {
return plugin.DefaultGRPCServer([]googleGRPC.ServerOption{
googleGRPC.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)),
googleGRPC.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(tracer)),
})
})
```

Refer to `example/memstore-plugin` for more details.

Bearer token propagation from the UI
------------------------------------
When using `--query.bearer-token-propagation=true`, the bearer token will be properly passed on to the gRPC plugin server. To get access to the bearer token in your plugin, use a method similar to:
Expand Down
7 changes: 7 additions & 0 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ import (
"os/exec"
"runtime"

"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"github.com/opentracing/opentracing-go"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
)
Expand Down Expand Up @@ -58,6 +61,10 @@ func (c *Configuration) Build() (*ClientPluginServices, error) {
Logger: hclog.New(&hclog.LoggerOptions{
Level: hclog.LevelFromString(c.PluginLogLevel),
}),
GRPCDialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())),
grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer())),
},
})

runtime.SetFinalizer(client, func(c *plugin.Client) {
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/grpc/shared/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type archiveWriter struct {

// GetTrace takes a traceID and returns a Trace associated with that traceID from Archive Storage
func (r *archiveReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
stream, err := r.client.GetArchiveTrace(upgradeContextWithBearerToken(ctx), &storage_v1.GetTraceRequest{
stream, err := r.client.GetArchiveTrace(upgradeContext(ctx), &storage_v1.GetTraceRequest{
TraceID: traceID,
})
if status.Code(err) == codes.NotFound {
Expand Down
37 changes: 28 additions & 9 deletions plugin/storage/grpc/shared/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ var (
_ StoragePlugin = (*grpcClient)(nil)
_ ArchiveStoragePlugin = (*grpcClient)(nil)
_ PluginCapabilities = (*grpcClient)(nil)

// upgradeContext composites several steps of upgrading context
upgradeContext = composeContextUpgradeFuncs(upgradeContextWithBearerToken)
)

// grpcClient implements shared.StoragePlugin and reads/writes spans and dependencies
Expand All @@ -46,16 +49,32 @@ type grpcClient struct {
depsReaderClient storage_v1.DependenciesReaderPluginClient
}

// ContextUpgradeFunc is a functional type that can be composed to upgrade context
type ContextUpgradeFunc func(ctx context.Context) context.Context

// composeContextUpgradeFuncs composes ContextUpgradeFunc and returns a composed function
// to run the given func in strict order.
func composeContextUpgradeFuncs(funcs ...ContextUpgradeFunc) ContextUpgradeFunc {
return func(ctx context.Context) context.Context {
for _, fun := range funcs {
ctx = fun(ctx)
}
return ctx
}
}

// upgradeContextWithBearerToken turns the context into a gRPC outgoing context with bearer token
// in the request metadata, if the original context has bearer token attached.
// Otherwise returns original context.
func upgradeContextWithBearerToken(ctx context.Context) context.Context {
bearerToken, hasToken := spanstore.GetBearerToken(ctx)
if hasToken {
requestMetadata := metadata.New(map[string]string{
spanstore.BearerTokenKey: bearerToken,
})
return metadata.NewOutgoingContext(ctx, requestMetadata)
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(nil)
}
md.Set(spanstore.BearerTokenKey, bearerToken)
return metadata.NewOutgoingContext(ctx, md)
}
return ctx
}
Expand Down Expand Up @@ -85,7 +104,7 @@ func (c *grpcClient) ArchiveSpanWriter() spanstore.Writer {

// GetTrace takes a traceID and returns a Trace associated with that traceID
func (c *grpcClient) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
stream, err := c.readerClient.GetTrace(upgradeContextWithBearerToken(ctx), &storage_v1.GetTraceRequest{
stream, err := c.readerClient.GetTrace(upgradeContext(ctx), &storage_v1.GetTraceRequest{
TraceID: traceID,
})
if status.Code(err) == codes.NotFound {
Expand All @@ -100,7 +119,7 @@ func (c *grpcClient) GetTrace(ctx context.Context, traceID model.TraceID) (*mode

// GetServices returns a list of all known services
func (c *grpcClient) GetServices(ctx context.Context) ([]string, error) {
resp, err := c.readerClient.GetServices(upgradeContextWithBearerToken(ctx), &storage_v1.GetServicesRequest{})
resp, err := c.readerClient.GetServices(upgradeContext(ctx), &storage_v1.GetServicesRequest{})
if err != nil {
return nil, fmt.Errorf("plugin error: %w", err)
}
Expand All @@ -113,7 +132,7 @@ func (c *grpcClient) GetOperations(
ctx context.Context,
query spanstore.OperationQueryParameters,
) ([]spanstore.Operation, error) {
resp, err := c.readerClient.GetOperations(upgradeContextWithBearerToken(ctx), &storage_v1.GetOperationsRequest{
resp, err := c.readerClient.GetOperations(upgradeContext(ctx), &storage_v1.GetOperationsRequest{
Service: query.ServiceName,
SpanKind: query.SpanKind,
})
Expand Down Expand Up @@ -141,7 +160,7 @@ func (c *grpcClient) GetOperations(

// FindTraces retrieves traces that match the traceQuery
func (c *grpcClient) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
stream, err := c.readerClient.FindTraces(upgradeContextWithBearerToken(ctx), &storage_v1.FindTracesRequest{
stream, err := c.readerClient.FindTraces(upgradeContext(ctx), &storage_v1.FindTracesRequest{
Query: &storage_v1.TraceQueryParameters{
ServiceName: query.ServiceName,
OperationName: query.OperationName,
Expand Down Expand Up @@ -179,7 +198,7 @@ func (c *grpcClient) FindTraces(ctx context.Context, query *spanstore.TraceQuery

// FindTraceIDs retrieves traceIDs that match the traceQuery
func (c *grpcClient) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryParameters) ([]model.TraceID, error) {
resp, err := c.readerClient.FindTraceIDs(upgradeContextWithBearerToken(ctx), &storage_v1.FindTraceIDsRequest{
resp, err := c.readerClient.FindTraceIDs(upgradeContext(ctx), &storage_v1.FindTraceIDsRequest{
Query: &storage_v1.TraceQueryParameters{
ServiceName: query.ServiceName,
OperationName: query.OperationName,
Expand Down

0 comments on commit ff32436

Please sign in to comment.