From c86a3de3f6678f682efea0f9efa68d0d46d5286f Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Wed, 10 Nov 2021 08:46:41 -0500 Subject: [PATCH 1/9] Add remote gRPC option for storage plugin Adds the option to host a gRPC storage API on a remote endpoint using regular gRPC. Previously the plugin system only supported local socket connections through the go-hashicorp plugin system. Signed-off-by: Matvey Arye --- plugin/storage/grpc/config/config.go | 59 +++++++++++++++++++++-- plugin/storage/grpc/options.go | 27 +++++++++-- plugin/storage/grpc/options_test.go | 22 +++++++++ plugin/storage/grpc/shared/grpc_client.go | 12 +++++ plugin/storage/grpc/shared/plugin.go | 9 +--- 5 files changed, 114 insertions(+), 15 deletions(-) diff --git a/plugin/storage/grpc/config/config.go b/plugin/storage/grpc/config/config.go index 72a04a422ee..9424dfcd455 100644 --- a/plugin/storage/grpc/config/config.go +++ b/plugin/storage/grpc/config/config.go @@ -15,24 +15,32 @@ package config import ( + "context" "fmt" "os/exec" "runtime" + "time" "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" "github.com/opentracing/opentracing-go" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" ) // Configuration describes the options to customize the storage behavior. type Configuration struct { - PluginBinary string `yaml:"binary" mapstructure:"binary"` - PluginConfigurationFile string `yaml:"configuration-file" mapstructure:"configuration_file"` - PluginLogLevel string `yaml:"log-level" mapstructure:"log_level"` + PluginBinary string `yaml:"binary" mapstructure:"binary"` + PluginConfigurationFile string `yaml:"configuration-file" mapstructure:"configuration_file"` + PluginLogLevel string `yaml:"log-level" mapstructure:"log_level"` + RemoteServerAddr string `yaml:"server" mapstructure:"server"` + RemoteTLS bool `yaml:"tls" mapstructure:"tls"` + RemoteCAFile string `yaml:"cafile" mapstructure:"cafile"` + RemoteServerHostOverride string `yaml:"server-host-override" mapstructure:"server-host-override"` + RemoteConnectTimeout time.Duration `yaml:"connection-timeout" mapstructure:"connection-timeout"` } // ClientPluginServices defines services plugin can expose and its capabilities @@ -48,6 +56,51 @@ type PluginBuilder interface { // Build instantiates a PluginServices func (c *Configuration) Build() (*ClientPluginServices, error) { + if c.PluginBinary != "" { + return c.buildPlugin() + } else { + return c.BuildRemote() + } +} + +func (c *Configuration) BuildRemote() (*ClientPluginServices, error) { + opts := []grpc.DialOption{ + grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())), + grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer())), + grpc.WithBlock(), + } + var err error + if c.RemoteTLS { + if c.RemoteCAFile == "" { + return nil, fmt.Errorf("ca file is required with TLS") + } + creds, err := credentials.NewClientTLSFromFile(c.RemoteCAFile, c.RemoteServerHostOverride) + if err != nil { + return nil, fmt.Errorf("failed to create TLS credentials %w", err) + } + opts = append(opts, grpc.WithTransportCredentials(creds)) + } else { + opts = append(opts, grpc.WithInsecure()) + } + + ctx, cancel := context.WithTimeout(context.Background(), c.RemoteConnectTimeout) + defer cancel() + conn, err := grpc.DialContext(ctx, c.RemoteServerAddr, opts...) + if err != nil { + return nil, fmt.Errorf("error connecting to Promscale GRPC server: %w", err) + } + + grpcClient := shared.NewGRPCClient(conn) + return &ClientPluginServices{ + PluginServices: shared.PluginServices{ + Store: grpcClient, + ArchiveStore: grpcClient, + }, + Capabilities: grpcClient, + }, nil +} + +func (c *Configuration) buildPlugin() (*ClientPluginServices, error) { // #nosec G204 cmd := exec.Command(c.PluginBinary, "--config", c.PluginConfigurationFile) diff --git a/plugin/storage/grpc/options.go b/plugin/storage/grpc/options.go index 7938a9dde9c..47803cb6963 100644 --- a/plugin/storage/grpc/options.go +++ b/plugin/storage/grpc/options.go @@ -16,6 +16,7 @@ package grpc import ( "flag" + "time" "github.com/spf13/viper" @@ -23,10 +24,16 @@ import ( ) const ( - pluginBinary = "grpc-storage-plugin.binary" - pluginConfigurationFile = "grpc-storage-plugin.configuration-file" - pluginLogLevel = "grpc-storage-plugin.log-level" - defaultPluginLogLevel = "warn" + pluginBinary = "grpc-storage-plugin.binary" + pluginConfigurationFile = "grpc-storage-plugin.configuration-file" + pluginLogLevel = "grpc-storage-plugin.log-level" + pluginServer = "grpc-storage-plugin.server" + pluginTLS = "grpc-storage-plugin.tls" + pluginCAFile = "grpc-storage-plugin.ca-file" + pluginServerHostOverride = "grpc-storage-plugin.server-host-override" + pluginConnectionTimeout = "grpc-storage-plugin.connection-timeout" + defaultPluginLogLevel = "warn" + defaultConnectionTimeout = time.Duration(5 * time.Second) ) // Options contains GRPC plugins configs and provides the ability @@ -40,6 +47,12 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { flagSet.String(pluginBinary, "", "The location of the plugin binary") flagSet.String(pluginConfigurationFile, "", "A path pointing to the plugin's configuration file, made available to the plugin with the --config arg") flagSet.String(pluginLogLevel, defaultPluginLogLevel, "Set the log level of the plugin's logger") + flagSet.String(pluginServer, "", "The server address for the remote gRPC server") + flagSet.Bool(pluginTLS, false, "Whether to use TLS for the remote connection") + flagSet.String(pluginCAFile, "", "The CA file for the remote connection") + flagSet.String(pluginServerHostOverride, "", "The server host override for the remote connection") + flagSet.Duration(pluginConnectionTimeout, defaultConnectionTimeout, "The connection timeout for connecting to the remote server") + } // InitFromViper initializes Options with properties from viper @@ -47,4 +60,10 @@ func (opt *Options) InitFromViper(v *viper.Viper) { opt.Configuration.PluginBinary = v.GetString(pluginBinary) opt.Configuration.PluginConfigurationFile = v.GetString(pluginConfigurationFile) opt.Configuration.PluginLogLevel = v.GetString(pluginLogLevel) + opt.Configuration.RemoteServerAddr = v.GetString(pluginServer) + opt.Configuration.RemoteTLS = v.GetBool(pluginTLS) + opt.Configuration.RemoteCAFile = v.GetString(pluginCAFile) + opt.Configuration.RemoteServerHostOverride = v.GetString(pluginServerHostOverride) + opt.Configuration.RemoteConnectTimeout = v.GetDuration(pluginConnectionTimeout) + } diff --git a/plugin/storage/grpc/options_test.go b/plugin/storage/grpc/options_test.go index 5e563c5c15c..bc2046dd362 100644 --- a/plugin/storage/grpc/options_test.go +++ b/plugin/storage/grpc/options_test.go @@ -16,6 +16,7 @@ package grpc import ( "testing" + "time" "github.com/stretchr/testify/assert" @@ -37,3 +38,24 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, opts.Configuration.PluginConfigurationFile, "config.json") assert.Equal(t, opts.Configuration.PluginLogLevel, "debug") } + +func TestRemoteOptionsWithFlags(t *testing.T) { + opts := &Options{} + v, command := config.Viperize(opts.AddFlags) + err := command.ParseFlags([]string{ + "--grpc-storage-plugin.server=localhost:2001", + "--grpc-storage-plugin.tls=true", + "--grpc-storage-plugin.ca-file=cafile", + "--grpc-storage-plugin.server-host-override=example.com", + "--grpc-storage-plugin.connection-timeout=60s", + }) + assert.NoError(t, err) + opts.InitFromViper(v) + + assert.Equal(t, opts.Configuration.PluginBinary, "") + assert.Equal(t, opts.Configuration.RemoteServerAddr, "localhost:2001") + assert.Equal(t, opts.Configuration.RemoteTLS, true) + assert.Equal(t, opts.Configuration.RemoteCAFile, "cafile") + assert.Equal(t, opts.Configuration.RemoteServerHostOverride, "example.com") + assert.Equal(t, opts.Configuration.RemoteConnectTimeout, 60*time.Second) +} diff --git a/plugin/storage/grpc/shared/grpc_client.go b/plugin/storage/grpc/shared/grpc_client.go index 33d166de717..2e1af1a246c 100644 --- a/plugin/storage/grpc/shared/grpc_client.go +++ b/plugin/storage/grpc/shared/grpc_client.go @@ -20,6 +20,7 @@ import ( "io" "time" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -53,6 +54,17 @@ type grpcClient struct { depsReaderClient storage_v1.DependenciesReaderPluginClient } +func NewGRPCClient(c *grpc.ClientConn) *grpcClient { + return &grpcClient{ + readerClient: storage_v1.NewSpanReaderPluginClient(c), + writerClient: storage_v1.NewSpanWriterPluginClient(c), + archiveReaderClient: storage_v1.NewArchiveSpanReaderPluginClient(c), + archiveWriterClient: storage_v1.NewArchiveSpanWriterPluginClient(c), + capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(c), + depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(c), + } +} + // ContextUpgradeFunc is a functional type that can be composed to upgrade context type ContextUpgradeFunc func(ctx context.Context) context.Context diff --git a/plugin/storage/grpc/shared/plugin.go b/plugin/storage/grpc/shared/plugin.go index 6ac3c2f8c6e..2027c3fe380 100644 --- a/plugin/storage/grpc/shared/plugin.go +++ b/plugin/storage/grpc/shared/plugin.go @@ -53,12 +53,5 @@ func (p *StorageGRPCPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server // GRPCClient implements plugin.GRPCPlugin. It is used by go-plugin to create a grpc plugin client. func (*StorageGRPCPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) { - return &grpcClient{ - readerClient: storage_v1.NewSpanReaderPluginClient(c), - writerClient: storage_v1.NewSpanWriterPluginClient(c), - archiveReaderClient: storage_v1.NewArchiveSpanReaderPluginClient(c), - archiveWriterClient: storage_v1.NewArchiveSpanWriterPluginClient(c), - capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(c), - depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(c), - }, nil + return NewGRPCClient(c), nil } From d75927e0d4a0b4e8b1e76560639c541e93e2fa72 Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Wed, 10 Nov 2021 18:54:06 -0500 Subject: [PATCH 2/9] Change remote gRPC configuration parameters Use the tlscfg package for the TLS configuration and change the cli option prefix from `grpc-storage-plugin` to `grpc-storage`. Signed-off-by: Matvey Arye --- plugin/storage/grpc/config/config.go | 42 ++++++++++++++++------------ plugin/storage/grpc/factory.go | 7 ++++- plugin/storage/grpc/factory_test.go | 6 +++- plugin/storage/grpc/options.go | 27 ++++++++++-------- plugin/storage/grpc/options_test.go | 12 +++----- 5 files changed, 54 insertions(+), 40 deletions(-) diff --git a/plugin/storage/grpc/config/config.go b/plugin/storage/grpc/config/config.go index 9424dfcd455..416ddb7ca67 100644 --- a/plugin/storage/grpc/config/config.go +++ b/plugin/storage/grpc/config/config.go @@ -25,22 +25,22 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" "github.com/opentracing/opentracing-go" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" ) // Configuration describes the options to customize the storage behavior. type Configuration struct { - PluginBinary string `yaml:"binary" mapstructure:"binary"` - PluginConfigurationFile string `yaml:"configuration-file" mapstructure:"configuration_file"` - PluginLogLevel string `yaml:"log-level" mapstructure:"log_level"` - RemoteServerAddr string `yaml:"server" mapstructure:"server"` - RemoteTLS bool `yaml:"tls" mapstructure:"tls"` - RemoteCAFile string `yaml:"cafile" mapstructure:"cafile"` - RemoteServerHostOverride string `yaml:"server-host-override" mapstructure:"server-host-override"` - RemoteConnectTimeout time.Duration `yaml:"connection-timeout" mapstructure:"connection-timeout"` + PluginBinary string `yaml:"binary" mapstructure:"binary"` + PluginConfigurationFile string `yaml:"configuration-file" mapstructure:"configuration_file"` + PluginLogLevel string `yaml:"log-level" mapstructure:"log_level"` + RemoteServerAddr string `yaml:"server" mapstructure:"server"` + RemoteTLS tlscfg.Options + RemoteConnectTimeout time.Duration `yaml:"connection-timeout" mapstructure:"connection-timeout"` } // ClientPluginServices defines services plugin can expose and its capabilities @@ -51,33 +51,39 @@ type ClientPluginServices struct { // PluginBuilder is used to create storage plugins. Implemented by Configuration. type PluginBuilder interface { - Build() (*ClientPluginServices, error) + Build(logger *zap.Logger) (*ClientPluginServices, error) + Close() error } // Build instantiates a PluginServices -func (c *Configuration) Build() (*ClientPluginServices, error) { +func (c *Configuration) Build(logger *zap.Logger) (*ClientPluginServices, error) { if c.PluginBinary != "" { return c.buildPlugin() } else { - return c.BuildRemote() + return c.buildRemote(logger) } } -func (c *Configuration) BuildRemote() (*ClientPluginServices, error) { +func (c *Configuration) Close() error { + if c.PluginBinary == "" { + return c.RemoteTLS.Close() + } + return nil +} + +func (c *Configuration) buildRemote(logger *zap.Logger) (*ClientPluginServices, error) { opts := []grpc.DialOption{ grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())), grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer())), grpc.WithBlock(), } var err error - if c.RemoteTLS { - if c.RemoteCAFile == "" { - return nil, fmt.Errorf("ca file is required with TLS") - } - creds, err := credentials.NewClientTLSFromFile(c.RemoteCAFile, c.RemoteServerHostOverride) + if c.RemoteTLS.Enabled { + tlsCfg, err := c.RemoteTLS.Config(logger) if err != nil { - return nil, fmt.Errorf("failed to create TLS credentials %w", err) + return nil, err } + creds := credentials.NewTLS(tlsCfg) opts = append(opts, grpc.WithTransportCredentials(creds)) } else { opts = append(opts, grpc.WithInsecure()) diff --git a/plugin/storage/grpc/factory.go b/plugin/storage/grpc/factory.go index bb5e3a89e3a..b90fcec7f86 100644 --- a/plugin/storage/grpc/factory.go +++ b/plugin/storage/grpc/factory.go @@ -68,7 +68,7 @@ func (f *Factory) InitFromOptions(opts Options) { func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { f.metricsFactory, f.logger = metricsFactory, logger - services, err := f.builder.Build() + services, err := f.builder.Build(logger) if err != nil { return fmt.Errorf("grpc-plugin builder failed to create a store: %w", err) } @@ -124,3 +124,8 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { } return f.archiveStore.ArchiveSpanWriter(), nil } + +// Close closes the resources held by the factory +func (f *Factory) Close() error { + return f.builder.Close() +} diff --git a/plugin/storage/grpc/factory_test.go b/plugin/storage/grpc/factory_test.go index 5ed7a6b9e2e..ce4e52ff460 100644 --- a/plugin/storage/grpc/factory_test.go +++ b/plugin/storage/grpc/factory_test.go @@ -42,7 +42,7 @@ type mockPluginBuilder struct { err error } -func (b *mockPluginBuilder) Build() (*grpcConfig.ClientPluginServices, error) { +func (b *mockPluginBuilder) Build(logger *zap.Logger) (*grpcConfig.ClientPluginServices, error) { if b.err != nil { return nil, b.err } @@ -60,6 +60,10 @@ func (b *mockPluginBuilder) Build() (*grpcConfig.ClientPluginServices, error) { return services, nil } +func (b *mockPluginBuilder) Close() error { + return nil +} + type mockPlugin struct { spanReader spanstore.Reader spanWriter spanstore.Writer diff --git a/plugin/storage/grpc/options.go b/plugin/storage/grpc/options.go index 47803cb6963..0f3a9a41a34 100644 --- a/plugin/storage/grpc/options.go +++ b/plugin/storage/grpc/options.go @@ -20,6 +20,7 @@ import ( "github.com/spf13/viper" + "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/plugin/storage/grpc/config" ) @@ -27,11 +28,9 @@ const ( pluginBinary = "grpc-storage-plugin.binary" pluginConfigurationFile = "grpc-storage-plugin.configuration-file" pluginLogLevel = "grpc-storage-plugin.log-level" - pluginServer = "grpc-storage-plugin.server" - pluginTLS = "grpc-storage-plugin.tls" - pluginCAFile = "grpc-storage-plugin.ca-file" - pluginServerHostOverride = "grpc-storage-plugin.server-host-override" - pluginConnectionTimeout = "grpc-storage-plugin.connection-timeout" + remotePrefix = "grpc-storage" + pluginServer = remotePrefix + ".server" + pluginConnectionTimeout = remotePrefix + ".connection-timeout" defaultPluginLogLevel = "warn" defaultConnectionTimeout = time.Duration(5 * time.Second) ) @@ -42,28 +41,32 @@ type Options struct { Configuration config.Configuration `mapstructure:",squash"` } +func tlsFlagsConfig() tlscfg.ClientFlagsConfig { + return tlscfg.ClientFlagsConfig{ + Prefix: remotePrefix, + } +} + // AddFlags adds flags for Options func (opt *Options) AddFlags(flagSet *flag.FlagSet) { + var tlsFlagsConfig = tlsFlagsConfig() + tlsFlagsConfig.AddFlags(flagSet) + flagSet.String(pluginBinary, "", "The location of the plugin binary") flagSet.String(pluginConfigurationFile, "", "A path pointing to the plugin's configuration file, made available to the plugin with the --config arg") flagSet.String(pluginLogLevel, defaultPluginLogLevel, "Set the log level of the plugin's logger") flagSet.String(pluginServer, "", "The server address for the remote gRPC server") - flagSet.Bool(pluginTLS, false, "Whether to use TLS for the remote connection") - flagSet.String(pluginCAFile, "", "The CA file for the remote connection") - flagSet.String(pluginServerHostOverride, "", "The server host override for the remote connection") flagSet.Duration(pluginConnectionTimeout, defaultConnectionTimeout, "The connection timeout for connecting to the remote server") } // InitFromViper initializes Options with properties from viper func (opt *Options) InitFromViper(v *viper.Viper) { + var tlsFlagsConfig = tlsFlagsConfig() opt.Configuration.PluginBinary = v.GetString(pluginBinary) opt.Configuration.PluginConfigurationFile = v.GetString(pluginConfigurationFile) opt.Configuration.PluginLogLevel = v.GetString(pluginLogLevel) opt.Configuration.RemoteServerAddr = v.GetString(pluginServer) - opt.Configuration.RemoteTLS = v.GetBool(pluginTLS) - opt.Configuration.RemoteCAFile = v.GetString(pluginCAFile) - opt.Configuration.RemoteServerHostOverride = v.GetString(pluginServerHostOverride) + opt.Configuration.RemoteTLS = tlsFlagsConfig.InitFromViper(v) opt.Configuration.RemoteConnectTimeout = v.GetDuration(pluginConnectionTimeout) - } diff --git a/plugin/storage/grpc/options_test.go b/plugin/storage/grpc/options_test.go index bc2046dd362..0f4833ebe83 100644 --- a/plugin/storage/grpc/options_test.go +++ b/plugin/storage/grpc/options_test.go @@ -43,19 +43,15 @@ func TestRemoteOptionsWithFlags(t *testing.T) { opts := &Options{} v, command := config.Viperize(opts.AddFlags) err := command.ParseFlags([]string{ - "--grpc-storage-plugin.server=localhost:2001", - "--grpc-storage-plugin.tls=true", - "--grpc-storage-plugin.ca-file=cafile", - "--grpc-storage-plugin.server-host-override=example.com", - "--grpc-storage-plugin.connection-timeout=60s", + "--grpc-storage.server=localhost:2001", + "--grpc-storage.tls.enabled=true", + "--grpc-storage.connection-timeout=60s", }) assert.NoError(t, err) opts.InitFromViper(v) assert.Equal(t, opts.Configuration.PluginBinary, "") assert.Equal(t, opts.Configuration.RemoteServerAddr, "localhost:2001") - assert.Equal(t, opts.Configuration.RemoteTLS, true) - assert.Equal(t, opts.Configuration.RemoteCAFile, "cafile") - assert.Equal(t, opts.Configuration.RemoteServerHostOverride, "example.com") + assert.Equal(t, opts.Configuration.RemoteTLS.Enabled, true) assert.Equal(t, opts.Configuration.RemoteConnectTimeout, 60*time.Second) } From 3714ebe4c022c0fb08f8ff7fdf44a636a414e7a6 Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Fri, 12 Nov 2021 17:43:41 -0500 Subject: [PATCH 3/9] Add tests for remote gRPC We also created a RegisterServer method on StorageGRPCPlugin to register be able to easily create remote GRPC servers. Signed-off-by: Matvey Arye --- plugin/storage/grpc/options_test.go | 17 +++ plugin/storage/grpc/shared/plugin.go | 9 +- plugin/storage/integration/grpc_test.go | 145 +++++++++++++++++++++--- 3 files changed, 151 insertions(+), 20 deletions(-) diff --git a/plugin/storage/grpc/options_test.go b/plugin/storage/grpc/options_test.go index 0f4833ebe83..7f83a874498 100644 --- a/plugin/storage/grpc/options_test.go +++ b/plugin/storage/grpc/options_test.go @@ -55,3 +55,20 @@ func TestRemoteOptionsWithFlags(t *testing.T) { assert.Equal(t, opts.Configuration.RemoteTLS.Enabled, true) assert.Equal(t, opts.Configuration.RemoteConnectTimeout, 60*time.Second) } + +func TestRemoteOptionsNoTLSWithFlags(t *testing.T) { + opts := &Options{} + v, command := config.Viperize(opts.AddFlags) + err := command.ParseFlags([]string{ + "--grpc-storage.server=localhost:2001", + "--grpc-storage.tls.enabled=false", + "--grpc-storage.connection-timeout=60s", + }) + assert.NoError(t, err) + opts.InitFromViper(v) + + assert.Equal(t, opts.Configuration.PluginBinary, "") + assert.Equal(t, opts.Configuration.RemoteServerAddr, "localhost:2001") + assert.Equal(t, opts.Configuration.RemoteTLS.Enabled, false) + assert.Equal(t, opts.Configuration.RemoteConnectTimeout, 60*time.Second) +} diff --git a/plugin/storage/grpc/shared/plugin.go b/plugin/storage/grpc/shared/plugin.go index 2027c3fe380..b1cb9eef009 100644 --- a/plugin/storage/grpc/shared/plugin.go +++ b/plugin/storage/grpc/shared/plugin.go @@ -36,8 +36,8 @@ type StorageGRPCPlugin struct { ArchiveImpl ArchiveStoragePlugin } -// GRPCServer implements plugin.GRPCPlugin. It is used by go-plugin to create a grpc plugin server. -func (p *StorageGRPCPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { +// RegisterServer registers the plugin with the server +func (p *StorageGRPCPlugin) RegisterServer(s *grpc.Server) error { server := &grpcServer{ Impl: p.Impl, ArchiveImpl: p.ArchiveImpl, @@ -51,6 +51,11 @@ func (p *StorageGRPCPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server return nil } +// GRPCServer implements plugin.GRPCPlugin. It is used by go-plugin to create a grpc plugin server. +func (p *StorageGRPCPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { + return p.RegisterServer(s) +} + // GRPCClient implements plugin.GRPCPlugin. It is used by go-plugin to create a grpc plugin client. func (*StorageGRPCPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) { return NewGRPCClient(c), nil diff --git a/plugin/storage/integration/grpc_test.go b/plugin/storage/integration/grpc_test.go index 2e16b6ec960..186078f05a7 100644 --- a/plugin/storage/integration/grpc_test.go +++ b/plugin/storage/integration/grpc_test.go @@ -18,45 +18,103 @@ package integration import ( + "net" "os" + "sync" "testing" + "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" + "github.com/jaegertracing/jaeger/plugin/storage/memory" "github.com/stretchr/testify/require" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" + googleGRPC "google.golang.org/grpc" + "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/grpc" + "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/spanstore" ) const defaultPluginBinaryPath = "../../../examples/memstore-plugin/memstore-plugin" +type gRPCServer struct { + errChan chan error + server *googleGRPC.Server + wg sync.WaitGroup +} + +func newgRPCServer() (*gRPCServer, error) { + return &gRPCServer{errChan: make(chan error, 1)}, nil +} + +func (s *gRPCServer) Restart() error { + //stop the server if one already exists + if s.server != nil { + s.server.GracefulStop() + s.wg.Wait() + select { + case err := <-s.errChan: + return err + default: + } + } + + memStorePlugin := &memoryStorePlugin{ + store: memory.NewStore(), + archiveStore: memory.NewStore(), + } + + s.server = googleGRPC.NewServer() + queryPlugin := shared.StorageGRPCPlugin{ + Impl: memStorePlugin, + ArchiveImpl: memStorePlugin, + } + + err := queryPlugin.RegisterServer(s.server) + if err != nil { + return err + } + + listener, err := net.Listen("tcp", "localhost:2001") + if err != nil { + return err + } + s.wg.Add(1) + go func() { + defer s.wg.Done() + err = s.server.Serve(listener) + if err != nil { + select { + case s.errChan <- err: + default: + } + } + }() + return nil +} + type GRPCStorageIntegrationTestSuite struct { StorageIntegration - logger *zap.Logger - pluginBinaryPath string - pluginConfigPath string + logger *zap.Logger + flags []string + server *gRPCServer } func (s *GRPCStorageIntegrationTestSuite) initialize() error { s.logger, _ = testutils.NewLogger() + if s.server != nil { + if err := s.server.Restart(); err != nil { + return err + } + } + f := grpc.NewFactory() v, command := config.Viperize(f.AddFlags) - flags := []string{ - "--grpc-storage-plugin.binary", - s.pluginBinaryPath, - "--grpc-storage-plugin.log-level", - "debug", - } - if s.pluginConfigPath != "" { - flags = append(flags, - "--grpc-storage-plugin.configuration-file", - s.pluginConfigPath, - ) - } - err := command.ParseFlags(flags) + err := command.ParseFlags(s.flags) if err != nil { return err } @@ -87,6 +145,31 @@ func (s *GRPCStorageIntegrationTestSuite) cleanUp() error { return s.initialize() } +type memoryStorePlugin struct { + store *memory.Store + archiveStore *memory.Store +} + +func (ns *memoryStorePlugin) DependencyReader() dependencystore.Reader { + return ns.store +} + +func (ns *memoryStorePlugin) SpanReader() spanstore.Reader { + return ns.store +} + +func (ns *memoryStorePlugin) SpanWriter() spanstore.Writer { + return ns.store +} + +func (ns *memoryStorePlugin) ArchiveSpanReader() spanstore.Reader { + return ns.archiveStore +} + +func (ns *memoryStorePlugin) ArchiveSpanWriter() spanstore.Writer { + return ns.archiveStore +} + func TestGRPCStorage(t *testing.T) { binaryPath := os.Getenv("PLUGIN_BINARY_PATH") if binaryPath == "" { @@ -97,9 +180,35 @@ func TestGRPCStorage(t *testing.T) { if configPath == "" { t.Log("PLUGIN_CONFIG_PATH env var not set") } + + flags := []string{ + "--grpc-storage-plugin.binary", binaryPath, + "--grpc-storage-plugin.log-level", "debug", + } + if configPath != "" { + flags = append(flags, + "--grpc-storage-plugin.configuration-file", configPath, + ) + } + + s := &GRPCStorageIntegrationTestSuite{ + flags: flags, + } + require.NoError(t, s.initialize()) + s.IntegrationTestAll(t) +} + +func TestGRPCRemoteStorage(t *testing.T) { + flags := []string{ + "--grpc-storage.server=localhost:2001", + "--grpc-storage.tls.enabled=false", + } + server, err := newgRPCServer() + require.NoError(t, err) + s := &GRPCStorageIntegrationTestSuite{ - pluginBinaryPath: binaryPath, - pluginConfigPath: configPath, + flags: flags, + server: server, } require.NoError(t, s.initialize()) s.IntegrationTestAll(t) From fb3e2bc6722c0a71fe0f212502857d907ed428f7 Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Sat, 13 Nov 2021 12:57:31 -0500 Subject: [PATCH 4/9] Fixing naming brought up in PR review Also ran `make fmt` Signed-off-by: Matvey Arye --- plugin/storage/grpc/config/config.go | 2 +- plugin/storage/grpc/options.go | 12 ++++++------ plugin/storage/integration/grpc_test.go | 5 ++--- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/plugin/storage/grpc/config/config.go b/plugin/storage/grpc/config/config.go index 416ddb7ca67..6da97164487 100644 --- a/plugin/storage/grpc/config/config.go +++ b/plugin/storage/grpc/config/config.go @@ -93,7 +93,7 @@ func (c *Configuration) buildRemote(logger *zap.Logger) (*ClientPluginServices, defer cancel() conn, err := grpc.DialContext(ctx, c.RemoteServerAddr, opts...) if err != nil { - return nil, fmt.Errorf("error connecting to Promscale GRPC server: %w", err) + return nil, fmt.Errorf("error connecting to remote storage: %w", err) } grpcClient := shared.NewGRPCClient(conn) diff --git a/plugin/storage/grpc/options.go b/plugin/storage/grpc/options.go index 0f3a9a41a34..5236f954fd6 100644 --- a/plugin/storage/grpc/options.go +++ b/plugin/storage/grpc/options.go @@ -29,8 +29,8 @@ const ( pluginConfigurationFile = "grpc-storage-plugin.configuration-file" pluginLogLevel = "grpc-storage-plugin.log-level" remotePrefix = "grpc-storage" - pluginServer = remotePrefix + ".server" - pluginConnectionTimeout = remotePrefix + ".connection-timeout" + remoteServer = remotePrefix + ".server" + remoteConnectionTimeout = remotePrefix + ".connection-timeout" defaultPluginLogLevel = "warn" defaultConnectionTimeout = time.Duration(5 * time.Second) ) @@ -55,8 +55,8 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { flagSet.String(pluginBinary, "", "The location of the plugin binary") flagSet.String(pluginConfigurationFile, "", "A path pointing to the plugin's configuration file, made available to the plugin with the --config arg") flagSet.String(pluginLogLevel, defaultPluginLogLevel, "Set the log level of the plugin's logger") - flagSet.String(pluginServer, "", "The server address for the remote gRPC server") - flagSet.Duration(pluginConnectionTimeout, defaultConnectionTimeout, "The connection timeout for connecting to the remote server") + flagSet.String(remoteServer, "", "The server address for the remote gRPC server") + flagSet.Duration(remoteConnectionTimeout, defaultConnectionTimeout, "The connection timeout for connecting to the remote server") } @@ -66,7 +66,7 @@ func (opt *Options) InitFromViper(v *viper.Viper) { opt.Configuration.PluginBinary = v.GetString(pluginBinary) opt.Configuration.PluginConfigurationFile = v.GetString(pluginConfigurationFile) opt.Configuration.PluginLogLevel = v.GetString(pluginLogLevel) - opt.Configuration.RemoteServerAddr = v.GetString(pluginServer) + opt.Configuration.RemoteServerAddr = v.GetString(remoteServer) opt.Configuration.RemoteTLS = tlsFlagsConfig.InitFromViper(v) - opt.Configuration.RemoteConnectTimeout = v.GetDuration(pluginConnectionTimeout) + opt.Configuration.RemoteConnectTimeout = v.GetDuration(remoteConnectionTimeout) } diff --git a/plugin/storage/integration/grpc_test.go b/plugin/storage/integration/grpc_test.go index 186078f05a7..6c308b81026 100644 --- a/plugin/storage/integration/grpc_test.go +++ b/plugin/storage/integration/grpc_test.go @@ -23,17 +23,16 @@ import ( "sync" "testing" - "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" - "github.com/jaegertracing/jaeger/plugin/storage/memory" "github.com/stretchr/testify/require" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" - googleGRPC "google.golang.org/grpc" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/grpc" + "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" + "github.com/jaegertracing/jaeger/plugin/storage/memory" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/spanstore" ) From 44a7ad69c1a44e7f972331930fa8fd0a1d915ed1 Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Mon, 15 Nov 2021 11:33:39 -0500 Subject: [PATCH 5/9] Fixing code nits for GRPC remote storage Signed-off-by: Matvey Arye --- plugin/storage/grpc/config/config.go | 1 - plugin/storage/grpc/options.go | 10 ++++------ plugin/storage/grpc/shared/plugin.go | 6 +++--- plugin/storage/integration/grpc_test.go | 14 +++++--------- 4 files changed, 12 insertions(+), 19 deletions(-) diff --git a/plugin/storage/grpc/config/config.go b/plugin/storage/grpc/config/config.go index 6da97164487..dbe958863fe 100644 --- a/plugin/storage/grpc/config/config.go +++ b/plugin/storage/grpc/config/config.go @@ -77,7 +77,6 @@ func (c *Configuration) buildRemote(logger *zap.Logger) (*ClientPluginServices, grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer())), grpc.WithBlock(), } - var err error if c.RemoteTLS.Enabled { tlsCfg, err := c.RemoteTLS.Config(logger) if err != nil { diff --git a/plugin/storage/grpc/options.go b/plugin/storage/grpc/options.go index 5236f954fd6..07daacf4b85 100644 --- a/plugin/storage/grpc/options.go +++ b/plugin/storage/grpc/options.go @@ -49,24 +49,22 @@ func tlsFlagsConfig() tlscfg.ClientFlagsConfig { // AddFlags adds flags for Options func (opt *Options) AddFlags(flagSet *flag.FlagSet) { - var tlsFlagsConfig = tlsFlagsConfig() - tlsFlagsConfig.AddFlags(flagSet) + tlsFlagsConfig().AddFlags(flagSet) flagSet.String(pluginBinary, "", "The location of the plugin binary") flagSet.String(pluginConfigurationFile, "", "A path pointing to the plugin's configuration file, made available to the plugin with the --config arg") flagSet.String(pluginLogLevel, defaultPluginLogLevel, "Set the log level of the plugin's logger") - flagSet.String(remoteServer, "", "The server address for the remote gRPC server") - flagSet.Duration(remoteConnectionTimeout, defaultConnectionTimeout, "The connection timeout for connecting to the remote server") + flagSet.String(remoteServer, "", "The remote storage gRPC server address") + flagSet.Duration(remoteConnectionTimeout, defaultConnectionTimeout, "The remote storage gRPC server connection timeout") } // InitFromViper initializes Options with properties from viper func (opt *Options) InitFromViper(v *viper.Viper) { - var tlsFlagsConfig = tlsFlagsConfig() opt.Configuration.PluginBinary = v.GetString(pluginBinary) opt.Configuration.PluginConfigurationFile = v.GetString(pluginConfigurationFile) opt.Configuration.PluginLogLevel = v.GetString(pluginLogLevel) opt.Configuration.RemoteServerAddr = v.GetString(remoteServer) - opt.Configuration.RemoteTLS = tlsFlagsConfig.InitFromViper(v) + opt.Configuration.RemoteTLS = tlsFlagsConfig().InitFromViper(v) opt.Configuration.RemoteConnectTimeout = v.GetDuration(remoteConnectionTimeout) } diff --git a/plugin/storage/grpc/shared/plugin.go b/plugin/storage/grpc/shared/plugin.go index b1cb9eef009..44884745d1d 100644 --- a/plugin/storage/grpc/shared/plugin.go +++ b/plugin/storage/grpc/shared/plugin.go @@ -36,8 +36,8 @@ type StorageGRPCPlugin struct { ArchiveImpl ArchiveStoragePlugin } -// RegisterServer registers the plugin with the server -func (p *StorageGRPCPlugin) RegisterServer(s *grpc.Server) error { +// RegisterHandlers registers the plugin with the server +func (p *StorageGRPCPlugin) RegisterHandlers(s *grpc.Server) error { server := &grpcServer{ Impl: p.Impl, ArchiveImpl: p.ArchiveImpl, @@ -53,7 +53,7 @@ func (p *StorageGRPCPlugin) RegisterServer(s *grpc.Server) error { // GRPCServer implements plugin.GRPCPlugin. It is used by go-plugin to create a grpc plugin server. func (p *StorageGRPCPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { - return p.RegisterServer(s) + return p.RegisterHandlers(s) } // GRPCClient implements plugin.GRPCPlugin. It is used by go-plugin to create a grpc plugin client. diff --git a/plugin/storage/integration/grpc_test.go b/plugin/storage/integration/grpc_test.go index 6c308b81026..5f2a0801c00 100644 --- a/plugin/storage/integration/grpc_test.go +++ b/plugin/storage/integration/grpc_test.go @@ -72,8 +72,7 @@ func (s *gRPCServer) Restart() error { ArchiveImpl: memStorePlugin, } - err := queryPlugin.RegisterServer(s.server) - if err != nil { + if err := queryPlugin.RegisterHandlers(s.server); err != nil { return err } @@ -84,8 +83,7 @@ func (s *gRPCServer) Restart() error { s.wg.Add(1) go func() { defer s.wg.Done() - err = s.server.Serve(listener) - if err != nil { + if err = s.server.Serve(listener); err != nil { select { case s.errChan <- err: default: @@ -184,11 +182,9 @@ func TestGRPCStorage(t *testing.T) { "--grpc-storage-plugin.binary", binaryPath, "--grpc-storage-plugin.log-level", "debug", } - if configPath != "" { - flags = append(flags, - "--grpc-storage-plugin.configuration-file", configPath, - ) - } + flags = append(flags, + "--grpc-storage-plugin.configuration-file", configPath, + ) s := &GRPCStorageIntegrationTestSuite{ flags: flags, From 903fe1a94457b364a24823d1300a9a5770d65e6a Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Mon, 15 Nov 2021 11:48:58 -0500 Subject: [PATCH 6/9] Refactor to combine the memory storage plugin Signed-off-by: Matvey Arye --- examples/memstore-plugin/main.go | 33 +--------------- plugin/storage/grpc/memory/plugin.go | 50 +++++++++++++++++++++++++ plugin/storage/integration/grpc_test.go | 6 +-- 3 files changed, 54 insertions(+), 35 deletions(-) create mode 100644 plugin/storage/grpc/memory/plugin.go diff --git a/examples/memstore-plugin/main.go b/examples/memstore-plugin/main.go index 03ea1db8abf..155a52adc84 100644 --- a/examples/memstore-plugin/main.go +++ b/examples/memstore-plugin/main.go @@ -27,10 +27,9 @@ import ( googleGRPC "google.golang.org/grpc" "github.com/jaegertracing/jaeger/plugin/storage/grpc" + grpcMemory "github.com/jaegertracing/jaeger/plugin/storage/grpc/memory" "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" "github.com/jaegertracing/jaeger/plugin/storage/memory" - "github.com/jaegertracing/jaeger/storage/dependencystore" - "github.com/jaegertracing/jaeger/storage/spanstore" ) const ( @@ -71,10 +70,7 @@ func main() { defer closer.Close() opentracing.SetGlobalTracer(tracer) - memStorePlugin := &memoryStorePlugin{ - store: memory.NewStore(), - archiveStore: memory.NewStore(), - } + memStorePlugin := grpcMemory.NewStoragePlugin(memory.NewStore(), memory.NewStore()) grpc.ServeWithGRPCServer(&shared.PluginServices{ Store: memStorePlugin, ArchiveStore: memStorePlugin, @@ -85,28 +81,3 @@ func main() { }) }) } - -type memoryStorePlugin struct { - store *memory.Store - archiveStore *memory.Store -} - -func (ns *memoryStorePlugin) DependencyReader() dependencystore.Reader { - return ns.store -} - -func (ns *memoryStorePlugin) SpanReader() spanstore.Reader { - return ns.store -} - -func (ns *memoryStorePlugin) SpanWriter() spanstore.Writer { - return ns.store -} - -func (ns *memoryStorePlugin) ArchiveSpanReader() spanstore.Reader { - return ns.archiveStore -} - -func (ns *memoryStorePlugin) ArchiveSpanWriter() spanstore.Writer { - return ns.archiveStore -} diff --git a/plugin/storage/grpc/memory/plugin.go b/plugin/storage/grpc/memory/plugin.go new file mode 100644 index 00000000000..56f605b099d --- /dev/null +++ b/plugin/storage/grpc/memory/plugin.go @@ -0,0 +1,50 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memory + +import ( + "github.com/jaegertracing/jaeger/plugin/storage/memory" + "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +type storagePlugin struct { + store *memory.Store + archiveStore *memory.Store +} + +func NewStoragePlugin(mainStore *memory.Store, archiveStore *memory.Store) *storagePlugin { + return &storagePlugin{store: mainStore, archiveStore: archiveStore} +} + +func (ns *storagePlugin) DependencyReader() dependencystore.Reader { + return ns.store +} + +func (ns *storagePlugin) SpanReader() spanstore.Reader { + return ns.store +} + +func (ns *storagePlugin) SpanWriter() spanstore.Writer { + return ns.store +} + +func (ns *storagePlugin) ArchiveSpanReader() spanstore.Reader { + return ns.archiveStore +} + +func (ns *storagePlugin) ArchiveSpanWriter() spanstore.Writer { + return ns.archiveStore +} diff --git a/plugin/storage/integration/grpc_test.go b/plugin/storage/integration/grpc_test.go index 5f2a0801c00..837e192969b 100644 --- a/plugin/storage/integration/grpc_test.go +++ b/plugin/storage/integration/grpc_test.go @@ -31,6 +31,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/grpc" + grpcMemory "github.com/jaegertracing/jaeger/plugin/storage/grpc/memory" "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" "github.com/jaegertracing/jaeger/plugin/storage/memory" "github.com/jaegertracing/jaeger/storage/dependencystore" @@ -61,10 +62,7 @@ func (s *gRPCServer) Restart() error { } } - memStorePlugin := &memoryStorePlugin{ - store: memory.NewStore(), - archiveStore: memory.NewStore(), - } + memStorePlugin := grpcMemory.NewStoragePlugin(memory.NewStore(), memory.NewStore()) s.server = googleGRPC.NewServer() queryPlugin := shared.StorageGRPCPlugin{ From 539dbca2566f63a4a75930efb7422eb6e5f3d37a Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Thu, 9 Dec 2021 18:40:21 -0500 Subject: [PATCH 7/9] Fix nits in PR review Signed-off-by: Matvey Arye --- plugin/storage/grpc/config/config.go | 5 +---- plugin/storage/grpc/factory.go | 3 +++ plugin/storage/integration/grpc_test.go | 27 ------------------------- 3 files changed, 4 insertions(+), 31 deletions(-) diff --git a/plugin/storage/grpc/config/config.go b/plugin/storage/grpc/config/config.go index dbe958863fe..00f7dce3b5a 100644 --- a/plugin/storage/grpc/config/config.go +++ b/plugin/storage/grpc/config/config.go @@ -65,10 +65,7 @@ func (c *Configuration) Build(logger *zap.Logger) (*ClientPluginServices, error) } func (c *Configuration) Close() error { - if c.PluginBinary == "" { - return c.RemoteTLS.Close() - } - return nil + return c.RemoteTLS.Close() } func (c *Configuration) buildRemote(logger *zap.Logger) (*ClientPluginServices, error) { diff --git a/plugin/storage/grpc/factory.go b/plugin/storage/grpc/factory.go index b90fcec7f86..0967f382307 100644 --- a/plugin/storage/grpc/factory.go +++ b/plugin/storage/grpc/factory.go @@ -17,6 +17,7 @@ package grpc import ( "flag" "fmt" + "io" "github.com/spf13/viper" "github.com/uber/jaeger-lib/metrics" @@ -42,6 +43,8 @@ type Factory struct { capabilities shared.PluginCapabilities } +var _ io.Closer = (*Factory)(nil) + // NewFactory creates a new Factory. func NewFactory() *Factory { return &Factory{} diff --git a/plugin/storage/integration/grpc_test.go b/plugin/storage/integration/grpc_test.go index 837e192969b..53b7ebd8f1b 100644 --- a/plugin/storage/integration/grpc_test.go +++ b/plugin/storage/integration/grpc_test.go @@ -34,8 +34,6 @@ import ( grpcMemory "github.com/jaegertracing/jaeger/plugin/storage/grpc/memory" "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" "github.com/jaegertracing/jaeger/plugin/storage/memory" - "github.com/jaegertracing/jaeger/storage/dependencystore" - "github.com/jaegertracing/jaeger/storage/spanstore" ) const defaultPluginBinaryPath = "../../../examples/memstore-plugin/memstore-plugin" @@ -140,31 +138,6 @@ func (s *GRPCStorageIntegrationTestSuite) cleanUp() error { return s.initialize() } -type memoryStorePlugin struct { - store *memory.Store - archiveStore *memory.Store -} - -func (ns *memoryStorePlugin) DependencyReader() dependencystore.Reader { - return ns.store -} - -func (ns *memoryStorePlugin) SpanReader() spanstore.Reader { - return ns.store -} - -func (ns *memoryStorePlugin) SpanWriter() spanstore.Writer { - return ns.store -} - -func (ns *memoryStorePlugin) ArchiveSpanReader() spanstore.Reader { - return ns.archiveStore -} - -func (ns *memoryStorePlugin) ArchiveSpanWriter() spanstore.Writer { - return ns.archiveStore -} - func TestGRPCStorage(t *testing.T) { binaryPath := os.Getenv("PLUGIN_BINARY_PATH") if binaryPath == "" { From c4a1b01130155a79eae31accb637c6cabd2fabdc Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Thu, 9 Dec 2021 19:52:13 -0500 Subject: [PATCH 8/9] Add test for grpc memory plugin Signed-off-by: Matvey Arye --- plugin/storage/grpc/memory/plugin_test.go | 37 +++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 plugin/storage/grpc/memory/plugin_test.go diff --git a/plugin/storage/grpc/memory/plugin_test.go b/plugin/storage/grpc/memory/plugin_test.go new file mode 100644 index 00000000000..11cc961eb74 --- /dev/null +++ b/plugin/storage/grpc/memory/plugin_test.go @@ -0,0 +1,37 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memory + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/jaegertracing/jaeger/plugin/storage/memory" +) + +func TestPluginUsesMemoryStorage(t *testing.T) { + mainStorage := memory.NewStore() + archiveStorage := memory.NewStore() + + memStorePlugin := NewStoragePlugin(mainStorage, archiveStorage) + + assert.Equal(t, mainStorage, memStorePlugin.DependencyReader()) + assert.Equal(t, mainStorage, memStorePlugin.SpanReader()) + assert.Equal(t, mainStorage, memStorePlugin.SpanWriter()) + assert.Equal(t, archiveStorage, memStorePlugin.ArchiveSpanReader()) + assert.Equal(t, archiveStorage, memStorePlugin.ArchiveSpanWriter()) + +} From 5c24f6e605331c4f8596f701bf44ece136c3d092 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 10 Dec 2021 11:44:55 -0500 Subject: [PATCH 9/9] Add test for Close Signed-off-by: Yuri Shkuro --- plugin/storage/grpc/factory_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/plugin/storage/grpc/factory_test.go b/plugin/storage/grpc/factory_test.go index ce4e52ff460..f2d6ea10c2c 100644 --- a/plugin/storage/grpc/factory_test.go +++ b/plugin/storage/grpc/factory_test.go @@ -258,6 +258,7 @@ func TestWithConfiguration(t *testing.T) { assert.Equal(t, f.options.Configuration.PluginBinary, "noop-grpc-plugin") assert.Equal(t, f.options.Configuration.PluginConfigurationFile, "config.json") assert.Equal(t, f.options.Configuration.PluginLogLevel, "debug") + assert.NoError(t, f.Close()) } func TestInitFromOptions(t *testing.T) {