diff --git a/plugin/sampling/strategystore/adaptive/factory.go b/plugin/sampling/strategystore/adaptive/factory.go index 4adfdc5214e..d10e3208efb 100644 --- a/plugin/sampling/strategystore/adaptive/factory.go +++ b/plugin/sampling/strategystore/adaptive/factory.go @@ -71,7 +71,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.S if err != nil { return err } - f.store, err = ssFactory.CreateSamplingStore() + f.store, err = ssFactory.CreateSamplingStore(f.options.AggregationBuckets) if err != nil { return err } diff --git a/plugin/sampling/strategystore/adaptive/factory_test.go b/plugin/sampling/strategystore/adaptive/factory_test.go index bf757e13aaf..c739c5a2dcd 100644 --- a/plugin/sampling/strategystore/adaptive/factory_test.go +++ b/plugin/sampling/strategystore/adaptive/factory_test.go @@ -123,7 +123,7 @@ func (m *mockSamplingStoreFactory) CreateLock() (distributedlock.Lock, error) { return mockLock, nil } -func (m *mockSamplingStoreFactory) CreateSamplingStore() (samplingstore.Store, error) { +func (m *mockSamplingStoreFactory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) { if m.storeFailsWith != nil { return nil, m.storeFailsWith } diff --git a/plugin/sampling/strategystore/factory_test.go b/plugin/sampling/strategystore/factory_test.go index db8380ed7bd..5be3087d3b8 100644 --- a/plugin/sampling/strategystore/factory_test.go +++ b/plugin/sampling/strategystore/factory_test.go @@ -153,6 +153,6 @@ type mockSamplingStoreFactory struct{} func (m *mockSamplingStoreFactory) CreateLock() (distributedlock.Lock, error) { return nil, nil } -func (m *mockSamplingStoreFactory) CreateSamplingStore() (samplingstore.Store, error) { +func (m *mockSamplingStoreFactory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) { return nil, nil } diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index 7cca3b13c7c..9f3c288be46 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -164,7 +164,7 @@ func (f *Factory) CreateLock() (distributedlock.Lock, error) { } // CreateSamplingStore implements storage.SamplingStoreFactory -func (f *Factory) CreateSamplingStore() (samplingstore.Store, error) { +func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) { return cSamplingStore.New(f.primarySession, f.primaryMetricsFactory, f.logger), nil } diff --git a/plugin/storage/cassandra/factory_test.go b/plugin/storage/cassandra/factory_test.go index bba78f9e2f6..b27ee56ad05 100644 --- a/plugin/storage/cassandra/factory_test.go +++ b/plugin/storage/cassandra/factory_test.go @@ -103,7 +103,7 @@ func TestCassandraFactory(t *testing.T) { _, err = f.CreateLock() assert.NoError(t, err) - _, err = f.CreateSamplingStore() + _, err = f.CreateSamplingStore(0) assert.NoError(t, err) assert.NoError(t, f.Close()) diff --git a/plugin/storage/memory/factory.go b/plugin/storage/memory/factory.go index c94fbfa6c50..6aa2c3d41ec 100644 --- a/plugin/storage/memory/factory.go +++ b/plugin/storage/memory/factory.go @@ -22,7 +22,9 @@ import ( "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/pkg/distributedlock" "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/samplingstore" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -79,6 +81,16 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { return f.store, nil } +// CreateSamplingStore implements storage.SamplingStoreFactory +func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) { + return NewSamplingStore(maxBuckets), nil +} + +// CreateLock implements storage.SamplingStoreFactory +func (f *Factory) CreateLock() (distributedlock.Lock, error) { + return &lock{}, nil +} + func (f *Factory) publishOpts() { internalFactory := f.metricsFactory.Namespace(metrics.NSOptions{Name: "internal"}) internalFactory.Gauge(metrics.Options{Name: limit}). diff --git a/plugin/storage/memory/factory_test.go b/plugin/storage/memory/factory_test.go index 729c05ff0fd..2a604e6a919 100644 --- a/plugin/storage/memory/factory_test.go +++ b/plugin/storage/memory/factory_test.go @@ -44,6 +44,12 @@ func TestMemoryStorageFactory(t *testing.T) { depReader, err := f.CreateDependencyReader() assert.NoError(t, err) assert.Equal(t, f.store, depReader) + samplingStore, err := f.CreateSamplingStore(2) + assert.NoError(t, err) + assert.Equal(t, 2, samplingStore.(*SamplingStore).maxBuckets) + lock, err := f.CreateLock() + assert.NoError(t, err) + assert.NotNil(t, lock) } func TestWithConfiguration(t *testing.T) { diff --git a/plugin/storage/memory/lock.go b/plugin/storage/memory/lock.go new file mode 100644 index 00000000000..66eaef1a193 --- /dev/null +++ b/plugin/storage/memory/lock.go @@ -0,0 +1,29 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memory + +import "time" + +type lock struct{} + +// Acquire always returns true for memory storage because it's a single-node +func (l *lock) Acquire(resource string, ttl time.Duration) (bool, error) { + return true, nil +} + +// Forfeit always returns true for memory storage +func (l *lock) Forfeit(resource string) (bool, error) { + return true, nil +} diff --git a/plugin/storage/memory/lock_test.go b/plugin/storage/memory/lock_test.go new file mode 100644 index 00000000000..fa036a99f0d --- /dev/null +++ b/plugin/storage/memory/lock_test.go @@ -0,0 +1,36 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memory + +import ( + "testing" + "time" + + "github.com/crossdock/crossdock-go/assert" +) + +func TestAcquire(t *testing.T) { + l := &lock{} + ok, err := l.Acquire("resource", time.Duration(1)) + assert.True(t, ok) + assert.NoError(t, err) +} + +func TestForfeit(t *testing.T) { + l := &lock{} + ok, err := l.Forfeit("resource") + assert.True(t, ok) + assert.NoError(t, err) +} diff --git a/plugin/storage/memory/sampling.go b/plugin/storage/memory/sampling.go new file mode 100644 index 00000000000..ec44f4398ce --- /dev/null +++ b/plugin/storage/memory/sampling.go @@ -0,0 +1,98 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memory + +import ( + "sync" + "time" + + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" +) + +// SamplingStroe is an in-memory store for sampling data +type SamplingStore struct { + sync.RWMutex + throughputs []*storedThroughput + probabilitiesAndQPS *storedServiceOperationProbabilitiesAndQPS + maxBuckets int +} + +type storedThroughput struct { + throughput []*model.Throughput + time time.Time +} + +type storedServiceOperationProbabilitiesAndQPS struct { + hostname string + probabilities model.ServiceOperationProbabilities + qps model.ServiceOperationQPS + time time.Time +} + +// NewSamplingStore creates an in-memory sampling store. +func NewSamplingStore(maxBuckets int) *SamplingStore { + return &SamplingStore{maxBuckets: maxBuckets} +} + +// InsertThroughput implements samplingstore.Store#InsertThroughput. +func (ss *SamplingStore) InsertThroughput(throughput []*model.Throughput) error { + ss.Lock() + defer ss.Unlock() + now := time.Now() + ss.preprendThroughput(&storedThroughput{throughput, now}) + return nil +} + +// GetThroughput implements samplingstore.Store#GetThroughput. +func (ss *SamplingStore) GetThroughput(start, end time.Time) ([]*model.Throughput, error) { + ss.Lock() + defer ss.Unlock() + var retSlice []*model.Throughput + for _, t := range ss.throughputs { + if t.time.After(start) && (t.time.Before(end) || t.time.Equal(end)) { + retSlice = append(retSlice, t.throughput...) + } + } + return retSlice, nil +} + +// InsertProbabilitiesAndQPS implements samplingstore.Store#InsertProbabilitiesAndQPS. +func (ss *SamplingStore) InsertProbabilitiesAndQPS( + hostname string, + probabilities model.ServiceOperationProbabilities, + qps model.ServiceOperationQPS, +) error { + ss.Lock() + defer ss.Unlock() + ss.probabilitiesAndQPS = &storedServiceOperationProbabilitiesAndQPS{hostname, probabilities, qps, time.Now()} + return nil +} + +// GetLatestProbabilities implements samplingstore.Store#GetLatestProbabilities. +func (ss *SamplingStore) GetLatestProbabilities() (model.ServiceOperationProbabilities, error) { + ss.Lock() + defer ss.Unlock() + if ss.probabilitiesAndQPS != nil { + return ss.probabilitiesAndQPS.probabilities, nil + } + return model.ServiceOperationProbabilities{}, nil +} + +func (ss *SamplingStore) preprendThroughput(throughput *storedThroughput) { + ss.throughputs = append([]*storedThroughput{throughput}, ss.throughputs...) + if len(ss.throughputs) > ss.maxBuckets { + ss.throughputs = ss.throughputs[0:ss.maxBuckets] + } +} diff --git a/plugin/storage/memory/sampling_test.go b/plugin/storage/memory/sampling_test.go new file mode 100644 index 00000000000..8f6e1dd5ff9 --- /dev/null +++ b/plugin/storage/memory/sampling_test.go @@ -0,0 +1,107 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memory + +import ( + "fmt" + "testing" + "time" + + "github.com/crossdock/crossdock-go/assert" + + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" +) + +func withPopulatedSamplingStore(f func(samplingStore *SamplingStore)) { + now := time.Now() + millisAfter := now.Add(time.Millisecond * time.Duration(100)) + secondsAfter := now.Add(time.Second * time.Duration(2)) + throughputs := []*storedThroughput{ + {[]*model.Throughput{{Service: "svc-1", Operation: "op-1", Count: 1}}, now}, + {[]*model.Throughput{{Service: "svc-1", Operation: "op-2", Count: 1}}, millisAfter}, + {[]*model.Throughput{{Service: "svc-2", Operation: "op-3", Count: 1}}, secondsAfter}, + } + pQPS := &storedServiceOperationProbabilitiesAndQPS{ + hostname: "guntur38ab8928", probabilities: model.ServiceOperationProbabilities{"svc-1": {"op-1": 0.01}}, qps: model.ServiceOperationQPS{"svc-1": {"op-1": 10.0}}, time: now} + samplingStore := &SamplingStore{throughputs: throughputs, probabilitiesAndQPS: pQPS} + f(samplingStore) +} + +func withMemorySamplingStore(f func(samplingStore *SamplingStore)) { + f(NewSamplingStore(5)) +} + +func TestInsertThroughtput(t *testing.T) { + withMemorySamplingStore(func(samplingStore *SamplingStore) { + start := time.Now() + throughputs := []*model.Throughput{ + {Service: "my-svc", Operation: "op"}, + {Service: "our-svc", Operation: "op2"}, + } + assert.NoError(t, samplingStore.InsertThroughput(throughputs)) + ret, _ := samplingStore.GetThroughput(start, start.Add(time.Second*time.Duration(1))) + assert.Equal(t, 2, len(ret)) + + for i := 0; i < 10; i++ { + in := []*model.Throughput{ + {Service: fmt.Sprint("svc-", i), Operation: fmt.Sprint("op-", i)}, + } + samplingStore.InsertThroughput(in) + } + assert.Equal(t, 5, len(samplingStore.throughputs)) + }) +} + +func TestGetThroughput(t *testing.T) { + withPopulatedSamplingStore(func(samplingStore *SamplingStore) { + start := time.Now() + ret, err := samplingStore.GetThroughput(start, start.Add(time.Second*time.Duration(1))) + assert.NoError(t, err) + assert.Equal(t, 1, len(ret)) + ret1, _ := samplingStore.GetThroughput(start, start) + assert.Equal(t, 0, len(ret1)) + ret2, _ := samplingStore.GetThroughput(start, start.Add(time.Hour*time.Duration(1))) + assert.Equal(t, 2, len(ret2)) + }) +} + +func TestInsertProbabilitiesAndQPS(t *testing.T) { + withMemorySamplingStore(func(samplingStore *SamplingStore) { + assert.NoError(t, samplingStore.InsertProbabilitiesAndQPS("dell11eg843d", model.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, model.ServiceOperationQPS{"new-srv": {"op": 4}})) + assert.NotEmpty(t, 1, samplingStore.probabilitiesAndQPS) + // Only latest one is kept in memory + assert.NoError(t, samplingStore.InsertProbabilitiesAndQPS("lncol73", model.ServiceOperationProbabilities{"my-app": {"hello": 0.3}}, model.ServiceOperationQPS{"new-srv": {"op": 7}})) + assert.Equal(t, 0.3, samplingStore.probabilitiesAndQPS.probabilities["my-app"]["hello"]) + }) +} + +func TestGetLatestProbability(t *testing.T) { + withMemorySamplingStore(func(samplingStore *SamplingStore) { + // No priod data + ret, err := samplingStore.GetLatestProbabilities() + assert.NoError(t, err) + assert.Empty(t, ret) + }) + + withPopulatedSamplingStore(func(samplingStore *SamplingStore) { + // With some pregenerated data + ret, err := samplingStore.GetLatestProbabilities() + assert.NoError(t, err) + assert.Equal(t, ret, model.ServiceOperationProbabilities{"svc-1": {"op-1": 0.01}}) + assert.NoError(t, samplingStore.InsertProbabilitiesAndQPS("utfhyolf", model.ServiceOperationProbabilities{"another-service": {"hello": 0.009}}, model.ServiceOperationQPS{"new-srv": {"op": 5}})) + ret, _ = samplingStore.GetLatestProbabilities() + assert.NotEqual(t, ret, model.ServiceOperationProbabilities{"svc-1": {"op-1": 0.01}}) + }) +} diff --git a/storage/factory.go b/storage/factory.go index 8c415aea48f..dacdafce344 100644 --- a/storage/factory.go +++ b/storage/factory.go @@ -55,7 +55,7 @@ type SamplingStoreFactory interface { // CreateLock creates a distributed lock. CreateLock() (distributedlock.Lock, error) // CreateSamplingStore creates a sampling store. - CreateSamplingStore() (samplingstore.Store, error) + CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) } var (