Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add remote gRPC option for storage plugin #3383

Merged
merged 11 commits into from
Dec 10, 2021
33 changes: 2 additions & 31 deletions examples/memstore-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ import (
googleGRPC "google.golang.org/grpc"

"github.com/jaegertracing/jaeger/plugin/storage/grpc"
grpcMemory "github.com/jaegertracing/jaeger/plugin/storage/grpc/memory"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const (
Expand Down Expand Up @@ -71,10 +70,7 @@ func main() {
defer closer.Close()
opentracing.SetGlobalTracer(tracer)

memStorePlugin := &memoryStorePlugin{
store: memory.NewStore(),
archiveStore: memory.NewStore(),
}
memStorePlugin := grpcMemory.NewStoragePlugin(memory.NewStore(), memory.NewStore())
grpc.ServeWithGRPCServer(&shared.PluginServices{
Store: memStorePlugin,
ArchiveStore: memStorePlugin,
Expand All @@ -85,28 +81,3 @@ func main() {
})
})
}

type memoryStorePlugin struct {
store *memory.Store
archiveStore *memory.Store
}

func (ns *memoryStorePlugin) DependencyReader() dependencystore.Reader {
return ns.store
}

func (ns *memoryStorePlugin) SpanReader() spanstore.Reader {
return ns.store
}

func (ns *memoryStorePlugin) SpanWriter() spanstore.Writer {
return ns.store
}

func (ns *memoryStorePlugin) ArchiveSpanReader() spanstore.Reader {
return ns.archiveStore
}

func (ns *memoryStorePlugin) ArchiveSpanWriter() spanstore.Writer {
return ns.archiveStore
}
62 changes: 60 additions & 2 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,21 @@
package config

import (
"context"
"fmt"
"os/exec"
"runtime"
"time"

"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
)

Expand All @@ -33,6 +38,9 @@ type Configuration struct {
PluginBinary string `yaml:"binary" mapstructure:"binary"`
PluginConfigurationFile string `yaml:"configuration-file" mapstructure:"configuration_file"`
PluginLogLevel string `yaml:"log-level" mapstructure:"log_level"`
RemoteServerAddr string `yaml:"server" mapstructure:"server"`
RemoteTLS tlscfg.Options
RemoteConnectTimeout time.Duration `yaml:"connection-timeout" mapstructure:"connection-timeout"`
}

// ClientPluginServices defines services plugin can expose and its capabilities
Expand All @@ -43,11 +51,61 @@ type ClientPluginServices struct {

// PluginBuilder is used to create storage plugins. Implemented by Configuration.
type PluginBuilder interface {
Build() (*ClientPluginServices, error)
Build(logger *zap.Logger) (*ClientPluginServices, error)
Close() error
}

// Build instantiates a PluginServices
func (c *Configuration) Build() (*ClientPluginServices, error) {
func (c *Configuration) Build(logger *zap.Logger) (*ClientPluginServices, error) {
if c.PluginBinary != "" {
return c.buildPlugin()
} else {
return c.buildRemote(logger)
}
}

func (c *Configuration) Close() error {
if c.PluginBinary == "" {
return c.RemoteTLS.Close()
}
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if c.PluginBinary == "" {
return c.RemoteTLS.Close()
}
return nil
return c.RemoteTLS.Close()

the check is not necessary, TLSConfig.Close() is safe to call

}

func (c *Configuration) buildRemote(logger *zap.Logger) (*ClientPluginServices, error) {
opts := []grpc.DialOption{
grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())),
grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer())),
grpc.WithBlock(),
}
if c.RemoteTLS.Enabled {
tlsCfg, err := c.RemoteTLS.Config(logger)
if err != nil {
return nil, err
}
creds := credentials.NewTLS(tlsCfg)
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithInsecure())
}

ctx, cancel := context.WithTimeout(context.Background(), c.RemoteConnectTimeout)
defer cancel()
conn, err := grpc.DialContext(ctx, c.RemoteServerAddr, opts...)
if err != nil {
return nil, fmt.Errorf("error connecting to remote storage: %w", err)
}

grpcClient := shared.NewGRPCClient(conn)
return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: grpcClient,
ArchiveStore: grpcClient,
},
Capabilities: grpcClient,
}, nil
}

func (c *Configuration) buildPlugin() (*ClientPluginServices, error) {
// #nosec G204
cmd := exec.Command(c.PluginBinary, "--config", c.PluginConfigurationFile)

Expand Down
7 changes: 6 additions & 1 deletion plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (f *Factory) InitFromOptions(opts Options) {
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger

services, err := f.builder.Build()
services, err := f.builder.Build(logger)
if err != nil {
return fmt.Errorf("grpc-plugin builder failed to create a store: %w", err)
}
Expand Down Expand Up @@ -124,3 +124,8 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
}
return f.archiveStore.ArchiveSpanWriter(), nil
}

// Close closes the resources held by the factory
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
func (f *Factory) Close() error {
return f.builder.Close()
}
6 changes: 5 additions & 1 deletion plugin/storage/grpc/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type mockPluginBuilder struct {
err error
}

func (b *mockPluginBuilder) Build() (*grpcConfig.ClientPluginServices, error) {
func (b *mockPluginBuilder) Build(logger *zap.Logger) (*grpcConfig.ClientPluginServices, error) {
if b.err != nil {
return nil, b.err
}
Expand All @@ -60,6 +60,10 @@ func (b *mockPluginBuilder) Build() (*grpcConfig.ClientPluginServices, error) {
return services, nil
}

func (b *mockPluginBuilder) Close() error {
return nil
}

type mockPlugin struct {
spanReader spanstore.Reader
spanWriter spanstore.Writer
Expand Down
50 changes: 50 additions & 0 deletions plugin/storage/grpc/memory/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import (
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

type storagePlugin struct {
store *memory.Store
archiveStore *memory.Store
}

func NewStoragePlugin(mainStore *memory.Store, archiveStore *memory.Store) *storagePlugin {
return &storagePlugin{store: mainStore, archiveStore: archiveStore}
}

func (ns *storagePlugin) DependencyReader() dependencystore.Reader {
return ns.store
}

func (ns *storagePlugin) SpanReader() spanstore.Reader {
return ns.store
}

func (ns *storagePlugin) SpanWriter() spanstore.Writer {
return ns.store
}

func (ns *storagePlugin) ArchiveSpanReader() spanstore.Reader {
return ns.archiveStore
}

func (ns *storagePlugin) ArchiveSpanWriter() spanstore.Writer {
return ns.archiveStore
}
28 changes: 24 additions & 4 deletions plugin/storage/grpc/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,23 @@ package grpc

import (
"flag"
"time"

"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
)

const (
pluginBinary = "grpc-storage-plugin.binary"
pluginConfigurationFile = "grpc-storage-plugin.configuration-file"
pluginLogLevel = "grpc-storage-plugin.log-level"
defaultPluginLogLevel = "warn"
pluginBinary = "grpc-storage-plugin.binary"
pluginConfigurationFile = "grpc-storage-plugin.configuration-file"
pluginLogLevel = "grpc-storage-plugin.log-level"
remotePrefix = "grpc-storage"
remoteServer = remotePrefix + ".server"
remoteConnectionTimeout = remotePrefix + ".connection-timeout"
defaultPluginLogLevel = "warn"
defaultConnectionTimeout = time.Duration(5 * time.Second)
)

// Options contains GRPC plugins configs and provides the ability
Expand All @@ -35,16 +41,30 @@ type Options struct {
Configuration config.Configuration `mapstructure:",squash"`
}

func tlsFlagsConfig() tlscfg.ClientFlagsConfig {
return tlscfg.ClientFlagsConfig{
Prefix: remotePrefix,
}
}

// AddFlags adds flags for Options
func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
tlsFlagsConfig().AddFlags(flagSet)

flagSet.String(pluginBinary, "", "The location of the plugin binary")
flagSet.String(pluginConfigurationFile, "", "A path pointing to the plugin's configuration file, made available to the plugin with the --config arg")
flagSet.String(pluginLogLevel, defaultPluginLogLevel, "Set the log level of the plugin's logger")
flagSet.String(remoteServer, "", "The remote storage gRPC server address")
flagSet.Duration(remoteConnectionTimeout, defaultConnectionTimeout, "The remote storage gRPC server connection timeout")

}

// InitFromViper initializes Options with properties from viper
func (opt *Options) InitFromViper(v *viper.Viper) {
opt.Configuration.PluginBinary = v.GetString(pluginBinary)
opt.Configuration.PluginConfigurationFile = v.GetString(pluginConfigurationFile)
opt.Configuration.PluginLogLevel = v.GetString(pluginLogLevel)
opt.Configuration.RemoteServerAddr = v.GetString(remoteServer)
opt.Configuration.RemoteTLS = tlsFlagsConfig().InitFromViper(v)
opt.Configuration.RemoteConnectTimeout = v.GetDuration(remoteConnectionTimeout)
}
35 changes: 35 additions & 0 deletions plugin/storage/grpc/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package grpc

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -37,3 +38,37 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, opts.Configuration.PluginConfigurationFile, "config.json")
assert.Equal(t, opts.Configuration.PluginLogLevel, "debug")
}

func TestRemoteOptionsWithFlags(t *testing.T) {
opts := &Options{}
v, command := config.Viperize(opts.AddFlags)
err := command.ParseFlags([]string{
"--grpc-storage.server=localhost:2001",
"--grpc-storage.tls.enabled=true",
"--grpc-storage.connection-timeout=60s",
})
assert.NoError(t, err)
opts.InitFromViper(v)

assert.Equal(t, opts.Configuration.PluginBinary, "")
assert.Equal(t, opts.Configuration.RemoteServerAddr, "localhost:2001")
assert.Equal(t, opts.Configuration.RemoteTLS.Enabled, true)
assert.Equal(t, opts.Configuration.RemoteConnectTimeout, 60*time.Second)
}

func TestRemoteOptionsNoTLSWithFlags(t *testing.T) {
opts := &Options{}
v, command := config.Viperize(opts.AddFlags)
err := command.ParseFlags([]string{
"--grpc-storage.server=localhost:2001",
"--grpc-storage.tls.enabled=false",
"--grpc-storage.connection-timeout=60s",
})
assert.NoError(t, err)
opts.InitFromViper(v)

assert.Equal(t, opts.Configuration.PluginBinary, "")
assert.Equal(t, opts.Configuration.RemoteServerAddr, "localhost:2001")
assert.Equal(t, opts.Configuration.RemoteTLS.Enabled, false)
assert.Equal(t, opts.Configuration.RemoteConnectTimeout, 60*time.Second)
}
12 changes: 12 additions & 0 deletions plugin/storage/grpc/shared/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -53,6 +54,17 @@ type grpcClient struct {
depsReaderClient storage_v1.DependenciesReaderPluginClient
}

func NewGRPCClient(c *grpc.ClientConn) *grpcClient {
return &grpcClient{
readerClient: storage_v1.NewSpanReaderPluginClient(c),
writerClient: storage_v1.NewSpanWriterPluginClient(c),
archiveReaderClient: storage_v1.NewArchiveSpanReaderPluginClient(c),
archiveWriterClient: storage_v1.NewArchiveSpanWriterPluginClient(c),
capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(c),
depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(c),
}
}

// ContextUpgradeFunc is a functional type that can be composed to upgrade context
type ContextUpgradeFunc func(ctx context.Context) context.Context

Expand Down
Loading