Skip to content

Commit

Permalink
Storage factory framework
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <ys@uber.com>
  • Loading branch information
Yuri Shkuro committed Jan 1, 2018
1 parent f325b0a commit 32e84fa
Show file tree
Hide file tree
Showing 8 changed files with 356 additions and 8 deletions.
2 changes: 1 addition & 1 deletion cmd/collector/app/builder/builder_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type CollectorOptions struct {
// NumWorkers is the number of internal workers in a collector
NumWorkers int
// WriteCacheTTL denotes how often to check and re-write a service or operation name
WriteCacheTTL time.Duration
WriteCacheTTL time.Duration // TODO deprecate in favor of cmd/flags/cassandra.Options
// CollectorPort is the port that the collector service listens in on for tchannel requests
CollectorPort int
// CollectorHTTPPort is the port that the collector service listens in on for http requests
Expand Down
33 changes: 26 additions & 7 deletions cmd/flags/cassandra/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ package cassandra
import (
"flag"
"strings"
"time"

"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/pkg/cassandra/config"
)

const (
// session settings
suffixConnPerHost = ".connections-per-host"
suffixMaxRetryAttempts = ".max-retry-attempts"
suffixTimeout = ".timeout"
Expand All @@ -40,17 +42,24 @@ const (
suffixCA = ".tls.ca"
suffixServerName = ".tls.server-name"
suffixVerifyHost = ".tls.verify-host"

// common storage settings
suffixSpanStoreWriteCacheTTL = ".span-store-write-cache-ttl"
suffixDepStoreDataFrequency = ".dependency-store-data-frequency"
)

// TODO this should be moved next to config.Configuration struct (maybe ./flags package)

// Options contains various type of Cassandra configs and provides the ability
// to bind them to command line flag and apply overlays, so that some configurations
// (e.g. archive) may be underspecified and infer the rest of its parameters from primary.
//
// TODO this can be moved to plugin/storage/cassandra.Factory
type Options struct {
primary *namespaceConfig

others map[string]*namespaceConfig
primary *namespaceConfig
others map[string]*namespaceConfig
SpanStoreWriteCacheTTL time.Duration
DepStoreDataFrequency time.Duration
}

// the Servers field in config.Configuration is a list, which we cannot represent with flags.
Expand Down Expand Up @@ -80,7 +89,9 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
servers: "127.0.0.1",
namespace: primaryNamespace,
},
others: make(map[string]*namespaceConfig, len(otherNamespaces)),
others: make(map[string]*namespaceConfig, len(otherNamespaces)),
SpanStoreWriteCacheTTL: time.Hour * 12,
DepStoreDataFrequency: time.Hour * 24,
}

for _, namespace := range otherNamespaces {
Expand All @@ -96,6 +107,12 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
for _, cfg := range opt.others {
addFlags(flagSet, cfg)
}
flagSet.Duration(opt.primary.namespace+suffixSpanStoreWriteCacheTTL,
opt.SpanStoreWriteCacheTTL,
"The duration to wait before rewriting an existing service or operation name")
flagSet.Duration(opt.primary.namespace+suffixDepStoreDataFrequency,
opt.DepStoreDataFrequency,
"Frequency of service dependency calculations")
}

func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
Expand Down Expand Up @@ -167,13 +184,15 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {

// InitFromViper initializes Options with properties from viper
func (opt *Options) InitFromViper(v *viper.Viper) {
initFromViper(opt.primary, v)
initNamespaceFromViper(opt.primary, v)
for _, cfg := range opt.others {
initFromViper(cfg, v)
initNamespaceFromViper(cfg, v)
}
opt.SpanStoreWriteCacheTTL = v.GetDuration(opt.primary.namespace + suffixSpanStoreWriteCacheTTL)
opt.DepStoreDataFrequency = v.GetDuration(opt.primary.namespace + suffixDepStoreDataFrequency)
}

func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
func initNamespaceFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.ConnectionsPerHost = v.GetInt(cfg.namespace + suffixConnPerHost)
cfg.MaxRetryAttempts = v.GetInt(cfg.namespace + suffixMaxRetryAttempts)
cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout)
Expand Down
2 changes: 2 additions & 0 deletions cmd/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ func (flags *SharedFlags) NewLogger(conf zap.Config, options ...zap.Option) (*za
// SharedFlags holds flags configuration
type SharedFlags struct {
// SpanStorage defines common settings for Span Storage.
// TODO deprecate in favor of env var
SpanStorage spanStorage
// DependencyStorage defines common settings for Dependency Storage.
// TODO deprecate in favor of cmd/flags/cassandra.Options
DependencyStorage dependencyStorage
// Logging holds logging configuration
Logging logging
Expand Down
31 changes: 31 additions & 0 deletions plugin/configurable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package plugin

import (
"flag"

"github.com/spf13/viper"
)

// Configurable interface can be implemented by plugins that require external configuration,
// such as CLI flags, config files, or environment variables.
type Configurable interface {
// AddFlags adds CLI flags for configuring this component.
AddFlags(flagSet *flag.FlagSet)

// InitFromViper initializes this component with properties from spf13/viper.
InitFromViper(v *viper.Viper)
}
82 changes: 82 additions & 0 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"flag"

"github.com/spf13/viper"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

cFlags "github.com/jaegertracing/jaeger/cmd/flags/cassandra"
"github.com/jaegertracing/jaeger/pkg/cassandra"
cDepStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/dependencystore"
cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

// Factory implements storage.Factory for Cassandra backend.
type Factory struct {
Options *cFlags.Options

primarySession cassandra.Session
// archiveSession cassandra.Session TODO
}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
return &Factory{
Options: cFlags.NewOptions("cassandra", "cassandra-archive"),
}
}

// AddFlags implements plugin.Configurable
func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
f.Options.AddFlags(flagSet)
}

// InitFromViper implements plugin.Configurable
func (f *Factory) InitFromViper(v *viper.Viper) {
f.Options.InitFromViper(v)
}

// Initialize implements storage.Factory
func (f *Factory) Initialize() error {
cfg := f.Options.GetPrimary()
primarySession, err := cfg.NewSession()
if err != nil {
return nil
}
f.primarySession = primarySession
// TODO init archive (cf. https://github.com/jaegertracing/jaeger/pull/604)
return nil
}

// SpanReader implements storage.Factory
func (f *Factory) SpanReader(metricsFactory metrics.Factory, logger *zap.Logger) (spanstore.Reader, error) {
return cSpanStore.NewSpanReader(f.primarySession, metricsFactory, logger), nil
}

// SpanWriter implements storage.Factory
func (f *Factory) SpanWriter(metricsFactory metrics.Factory, logger *zap.Logger) (spanstore.Writer, error) {
return cSpanStore.NewSpanWriter(f.primarySession, f.Options.SpanStoreWriteCacheTTL, metricsFactory, logger), nil
}

// DependencyReader implements storage.Factory
func (f *Factory) DependencyReader(metricsFactory metrics.Factory, logger *zap.Logger) (dependencystore.Reader, error) {
return cDepStore.NewDependencyStore(f.primarySession, f.Options.DepStoreDataFrequency, metricsFactory, logger), nil
}
27 changes: 27 additions & 0 deletions plugin/storage/cassandra/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cassandra

import (
"testing"

"github.com/jaegertracing/jaeger/storage"
)

var _ storage.Factory = new(Factory)

func TestFactory(t *testing.T) {
NewFactory()
}
143 changes: 143 additions & 0 deletions plugin/storage/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storage

import (
"errors"
"flag"
"fmt"
"os"

"github.com/spf13/viper"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/plugin"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const (
spanStorageEnvVar = "SPAN_STORAGE"
dependencyStorageEnvVar = "DEPENDENCY_STORAGE"

cassandraStorageType = "cassandra"
elasticsearchStorageType = "elasticsearch"
memoryStorageType = "memory"
)

type factory struct {
spanStoreType string
depStoreType string

factories map[string]storage.Factory
}

// NewFactory creates a meta-factory for storage components. It reads the desired types of storage backends
// from SPAN_STORAGE and DEPENDENCY_STORAGE environment variable. Allowed values:
// * `cassandra` - built-in
// * `elasticsearch` - built-in
// * `memory` - built-in
// * `plugin` - loads a dynamic plugin that implements storage.Factory interface (not supported at the moment)
func NewFactory(metricsFactory metrics.Factory, logger *zap.Logger) (storage.Factory, error) {
f := &factory{}
f.spanStoreType = os.Getenv(spanStorageEnvVar)
if f.spanStoreType == "" {
f.spanStoreType = cassandraStorageType
}
f.depStoreType = os.Getenv(dependencyStorageEnvVar)
if f.depStoreType == "" {
f.depStoreType = f.spanStoreType
}
types := map[string]struct{}{
f.spanStoreType: {},
f.depStoreType: {},
}
f.factories = make(map[string]storage.Factory)
for t := range types {
ff, err := f.getFactoryOfType(t)
if err != nil {
return nil, err
}
f.factories[t] = ff
}
return f, nil
}

func (f *factory) getFactoryOfType(factoryType string) (storage.Factory, error) {
switch factoryType {
case cassandraStorageType:
return cassandra.NewFactory(), nil
case elasticsearchStorageType:
return nil, errors.New("ElasticsearchStorageType not supported")
case memoryStorageType:
return nil, errors.New("MemoryStorageType not supported")
default:
return nil, fmt.Errorf("Unknown storage type %s", factoryType)
}
}

func (f *factory) Initialize() error {
for _, factory := range f.factories {
if err := factory.Initialize(); err != nil {
return err
}
}
return nil
}

func (f *factory) SpanReader(metricsFactory metrics.Factory, logger *zap.Logger) (spanstore.Reader, error) {
factory, ok := f.factories[f.spanStoreType]
if !ok {
return nil, fmt.Errorf("No %s backend registered for span store", f.spanStoreType)
}
return factory.SpanReader(metricsFactory, logger)
}

func (f *factory) SpanWriter(metricsFactory metrics.Factory, logger *zap.Logger) (spanstore.Writer, error) {
factory, ok := f.factories[f.spanStoreType]
if !ok {
return nil, fmt.Errorf("No %s backend registered for span store", f.spanStoreType)
}
return factory.SpanWriter(metricsFactory, logger)
}

func (f *factory) DependencyReader(metricsFactory metrics.Factory, logger *zap.Logger) (dependencystore.Reader, error) {
factory, ok := f.factories[f.spanStoreType]
if !ok {
return nil, fmt.Errorf("No %s backend registered for span store", f.spanStoreType)
}
return factory.DependencyReader(metricsFactory, logger)
}

// AddFlags implements plugin.Configurable
func (f *factory) AddFlags(flagSet *flag.FlagSet) {
for _, factory := range f.factories {
if conf, ok := factory.(plugin.Configurable); ok {
conf.AddFlags(flagSet)
}
}
}

// InitFromViper implements plugin.Configurable
func (f *factory) InitFromViper(v *viper.Viper) {
for _, factory := range f.factories {
if conf, ok := factory.(plugin.Configurable); ok {
conf.InitFromViper(v)
}
}
}
Loading

0 comments on commit 32e84fa

Please sign in to comment.