Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics Enricher API #1271

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
468688c
add metrics processor
hstan Oct 20, 2020
fc0b575
remove unused method
hstan Oct 20, 2020
4e641ac
move interface and add comments
hstan Oct 21, 2020
fe2c130
add comments for MetricsProcessor interface
hstan Oct 21, 2020
dbe97fe
Merge branch 'master' into add-metrics-processor
hstan Oct 21, 2020
b297e94
add changelog
hstan Oct 28, 2020
ee0f8fd
Merge branch 'master' of github.com:open-telemetry/opentelemetry-go i…
hstan Oct 28, 2020
21c833f
make the option api accept variadic signature for metrics processors
hstan Oct 29, 2020
7b11574
split changelog into two separate lines
hstan Oct 29, 2020
015599a
update `MetricsProcessor` documentation
hstan Oct 30, 2020
adf34f1
Merge branch 'master' of github.com:hstan/opentelemetry-go into add-m…
hstan Nov 11, 2020
e6b051e
use a function type instead of interface and rename it to a more mean…
hstan Nov 11, 2020
afa378d
update changelog
hstan Nov 11, 2020
8ea1616
add metrics labels enricher to pull controller
hstan Nov 11, 2020
f73d433
add test for pull with metrics labels enricher
hstan Nov 11, 2020
208a049
add test for push with metrics labels enricher
hstan Nov 11, 2020
4f91f17
sort import order
hstan Nov 11, 2020
4739df6
update changelog
hstan Nov 11, 2020
85faa2e
handle error returned from the enricher function
hstan Nov 11, 2020
b94b328
Merge branch 'master' of github.com:open-telemetry/opentelemetry-go i…
hstan Nov 13, 2020
b3caafa
clean up
hstan Dec 14, 2020
a22e258
Merge branch 'master' of github.com:open-telemetry/opentelemetry-go i…
hstan Dec 14, 2020
6dab82e
fix tests
hstan Dec 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `EventOption` and the related `NewEventConfig` function are added to the `go.opentelemetry.io/otel` package to configure Span events. (#1254)
- A `TextMapPropagator` and associated `TextMapCarrier` are added to the `go.opentelemetry.io/otel/oteltest` package to test TextMap type propagators and their use. (#1259)
- `SpanContextFromContext` returns `SpanContext` from context. (#1255)
- `MetricsLabelsEnricher` type is added to `go.opentelemetry.io/otel/sdk/metric` package. (#1271)
- `WithMetricsLabelsEnricher` config option is added to `go.opentelemetry.io/otel/sdk/push` package to allow providing a function to enrich metrics labels based on context. (#1271)

### Changed

Expand Down
38 changes: 38 additions & 0 deletions sdk/metric/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metric

// Config contains configuration for an SDK.
type Config struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I recently removed a similar struct from this package, arguing that Accumulators are not generally built from options, but from one of the controllers. That's why Resource is an explicit argument, and I would be just as happy if this were an third argument to the function, as opposed to a new option. This would be the first optional setting, and maybe we can introduce a Config struct an Option interface when there are more than one.

FWIW, I'm planning to consolidate the push and pull controllers, which would simplify this PR. See open-telemetry/opentelemetry-specification#1207.

// If provided, MetricsLabelsEnricher is executed each time a metric is recorded
// by the Accumulator's sync instrument implementation
MetricsLabelsEnricher MetricsLabelsEnricher
}

// Option is the interface that applies the value to a configuration option.
type Option interface {
// Apply sets the Option value of a Config.
Apply(*Config)
}

func WithMetricsLabelsEnricher(e MetricsLabelsEnricher) Option {
return metricsLabelsEnricherOption(e)
}

type metricsLabelsEnricherOption MetricsLabelsEnricher

func (e metricsLabelsEnricherOption) Apply(config *Config) {
config.MetricsLabelsEnricher = MetricsLabelsEnricher(e)
}
15 changes: 15 additions & 0 deletions sdk/metric/controller/pull/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package pull // import "go.opentelemetry.io/otel/sdk/metric/controller/pull"
import (
"time"

"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
)

Expand All @@ -33,6 +34,10 @@ type Config struct {
// If the period is zero, caching of the result is disabled.
// The default value is 10 seconds.
CachePeriod time.Duration

// MetricsLabelsEnricher is a function that enriches metrics labels based
// on kvs stored in context when metrics are recorded.
MetricsLabelsEnricher metric.MetricsLabelsEnricher
}

// Option is the interface that applies the value to a configuration option.
Expand Down Expand Up @@ -62,3 +67,13 @@ type cachePeriodOption time.Duration
func (o cachePeriodOption) Apply(config *Config) {
config.CachePeriod = time.Duration(o)
}

func WithMetricsLabelsEnricher(e metric.MetricsLabelsEnricher) Option {
return metricsLabelsEnricherOption(e)
}

type metricsLabelsEnricherOption metric.MetricsLabelsEnricher

func (e metricsLabelsEnricherOption) Apply(config *Config) {
config.MetricsLabelsEnricher = metric.MetricsLabelsEnricher(e)
}
1 change: 1 addition & 0 deletions sdk/metric/controller/pull/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func New(checkpointer export.Checkpointer, options ...Option) *Controller {
accum := sdk.NewAccumulator(
checkpointer,
config.Resource,
sdk.WithMetricsLabelsEnricher(config.MetricsLabelsEnricher),
)
return &Controller{
accumulator: accum,
Expand Down
32 changes: 32 additions & 0 deletions sdk/metric/controller/pull/pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,35 @@ func TestPullWithCache(t *testing.T) {
}, records.Map())

}

func TestPullWithMetricsLabelEnricher(t *testing.T) {
metricsLabelsEnricher := func(ctx context.Context, kvs []label.KeyValue) ([]label.KeyValue, error) {
baggage := otel.Baggage(ctx)
kvs = append(baggage.ToSlice(), kvs...)
return kvs, nil
}

puller := pull.New(
basic.New(
selector.NewWithExactDistribution(),
export.CumulativeExporter,
basic.WithMemory(true),
),
pull.WithCachePeriod(0),
pull.WithMetricsLabelsEnricher(metricsLabelsEnricher),
)

ctx := otel.ContextWithBaggageValues(context.Background(), label.String("A", "B"))
meter := puller.MeterProvider().Meter("withLabelEnricher")
counter := otel.Must(meter).NewInt64Counter("counter.sum")

counter.Add(ctx, 10)

require.NoError(t, puller.Collect(context.Background()))
records := processortest.NewOutput(label.DefaultEncoder())
require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord))

require.EqualValues(t, map[string]float64{
"counter.sum/A=B/": 10,
}, records.Map())
}
15 changes: 15 additions & 0 deletions sdk/metric/controller/push/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package push // import "go.opentelemetry.io/otel/sdk/metric/controller/push"
import (
"time"

"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
)

Expand All @@ -33,6 +34,10 @@ type Config struct {
// integrate, and export) can last before it is canceled. Defaults to
// the controller push period.
Timeout time.Duration

// MetricsLabelsEnricher is a function that enriches metrics labels based
// on kvs stored in context when metrics are recorded.
MetricsLabelsEnricher metric.MetricsLabelsEnricher
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Repeat: open-telemetry/opentelemetry-specification#1207 says we can and should consolidate push and pull controllers, to avoid this repetition.)

}

// Option is the interface that applies the value to a configuration option.
Expand Down Expand Up @@ -73,3 +78,13 @@ type timeoutOption time.Duration
func (o timeoutOption) Apply(config *Config) {
config.Timeout = time.Duration(o)
}

func WithMetricsLabelsEnricher(e metric.MetricsLabelsEnricher) Option {
return metricsLabelsEnricherOption(e)
}

type metricsLabelsEnricherOption metric.MetricsLabelsEnricher

func (e metricsLabelsEnricherOption) Apply(config *Config) {
config.MetricsLabelsEnricher = metric.MetricsLabelsEnricher(e)
}
1 change: 1 addition & 0 deletions sdk/metric/controller/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func New(checkpointer export.Checkpointer, exporter export.Exporter, opts ...Opt
impl := sdk.NewAccumulator(
hstan marked this conversation as resolved.
Show resolved Hide resolved
checkpointer,
c.Resource,
sdk.WithMetricsLabelsEnricher(c.MetricsLabelsEnricher),
)
return &Controller{
provider: registry.NewMeterProvider(impl),
Expand Down
36 changes: 36 additions & 0 deletions sdk/metric/controller/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,39 @@ func TestPushExportError(t *testing.T) {
})
}
}

func TestWithMetricsLabelsEnricher(t *testing.T) {
exporter := newExporter()
checkpointer := newCheckpointer()
metricsLabelsEnricher := func(ctx context.Context, kvs []label.KeyValue) ([]label.KeyValue, error) {
baggage := otel.Baggage(ctx)
kvs = append(baggage.ToSlice(), kvs...)
return kvs, nil
}
p := push.New(
checkpointer,
exporter,
push.WithPeriod(time.Second),
push.WithMetricsLabelsEnricher(metricsLabelsEnricher),
)
meter := p.MeterProvider().Meter("name")

mock := controllertest.NewMockClock()
p.SetClock(mock)

counter := otel.Must(meter).NewInt64Counter("counter.sum")

p.Start()

ctx := otel.ContextWithBaggageValues(context.Background(), label.String("A", "B"))
counter.Add(ctx, 1)

require.EqualValues(t, map[string]float64{}, exporter.Values())

mock.Add(time.Second)
runtime.Gosched()

require.EqualValues(t, map[string]float64{
"counter.sum/A=B/": 1,
}, exporter.Values())
}
24 changes: 20 additions & 4 deletions sdk/metric/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type (

// resource is applied to all records in this Accumulator.
resource *resource.Resource

// metricsLabelsEnricher is applied to all records in this Accumulator
metricsLabelsEnricher MetricsLabelsEnricher
}

syncInstrument struct {
Expand Down Expand Up @@ -139,6 +142,10 @@ type (
labels *label.Set
observed export.Aggregator
}

// MetricsLabelsEnricher can be provided as a config option to enrich metrics labels based on
// the context when the metrics are recorded
MetricsLabelsEnricher func(context.Context, []label.KeyValue) ([]label.KeyValue, error)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MrAlias Does this make sense?

jmacd marked this conversation as resolved.
Show resolved Hide resolved
)

var (
Expand Down Expand Up @@ -291,6 +298,9 @@ func (s *syncInstrument) Bind(kvs []label.KeyValue) api.BoundSyncImpl {
}

func (s *syncInstrument) RecordOne(ctx context.Context, number api.Number, kvs []label.KeyValue) {
if s.meter.metricsLabelsEnricher != nil {
kvs, _ = s.meter.metricsLabelsEnricher(ctx, kvs)
}
h := s.acquireHandle(kvs, nil)
defer h.Unbind()
h.RecordOne(ctx, number)
Expand All @@ -305,11 +315,17 @@ func (s *syncInstrument) RecordOne(ctx context.Context, number api.Number, kvs [
// processor will call Collect() when it receives a request to scrape
// current metric values. A push-based processor should configure its
// own periodic collection.
func NewAccumulator(processor export.Processor, resource *resource.Resource) *Accumulator {
func NewAccumulator(processor export.Processor, resource *resource.Resource, opts ...Option) *Accumulator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the comment above. I'd be happier to leave this a three-argument function until we have more than one optional setting.

c := &Config{}
for _, opt := range opts {
opt.Apply(c)
}

return &Accumulator{
processor: processor,
asyncInstruments: internal.NewAsyncInstrumentState(),
resource: resource,
processor: processor,
asyncInstruments: internal.NewAsyncInstrumentState(),
resource: resource,
metricsLabelsEnricher: c.MetricsLabelsEnricher,
}
}

Expand Down