Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[adaptive sampling] Clean-up after previous refactoring #5954

Merged
merged 2 commits into from
Sep 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugin/sampling/strategyprovider/adaptive/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ const (
maxProbabilities = 10
)

// aggregator is a kind of trace processor that watches for root spans
// and calculates how many traces per service / per endpoint are being
// produced. It periodically flushes these stats ("throughput") to storage.
//
// It also invokes PostAggregator which actually computes adaptive sampling
// probabilities based on the observed throughput.
type aggregator struct {
sync.Mutex

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package adaptive

import (
"context"
"errors"
"math"
"math/rand"
Expand All @@ -17,7 +16,6 @@ import (
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/sampling/leaderelection"
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive/calculationstrategy"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/storage/samplingstore"
)

Expand Down Expand Up @@ -58,7 +56,7 @@ type throughputBucket struct {
endTime time.Time
}

// PostAggregator retrieves service throughput over a look back interval and calculates sampling probabilities
// PostAggregator retrieves service throughput over a lookback interval and calculates sampling probabilities
// per operation such that each operation is sampled at a specified target QPS. It achieves this by
// retrieving discrete buckets of operation throughput and doing a weighted average of the throughput
// and generating a probability to match the targetQPS.
Expand Down Expand Up @@ -129,16 +127,6 @@ func newPostAggregator(
}, nil
}

// GetSamplingStrategy implements protobuf endpoint for retrieving sampling strategy for a service.
func (p *Provider) GetSamplingStrategy(_ context.Context, service string) (*api_v2.SamplingStrategyResponse, error) {
p.RLock()
defer p.RUnlock()
if strategy, ok := p.strategyResponses[service]; ok {
return strategy, nil
}
return p.generateDefaultSamplingStrategyResponse(), nil
}

// Start initializes and starts the sampling postAggregator which regularly calculates sampling probabilities.
func (p *PostAggregator) Start() error {
p.logger.Info("starting adaptive sampling postAggregator")
Expand All @@ -148,52 +136,10 @@ func (p *PostAggregator) Start() error {
return nil
}

func (p *Provider) loadProbabilities() {
// TODO GetLatestProbabilities API can be changed to return the latest measured qps for initialization
probabilities, err := p.storage.GetLatestProbabilities()
if err != nil {
p.logger.Warn("failed to initialize probabilities", zap.Error(err))
return
}
p.Lock()
defer p.Unlock()
p.probabilities = probabilities
}

// runUpdateProbabilitiesLoop is a loop that reads probabilities from storage.
// The follower updates its local cache with the latest probabilities and serves them.
func (p *Provider) runUpdateProbabilitiesLoop() {
select {
case <-time.After(addJitter(p.followerRefreshInterval)):
// continue after jitter delay
case <-p.shutdown:
return
}

ticker := time.NewTicker(p.followerRefreshInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Only load probabilities if this strategy_store doesn't hold the leader lock
if !p.isLeader() {
p.loadProbabilities()
p.generateStrategyResponses()
}
case <-p.shutdown:
return
}
}
}

func (p *PostAggregator) isLeader() bool {
return p.electionParticipant.IsLeader()
}

func (p *Provider) isLeader() bool {
return p.electionParticipant.IsLeader()
}

// addJitter adds a random amount of time. Without jitter, if the host holding the leader
// lock were to die, then all other collectors can potentially wait for a full cycle before
// trying to acquire the lock. With jitter, we can reduce the average amount of time before a
Expand Down Expand Up @@ -457,40 +403,3 @@ func (p *PostAggregator) isUsingAdaptiveSampling(
}
return false
}

// generateStrategyResponses generates and caches SamplingStrategyResponse from the calculated sampling probabilities.
func (p *Provider) generateStrategyResponses() {
p.RLock()
strategies := make(map[string]*api_v2.SamplingStrategyResponse)
for svc, opProbabilities := range p.probabilities {
opStrategies := make([]*api_v2.OperationSamplingStrategy, len(opProbabilities))
var idx int
for op, probability := range opProbabilities {
opStrategies[idx] = &api_v2.OperationSamplingStrategy{
Operation: op,
ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{
SamplingRate: probability,
},
}
idx++
}
strategy := p.generateDefaultSamplingStrategyResponse()
strategy.OperationSampling.PerOperationStrategies = opStrategies
strategies[svc] = strategy
}
p.RUnlock()

p.Lock()
defer p.Unlock()
p.strategyResponses = strategies
}

func (p *Provider) generateDefaultSamplingStrategyResponse() *api_v2.SamplingStrategyResponse {
return &api_v2.SamplingStrategyResponse{
StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC,
OperationSampling: &api_v2.PerOperationSamplingStrategies{
DefaultSamplingProbability: p.InitialSamplingProbability,
DefaultLowerBoundTracesPerSecond: p.MinSamplesPerSecond,
},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package adaptive

import (
"context"
"errors"
"testing"
"time"
Expand All @@ -20,7 +19,6 @@ import (
"github.com/jaegertracing/jaeger/pkg/testutils"
epmocks "github.com/jaegertracing/jaeger/plugin/sampling/leaderelection/mocks"
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive/calculationstrategy"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
smocks "github.com/jaegertracing/jaeger/storage/samplingstore/mocks"
)

Expand Down Expand Up @@ -405,113 +403,6 @@ func TestRunCalculationLoop_GetThroughputError(t *testing.T) {
require.NoError(t, agg.Close())
}

func TestLoadProbabilities(t *testing.T) {
mockStorage := &smocks.Store{}
mockStorage.On("GetLatestProbabilities").Return(make(model.ServiceOperationProbabilities), nil)

p := &Provider{storage: mockStorage}
require.Nil(t, p.probabilities)
p.loadProbabilities()
require.NotNil(t, p.probabilities)
}

func TestRunUpdateProbabilitiesLoop(t *testing.T) {
mockStorage := &smocks.Store{}
mockStorage.On("GetLatestProbabilities").Return(make(model.ServiceOperationProbabilities), nil)
mockEP := &epmocks.ElectionParticipant{}
mockEP.On("Start").Return(nil)
mockEP.On("Close").Return(nil)
mockEP.On("IsLeader").Return(false)

p := &Provider{
storage: mockStorage,
shutdown: make(chan struct{}),
followerRefreshInterval: time.Millisecond,
electionParticipant: mockEP,
}
defer close(p.shutdown)
require.Nil(t, p.probabilities)
require.Nil(t, p.strategyResponses)
go p.runUpdateProbabilitiesLoop()

for i := 0; i < 1000; i++ {
p.RLock()
if p.probabilities != nil && p.strategyResponses != nil {
p.RUnlock()
break
}
p.RUnlock()
time.Sleep(time.Millisecond)
}
p.RLock()
assert.NotNil(t, p.probabilities)
assert.NotNil(t, p.strategyResponses)
p.RUnlock()
}

func TestRealisticRunCalculationLoop(t *testing.T) {
t.Skip("Skipped realistic calculation loop test")
logger := zap.NewNop()
// NB: This is an extremely long test since it uses near realistic (1/6th scale) processor config values
testThroughputs := []*model.Throughput{
{Service: "svcA", Operation: "GET", Count: 10},
{Service: "svcA", Operation: "POST", Count: 9},
{Service: "svcA", Operation: "PUT", Count: 5},
{Service: "svcA", Operation: "DELETE", Count: 20},
}
mockStorage := &smocks.Store{}
mockStorage.On("GetThroughput", mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")).
Return(testThroughputs, nil)
mockStorage.On("GetLatestProbabilities").Return(make(model.ServiceOperationProbabilities), nil)
mockStorage.On("InsertProbabilitiesAndQPS", "host", mock.AnythingOfType("model.ServiceOperationProbabilities"),
mock.AnythingOfType("model.ServiceOperationQPS")).Return(nil)
mockEP := &epmocks.ElectionParticipant{}
mockEP.On("Start").Return(nil)
mockEP.On("Close").Return(nil)
mockEP.On("IsLeader").Return(true)
cfg := Options{
TargetSamplesPerSecond: 1.0,
DeltaTolerance: 0.2,
InitialSamplingProbability: 0.001,
CalculationInterval: time.Second * 10,
AggregationBuckets: 1,
Delay: time.Second * 10,
}
s := NewProvider(cfg, logger, mockEP, mockStorage)
s.Start()

for i := 0; i < 100; i++ {
strategy, _ := s.GetSamplingStrategy(context.Background(), "svcA")
if len(strategy.OperationSampling.PerOperationStrategies) != 0 {
break
}
time.Sleep(250 * time.Millisecond)
}
s.Close()

strategy, err := s.GetSamplingStrategy(context.Background(), "svcA")
require.NoError(t, err)
require.Len(t, strategy.OperationSampling.PerOperationStrategies, 4)
strategies := strategy.OperationSampling.PerOperationStrategies

for _, s := range strategies {
switch s.Operation {
case "GET":
assert.Equal(t, 0.001, s.ProbabilisticSampling.SamplingRate,
"Already at 1QPS, no probability change")
case "POST":
assert.Equal(t, 0.001, s.ProbabilisticSampling.SamplingRate,
"Within epsilon of 1QPS, no probability change")
case "PUT":
assert.InEpsilon(t, 0.002, s.ProbabilisticSampling.SamplingRate, 0.025,
"Under sampled, double probability")
case "DELETE":
assert.InEpsilon(t, 0.0005, s.ProbabilisticSampling.SamplingRate, 0.025,
"Over sampled, halve probability")
}
}
}

func TestPrependBucket(t *testing.T) {
p := &PostAggregator{Options: Options{AggregationBuckets: 1}}
p.prependThroughputBucket(&throughputBucket{interval: time.Minute})
Expand Down Expand Up @@ -547,41 +438,6 @@ func TestConstructorFailure(t *testing.T) {
require.EqualError(t, err, "BucketsForCalculation cannot be less than 1")
}

func TestGenerateStrategyResponses(t *testing.T) {
probabilities := model.ServiceOperationProbabilities{
"svcA": map[string]float64{
"GET": 0.5,
},
}
p := &Provider{
probabilities: probabilities,
Options: Options{
InitialSamplingProbability: 0.001,
MinSamplesPerSecond: 0.0001,
},
}
p.generateStrategyResponses()

expectedResponse := map[string]*api_v2.SamplingStrategyResponse{
"svcA": {
StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC,
OperationSampling: &api_v2.PerOperationSamplingStrategies{
DefaultSamplingProbability: 0.001,
DefaultLowerBoundTracesPerSecond: 0.0001,
PerOperationStrategies: []*api_v2.OperationSamplingStrategy{
{
Operation: "GET",
ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{
SamplingRate: 0.5,
},
},
},
},
},
}
assert.Equal(t, expectedResponse, p.strategyResponses)
}

func TestUsingAdaptiveSampling(t *testing.T) {
p := &PostAggregator{}
throughput := serviceOperationThroughput{
Expand Down
Loading
Loading