Skip to content

Commit

Permalink
Instrument MetricsReader with metrics (jaegertracing#3667)
Browse files Browse the repository at this point in the history
* Add metrics instrumentation for SPM (MetricsReader)

Signed-off-by: Albert Teoh <see.kwang.teoh@gmail.com>

* Add metrics for all-in-one and new prometheus reader

Signed-off-by: Albert Teoh <see.kwang.teoh@gmail.com>

* Remove new prometheus metrics instrumentation

Signed-off-by: Albert Teoh <see.kwang.teoh@gmail.com>

* Add license

Signed-off-by: Albert Teoh <see.kwang.teoh@gmail.com>

* Revert unused metrics for prometheus factory

Signed-off-by: Albert Teoh <see.kwang.teoh@gmail.com>

* Long params to multiline and inline return

Signed-off-by: Albert Teoh <see.kwang.teoh@gmail.com>

* make fmt

Signed-off-by: Albert Teoh <see.kwang.teoh@gmail.com>

* Group logger and metrics next to each other

Signed-off-by: Albert Teoh <see.kwang.teoh@gmail.com>

* Remove copied comment

Signed-off-by: Albert Teoh <see.kwang.teoh@gmail.com>

* Remove responses metric

Signed-off-by: Albert Teoh <see.kwang.teoh@gmail.com>
  • Loading branch information
albertteoh committed Jul 13, 2022
1 parent 58b480d commit 79d79e6
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 12 deletions.
23 changes: 18 additions & 5 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/dependencystore"
metricsstoreMetrics "github.com/jaegertracing/jaeger/storage/metricsstore/metrics"
"github.com/jaegertracing/jaeger/storage/spanstore"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)
Expand Down Expand Up @@ -120,7 +121,7 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to create dependency reader", zap.Error(err))
}

metricsQueryService, err := createMetricsQueryService(metricsReaderFactory, v, logger)
metricsQueryService, err := createMetricsQueryService(metricsReaderFactory, v, logger, metricsFactory)
if err != nil {
logger.Fatal("Failed to create metrics reader", zap.Error(err))
}
Expand Down Expand Up @@ -307,12 +308,24 @@ func initTracer(metricsFactory metrics.Factory, logger *zap.Logger) io.Closer {
return closer
}

func createMetricsQueryService(factory *metricsPlugin.Factory, v *viper.Viper, logger *zap.Logger) (querysvc.MetricsQueryService, error) {
if err := factory.Initialize(logger); err != nil {
func createMetricsQueryService(
metricsReaderFactory *metricsPlugin.Factory,
v *viper.Viper,
logger *zap.Logger,
metricsReaderMetricsFactory metrics.Factory,
) (querysvc.MetricsQueryService, error) {

if err := metricsReaderFactory.Initialize(logger); err != nil {
return nil, fmt.Errorf("failed to init metrics reader factory: %w", err)
}

// Ensure default parameter values are loaded correctly.
factory.InitFromViper(v, logger)
return factory.CreateMetricsReader()
metricsReaderFactory.InitFromViper(v, logger)
reader, err := metricsReaderFactory.CreateMetricsReader()
if err != nil {
return nil, fmt.Errorf("failed to create metrics reader: %w", err)
}

// Decorate the metrics reader with metrics instrumentation.
return metricsstoreMetrics.NewReadMetricsDecorator(reader, metricsReaderMetricsFactory), nil
}
27 changes: 20 additions & 7 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ import (
metricsPlugin "github.com/jaegertracing/jaeger/plugin/metrics"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
metricsstoreMetrics "github.com/jaegertracing/jaeger/storage/metricsstore/metrics"
spanstoreMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)

func main() {
Expand Down Expand Up @@ -107,13 +108,13 @@ func main() {
if err != nil {
logger.Fatal("Failed to create span reader", zap.Error(err))
}
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, metricsFactory)
spanReader = spanstoreMetrics.NewReadMetricsDecorator(spanReader, metricsFactory)
dependencyReader, err := storageFactory.CreateDependencyReader()
if err != nil {
logger.Fatal("Failed to create dependency reader", zap.Error(err))
}

metricsQueryService, err := createMetricsQueryService(metricsReaderFactory, v, logger)
metricsQueryService, err := createMetricsQueryService(metricsReaderFactory, v, logger, metricsFactory)
if err != nil {
logger.Fatal("Failed to create metrics query service", zap.Error(err))
}
Expand Down Expand Up @@ -167,12 +168,24 @@ func main() {
}
}

func createMetricsQueryService(factory *metricsPlugin.Factory, v *viper.Viper, logger *zap.Logger) (querysvc.MetricsQueryService, error) {
if err := factory.Initialize(logger); err != nil {
func createMetricsQueryService(
metricsReaderFactory *metricsPlugin.Factory,
v *viper.Viper,
logger *zap.Logger,
metricsReaderMetricsFactory metrics.Factory,
) (querysvc.MetricsQueryService, error) {

if err := metricsReaderFactory.Initialize(logger); err != nil {
return nil, fmt.Errorf("failed to init metrics reader factory: %w", err)
}

// Ensure default parameter values are loaded correctly.
factory.InitFromViper(v, logger)
return factory.CreateMetricsReader()
metricsReaderFactory.InitFromViper(v, logger)
reader, err := metricsReaderFactory.CreateMetricsReader()
if err != nil {
return nil, fmt.Errorf("failed to create metrics reader: %w", err)
}

// Decorate the metrics reader with metrics instrumentation.
return metricsstoreMetrics.NewReadMetricsDecorator(reader, metricsReaderMetricsFactory), nil
}
101 changes: 101 additions & 0 deletions storage/metricsstore/metrics/decorator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (c) 2022 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"context"
"time"

"github.com/uber/jaeger-lib/metrics"

protometrics "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
"github.com/jaegertracing/jaeger/storage/metricsstore"
)

// ReadMetricsDecorator wraps a metricsstore.Reader and collects metrics around each read operation.
type ReadMetricsDecorator struct {
reader metricsstore.Reader
getLatenciesMetrics *queryMetrics
getCallRatesMetrics *queryMetrics
getErrorRatesMetrics *queryMetrics
getMinStepDurationMetrics *queryMetrics
}

type queryMetrics struct {
Errors metrics.Counter `metric:"requests" tags:"result=err"`
Successes metrics.Counter `metric:"requests" tags:"result=ok"`
ErrLatency metrics.Timer `metric:"latency" tags:"result=err"`
OKLatency metrics.Timer `metric:"latency" tags:"result=ok"`
}

func (q *queryMetrics) emit(err error, latency time.Duration) {
if err != nil {
q.Errors.Inc(1)
q.ErrLatency.Record(latency)
} else {
q.Successes.Inc(1)
q.OKLatency.Record(latency)
}
}

// NewReadMetricsDecorator returns a new ReadMetricsDecorator.
func NewReadMetricsDecorator(reader metricsstore.Reader, metricsFactory metrics.Factory) *ReadMetricsDecorator {
return &ReadMetricsDecorator{
reader: reader,
getLatenciesMetrics: buildQueryMetrics("get_latencies", metricsFactory),
getCallRatesMetrics: buildQueryMetrics("get_call_rates", metricsFactory),
getErrorRatesMetrics: buildQueryMetrics("get_error_rates", metricsFactory),
getMinStepDurationMetrics: buildQueryMetrics("get_min_step_duration", metricsFactory),
}
}

func buildQueryMetrics(operation string, metricsFactory metrics.Factory) *queryMetrics {
qMetrics := &queryMetrics{}
scoped := metricsFactory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"operation": operation}})
metrics.Init(qMetrics, scoped, nil)
return qMetrics
}

// GetLatencies implements metricsstore.Reader#GetLatencies
func (m *ReadMetricsDecorator) GetLatencies(ctx context.Context, params *metricsstore.LatenciesQueryParameters) (*protometrics.MetricFamily, error) {
start := time.Now()
retMe, err := m.reader.GetLatencies(ctx, params)
m.getLatenciesMetrics.emit(err, time.Since(start))
return retMe, err
}

// GetCallRates implements metricsstore.Reader#GetCallRates
func (m *ReadMetricsDecorator) GetCallRates(ctx context.Context, params *metricsstore.CallRateQueryParameters) (*protometrics.MetricFamily, error) {
start := time.Now()
retMe, err := m.reader.GetCallRates(ctx, params)
m.getCallRatesMetrics.emit(err, time.Since(start))
return retMe, err
}

// GetErrorRates implements metricsstore.Reader#GetErrorRates
func (m *ReadMetricsDecorator) GetErrorRates(ctx context.Context, params *metricsstore.ErrorRateQueryParameters) (*protometrics.MetricFamily, error) {
start := time.Now()
retMe, err := m.reader.GetErrorRates(ctx, params)
m.getErrorRatesMetrics.emit(err, time.Since(start))
return retMe, err
}

// GetMinStepDuration implements metricsstore.Reader#GetMinStepDuration
func (m *ReadMetricsDecorator) GetMinStepDuration(ctx context.Context, params *metricsstore.MinStepDurationQueryParameters) (time.Duration, error) {
start := time.Now()
retMe, err := m.reader.GetMinStepDuration(ctx, params)
m.getMinStepDurationMetrics.emit(err, time.Since(start))
return retMe, err
}
153 changes: 153 additions & 0 deletions storage/metricsstore/metrics/decorator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright (c) 2022 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics_test

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/uber/jaeger-lib/metrics/metricstest"

protometrics "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
"github.com/jaegertracing/jaeger/storage/metricsstore"
. "github.com/jaegertracing/jaeger/storage/metricsstore/metrics"
"github.com/jaegertracing/jaeger/storage/metricsstore/mocks"
)

func TestSuccessfulUnderlyingCalls(t *testing.T) {
mf := metricstest.NewFactory(0)

mockReader := mocks.Reader{}
mrs := NewReadMetricsDecorator(&mockReader, mf)
glParams := &metricsstore.LatenciesQueryParameters{}
mockReader.On("GetLatencies", context.Background(), glParams).
Return(&protometrics.MetricFamily{}, nil)
mrs.GetLatencies(context.Background(), glParams)

gcrParams := &metricsstore.CallRateQueryParameters{}
mockReader.On("GetCallRates", context.Background(), gcrParams).
Return(&protometrics.MetricFamily{}, nil)
mrs.GetCallRates(context.Background(), gcrParams)

gerParams := &metricsstore.ErrorRateQueryParameters{}
mockReader.On("GetErrorRates", context.Background(), gerParams).
Return(&protometrics.MetricFamily{}, nil)
mrs.GetErrorRates(context.Background(), gerParams)

msdParams := &metricsstore.MinStepDurationQueryParameters{}
mockReader.On("GetMinStepDuration", context.Background(), msdParams).
Return(time.Second, nil)
mrs.GetMinStepDuration(context.Background(), msdParams)

counters, gauges := mf.Snapshot()
wantCounts := map[string]int64{
"requests|operation=get_latencies|result=ok": 1,
"requests|operation=get_latencies|result=err": 0,
"requests|operation=get_call_rates|result=ok": 1,
"requests|operation=get_call_rates|result=err": 0,
"requests|operation=get_error_rates|result=ok": 1,
"requests|operation=get_error_rates|result=err": 0,
"requests|operation=get_min_step_duration|result=ok": 1,
"requests|operation=get_min_step_duration|result=err": 0,
}

// This is not exhaustive.
wantExistingKeys := []string{
"latency|operation=get_latencies|result=ok.P50",
"latency|operation=get_error_rates|result=ok.P50",
}

// This is not exhaustive.
wantNonExistentKeys := []string{
"latency|operation=get_latencies|result=err.P50",
}

checkExpectedExistingAndNonExistentCounters(t, counters, wantCounts, gauges, wantExistingKeys, wantNonExistentKeys)
}

func checkExpectedExistingAndNonExistentCounters(t *testing.T,
actualCounters,
expectedCounters,
actualGauges map[string]int64,
existingKeys,
nonExistentKeys []string) {
for k, v := range expectedCounters {
assert.EqualValues(t, v, actualCounters[k], k)
}

for _, k := range existingKeys {
_, ok := actualGauges[k]
assert.True(t, ok)
}

for _, k := range nonExistentKeys {
_, ok := actualGauges[k]
assert.False(t, ok)
}
}

func TestFailingUnderlyingCalls(t *testing.T) {
mf := metricstest.NewFactory(0)

mockReader := mocks.Reader{}
mrs := NewReadMetricsDecorator(&mockReader, mf)
glParams := &metricsstore.LatenciesQueryParameters{}
mockReader.On("GetLatencies", context.Background(), glParams).
Return(&protometrics.MetricFamily{}, errors.New("failure"))
mrs.GetLatencies(context.Background(), glParams)

gcrParams := &metricsstore.CallRateQueryParameters{}
mockReader.On("GetCallRates", context.Background(), gcrParams).
Return(&protometrics.MetricFamily{}, errors.New("failure"))
mrs.GetCallRates(context.Background(), gcrParams)

gerParams := &metricsstore.ErrorRateQueryParameters{}
mockReader.On("GetErrorRates", context.Background(), gerParams).
Return(&protometrics.MetricFamily{}, errors.New("failure"))
mrs.GetErrorRates(context.Background(), gerParams)

msdParams := &metricsstore.MinStepDurationQueryParameters{}
mockReader.On("GetMinStepDuration", context.Background(), msdParams).
Return(time.Second, errors.New("failure"))
mrs.GetMinStepDuration(context.Background(), msdParams)

counters, gauges := mf.Snapshot()
wantCounts := map[string]int64{
"requests|operation=get_latencies|result=ok": 0,
"requests|operation=get_latencies|result=err": 1,
"requests|operation=get_call_rates|result=ok": 0,
"requests|operation=get_call_rates|result=err": 1,
"requests|operation=get_error_rates|result=ok": 0,
"requests|operation=get_error_rates|result=err": 1,
"requests|operation=get_min_step_duration|result=ok": 0,
"requests|operation=get_min_step_duration|result=err": 1,
}

// This is not exhaustive.
wantExistingKeys := []string{
"latency|operation=get_latencies|result=err.P50",
}

// This is not exhaustive.
wantNonExistentKeys := []string{
"latency|operation=get_latencies|result=ok.P50",
"latency|operation=get_error_rates|result=ok.P50",
}

checkExpectedExistingAndNonExistentCounters(t, counters, wantCounts, gauges, wantExistingKeys, wantNonExistentKeys)
}

0 comments on commit 79d79e6

Please sign in to comment.