Skip to content

Commit

Permalink
Add factory and update config for tail sampling processor
Browse files Browse the repository at this point in the history
  • Loading branch information
songy23 committed Jul 26, 2019
1 parent 11a0a2a commit 10d8c64
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 38 deletions.
58 changes: 58 additions & 0 deletions internal/collector/processor/tailsampling/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tailsampling

import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-service/config/configerror"
"github.com/open-telemetry/opentelemetry-service/config/configmodels"
"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/processor"
"github.com/open-telemetry/opentelemetry-service/service/builder"
)

const (
// The value of "type" Tail Sampling in configuration.
typeStr = "tail-sampling"
)

// Factory is the factory for Tail Sampling processor.
type Factory struct {
}

// Type gets the type of the config created by this factory.
func (f *Factory) Type() string {
return typeStr
}

// CreateTraceProcessor creates a trace processor based on this config.
func (f *Factory) CreateTraceProcessor(
logger *zap.Logger,
nextConsumer consumer.TraceConsumer,
cfg configmodels.Processor,
) (processor.TraceProcessor, error) {
tCfg := cfg.(*builder.TailBasedCfg)
return NewTraceProcessor(logger, nextConsumer, *tCfg)
}

// CreateMetricsProcessor creates a metrics processor based on this config.
func (f *Factory) CreateMetricsProcessor(
logger *zap.Logger,
nextConsumer consumer.MetricsConsumer,
cfg configmodels.Processor,
) (processor.MetricsProcessor, error) {
return nil, configerror.ErrDataTypeIsNotSupported
}
41 changes: 41 additions & 0 deletions internal/collector/processor/tailsampling/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tailsampling

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-service/exporter/exportertest"
"github.com/open-telemetry/opentelemetry-service/service/builder"
)

func TestCreateProcessor(t *testing.T) {
factory := &Factory{}
require.NotNil(t, factory)

cfg := builder.NewDefaultTailBasedCfg()

tp, err := factory.CreateTraceProcessor(zap.NewNop(), exportertest.NewNopTraceExporter(), cfg)
assert.NotNil(t, tp)
assert.NoError(t, err, "cannot create trace processor")

mp, err := factory.CreateMetricsProcessor(zap.NewNop(), nil, cfg)
assert.Nil(t, mp)
assert.Error(t, err, "should not be able to create metric processor")
}
50 changes: 27 additions & 23 deletions internal/collector/processor/tailsampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tailsampling

import (
"context"
"errors"
"runtime"
"sync"
"sync/atomic"
Expand All @@ -30,7 +31,8 @@ import (
"github.com/open-telemetry/opentelemetry-service/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-service/internal/collector/processor/idbatcher"
"github.com/open-telemetry/opentelemetry-service/internal/collector/sampling"
"github.com/open-telemetry/opentelemetry-service/observability"
"github.com/open-telemetry/opentelemetry-service/processor"
"github.com/open-telemetry/opentelemetry-service/service/builder"
)

// Policy combines a sampling policy evaluator with the destinations to be
Expand All @@ -54,6 +56,7 @@ type traceKey string
// policies to sample traces.
type tailSamplingSpanProcessor struct {
ctx context.Context
nextConsumer consumer.TraceConsumer
start sync.Once
maxNumTraces uint64
policies []*Policy
Expand All @@ -69,40 +72,41 @@ const (
sourceFormat = "tail-sampling"
)

var _ consumer.TraceConsumer = (*tailSamplingSpanProcessor)(nil)
var _ processor.TraceProcessor = (*tailSamplingSpanProcessor)(nil)

// NewTailSamplingSpanProcessor creates a TailSamplingSpanProcessor with the given policies.
// It will keep maxNumTraces on memory and will attempt to wait until decisionWait before evaluating if
// a trace should be sampled or not. Providing expectedNewTracesPerSec helps with allocating data structures
// with closer to actual usage size.
func NewTailSamplingSpanProcessor(
policies []*Policy,
maxNumTraces, expectedNewTracesPerSec uint64,
decisionWait time.Duration,
logger *zap.Logger) (consumer.TraceConsumer, error) {
// NewTraceProcessor returns a processor.TraceProcessor that will perform tail sampling according to the given
// configuration.
func NewTraceProcessor(logger *zap.Logger, nextConsumer consumer.TraceConsumer, cfg builder.TailBasedCfg) (processor.TraceProcessor, error) {
if nextConsumer == nil {
return nil, errors.New("nextConsumer is nil")
}

numDecisionBatches := uint64(decisionWait.Seconds())
inBatcher, err := idbatcher.New(numDecisionBatches, expectedNewTracesPerSec, uint64(2*runtime.NumCPU()))
numDecisionBatches := uint64(cfg.DecisionWait.Seconds())
inBatcher, err := idbatcher.New(numDecisionBatches, cfg.ExpectedNewTracesPerSec, uint64(2*runtime.NumCPU()))
if err != nil {
return nil, err
}

tsp := &tailSamplingSpanProcessor{
ctx: context.Background(),
maxNumTraces: maxNumTraces,
policies: policies,
nextConsumer: nextConsumer,
maxNumTraces: cfg.NumTraces,
logger: logger,
decisionBatcher: inBatcher,
// policies: policies,
}

for _, policy := range policies {
policyCtx, err := tag.New(tsp.ctx, tag.Upsert(tagPolicyKey, policy.Name), tag.Upsert(observability.TagKeyReceiver, sourceFormat))
if err != nil {
return nil, err
}
policy.ctx = policyCtx
}
// TODO(#146): add policies to TailBasedCfg
// for _, policy := range policies {
// policyCtx, err := tag.New(tsp.ctx, tag.Upsert(tagPolicyKey, policy.Name), tag.Upsert(observability.TagKeyReceiver, sourceFormat))
// if err != nil {
// return nil, err
// }
// policy.ctx = policyCtx
// }
tsp.policyTicker = &policyTicker{onTick: tsp.samplingPolicyOnTick}
tsp.deleteChan = make(chan traceKey, maxNumTraces)
tsp.deleteChan = make(chan traceKey, cfg.NumTraces)

return tsp, nil
}

Expand Down
50 changes: 35 additions & 15 deletions internal/collector/processor/tailsampling/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ import (
"testing"
"time"

tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-service/exporter/exportertest"
"github.com/open-telemetry/opentelemetry-service/internal/collector/processor/idbatcher"
"github.com/open-telemetry/opentelemetry-service/internal/collector/sampling"
"github.com/open-telemetry/opentelemetry-service/service/builder"
tracetranslator "github.com/open-telemetry/opentelemetry-service/translator/trace"

tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"go.uber.org/zap"
)

const (
Expand All @@ -36,7 +38,12 @@ const (

func TestSequentialTraceArrival(t *testing.T) {
traceIds, batches := generateIdsAndBatches(128)
sp, _ := NewTailSamplingSpanProcessor(newTestPolicy(), uint64(2*len(traceIds)), 64, defaultTestDecisionWait, zap.NewNop())
cfg := builder.TailBasedCfg{
DecisionWait: defaultTestDecisionWait,
NumTraces: uint64(2 * len(traceIds)),
ExpectedNewTracesPerSec: 64,
}
sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg)
tsp := sp.(*tailSamplingSpanProcessor)
for _, batch := range batches {
tsp.ConsumeTraceData(context.Background(), batch)
Expand All @@ -57,7 +64,12 @@ func TestConcurrentTraceArrival(t *testing.T) {
traceIds, batches := generateIdsAndBatches(128)

var wg sync.WaitGroup
sp, _ := NewTailSamplingSpanProcessor(newTestPolicy(), uint64(2*len(traceIds)), 64, defaultTestDecisionWait, zap.NewNop())
cfg := builder.TailBasedCfg{
DecisionWait: defaultTestDecisionWait,
NumTraces: uint64(2 * len(traceIds)),
ExpectedNewTracesPerSec: 64,
}
sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg)
tsp := sp.(*tailSamplingSpanProcessor)
for _, batch := range batches {
// Add the same traceId twice.
Expand Down Expand Up @@ -90,7 +102,12 @@ func TestConcurrentTraceArrival(t *testing.T) {
func TestSequentialTraceMapSize(t *testing.T) {
traceIds, batches := generateIdsAndBatches(210)
const maxSize = 100
sp, _ := NewTailSamplingSpanProcessor(newTestPolicy(), uint64(maxSize), 64, defaultTestDecisionWait, zap.NewNop())
cfg := builder.TailBasedCfg{
DecisionWait: defaultTestDecisionWait,
NumTraces: uint64(maxSize),
ExpectedNewTracesPerSec: 64,
}
sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg)
tsp := sp.(*tailSamplingSpanProcessor)
for _, batch := range batches {
tsp.ConsumeTraceData(context.Background(), batch)
Expand All @@ -108,7 +125,12 @@ func TestConcurrentTraceMapSize(t *testing.T) {
_, batches := generateIdsAndBatches(210)
const maxSize = 100
var wg sync.WaitGroup
sp, _ := NewTailSamplingSpanProcessor(newTestPolicy(), uint64(maxSize), 64, defaultTestDecisionWait, zap.NewNop())
cfg := builder.TailBasedCfg{
DecisionWait: defaultTestDecisionWait,
NumTraces: uint64(maxSize),
ExpectedNewTracesPerSec: 64,
}
sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg)
tsp := sp.(*tailSamplingSpanProcessor)
for _, batch := range batches {
wg.Add(1)
Expand All @@ -133,19 +155,17 @@ func TestConcurrentTraceMapSize(t *testing.T) {
}

func TestSamplingPolicyTypicalPath(t *testing.T) {
t.Skip("TODO(#146): add policies to TailBasedCfg and fix this test")
const maxSize = 100
const decisionWaitSeconds = 5
decisionWait := time.Second * decisionWaitSeconds
msp := &mockSpanProcessor{}
mpe := &mockPolicyEvaluator{}
testPolicy := []*Policy{
{
Name: "test",
Evaluator: mpe,
Destination: msp,
},
cfg := builder.TailBasedCfg{
DecisionWait: defaultTestDecisionWait,
NumTraces: uint64(maxSize),
ExpectedNewTracesPerSec: 64,
}
sp, _ := NewTailSamplingSpanProcessor(testPolicy, maxSize, 64, decisionWait, zap.NewNop())
sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg)
tsp := sp.(*tailSamplingSpanProcessor)

// For this test explicitly control the timer calls and batcher.
Expand Down
6 changes: 6 additions & 0 deletions service/builder/sampling_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"time"

"github.com/spf13/viper"

"github.com/open-telemetry/opentelemetry-service/config/configmodels"
)

const (
Expand Down Expand Up @@ -159,12 +161,16 @@ func (sCfg *SamplingCfg) InitFromViper(v *viper.Viper) *SamplingCfg {

// TailBasedCfg holds the configuration for tail-based sampling.
type TailBasedCfg struct {
configmodels.ProcessorSettings `mapstructure:",squash"`
// DecisionWait is the desired wait time from the arrival of the first span of
// trace until the decision about sampling it or not is evaluated.
DecisionWait time.Duration `mapstructure:"decision-wait"`
// NumTraces is the number of traces kept on memory. Typically most of the data
// of a trace is released after a sampling decision is taken.
NumTraces uint64 `mapstructure:"num-traces"`
// ExpectedNewTracesPerSec sets the expected number of new traces sending to the tail sampling processor
// per second. This helps with allocating data structures with closer to actual usage size.
ExpectedNewTracesPerSec uint64 `mapstructure:"expected-new-traces-per-sec"`
}

// NewDefaultTailBasedCfg creates a TailBasedCfg with the default values.
Expand Down

0 comments on commit 10d8c64

Please sign in to comment.