Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create new grpc storage configuration to align with OTEL #5331

Merged
merged 30 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4da9754
add more grpc config options
akagami-harsh Apr 5, 2024
7371b30
Merge branch 'main' into grpc-config
akagami-harsh Apr 5, 2024
5c07802
Merge branch 'main' into grpc-config
akagami-harsh Apr 6, 2024
3986d8a
reuse otel's client config
akagami-harsh Apr 7, 2024
6cfb2dd
fix
akagami-harsh Apr 7, 2024
45467be
fix
akagami-harsh Apr 7, 2024
940f02c
log not supported when using auth
akagami-harsh Apr 8, 2024
9edf36a
fix
akagami-harsh Apr 8, 2024
3695b30
Merge branch 'main' into grpc-config
akagami-harsh Apr 8, 2024
b4ba240
fix
akagami-harsh Apr 8, 2024
e4dbf8e
Merge branch 'main' into grpc-config
akagami-harsh Apr 15, 2024
03b0f4e
Merge branch 'main' into grpc-config
akagami-harsh May 9, 2024
947501e
add config V2
akagami-harsh May 10, 2024
5eb9218
Merge branch 'main' into grpc-config
akagami-harsh May 12, 2024
e476ece
fix
akagami-harsh May 12, 2024
54736cc
fix
akagami-harsh May 13, 2024
e14a3f2
fix
akagami-harsh May 13, 2024
15be346
fix
akagami-harsh May 16, 2024
71baf51
fix
akagami-harsh May 16, 2024
2139a46
Merge branch 'main' into grpc-config
akagami-harsh May 16, 2024
c4a34b0
add ToOtelClientConfig()
akagami-harsh May 19, 2024
ba44b7f
fix
akagami-harsh May 19, 2024
a2143b3
add remote conn in *ClientPluginServices
akagami-harsh May 19, 2024
148000c
refactor
yurishkuro May 20, 2024
14f9c94
finish-tests
yurishkuro May 20, 2024
ff9ea18
cleanup
yurishkuro May 20, 2024
dd8701e
simplify
yurishkuro May 20, 2024
cd77995
add tests
akagami-harsh May 20, 2024
7c6e513
fix
akagami-harsh May 20, 2024
f3c2ccf
Merge branch 'main' into grpc-config
akagami-harsh May 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/jaeger/config-remote-storage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ extensions:
jaeger_storage:
grpc:
external-storage:
server: localhost:17271
connection-timeout: 5s
endpoint: localhost:17271
timeout: 5s
tls:
insecure: true

receivers:
otlp:
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
type Config struct {
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"`
GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"`
GRPC map[string]grpcCfg.ConfigV2 `mapstructure:"grpc"`
Opensearch map[string]esCfg.Configuration `mapstructure:"opensearch"`
Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"`
Cassandra map[string]cassandra.Options `mapstructure:"cassandra"`
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
cfg: s.config.Badger,
builder: badger.NewFactoryWithConfig,
}
grpcStarter := &starter[grpcCfg.Configuration, *grpc.Factory]{
grpcStarter := &starter[grpcCfg.ConfigV2, *grpc.Factory]{
ext: s,
storageKind: "grpc",
cfg: s.config.GRPC,
Expand Down
18 changes: 18 additions & 0 deletions pkg/config/tlscfg/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path/filepath"
"time"

"go.opentelemetry.io/collector/config/configtls"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -135,6 +136,23 @@ func (p Options) loadCertPool() (*x509.CertPool, error) {
return certPool, nil
}

func (o *Options) ToOtelClientConfig() configtls.ClientConfig {
akagami-harsh marked this conversation as resolved.
Show resolved Hide resolved
return configtls.ClientConfig{
Insecure: !o.Enabled,
InsecureSkipVerify: o.SkipHostVerify,
ServerName: o.ServerName,
Config: configtls.Config{
CAFile: o.CAPath,
CertFile: o.CertPath,
KeyFile: o.KeyPath,
CipherSuites: o.CipherSuites,
MinVersion: o.MinVersion,
MaxVersion: o.MaxVersion,
ReloadInterval: o.ReloadInterval,
},
}
}

func addCertToPool(caPath string, certPool *x509.CertPool) error {
caPEM, err := os.ReadFile(filepath.Clean(caPath))
if err != nil {
Expand Down
80 changes: 45 additions & 35 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@
package config

import (
"errors"
"context"
"fmt"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
Expand All @@ -37,72 +39,80 @@
RemoteTLS tlscfg.Options
RemoteConnectTimeout time.Duration `yaml:"connection-timeout" mapstructure:"connection-timeout"`
TenancyOpts tenancy.Options
}

type ConfigV2 struct {
TenancyOpts tenancy.Options
akagami-harsh marked this conversation as resolved.
Show resolved Hide resolved
configgrpc.ClientConfig `mapstructure:",squash"`
exporterhelper.TimeoutSettings `mapstructure:",squash"`
}

remoteConn *grpc.ClientConn
func (c *Configuration) TranslateToConfigV2() *ConfigV2 {
return &ConfigV2{
ClientConfig: configgrpc.ClientConfig{
Endpoint: c.RemoteServerAddr,
TLSSetting: c.RemoteTLS.ToOtelClientConfig(),
},
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: c.RemoteConnectTimeout,
},
}
}

// ClientPluginServices defines services plugin can expose and its capabilities
type ClientPluginServices struct {
shared.PluginServices
Capabilities shared.PluginCapabilities
remoteConn *grpc.ClientConn
}

// PluginBuilder is used to create storage plugins. Implemented by Configuration.
// TODO this interface should be removed and the building capability moved to Factory.
type PluginBuilder interface {
Build(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error)
Close() error
}

// Build instantiates a PluginServices
func (c *Configuration) Build(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error) {
return c.buildRemote(logger, tracerProvider, grpc.NewClient)
// TODO move this to factory.go
func (c *ConfigV2) Build(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error) {
akagami-harsh marked this conversation as resolved.
Show resolved Hide resolved
telset := component.TelemetrySettings{
Logger: logger,
TracerProvider: tracerProvider,
}
newClientFn := func(opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
return c.ToClientConn(context.Background(), componenttest.NewNopHost(), telset, opts...)
}
return newRemoteStorage(c, telset, newClientFn)
}

type newClientFn func(target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error)
type newClientFn func(opts ...grpc.DialOption) (*grpc.ClientConn, error)

func (c *Configuration) buildRemote(logger *zap.Logger, tracerProvider trace.TracerProvider, newClient newClientFn) (*ClientPluginServices, error) {
func newRemoteStorage(c *ConfigV2, telset component.TelemetrySettings, newClient newClientFn) (*ClientPluginServices, error) {
opts := []grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(tracerProvider))),
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(telset.TracerProvider))),
}
if c.RemoteTLS.Enabled {
tlsCfg, err := c.RemoteTLS.Config(logger)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
creds := credentials.NewTLS(tlsCfg)
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
if c.Auth != nil {
return nil, fmt.Errorf("authenticator is not supported")

Check warning on line 88 in plugin/storage/grpc/config/config.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/grpc/config/config.go#L88

Added line #L88 was not covered by tests
}

tenancyMgr := tenancy.NewManager(&c.TenancyOpts)
if tenancyMgr.Enabled {
opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr)))
opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr)))
}
var err error
c.remoteConn, err = newClient(c.RemoteServerAddr, opts...)

remoteConn, err := newClient(opts...)
if err != nil {
return nil, fmt.Errorf("error creating remote storage client: %w", err)
}

grpcClient := shared.NewGRPCClient(c.remoteConn)
grpcClient := shared.NewGRPCClient(remoteConn)
return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: grpcClient,
ArchiveStore: grpcClient,
StreamingSpanWriter: grpcClient,
},
Capabilities: grpcClient,
remoteConn: remoteConn,
}, nil
}

func (c *Configuration) Close() error {
var errs []error
func (c *ClientPluginServices) Close() error {
if c.remoteConn != nil {
errs = append(errs, c.remoteConn.Close())
return c.remoteConn.Close()
}
errs = append(errs, c.RemoteTLS.Close())
return errors.Join(errs...)
return nil

Check warning on line 117 in plugin/storage/grpc/config/config.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/grpc/config/config.go#L117

Added line #L117 was not covered by tests
}
9 changes: 5 additions & 4 deletions plugin/storage/grpc/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ import (
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.opentelemetry.io/collector/component"
"google.golang.org/grpc"
)

func TestBuildRemoteNewClientError(t *testing.T) {
// this is a silly test to verify handling of error from grpc.NewClient, which cannot be induced via params.
c := &Configuration{}
_, err := c.buildRemote(zap.NewNop(), nil, func(target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
c := &ConfigV2{}
newClientFn := func(opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
return nil, errors.New("test error")
})
}
_, err := newRemoteStorage(c, component.TelemetrySettings{}, newClientFn)
require.Error(t, err)
require.Contains(t, err.Error(), "error creating remote storage client")
}
75 changes: 34 additions & 41 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package grpc

import (
"errors"
"flag"
"fmt"
"io"
Expand All @@ -27,7 +28,6 @@ import (
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand All @@ -42,17 +42,14 @@ var ( // interface comformance checks

// Factory implements storage.Factory and creates storage components backed by a storage plugin.
type Factory struct {
options Options
metricsFactory metrics.Factory
logger *zap.Logger
tracerProvider trace.TracerProvider

builder config.PluginBuilder
configV1 config.Configuration
configV2 *config.ConfigV2

store shared.StoragePlugin
archiveStore shared.ArchiveStoragePlugin
streamingSpanWriter shared.StreamingSpanWriterPlugin
capabilities shared.PluginCapabilities
services *config.ClientPluginServices
}

// NewFactory creates a new Factory.
Expand All @@ -62,108 +59,104 @@ func NewFactory() *Factory {

// NewFactoryWithConfig is used from jaeger(v2).
func NewFactoryWithConfig(
cfg config.Configuration,
cfg config.ConfigV2,
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*Factory, error) {
f := NewFactory()
f.configureFromOptions(Options{Configuration: cfg})
err := f.Initialize(metricsFactory, logger)
if err != nil {
f.configV2 = &cfg
if err := f.Initialize(metricsFactory, logger); err != nil {
return nil, err
}
return f, nil
}

// AddFlags implements plugin.Configurable
func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
f.options.AddFlags(flagSet)
v1AddFlags(flagSet)
}

// InitFromViper implements plugin.Configurable
func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) {
if err := f.options.InitFromViper(v); err != nil {
if err := v1InitFromViper(&f.configV1, v); err != nil {
logger.Fatal("unable to initialize gRPC storage factory", zap.Error(err))
}
f.builder = &f.options.Configuration
}

// configureFromOptions initializes factory from options
func (f *Factory) configureFromOptions(opts Options) {
f.options = opts
f.builder = &f.options.Configuration
}

// Initialize implements storage.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger
f.tracerProvider = otel.GetTracerProvider()

services, err := f.builder.Build(logger, f.tracerProvider)
if f.configV2 == nil {
f.configV2 = f.configV1.TranslateToConfigV2()
}

var err error
f.services, err = f.configV2.Build(logger, f.tracerProvider)
if err != nil {
return fmt.Errorf("grpc storage builder failed to create a store: %w", err)
}

f.store = services.Store
f.archiveStore = services.ArchiveStore
f.capabilities = services.Capabilities
f.streamingSpanWriter = services.StreamingSpanWriter
logger.Info("Remote storage configuration", zap.Any("configuration", f.options.Configuration))
logger.Info("Remote storage configuration", zap.Any("configuration", f.configV2))
return nil
}

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return f.store.SpanReader(), nil
return f.services.Store.SpanReader(), nil
}

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
if f.capabilities != nil && f.streamingSpanWriter != nil {
if capabilities, err := f.capabilities.Capabilities(); err == nil && capabilities.StreamingSpanWriter {
return f.streamingSpanWriter.StreamingSpanWriter(), nil
if f.services.Capabilities != nil && f.services.StreamingSpanWriter != nil {
if capabilities, err := f.services.Capabilities.Capabilities(); err == nil && capabilities.StreamingSpanWriter {
return f.services.StreamingSpanWriter.StreamingSpanWriter(), nil
}
}
return f.store.SpanWriter(), nil
return f.services.Store.SpanWriter(), nil
}

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return f.store.DependencyReader(), nil
return f.services.Store.DependencyReader(), nil
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if f.capabilities == nil {
if f.services.Capabilities == nil {
return nil, storage.ErrArchiveStorageNotSupported
}
capabilities, err := f.capabilities.Capabilities()
capabilities, err := f.services.Capabilities.Capabilities()
if err != nil {
return nil, err
}
if capabilities == nil || !capabilities.ArchiveSpanReader {
return nil, storage.ErrArchiveStorageNotSupported
}
return f.archiveStore.ArchiveSpanReader(), nil
return f.services.ArchiveStore.ArchiveSpanReader(), nil
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if f.capabilities == nil {
if f.services.Capabilities == nil {
return nil, storage.ErrArchiveStorageNotSupported
}
capabilities, err := f.capabilities.Capabilities()
capabilities, err := f.services.Capabilities.Capabilities()
if err != nil {
return nil, err
}
if capabilities == nil || !capabilities.ArchiveSpanWriter {
return nil, storage.ErrArchiveStorageNotSupported
}
return f.archiveStore.ArchiveSpanWriter(), nil
return f.services.ArchiveStore.ArchiveSpanWriter(), nil
}

// Close closes the resources held by the factory
func (f *Factory) Close() error {
// TODO Close should move into Services type, instead of being in the Config.
return f.builder.Close()
var errs []error
if f.services != nil {
errs = append(errs, f.services.Close())
}
errs = append(errs, f.configV1.RemoteTLS.Close())
return errors.Join(errs...)
}
Loading
Loading