Skip to content

Commit

Permalink
Add remote gRPC option for storage plugin
Browse files Browse the repository at this point in the history
Adds the option to host a gRPC storage API on a remote endpoint
using regular gRPC. Previously the plugin system only supported
local socket connections through the go-hashicorp plugin system.

Signed-off-by: Matvey Arye <mat@timescale.com>
  • Loading branch information
cevian committed Nov 10, 2021
1 parent eac737b commit 292d7a4
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 15 deletions.
59 changes: 56 additions & 3 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,32 @@
package config

import (
"context"
"fmt"
"os/exec"
"runtime"
"time"

"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"github.com/opentracing/opentracing-go"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
)

// Configuration describes the options to customize the storage behavior.
type Configuration struct {
PluginBinary string `yaml:"binary" mapstructure:"binary"`
PluginConfigurationFile string `yaml:"configuration-file" mapstructure:"configuration_file"`
PluginLogLevel string `yaml:"log-level" mapstructure:"log_level"`
PluginBinary string `yaml:"binary" mapstructure:"binary"`
PluginConfigurationFile string `yaml:"configuration-file" mapstructure:"configuration_file"`
PluginLogLevel string `yaml:"log-level" mapstructure:"log_level"`
RemoteServerAddr string `yaml:"server" mapstructure:"server"`
RemoteTLS bool `yaml:"tls" mapstructure:"tls"`
RemoteCAFile string `yaml:"cafile" mapstructure:"cafile"`
RemoteServerHostOverride string `yaml:"server-host-override" mapstructure:"server-host-override"`
RemoteConnectTimeout time.Duration `yaml:"connection-timeout" mapstructure:"connection-timeout"`
}

// ClientPluginServices defines services plugin can expose and its capabilities
Expand All @@ -48,6 +56,51 @@ type PluginBuilder interface {

// Build instantiates a PluginServices
func (c *Configuration) Build() (*ClientPluginServices, error) {
if c.PluginBinary != "" {
return c.buildPlugin()
} else {
return c.BuildRemote()
}
}

func (c *Configuration) BuildRemote() (*ClientPluginServices, error) {
opts := []grpc.DialOption{
grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())),
grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer())),
grpc.WithBlock(),
}
var err error
if c.RemoteTLS {
if c.RemoteCAFile == "" {
return nil, fmt.Errorf("ca file is required with TLS")
}
creds, err := credentials.NewClientTLSFromFile(c.RemoteCAFile, c.RemoteServerHostOverride)
if err != nil {
return nil, fmt.Errorf("failed to create TLS credentials %w", err)
}
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithInsecure())
}

ctx, cancel := context.WithTimeout(context.Background(), c.RemoteConnectTimeout)
defer cancel()
conn, err := grpc.DialContext(ctx, c.RemoteServerAddr, opts...)
if err != nil {
return nil, fmt.Errorf("error connecting to Promscale GRPC server: %w", err)
}

grpcClient := shared.NewGRPCClient(conn)
return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: grpcClient,
ArchiveStore: grpcClient,
},
Capabilities: grpcClient,
}, nil
}

func (c *Configuration) buildPlugin() (*ClientPluginServices, error) {
// #nosec G204
cmd := exec.Command(c.PluginBinary, "--config", c.PluginConfigurationFile)

Expand Down
27 changes: 23 additions & 4 deletions plugin/storage/grpc/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,24 @@ package grpc

import (
"flag"
"time"

"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
)

const (
pluginBinary = "grpc-storage-plugin.binary"
pluginConfigurationFile = "grpc-storage-plugin.configuration-file"
pluginLogLevel = "grpc-storage-plugin.log-level"
defaultPluginLogLevel = "warn"
pluginBinary = "grpc-storage-plugin.binary"
pluginConfigurationFile = "grpc-storage-plugin.configuration-file"
pluginLogLevel = "grpc-storage-plugin.log-level"
pluginServer = "grpc-storage-plugin.server"
pluginTLS = "grpc-storage-plugin.tls"
pluginCAFile = "grpc-storage-plugin.ca-file"
pluginServerHostOverride = "grpc-storage-plugin.server-host-override"
pluginConnectionTimeout = "grpc-storage-plugin.connection-timeout"
defaultPluginLogLevel = "warn"
defaultConnectionTimeout = time.Duration(5 * time.Second)
)

// Options contains GRPC plugins configs and provides the ability
Expand All @@ -40,11 +47,23 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
flagSet.String(pluginBinary, "", "The location of the plugin binary")
flagSet.String(pluginConfigurationFile, "", "A path pointing to the plugin's configuration file, made available to the plugin with the --config arg")
flagSet.String(pluginLogLevel, defaultPluginLogLevel, "Set the log level of the plugin's logger")
flagSet.String(pluginServer, "", "The server address for the remote gRPC server")
flagSet.Bool(pluginTLS, false, "Whether to use TLS for the remote connection")
flagSet.String(pluginCAFile, "", "The CA file for the remote connection")
flagSet.String(pluginServerHostOverride, "", "The server host override for the remote connection")
flagSet.Duration(pluginConnectionTimeout, defaultConnectionTimeout, "The connection timeout for connecting to the remote server")

}

// InitFromViper initializes Options with properties from viper
func (opt *Options) InitFromViper(v *viper.Viper) {
opt.Configuration.PluginBinary = v.GetString(pluginBinary)
opt.Configuration.PluginConfigurationFile = v.GetString(pluginConfigurationFile)
opt.Configuration.PluginLogLevel = v.GetString(pluginLogLevel)
opt.Configuration.RemoteServerAddr = v.GetString(pluginServer)
opt.Configuration.RemoteTLS = v.GetBool(pluginTLS)
opt.Configuration.RemoteCAFile = v.GetString(pluginCAFile)
opt.Configuration.RemoteServerHostOverride = v.GetString(pluginServerHostOverride)
opt.Configuration.RemoteConnectTimeout = v.GetDuration(pluginConnectionTimeout)

}
22 changes: 22 additions & 0 deletions plugin/storage/grpc/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package grpc

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -37,3 +38,24 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, opts.Configuration.PluginConfigurationFile, "config.json")
assert.Equal(t, opts.Configuration.PluginLogLevel, "debug")
}

func TestRemoteOptionsWithFlags(t *testing.T) {
opts := &Options{}
v, command := config.Viperize(opts.AddFlags)
err := command.ParseFlags([]string{
"--grpc-storage-plugin.server=localhost:2001",
"--grpc-storage-plugin.tls=true",
"--grpc-storage-plugin.ca-file=cafile",
"--grpc-storage-plugin.server-host-override=example.com",
"--grpc-storage-plugin.connection-timeout=60s",
})
assert.NoError(t, err)
opts.InitFromViper(v)

assert.Equal(t, opts.Configuration.PluginBinary, "")
assert.Equal(t, opts.Configuration.RemoteServerAddr, "localhost:2001")
assert.Equal(t, opts.Configuration.RemoteTLS, true)
assert.Equal(t, opts.Configuration.RemoteCAFile, "cafile")
assert.Equal(t, opts.Configuration.RemoteServerHostOverride, "example.com")
assert.Equal(t, opts.Configuration.RemoteConnectTimeout, 60*time.Second)
}
12 changes: 12 additions & 0 deletions plugin/storage/grpc/shared/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -53,6 +54,17 @@ type grpcClient struct {
depsReaderClient storage_v1.DependenciesReaderPluginClient
}

func NewGRPCClient(c *grpc.ClientConn) *grpcClient {
return &grpcClient{
readerClient: storage_v1.NewSpanReaderPluginClient(c),
writerClient: storage_v1.NewSpanWriterPluginClient(c),
archiveReaderClient: storage_v1.NewArchiveSpanReaderPluginClient(c),
archiveWriterClient: storage_v1.NewArchiveSpanWriterPluginClient(c),
capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(c),
depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(c),
}
}

// ContextUpgradeFunc is a functional type that can be composed to upgrade context
type ContextUpgradeFunc func(ctx context.Context) context.Context

Expand Down
9 changes: 1 addition & 8 deletions plugin/storage/grpc/shared/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,5 @@ func (p *StorageGRPCPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server

// GRPCClient implements plugin.GRPCPlugin. It is used by go-plugin to create a grpc plugin client.
func (*StorageGRPCPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
return &grpcClient{
readerClient: storage_v1.NewSpanReaderPluginClient(c),
writerClient: storage_v1.NewSpanWriterPluginClient(c),
archiveReaderClient: storage_v1.NewArchiveSpanReaderPluginClient(c),
archiveWriterClient: storage_v1.NewArchiveSpanWriterPluginClient(c),
capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(c),
depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(c),
}, nil
return NewGRPCClient(c), nil
}

0 comments on commit 292d7a4

Please sign in to comment.