Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create new grpc storage configuration to align with OTEL #5331

Merged
merged 30 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4da9754
add more grpc config options
akagami-harsh Apr 5, 2024
7371b30
Merge branch 'main' into grpc-config
akagami-harsh Apr 5, 2024
5c07802
Merge branch 'main' into grpc-config
akagami-harsh Apr 6, 2024
3986d8a
reuse otel's client config
akagami-harsh Apr 7, 2024
6cfb2dd
fix
akagami-harsh Apr 7, 2024
45467be
fix
akagami-harsh Apr 7, 2024
940f02c
log not supported when using auth
akagami-harsh Apr 8, 2024
9edf36a
fix
akagami-harsh Apr 8, 2024
3695b30
Merge branch 'main' into grpc-config
akagami-harsh Apr 8, 2024
b4ba240
fix
akagami-harsh Apr 8, 2024
e4dbf8e
Merge branch 'main' into grpc-config
akagami-harsh Apr 15, 2024
03b0f4e
Merge branch 'main' into grpc-config
akagami-harsh May 9, 2024
947501e
add config V2
akagami-harsh May 10, 2024
5eb9218
Merge branch 'main' into grpc-config
akagami-harsh May 12, 2024
e476ece
fix
akagami-harsh May 12, 2024
54736cc
fix
akagami-harsh May 13, 2024
e14a3f2
fix
akagami-harsh May 13, 2024
15be346
fix
akagami-harsh May 16, 2024
71baf51
fix
akagami-harsh May 16, 2024
2139a46
Merge branch 'main' into grpc-config
akagami-harsh May 16, 2024
c4a34b0
add ToOtelClientConfig()
akagami-harsh May 19, 2024
ba44b7f
fix
akagami-harsh May 19, 2024
a2143b3
add remote conn in *ClientPluginServices
akagami-harsh May 19, 2024
148000c
refactor
yurishkuro May 20, 2024
14f9c94
finish-tests
yurishkuro May 20, 2024
ff9ea18
cleanup
yurishkuro May 20, 2024
dd8701e
simplify
yurishkuro May 20, 2024
cd77995
add tests
akagami-harsh May 20, 2024
7c6e513
fix
akagami-harsh May 20, 2024
f3c2ccf
Merge branch 'main' into grpc-config
akagami-harsh May 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/jaeger/config-remote-storage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ extensions:
jaeger_storage:
grpc:
external-storage:
server: localhost:17271
connection-timeout: 5s
endpoint: localhost:17271
timeout: 5s
tls:
insecure: true

receivers:
otlp:
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
type Config struct {
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"`
GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"`
GRPC map[string]grpcCfg.ConfigV2 `mapstructure:"grpc"`
Opensearch map[string]esCfg.Configuration `mapstructure:"opensearch"`
Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"`
Cassandra map[string]cassandra.Options `mapstructure:"cassandra"`
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
cfg: s.config.Badger,
builder: badger.NewFactoryWithConfig,
}
grpcStarter := &starter[grpcCfg.Configuration, *grpc.Factory]{
grpcStarter := &starter[grpcCfg.ConfigV2, *grpc.Factory]{
ext: s,
storageKind: "grpc",
cfg: s.config.GRPC,
Expand Down
18 changes: 18 additions & 0 deletions pkg/config/tlscfg/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path/filepath"
"time"

"go.opentelemetry.io/collector/config/configtls"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -135,6 +136,23 @@ func (p Options) loadCertPool() (*x509.CertPool, error) {
return certPool, nil
}

func (o *Options) ToOtelClientConfig() configtls.ClientConfig {
akagami-harsh marked this conversation as resolved.
Show resolved Hide resolved
return configtls.ClientConfig{
Insecure: !o.Enabled,
InsecureSkipVerify: o.SkipHostVerify,
ServerName: o.ServerName,
Config: configtls.Config{
CAFile: o.CAPath,
CertFile: o.CertPath,
KeyFile: o.KeyPath,
CipherSuites: o.CipherSuites,
MinVersion: o.MinVersion,
MaxVersion: o.MaxVersion,
ReloadInterval: o.ReloadInterval,
},
}
}

func addCertToPool(caPath string, certPool *x509.CertPool) error {
caPEM, err := os.ReadFile(filepath.Clean(caPath))
if err != nil {
Expand Down
56 changes: 56 additions & 0 deletions pkg/config/tlscfg/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"fmt"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config/configtls"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -186,3 +188,57 @@ func TestOptionsToConfig(t *testing.T) {
})
}
}

func TestToOtelClientConfig(t *testing.T) {
testCases := []struct {
name string
options Options
expected configtls.ClientConfig
}{
{
name: "insecure",
options: Options{
Enabled: false,
},
expected: configtls.ClientConfig{
Insecure: true,
},
},
{
name: "secure with skip host verify",
options: Options{
Enabled: true,
SkipHostVerify: true,
ServerName: "example.com",
CAPath: "path/to/ca.pem",
CertPath: "path/to/cert.pem",
KeyPath: "path/to/key.pem",
CipherSuites: []string{"TLS_RSA_WITH_AES_128_CBC_SHA"},
MinVersion: "1.2",
MaxVersion: "1.3",
ReloadInterval: 24 * time.Hour,
},
expected: configtls.ClientConfig{
Insecure: false,
InsecureSkipVerify: true,
ServerName: "example.com",
Config: configtls.Config{
CAFile: "path/to/ca.pem",
CertFile: "path/to/cert.pem",
KeyFile: "path/to/key.pem",
CipherSuites: []string{"TLS_RSA_WITH_AES_128_CBC_SHA"},
MinVersion: "1.2",
MaxVersion: "1.3",
ReloadInterval: 24 * time.Hour,
},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actual := tc.options.ToOtelClientConfig()
assert.Equal(t, tc.expected, actual)
})
}
}
82 changes: 46 additions & 36 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@
package config

import (
"errors"
"context"
"fmt"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
Expand All @@ -37,72 +39,80 @@
RemoteTLS tlscfg.Options
RemoteConnectTimeout time.Duration `yaml:"connection-timeout" mapstructure:"connection-timeout"`
TenancyOpts tenancy.Options
}

type ConfigV2 struct {
Tenancy tenancy.Options `mapstructure:"multi_tenancy"`
configgrpc.ClientConfig `mapstructure:",squash"`
exporterhelper.TimeoutSettings `mapstructure:",squash"`
}

remoteConn *grpc.ClientConn
func (c *Configuration) TranslateToConfigV2() *ConfigV2 {
return &ConfigV2{
ClientConfig: configgrpc.ClientConfig{
Endpoint: c.RemoteServerAddr,
TLSSetting: c.RemoteTLS.ToOtelClientConfig(),
},
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: c.RemoteConnectTimeout,
},
}
}

// ClientPluginServices defines services plugin can expose and its capabilities
type ClientPluginServices struct {
shared.PluginServices
Capabilities shared.PluginCapabilities
remoteConn *grpc.ClientConn
}

// PluginBuilder is used to create storage plugins. Implemented by Configuration.
// TODO this interface should be removed and the building capability moved to Factory.
type PluginBuilder interface {
Build(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error)
Close() error
}

// Build instantiates a PluginServices
func (c *Configuration) Build(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error) {
return c.buildRemote(logger, tracerProvider, grpc.NewClient)
// TODO move this to factory.go
func (c *ConfigV2) Build(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error) {
akagami-harsh marked this conversation as resolved.
Show resolved Hide resolved
telset := component.TelemetrySettings{
Logger: logger,
TracerProvider: tracerProvider,
}
newClientFn := func(opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
return c.ToClientConn(context.Background(), componenttest.NewNopHost(), telset, opts...)
}
return newRemoteStorage(c, telset, newClientFn)
}

type newClientFn func(target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error)
type newClientFn func(opts ...grpc.DialOption) (*grpc.ClientConn, error)

func (c *Configuration) buildRemote(logger *zap.Logger, tracerProvider trace.TracerProvider, newClient newClientFn) (*ClientPluginServices, error) {
func newRemoteStorage(c *ConfigV2, telset component.TelemetrySettings, newClient newClientFn) (*ClientPluginServices, error) {
opts := []grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(tracerProvider))),
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(telset.TracerProvider))),
}
if c.RemoteTLS.Enabled {
tlsCfg, err := c.RemoteTLS.Config(logger)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
creds := credentials.NewTLS(tlsCfg)
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
if c.Auth != nil {
return nil, fmt.Errorf("authenticator is not supported")

Check warning on line 88 in plugin/storage/grpc/config/config.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/grpc/config/config.go#L88

Added line #L88 was not covered by tests
}

tenancyMgr := tenancy.NewManager(&c.TenancyOpts)
tenancyMgr := tenancy.NewManager(&c.Tenancy)
if tenancyMgr.Enabled {
opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr)))
opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr)))
}
var err error
c.remoteConn, err = newClient(c.RemoteServerAddr, opts...)

remoteConn, err := newClient(opts...)
if err != nil {
return nil, fmt.Errorf("error creating remote storage client: %w", err)
}

grpcClient := shared.NewGRPCClient(c.remoteConn)
grpcClient := shared.NewGRPCClient(remoteConn)
return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: grpcClient,
ArchiveStore: grpcClient,
StreamingSpanWriter: grpcClient,
},
Capabilities: grpcClient,
remoteConn: remoteConn,
}, nil
}

func (c *Configuration) Close() error {
var errs []error
func (c *ClientPluginServices) Close() error {
if c.remoteConn != nil {
errs = append(errs, c.remoteConn.Close())
return c.remoteConn.Close()
}
errs = append(errs, c.RemoteTLS.Close())
return errors.Join(errs...)
return nil

Check warning on line 117 in plugin/storage/grpc/config/config.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/grpc/config/config.go#L117

Added line #L117 was not covered by tests
}
9 changes: 5 additions & 4 deletions plugin/storage/grpc/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ import (
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.opentelemetry.io/collector/component"
"google.golang.org/grpc"
)

func TestBuildRemoteNewClientError(t *testing.T) {
// this is a silly test to verify handling of error from grpc.NewClient, which cannot be induced via params.
c := &Configuration{}
_, err := c.buildRemote(zap.NewNop(), nil, func(target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
c := &ConfigV2{}
newClientFn := func(opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
return nil, errors.New("test error")
})
}
_, err := newRemoteStorage(c, component.TelemetrySettings{}, newClientFn)
require.Error(t, err)
require.Contains(t, err.Error(), "error creating remote storage client")
}
Loading
Loading