diff --git a/plugin/sampling/strategyprovider/adaptive/aggregator.go b/plugin/sampling/strategyprovider/adaptive/aggregator.go index a32cf8d1ef4..42ca182fbe3 100644 --- a/plugin/sampling/strategyprovider/adaptive/aggregator.go +++ b/plugin/sampling/strategyprovider/adaptive/aggregator.go @@ -22,6 +22,12 @@ const ( maxProbabilities = 10 ) +// aggregator is a kind of trace processor that watches for root spans +// and calculates how many traces per service / per endpoint are being +// produced. It periodically flushes these stats ("throughput") to storage. +// +// It also invokes PostAggregator which actually computes adaptive sampling +// probabilities based on the observed throughput. type aggregator struct { sync.Mutex diff --git a/plugin/sampling/strategyprovider/adaptive/processor.go b/plugin/sampling/strategyprovider/adaptive/post_aggregator.go similarity index 83% rename from plugin/sampling/strategyprovider/adaptive/processor.go rename to plugin/sampling/strategyprovider/adaptive/post_aggregator.go index 69122214486..407704892b2 100644 --- a/plugin/sampling/strategyprovider/adaptive/processor.go +++ b/plugin/sampling/strategyprovider/adaptive/post_aggregator.go @@ -4,7 +4,6 @@ package adaptive import ( - "context" "errors" "math" "math/rand" @@ -17,7 +16,6 @@ import ( "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin/sampling/leaderelection" "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive/calculationstrategy" - "github.com/jaegertracing/jaeger/proto-gen/api_v2" "github.com/jaegertracing/jaeger/storage/samplingstore" ) @@ -58,7 +56,7 @@ type throughputBucket struct { endTime time.Time } -// PostAggregator retrieves service throughput over a look back interval and calculates sampling probabilities +// PostAggregator retrieves service throughput over a lookback interval and calculates sampling probabilities // per operation such that each operation is sampled at a specified target QPS. It achieves this by // retrieving discrete buckets of operation throughput and doing a weighted average of the throughput // and generating a probability to match the targetQPS. @@ -129,16 +127,6 @@ func newPostAggregator( }, nil } -// GetSamplingStrategy implements protobuf endpoint for retrieving sampling strategy for a service. -func (p *Provider) GetSamplingStrategy(_ context.Context, service string) (*api_v2.SamplingStrategyResponse, error) { - p.RLock() - defer p.RUnlock() - if strategy, ok := p.strategyResponses[service]; ok { - return strategy, nil - } - return p.generateDefaultSamplingStrategyResponse(), nil -} - // Start initializes and starts the sampling postAggregator which regularly calculates sampling probabilities. func (p *PostAggregator) Start() error { p.logger.Info("starting adaptive sampling postAggregator") @@ -148,52 +136,10 @@ func (p *PostAggregator) Start() error { return nil } -func (p *Provider) loadProbabilities() { - // TODO GetLatestProbabilities API can be changed to return the latest measured qps for initialization - probabilities, err := p.storage.GetLatestProbabilities() - if err != nil { - p.logger.Warn("failed to initialize probabilities", zap.Error(err)) - return - } - p.Lock() - defer p.Unlock() - p.probabilities = probabilities -} - -// runUpdateProbabilitiesLoop is a loop that reads probabilities from storage. -// The follower updates its local cache with the latest probabilities and serves them. -func (p *Provider) runUpdateProbabilitiesLoop() { - select { - case <-time.After(addJitter(p.followerRefreshInterval)): - // continue after jitter delay - case <-p.shutdown: - return - } - - ticker := time.NewTicker(p.followerRefreshInterval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - // Only load probabilities if this strategy_store doesn't hold the leader lock - if !p.isLeader() { - p.loadProbabilities() - p.generateStrategyResponses() - } - case <-p.shutdown: - return - } - } -} - func (p *PostAggregator) isLeader() bool { return p.electionParticipant.IsLeader() } -func (p *Provider) isLeader() bool { - return p.electionParticipant.IsLeader() -} - // addJitter adds a random amount of time. Without jitter, if the host holding the leader // lock were to die, then all other collectors can potentially wait for a full cycle before // trying to acquire the lock. With jitter, we can reduce the average amount of time before a @@ -457,40 +403,3 @@ func (p *PostAggregator) isUsingAdaptiveSampling( } return false } - -// generateStrategyResponses generates and caches SamplingStrategyResponse from the calculated sampling probabilities. -func (p *Provider) generateStrategyResponses() { - p.RLock() - strategies := make(map[string]*api_v2.SamplingStrategyResponse) - for svc, opProbabilities := range p.probabilities { - opStrategies := make([]*api_v2.OperationSamplingStrategy, len(opProbabilities)) - var idx int - for op, probability := range opProbabilities { - opStrategies[idx] = &api_v2.OperationSamplingStrategy{ - Operation: op, - ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{ - SamplingRate: probability, - }, - } - idx++ - } - strategy := p.generateDefaultSamplingStrategyResponse() - strategy.OperationSampling.PerOperationStrategies = opStrategies - strategies[svc] = strategy - } - p.RUnlock() - - p.Lock() - defer p.Unlock() - p.strategyResponses = strategies -} - -func (p *Provider) generateDefaultSamplingStrategyResponse() *api_v2.SamplingStrategyResponse { - return &api_v2.SamplingStrategyResponse{ - StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC, - OperationSampling: &api_v2.PerOperationSamplingStrategies{ - DefaultSamplingProbability: p.InitialSamplingProbability, - DefaultLowerBoundTracesPerSecond: p.MinSamplesPerSecond, - }, - } -} diff --git a/plugin/sampling/strategyprovider/adaptive/processor_test.go b/plugin/sampling/strategyprovider/adaptive/post_aggregator_test.go similarity index 84% rename from plugin/sampling/strategyprovider/adaptive/processor_test.go rename to plugin/sampling/strategyprovider/adaptive/post_aggregator_test.go index 711e4e97619..e74147d9bb3 100644 --- a/plugin/sampling/strategyprovider/adaptive/processor_test.go +++ b/plugin/sampling/strategyprovider/adaptive/post_aggregator_test.go @@ -4,7 +4,6 @@ package adaptive import ( - "context" "errors" "testing" "time" @@ -20,7 +19,6 @@ import ( "github.com/jaegertracing/jaeger/pkg/testutils" epmocks "github.com/jaegertracing/jaeger/plugin/sampling/leaderelection/mocks" "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive/calculationstrategy" - "github.com/jaegertracing/jaeger/proto-gen/api_v2" smocks "github.com/jaegertracing/jaeger/storage/samplingstore/mocks" ) @@ -405,113 +403,6 @@ func TestRunCalculationLoop_GetThroughputError(t *testing.T) { require.NoError(t, agg.Close()) } -func TestLoadProbabilities(t *testing.T) { - mockStorage := &smocks.Store{} - mockStorage.On("GetLatestProbabilities").Return(make(model.ServiceOperationProbabilities), nil) - - p := &Provider{storage: mockStorage} - require.Nil(t, p.probabilities) - p.loadProbabilities() - require.NotNil(t, p.probabilities) -} - -func TestRunUpdateProbabilitiesLoop(t *testing.T) { - mockStorage := &smocks.Store{} - mockStorage.On("GetLatestProbabilities").Return(make(model.ServiceOperationProbabilities), nil) - mockEP := &epmocks.ElectionParticipant{} - mockEP.On("Start").Return(nil) - mockEP.On("Close").Return(nil) - mockEP.On("IsLeader").Return(false) - - p := &Provider{ - storage: mockStorage, - shutdown: make(chan struct{}), - followerRefreshInterval: time.Millisecond, - electionParticipant: mockEP, - } - defer close(p.shutdown) - require.Nil(t, p.probabilities) - require.Nil(t, p.strategyResponses) - go p.runUpdateProbabilitiesLoop() - - for i := 0; i < 1000; i++ { - p.RLock() - if p.probabilities != nil && p.strategyResponses != nil { - p.RUnlock() - break - } - p.RUnlock() - time.Sleep(time.Millisecond) - } - p.RLock() - assert.NotNil(t, p.probabilities) - assert.NotNil(t, p.strategyResponses) - p.RUnlock() -} - -func TestRealisticRunCalculationLoop(t *testing.T) { - t.Skip("Skipped realistic calculation loop test") - logger := zap.NewNop() - // NB: This is an extremely long test since it uses near realistic (1/6th scale) processor config values - testThroughputs := []*model.Throughput{ - {Service: "svcA", Operation: "GET", Count: 10}, - {Service: "svcA", Operation: "POST", Count: 9}, - {Service: "svcA", Operation: "PUT", Count: 5}, - {Service: "svcA", Operation: "DELETE", Count: 20}, - } - mockStorage := &smocks.Store{} - mockStorage.On("GetThroughput", mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")). - Return(testThroughputs, nil) - mockStorage.On("GetLatestProbabilities").Return(make(model.ServiceOperationProbabilities), nil) - mockStorage.On("InsertProbabilitiesAndQPS", "host", mock.AnythingOfType("model.ServiceOperationProbabilities"), - mock.AnythingOfType("model.ServiceOperationQPS")).Return(nil) - mockEP := &epmocks.ElectionParticipant{} - mockEP.On("Start").Return(nil) - mockEP.On("Close").Return(nil) - mockEP.On("IsLeader").Return(true) - cfg := Options{ - TargetSamplesPerSecond: 1.0, - DeltaTolerance: 0.2, - InitialSamplingProbability: 0.001, - CalculationInterval: time.Second * 10, - AggregationBuckets: 1, - Delay: time.Second * 10, - } - s := NewProvider(cfg, logger, mockEP, mockStorage) - s.Start() - - for i := 0; i < 100; i++ { - strategy, _ := s.GetSamplingStrategy(context.Background(), "svcA") - if len(strategy.OperationSampling.PerOperationStrategies) != 0 { - break - } - time.Sleep(250 * time.Millisecond) - } - s.Close() - - strategy, err := s.GetSamplingStrategy(context.Background(), "svcA") - require.NoError(t, err) - require.Len(t, strategy.OperationSampling.PerOperationStrategies, 4) - strategies := strategy.OperationSampling.PerOperationStrategies - - for _, s := range strategies { - switch s.Operation { - case "GET": - assert.Equal(t, 0.001, s.ProbabilisticSampling.SamplingRate, - "Already at 1QPS, no probability change") - case "POST": - assert.Equal(t, 0.001, s.ProbabilisticSampling.SamplingRate, - "Within epsilon of 1QPS, no probability change") - case "PUT": - assert.InEpsilon(t, 0.002, s.ProbabilisticSampling.SamplingRate, 0.025, - "Under sampled, double probability") - case "DELETE": - assert.InEpsilon(t, 0.0005, s.ProbabilisticSampling.SamplingRate, 0.025, - "Over sampled, halve probability") - } - } -} - func TestPrependBucket(t *testing.T) { p := &PostAggregator{Options: Options{AggregationBuckets: 1}} p.prependThroughputBucket(&throughputBucket{interval: time.Minute}) @@ -547,41 +438,6 @@ func TestConstructorFailure(t *testing.T) { require.EqualError(t, err, "BucketsForCalculation cannot be less than 1") } -func TestGenerateStrategyResponses(t *testing.T) { - probabilities := model.ServiceOperationProbabilities{ - "svcA": map[string]float64{ - "GET": 0.5, - }, - } - p := &Provider{ - probabilities: probabilities, - Options: Options{ - InitialSamplingProbability: 0.001, - MinSamplesPerSecond: 0.0001, - }, - } - p.generateStrategyResponses() - - expectedResponse := map[string]*api_v2.SamplingStrategyResponse{ - "svcA": { - StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC, - OperationSampling: &api_v2.PerOperationSamplingStrategies{ - DefaultSamplingProbability: 0.001, - DefaultLowerBoundTracesPerSecond: 0.0001, - PerOperationStrategies: []*api_v2.OperationSamplingStrategy{ - { - Operation: "GET", - ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{ - SamplingRate: 0.5, - }, - }, - }, - }, - }, - } - assert.Equal(t, expectedResponse, p.strategyResponses) -} - func TestUsingAdaptiveSampling(t *testing.T) { p := &PostAggregator{} throughput := serviceOperationThroughput{ diff --git a/plugin/sampling/strategyprovider/adaptive/provider.go b/plugin/sampling/strategyprovider/adaptive/provider.go index 22d2538636f..dfc7ba29b18 100644 --- a/plugin/sampling/strategyprovider/adaptive/provider.go +++ b/plugin/sampling/strategyprovider/adaptive/provider.go @@ -4,6 +4,7 @@ package adaptive import ( + "context" "sync" "time" @@ -17,6 +18,11 @@ import ( const defaultFollowerProbabilityInterval = 20 * time.Second +// Provider is responsible for providing sampling strategies for services. +// It periodically loads sampling probabilities from storage and converts them +// into sampling strategies that are cached and served to clients. +// Provider relies on sampling probabilities being periodically updated by the +// aggregator & post-aggregator. type Provider struct { sync.RWMutex Options @@ -55,24 +61,113 @@ func NewProvider(options Options, logger *zap.Logger, participant leaderelection } // Start initializes and starts the sampling service which regularly loads sampling probabilities and generates strategies. -func (ss *Provider) Start() error { - ss.logger.Info("starting adaptive sampling service") - ss.loadProbabilities() - ss.generateStrategyResponses() +func (p *Provider) Start() error { + p.logger.Info("starting adaptive sampling service") + p.loadProbabilities() + p.generateStrategyResponses() - ss.bgFinished.Add(1) + p.bgFinished.Add(1) go func() { - ss.runUpdateProbabilitiesLoop() - ss.bgFinished.Done() + p.runUpdateProbabilitiesLoop() + p.bgFinished.Done() }() return nil } +func (p *Provider) loadProbabilities() { + // TODO GetLatestProbabilities API can be changed to return the latest measured qps for initialization + probabilities, err := p.storage.GetLatestProbabilities() + if err != nil { + p.logger.Warn("failed to initialize probabilities", zap.Error(err)) + return + } + p.Lock() + defer p.Unlock() + p.probabilities = probabilities +} + +// runUpdateProbabilitiesLoop is a loop that reads probabilities from storage. +// The follower updates its local cache with the latest probabilities and serves them. +func (p *Provider) runUpdateProbabilitiesLoop() { + select { + case <-time.After(addJitter(p.followerRefreshInterval)): + // continue after jitter delay + case <-p.shutdown: + return + } + + ticker := time.NewTicker(p.followerRefreshInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + // Only load probabilities if this strategy_store doesn't hold the leader lock + if !p.isLeader() { + p.loadProbabilities() + p.generateStrategyResponses() + } + case <-p.shutdown: + return + } + } +} + +func (p *Provider) isLeader() bool { + return p.electionParticipant.IsLeader() +} + +// generateStrategyResponses generates and caches SamplingStrategyResponse from the calculated sampling probabilities. +func (p *Provider) generateStrategyResponses() { + p.RLock() + strategies := make(map[string]*api_v2.SamplingStrategyResponse) + for svc, opProbabilities := range p.probabilities { + opStrategies := make([]*api_v2.OperationSamplingStrategy, len(opProbabilities)) + var idx int + for op, probability := range opProbabilities { + opStrategies[idx] = &api_v2.OperationSamplingStrategy{ + Operation: op, + ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{ + SamplingRate: probability, + }, + } + idx++ + } + strategy := p.generateDefaultSamplingStrategyResponse() + strategy.OperationSampling.PerOperationStrategies = opStrategies + strategies[svc] = strategy + } + p.RUnlock() + + p.Lock() + defer p.Unlock() + p.strategyResponses = strategies +} + +func (p *Provider) generateDefaultSamplingStrategyResponse() *api_v2.SamplingStrategyResponse { + return &api_v2.SamplingStrategyResponse{ + StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC, + OperationSampling: &api_v2.PerOperationSamplingStrategies{ + DefaultSamplingProbability: p.InitialSamplingProbability, + DefaultLowerBoundTracesPerSecond: p.MinSamplesPerSecond, + }, + } +} + +// GetSamplingStrategy implements protobuf endpoint for retrieving sampling strategy for a service. +func (p *Provider) GetSamplingStrategy(_ context.Context, service string) (*api_v2.SamplingStrategyResponse, error) { + p.RLock() + defer p.RUnlock() + if strategy, ok := p.strategyResponses[service]; ok { + return strategy, nil + } + return p.generateDefaultSamplingStrategyResponse(), nil +} + // Close stops the service from loading probabilities and generating strategies. -func (ss *Provider) Close() error { - ss.logger.Info("stopping adaptive sampling service") - close(ss.shutdown) - ss.bgFinished.Wait() +func (p *Provider) Close() error { + p.logger.Info("stopping adaptive sampling service") + close(p.shutdown) + p.bgFinished.Wait() return nil } diff --git a/plugin/sampling/strategyprovider/adaptive/provider_test.go b/plugin/sampling/strategyprovider/adaptive/provider_test.go new file mode 100644 index 00000000000..89e0f37481e --- /dev/null +++ b/plugin/sampling/strategyprovider/adaptive/provider_test.go @@ -0,0 +1,162 @@ +// Copyright (c) 2018 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptive + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" + epmocks "github.com/jaegertracing/jaeger/plugin/sampling/leaderelection/mocks" + "github.com/jaegertracing/jaeger/proto-gen/api_v2" + smocks "github.com/jaegertracing/jaeger/storage/samplingstore/mocks" +) + +func TestProviderLoadProbabilities(t *testing.T) { + mockStorage := &smocks.Store{} + mockStorage.On("GetLatestProbabilities").Return(make(model.ServiceOperationProbabilities), nil) + + p := &Provider{storage: mockStorage} + require.Nil(t, p.probabilities) + p.loadProbabilities() + require.NotNil(t, p.probabilities) +} + +func TestProviderRunUpdateProbabilitiesLoop(t *testing.T) { + mockStorage := &smocks.Store{} + mockStorage.On("GetLatestProbabilities").Return(make(model.ServiceOperationProbabilities), nil) + mockEP := &epmocks.ElectionParticipant{} + mockEP.On("Start").Return(nil) + mockEP.On("Close").Return(nil) + mockEP.On("IsLeader").Return(false) + + p := &Provider{ + storage: mockStorage, + shutdown: make(chan struct{}), + followerRefreshInterval: time.Millisecond, + electionParticipant: mockEP, + } + defer close(p.shutdown) + require.Nil(t, p.probabilities) + require.Nil(t, p.strategyResponses) + go p.runUpdateProbabilitiesLoop() + + for i := 0; i < 1000; i++ { + p.RLock() + if p.probabilities != nil && p.strategyResponses != nil { + p.RUnlock() + break + } + p.RUnlock() + time.Sleep(time.Millisecond) + } + p.RLock() + assert.NotNil(t, p.probabilities) + assert.NotNil(t, p.strategyResponses) + p.RUnlock() +} + +func TestProviderRealisticRunCalculationLoop(t *testing.T) { + t.Skip("Skipped realistic calculation loop test") + logger := zap.NewNop() + // NB: This is an extremely long test since it uses near realistic (1/6th scale) processor config values + testThroughputs := []*model.Throughput{ + {Service: "svcA", Operation: "GET", Count: 10}, + {Service: "svcA", Operation: "POST", Count: 9}, + {Service: "svcA", Operation: "PUT", Count: 5}, + {Service: "svcA", Operation: "DELETE", Count: 20}, + } + mockStorage := &smocks.Store{} + mockStorage.On("GetThroughput", mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")). + Return(testThroughputs, nil) + mockStorage.On("GetLatestProbabilities").Return(make(model.ServiceOperationProbabilities), nil) + mockStorage.On("InsertProbabilitiesAndQPS", "host", mock.AnythingOfType("model.ServiceOperationProbabilities"), + mock.AnythingOfType("model.ServiceOperationQPS")).Return(nil) + mockEP := &epmocks.ElectionParticipant{} + mockEP.On("Start").Return(nil) + mockEP.On("Close").Return(nil) + mockEP.On("IsLeader").Return(true) + cfg := Options{ + TargetSamplesPerSecond: 1.0, + DeltaTolerance: 0.2, + InitialSamplingProbability: 0.001, + CalculationInterval: time.Second * 10, + AggregationBuckets: 1, + Delay: time.Second * 10, + } + s := NewProvider(cfg, logger, mockEP, mockStorage) + s.Start() + + for i := 0; i < 100; i++ { + strategy, _ := s.GetSamplingStrategy(context.Background(), "svcA") + if len(strategy.OperationSampling.PerOperationStrategies) != 0 { + break + } + time.Sleep(250 * time.Millisecond) + } + s.Close() + + strategy, err := s.GetSamplingStrategy(context.Background(), "svcA") + require.NoError(t, err) + require.Len(t, strategy.OperationSampling.PerOperationStrategies, 4) + strategies := strategy.OperationSampling.PerOperationStrategies + + for _, s := range strategies { + switch s.Operation { + case "GET": + assert.Equal(t, 0.001, s.ProbabilisticSampling.SamplingRate, + "Already at 1QPS, no probability change") + case "POST": + assert.Equal(t, 0.001, s.ProbabilisticSampling.SamplingRate, + "Within epsilon of 1QPS, no probability change") + case "PUT": + assert.InEpsilon(t, 0.002, s.ProbabilisticSampling.SamplingRate, 0.025, + "Under sampled, double probability") + case "DELETE": + assert.InEpsilon(t, 0.0005, s.ProbabilisticSampling.SamplingRate, 0.025, + "Over sampled, halve probability") + } + } +} + +func TestProviderGenerateStrategyResponses(t *testing.T) { + probabilities := model.ServiceOperationProbabilities{ + "svcA": map[string]float64{ + "GET": 0.5, + }, + } + p := &Provider{ + probabilities: probabilities, + Options: Options{ + InitialSamplingProbability: 0.001, + MinSamplesPerSecond: 0.0001, + }, + } + p.generateStrategyResponses() + + expectedResponse := map[string]*api_v2.SamplingStrategyResponse{ + "svcA": { + StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC, + OperationSampling: &api_v2.PerOperationSamplingStrategies{ + DefaultSamplingProbability: 0.001, + DefaultLowerBoundTracesPerSecond: 0.0001, + PerOperationStrategies: []*api_v2.OperationSamplingStrategy{ + { + Operation: "GET", + ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{ + SamplingRate: 0.5, + }, + }, + }, + }, + }, + } + assert.Equal(t, expectedResponse, p.strategyResponses) +}