From e8546e3bc520b3cf20934cd92fc013d75bf87565 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Fri, 27 Mar 2020 14:06:48 -0700 Subject: [PATCH] Remove Labelset (#595) * Remove LabelSet frmo api/metric * SDK tests pass * Restore benchmarks * All tests pass * Remove all mentions of LabelSet * Test RecordBatch * Batch test * Improves benchmark (some) * Move the benchmark to match HEAD * Align labels for GOARCH=386 * Add alignment test * Disable the stress test fo GOARCH=386 * Fix bug * Move atomic fields into their own file * Add a TODO * Comments * Remove metric.Labels(...) * FTB Co-authored-by: Liz Fong-Jones --- api/global/internal/benchmark_test.go | 9 +- api/global/internal/meter.go | 52 +---- api/global/internal/meter_test.go | 84 ++++---- api/metric/api.go | 11 +- api/metric/api_test.go | 62 +++--- api/metric/common.go | 4 +- api/metric/counter.go | 8 +- api/metric/doc.go | 6 +- api/metric/measure.go | 8 +- api/metric/noop.go | 12 +- api/metric/observer.go | 6 +- api/metric/registry/registry.go | 9 +- api/metric/sdkhelpers.go | 41 ++-- example/basic/main.go | 6 +- example/prometheus/main.go | 17 +- exporters/metric/prometheus/prometheus.go | 6 +- exporters/metric/stdout/example_test.go | 5 +- exporters/metric/test/test.go | 4 +- exporters/otlp/otlp_test.go | 15 +- internal/metric/mock.go | 63 ++---- sdk/metric/alignment_test.go | 19 +- sdk/metric/atomicfields.go | 25 +++ sdk/metric/batcher/defaultkeys/defaultkeys.go | 2 +- sdk/metric/benchmark_test.go | 115 +++++----- sdk/metric/controller/push/push_test.go | 8 +- sdk/metric/correct_test.go | 69 ++++-- sdk/metric/example_test.go | 3 +- sdk/metric/sdk.go | 199 ++++++++++-------- sdk/metric/stress_test.go | 43 ++-- 29 files changed, 435 insertions(+), 476 deletions(-) create mode 100644 sdk/metric/atomicfields.go diff --git a/api/global/internal/benchmark_test.go b/api/global/internal/benchmark_test.go index 93935eceb0c..92a431facdd 100644 --- a/api/global/internal/benchmark_test.go +++ b/api/global/internal/benchmark_test.go @@ -19,6 +19,7 @@ import ( "strings" "testing" + "go.opentelemetry.io/otel/api/core" "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/global/internal" "go.opentelemetry.io/otel/api/key" @@ -90,13 +91,13 @@ func BenchmarkGlobalInt64CounterAddNoSDK(b *testing.B) { internal.ResetForTest() ctx := context.Background() sdk := global.Meter("test") - labs := sdk.Labels(key.String("A", "B")) + labs := []core.KeyValue{key.String("A", "B")} cnt := Must(sdk).NewInt64Counter("int64.counter") b.ResetTimer() for i := 0; i < b.N; i++ { - cnt.Add(ctx, 1, labs) + cnt.Add(ctx, 1, labs...) } } @@ -109,13 +110,13 @@ func BenchmarkGlobalInt64CounterAddWithSDK(b *testing.B) { global.SetMeterProvider(fix) - labs := sdk.Labels(key.String("A", "B")) + labs := []core.KeyValue{key.String("A", "B")} cnt := Must(sdk).NewInt64Counter("int64.counter") b.ResetTimer() for i := 0; i < b.N; i++ { - cnt.Add(ctx, 1, labs) + cnt.Add(ctx, 1, labs...) } } diff --git a/api/global/internal/meter.go b/api/global/internal/meter.go index d432d7be672..d15e2b378fe 100644 --- a/api/global/internal/meter.go +++ b/api/global/internal/meter.go @@ -39,9 +39,6 @@ import ( // provider. Mutexes in the Provider and Meters ensure that each // instrument has a delegate before the global provider is set. // -// LabelSets are implemented by delegating to the Meter instance using -// the metric.LabelSetDelegator interface. -// // Bound instrument operations are implemented by delegating to the // instrument after it is registered, with a sync.Once initializer to // protect against races with Release(). @@ -100,28 +97,17 @@ type AsyncImpler interface { AsyncImpl() metric.AsyncImpl } -type labelSet struct { - delegate unsafe.Pointer // (* metric.LabelSet) - - meter *meter - value []core.KeyValue - - initialize sync.Once -} - type syncHandle struct { delegate unsafe.Pointer // (*metric.HandleImpl) inst *syncImpl - labels metric.LabelSet + labels []core.KeyValue initialize sync.Once } var _ metric.Provider = &meterProvider{} var _ metric.Meter = &meter{} -var _ metric.LabelSet = &labelSet{} -var _ metric.LabelSetDelegate = &labelSet{} var _ metric.InstrumentImpl = &syncImpl{} var _ metric.BoundSyncImpl = &syncHandle{} var _ metric.AsyncImpl = &asyncImpl{} @@ -254,7 +240,7 @@ func (inst *syncImpl) Implementation() interface{} { return inst } -func (inst *syncImpl) Bind(labels metric.LabelSet) metric.BoundSyncImpl { +func (inst *syncImpl) Bind(labels []core.KeyValue) metric.BoundSyncImpl { if implPtr := (*metric.SyncImpl)(atomic.LoadPointer(&inst.delegate)); implPtr != nil { return (*implPtr).Bind(labels) } @@ -340,13 +326,13 @@ func (obs *asyncImpl) setDelegate(d metric.Meter) { // Metric updates -func (m *meter) RecordBatch(ctx context.Context, labels metric.LabelSet, measurements ...metric.Measurement) { +func (m *meter) RecordBatch(ctx context.Context, labels []core.KeyValue, measurements ...metric.Measurement) { if delegatePtr := (*metric.Meter)(atomic.LoadPointer(&m.delegate)); delegatePtr != nil { (*delegatePtr).RecordBatch(ctx, labels, measurements...) } } -func (inst *syncImpl) RecordOne(ctx context.Context, number core.Number, labels metric.LabelSet) { +func (inst *syncImpl) RecordOne(ctx context.Context, number core.Number, labels []core.KeyValue) { if instPtr := (*metric.SyncImpl)(atomic.LoadPointer(&inst.delegate)); instPtr != nil { (*instPtr).RecordOne(ctx, number, labels) } @@ -377,35 +363,6 @@ func (bound *syncHandle) RecordOne(ctx context.Context, number core.Number) { (*implPtr).RecordOne(ctx, number) } -// LabelSet initialization - -func (m *meter) Labels(labels ...core.KeyValue) metric.LabelSet { - return &labelSet{ - meter: m, - value: labels, - } -} - -func (labels *labelSet) Delegate() metric.LabelSet { - meterPtr := (*metric.Meter)(atomic.LoadPointer(&labels.meter.delegate)) - if meterPtr == nil { - // This is technically impossible, provided the global - // Meter is updated after the meters and instruments - // have been delegated. - return labels - } - var implPtr *metric.LabelSet - labels.initialize.Do(func() { - implPtr = new(metric.LabelSet) - *implPtr = (*meterPtr).Labels(labels.value...) - atomic.StorePointer(&labels.delegate, unsafe.Pointer(implPtr)) - }) - if implPtr == nil { - implPtr = (*metric.LabelSet)(atomic.LoadPointer(&labels.delegate)) - } - return (*implPtr) -} - // Constructors func (m *meter) withName(opts []metric.Option) []metric.Option { @@ -466,7 +423,6 @@ func AtomicFieldOffsets() map[string]uintptr { "meter.delegate": unsafe.Offsetof(meter{}.delegate), "syncImpl.delegate": unsafe.Offsetof(syncImpl{}.delegate), "asyncImpl.delegate": unsafe.Offsetof(asyncImpl{}.delegate), - "labelSet.delegate": unsafe.Offsetof(labelSet{}.delegate), "syncHandle.delegate": unsafe.Offsetof(syncHandle{}.delegate), } } diff --git a/api/global/internal/meter_test.go b/api/global/internal/meter_test.go index caff1d8c24f..b5acbce6f63 100644 --- a/api/global/internal/meter_test.go +++ b/api/global/internal/meter_test.go @@ -47,7 +47,7 @@ func asStructs(batches []metrictest.Batch) []measured { r = append(r, measured{ Name: m.Instrument.Descriptor().Name(), LibraryName: m.Instrument.Descriptor().LibraryName(), - Labels: batch.LabelSet.Labels, + Labels: asMap(batch.Labels...), Number: m.Number, }) } @@ -72,41 +72,38 @@ func TestDirect(t *testing.T) { ctx := context.Background() meter1 := global.Meter("test1") meter2 := global.Meter("test2") - lvals1 := key.String("A", "B") - labels1 := meter1.Labels(lvals1) - lvals2 := key.String("C", "D") - labels2 := meter1.Labels(lvals2) - lvals3 := key.String("E", "F") - labels3 := meter2.Labels(lvals3) + labels1 := []core.KeyValue{key.String("A", "B")} + labels2 := []core.KeyValue{key.String("C", "D")} + labels3 := []core.KeyValue{key.String("E", "F")} counter := Must(meter1).NewInt64Counter("test.counter") - counter.Add(ctx, 1, labels1) - counter.Add(ctx, 1, labels1) + counter.Add(ctx, 1, labels1...) + counter.Add(ctx, 1, labels1...) measure := Must(meter1).NewFloat64Measure("test.measure") - measure.Record(ctx, 1, labels1) - measure.Record(ctx, 2, labels1) + measure.Record(ctx, 1, labels1...) + measure.Record(ctx, 2, labels1...) _ = Must(meter1).RegisterFloat64Observer("test.observer.float", func(result metric.Float64ObserverResult) { - result.Observe(1., labels1) - result.Observe(2., labels2) + result.Observe(1., labels1...) + result.Observe(2., labels2...) }) _ = Must(meter1).RegisterInt64Observer("test.observer.int", func(result metric.Int64ObserverResult) { - result.Observe(1, labels1) - result.Observe(2, labels2) + result.Observe(1, labels1...) + result.Observe(2, labels2...) }) second := Must(meter2).NewFloat64Measure("test.second") - second.Record(ctx, 1, labels3) - second.Record(ctx, 2, labels3) + second.Record(ctx, 1, labels3...) + second.Record(ctx, 2, labels3...) mock, provider := metrictest.NewProvider() global.SetMeterProvider(provider) - counter.Add(ctx, 1, labels1) - measure.Record(ctx, 3, labels1) - second.Record(ctx, 3, labels3) + counter.Add(ctx, 1, labels1...) + measure.Record(ctx, 3, labels1...) + second.Record(ctx, 3, labels3...) mock.RunAsyncInstruments() @@ -117,43 +114,43 @@ func TestDirect(t *testing.T) { { Name: "test.counter", LibraryName: "test1", - Labels: asMap(lvals1), + Labels: asMap(labels1...), Number: asInt(1), }, { Name: "test.measure", LibraryName: "test1", - Labels: asMap(lvals1), + Labels: asMap(labels1...), Number: asFloat(3), }, { Name: "test.second", LibraryName: "test2", - Labels: asMap(lvals3), + Labels: asMap(labels3...), Number: asFloat(3), }, { Name: "test.observer.float", LibraryName: "test1", - Labels: asMap(lvals1), + Labels: asMap(labels1...), Number: asFloat(1), }, { Name: "test.observer.float", LibraryName: "test1", - Labels: asMap(lvals2), + Labels: asMap(labels2...), Number: asFloat(2), }, { Name: "test.observer.int", LibraryName: "test1", - Labels: asMap(lvals1), + Labels: asMap(labels1...), Number: asInt(1), }, { Name: "test.observer.int", LibraryName: "test1", - Labels: asMap(lvals2), + Labels: asMap(labels2...), Number: asInt(2), }, }, @@ -168,16 +165,15 @@ func TestBound(t *testing.T) { // vs. the above, to cover all the instruments. ctx := context.Background() glob := global.Meter("test") - lvals1 := key.String("A", "B") - labels1 := glob.Labels(lvals1) + labels1 := []core.KeyValue{key.String("A", "B")} counter := Must(glob).NewFloat64Counter("test.counter") - boundC := counter.Bind(labels1) + boundC := counter.Bind(labels1...) boundC.Add(ctx, 1) boundC.Add(ctx, 1) measure := Must(glob).NewInt64Measure("test.measure") - boundM := measure.Bind(labels1) + boundM := measure.Bind(labels1...) boundM.Record(ctx, 1) boundM.Record(ctx, 2) @@ -192,13 +188,13 @@ func TestBound(t *testing.T) { { Name: "test.counter", LibraryName: "test", - Labels: asMap(lvals1), + Labels: asMap(labels1...), Number: asFloat(1), }, { Name: "test.measure", LibraryName: "test", - Labels: asMap(lvals1), + Labels: asMap(labels1...), Number: asInt(3), }, }, @@ -213,14 +209,13 @@ func TestUnbind(t *testing.T) { internal.ResetForTest() glob := global.Meter("test") - lvals1 := key.New("A").String("B") - labels1 := glob.Labels(lvals1) + labels1 := []core.KeyValue{key.String("A", "B")} counter := Must(glob).NewFloat64Counter("test.counter") - boundC := counter.Bind(labels1) + boundC := counter.Bind(labels1...) measure := Must(glob).NewInt64Measure("test.measure") - boundM := measure.Bind(labels1) + boundM := measure.Bind(labels1...) boundC.Unbind() boundM.Unbind() @@ -231,12 +226,11 @@ func TestDefaultSDK(t *testing.T) { ctx := context.Background() meter1 := global.Meter("builtin") - lvals1 := key.String("A", "B") - labels1 := meter1.Labels(lvals1) + labels1 := []core.KeyValue{key.String("A", "B")} counter := Must(meter1).NewInt64Counter("test.builtin") - counter.Add(ctx, 1, labels1) - counter.Add(ctx, 1, labels1) + counter.Add(ctx, 1, labels1...) + counter.Add(ctx, 1, labels1...) in, out := io.Pipe() pusher, err := stdout.InstallNewPipeline(stdout.Config{ @@ -247,7 +241,7 @@ func TestDefaultSDK(t *testing.T) { panic(err) } - counter.Add(ctx, 1, labels1) + counter.Add(ctx, 1, labels1...) ch := make(chan string) go func() { @@ -270,7 +264,7 @@ func TestUnbindThenRecordOne(t *testing.T) { meter := global.Meter("test") counter := Must(meter).NewInt64Counter("test.counter") - boundC := counter.Bind(meter.Labels()) + boundC := counter.Bind() global.SetMeterProvider(provider) boundC.Unbind() @@ -312,8 +306,8 @@ func TestErrorInDeferredConstructor(t *testing.T) { global.SetMeterProvider(sdk) }) - c1.Add(ctx, 1, meter.Labels()) - c2.Add(ctx, 2, meter.Labels()) + c1.Add(ctx, 1) + c2.Add(ctx, 2) } func TestImplementationIndirection(t *testing.T) { diff --git a/api/metric/api.go b/api/metric/api.go index 0cbef205fa4..9bc68191097 100644 --- a/api/metric/api.go +++ b/api/metric/api.go @@ -31,11 +31,6 @@ type Provider interface { Meter(name string) Meter } -// LabelSet is an implementation-level interface that represents a -// []core.KeyValue for use as pre-defined labels in the metrics API. -type LabelSet interface { -} - // Config contains some options for metrics of any kind. type Config struct { // Description is an optional field describing the metric @@ -161,12 +156,8 @@ func (d Descriptor) LibraryName() string { // Meter is an interface to the metrics portion of the OpenTelemetry SDK. type Meter interface { - // Labels returns a reference to a set of labels that cannot - // be read by the application. - Labels(...core.KeyValue) LabelSet - // RecordBatch atomically records a batch of measurements. - RecordBatch(context.Context, LabelSet, ...Measurement) + RecordBatch(context.Context, []core.KeyValue, ...Measurement) // All instrument constructors may return an error for // conditions such as: diff --git a/api/metric/api_test.go b/api/metric/api_test.go index 5961ebc92ea..5be4c3c834e 100644 --- a/api/metric/api_test.go +++ b/api/metric/api_test.go @@ -139,9 +139,9 @@ func TestCounter(t *testing.T) { mockSDK, meter := mockTest.NewMeter() c := Must(meter).NewFloat64Counter("test.counter.float") ctx := context.Background() - labels := meter.Labels() - c.Add(ctx, 42, labels) - boundInstrument := c.Bind(labels) + labels := []core.KeyValue{key.String("A", "B")} + c.Add(ctx, 42, labels...) + boundInstrument := c.Bind(labels...) boundInstrument.Add(ctx, 42) meter.RecordBatch(ctx, labels, c.Measurement(42)) t.Log("Testing float counter") @@ -151,9 +151,9 @@ func TestCounter(t *testing.T) { mockSDK, meter := mockTest.NewMeter() c := Must(meter).NewInt64Counter("test.counter.int") ctx := context.Background() - labels := meter.Labels() - c.Add(ctx, 42, labels) - boundInstrument := c.Bind(labels) + labels := []core.KeyValue{key.String("A", "B"), key.String("C", "D")} + c.Add(ctx, 42, labels...) + boundInstrument := c.Bind(labels...) boundInstrument.Add(ctx, 42) meter.RecordBatch(ctx, labels, c.Measurement(42)) t.Log("Testing int counter") @@ -166,9 +166,9 @@ func TestMeasure(t *testing.T) { mockSDK, meter := mockTest.NewMeter() m := Must(meter).NewFloat64Measure("test.measure.float") ctx := context.Background() - labels := meter.Labels() - m.Record(ctx, 42, labels) - boundInstrument := m.Bind(labels) + labels := []core.KeyValue{} + m.Record(ctx, 42, labels...) + boundInstrument := m.Bind(labels...) boundInstrument.Record(ctx, 42) meter.RecordBatch(ctx, labels, m.Measurement(42)) t.Log("Testing float measure") @@ -178,9 +178,9 @@ func TestMeasure(t *testing.T) { mockSDK, meter := mockTest.NewMeter() m := Must(meter).NewInt64Measure("test.measure.int") ctx := context.Background() - labels := meter.Labels() - m.Record(ctx, 42, labels) - boundInstrument := m.Bind(labels) + labels := []core.KeyValue{key.Int("I", 1)} + m.Record(ctx, 42, labels...) + boundInstrument := m.Bind(labels...) boundInstrument.Record(ctx, 42) meter.RecordBatch(ctx, labels, m.Measurement(42)) t.Log("Testing int measure") @@ -190,10 +190,10 @@ func TestMeasure(t *testing.T) { func TestObserver(t *testing.T) { { + labels := []core.KeyValue{key.String("O", "P")} mockSDK, meter := mockTest.NewMeter() - labels := meter.Labels() o := Must(meter).RegisterFloat64Observer("test.observer.float", func(result metric.Float64ObserverResult) { - result.Observe(42, labels) + result.Observe(42, labels...) }) t.Log("Testing float observer") @@ -201,10 +201,10 @@ func TestObserver(t *testing.T) { checkObserverBatch(t, labels, mockSDK, core.Float64NumberKind, o.AsyncImpl()) } { + labels := []core.KeyValue{} mockSDK, meter := mockTest.NewMeter() - labels := meter.Labels() o := Must(meter).RegisterInt64Observer("test.observer.int", func(result metric.Int64ObserverResult) { - result.Observe(42, labels) + result.Observe(42, labels...) }) t.Log("Testing int observer") mockSDK.RunAsyncInstruments() @@ -212,30 +212,21 @@ func TestObserver(t *testing.T) { } } -func checkBatches(t *testing.T, ctx context.Context, labels metric.LabelSet, mock *mockTest.MeterImpl, kind core.NumberKind, instrument metric.InstrumentImpl) { +func checkBatches(t *testing.T, ctx context.Context, labels []core.KeyValue, mock *mockTest.MeterImpl, kind core.NumberKind, instrument metric.InstrumentImpl) { t.Helper() if len(mock.MeasurementBatches) != 3 { t.Errorf("Expected 3 recorded measurement batches, got %d", len(mock.MeasurementBatches)) } ourInstrument := instrument.Implementation().(*mockTest.Sync) - ourLabelSet := labels.(*mockTest.LabelSet) - minLen := 3 - if minLen > len(mock.MeasurementBatches) { - minLen = len(mock.MeasurementBatches) - } - for i := 0; i < minLen; i++ { - got := mock.MeasurementBatches[i] + for i, got := range mock.MeasurementBatches { if got.Ctx != ctx { d := func(c context.Context) string { return fmt.Sprintf("(ptr: %p, ctx %#v)", c, c) } t.Errorf("Wrong recorded context in batch %d, expected %s, got %s", i, d(ctx), d(got.Ctx)) } - if got.LabelSet != ourLabelSet { - d := func(l *mockTest.LabelSet) string { - return fmt.Sprintf("(ptr: %p, labels %#v)", l, l.Labels) - } - t.Errorf("Wrong recorded label set in batch %d, expected %s, got %s", i, d(ourLabelSet), d(got.LabelSet)) + if !assert.Equal(t, got.Labels, labels) { + t.Errorf("Wrong recorded label set in batch %d, expected %v, got %v", i, labels, got.Labels) } if len(got.Measurements) != 1 { t.Errorf("Expected 1 measurement in batch %d, got %d", i, len(got.Measurements)) @@ -261,7 +252,7 @@ func checkBatches(t *testing.T, ctx context.Context, labels metric.LabelSet, moc } } -func checkObserverBatch(t *testing.T, labels metric.LabelSet, mock *mockTest.MeterImpl, kind core.NumberKind, observer metric.AsyncImpl) { +func checkObserverBatch(t *testing.T, labels []core.KeyValue, mock *mockTest.MeterImpl, kind core.NumberKind, observer metric.AsyncImpl) { t.Helper() assert.Len(t, mock.MeasurementBatches, 1) if len(mock.MeasurementBatches) < 1 { @@ -271,9 +262,8 @@ func checkObserverBatch(t *testing.T, labels metric.LabelSet, mock *mockTest.Met if !assert.NotNil(t, o) { return } - ourLabelSet := labels.(*mockTest.LabelSet) got := mock.MeasurementBatches[0] - assert.Equal(t, ourLabelSet, got.LabelSet) + assert.Equal(t, labels, got.Labels) assert.Len(t, got.Measurements, 1) if len(got.Measurements) < 1 { return @@ -301,18 +291,14 @@ type testWrappedMeter struct { var _ metric.MeterImpl = testWrappedMeter{} -func (testWrappedMeter) Labels(...core.KeyValue) metric.LabelSet { - return nil -} - -func (testWrappedMeter) RecordBatch(context.Context, metric.LabelSet, ...metric.Measurement) { +func (testWrappedMeter) RecordBatch(context.Context, []core.KeyValue, ...metric.Measurement) { } func (testWrappedMeter) NewSyncInstrument(_ metric.Descriptor) (metric.SyncImpl, error) { return nil, nil } -func (testWrappedMeter) NewAsyncInstrument(_ metric.Descriptor, _ func(func(core.Number, metric.LabelSet))) (metric.AsyncImpl, error) { +func (testWrappedMeter) NewAsyncInstrument(_ metric.Descriptor, _ func(func(core.Number, []core.KeyValue))) (metric.AsyncImpl, error) { return nil, errors.New("Test wrap error") } diff --git a/api/metric/common.go b/api/metric/common.go index 40774edd07f..0c9c7368c83 100644 --- a/api/metric/common.go +++ b/api/metric/common.go @@ -35,7 +35,7 @@ type asyncInstrument struct { var ErrSDKReturnedNilImpl = errors.New("SDK returned a nil implementation") -func (s syncInstrument) bind(labels LabelSet) syncBoundInstrument { +func (s syncInstrument) bind(labels []core.KeyValue) syncBoundInstrument { return newSyncBoundInstrument(s.instrument.Bind(labels)) } @@ -47,7 +47,7 @@ func (s syncInstrument) int64Measurement(value int64) Measurement { return newMeasurement(s.instrument, core.NewInt64Number(value)) } -func (s syncInstrument) directRecord(ctx context.Context, number core.Number, labels LabelSet) { +func (s syncInstrument) directRecord(ctx context.Context, number core.Number, labels []core.KeyValue) { s.instrument.RecordOne(ctx, number, labels) } diff --git a/api/metric/counter.go b/api/metric/counter.go index d145d2294b1..9ae17abc48a 100644 --- a/api/metric/counter.go +++ b/api/metric/counter.go @@ -51,7 +51,7 @@ type BoundInt64Counter struct { // If the labels do not contain a value for the key specified in the // counter with the WithKeys option, then the missing value will be // treated as unspecified. -func (c Float64Counter) Bind(labels LabelSet) (h BoundFloat64Counter) { +func (c Float64Counter) Bind(labels ...core.KeyValue) (h BoundFloat64Counter) { h.syncBoundInstrument = c.bind(labels) return } @@ -63,7 +63,7 @@ func (c Float64Counter) Bind(labels LabelSet) (h BoundFloat64Counter) { // If the labels do not contain a value for the key specified in the // counter with the WithKeys option, then the missing value will be // treated as unspecified. -func (c Int64Counter) Bind(labels LabelSet) (h BoundInt64Counter) { +func (c Int64Counter) Bind(labels ...core.KeyValue) (h BoundInt64Counter) { h.syncBoundInstrument = c.bind(labels) return } @@ -87,7 +87,7 @@ func (c Int64Counter) Measurement(value int64) Measurement { // If the labels do not contain a value for the key specified in the // counter with the WithKeys option, then the missing value will be // treated as unspecified. -func (c Float64Counter) Add(ctx context.Context, value float64, labels LabelSet) { +func (c Float64Counter) Add(ctx context.Context, value float64, labels ...core.KeyValue) { c.directRecord(ctx, core.NewFloat64Number(value), labels) } @@ -98,7 +98,7 @@ func (c Float64Counter) Add(ctx context.Context, value float64, labels LabelSet) // If the labels do not contain a value for the key specified in the // counter with the WithKeys option, then the missing value will be // treated as unspecified. -func (c Int64Counter) Add(ctx context.Context, value int64, labels LabelSet) { +func (c Int64Counter) Add(ctx context.Context, value int64, labels ...core.KeyValue) { c.directRecord(ctx, core.NewInt64Number(value), labels) } diff --git a/api/metric/doc.go b/api/metric/doc.go index 996a312c8f4..127b33bd4dc 100644 --- a/api/metric/doc.go +++ b/api/metric/doc.go @@ -26,11 +26,7 @@ // The primary object that handles metrics is Meter. Meter can be // obtained from Provider. The implementations of the Meter and // Provider are provided by SDK. Normally, the Meter is used directly -// only for the instrument creation, LabelSet generation and batch -// recording. -// -// LabelSet is a set of keys and values that are in a suitable, -// optimized form to be used by Meter. +// only for the instrument creation and batch recording. // // Counters are instruments that are reporting a quantity or a sum. An // example could be bank account balance or bytes downloaded. Counters diff --git a/api/metric/measure.go b/api/metric/measure.go index 7fb829bb184..216848b182c 100644 --- a/api/metric/measure.go +++ b/api/metric/measure.go @@ -51,7 +51,7 @@ type BoundInt64Measure struct { // If the labels do not contain a value for the key specified in the // measure with the WithKeys option, then the missing value will be // treated as unspecified. -func (c Float64Measure) Bind(labels LabelSet) (h BoundFloat64Measure) { +func (c Float64Measure) Bind(labels ...core.KeyValue) (h BoundFloat64Measure) { h.syncBoundInstrument = c.bind(labels) return } @@ -63,7 +63,7 @@ func (c Float64Measure) Bind(labels LabelSet) (h BoundFloat64Measure) { // If the labels do not contain a value for the key specified in the // measure with the WithKeys option, then the missing value will be // treated as unspecified. -func (c Int64Measure) Bind(labels LabelSet) (h BoundInt64Measure) { +func (c Int64Measure) Bind(labels ...core.KeyValue) (h BoundInt64Measure) { h.syncBoundInstrument = c.bind(labels) return } @@ -87,7 +87,7 @@ func (c Int64Measure) Measurement(value int64) Measurement { // If the labels do not contain a value for the key specified in the // measure with the WithKeys option, then the missing value will be // treated as unspecified. -func (c Float64Measure) Record(ctx context.Context, value float64, labels LabelSet) { +func (c Float64Measure) Record(ctx context.Context, value float64, labels ...core.KeyValue) { c.directRecord(ctx, core.NewFloat64Number(value), labels) } @@ -98,7 +98,7 @@ func (c Float64Measure) Record(ctx context.Context, value float64, labels LabelS // If the labels do not contain a value for the key specified in the // measure with the WithKeys option, then the missing value will be // treated as unspecified. -func (c Int64Measure) Record(ctx context.Context, value int64, labels LabelSet) { +func (c Int64Measure) Record(ctx context.Context, value int64, labels ...core.KeyValue) { c.directRecord(ctx, core.NewInt64Number(value), labels) } diff --git a/api/metric/noop.go b/api/metric/noop.go index a8e947ba35a..c77e24a7e87 100644 --- a/api/metric/noop.go +++ b/api/metric/noop.go @@ -23,7 +23,6 @@ import ( type NoopProvider struct{} type NoopMeter struct{} -type noopLabelSet struct{} type noopInstrument struct{} type noopBoundInstrument struct{} type NoopSync struct{ noopInstrument } @@ -33,7 +32,6 @@ var _ Provider = NoopProvider{} var _ Meter = NoopMeter{} var _ SyncImpl = NoopSync{} var _ BoundSyncImpl = noopBoundInstrument{} -var _ LabelSet = noopLabelSet{} var _ AsyncImpl = NoopAsync{} func (NoopProvider) Meter(name string) Meter { @@ -54,18 +52,14 @@ func (noopBoundInstrument) RecordOne(context.Context, core.Number) { func (noopBoundInstrument) Unbind() { } -func (NoopSync) Bind(LabelSet) BoundSyncImpl { +func (NoopSync) Bind([]core.KeyValue) BoundSyncImpl { return noopBoundInstrument{} } -func (NoopSync) RecordOne(context.Context, core.Number, LabelSet) { +func (NoopSync) RecordOne(context.Context, core.Number, []core.KeyValue) { } -func (NoopMeter) Labels(...core.KeyValue) LabelSet { - return noopLabelSet{} -} - -func (NoopMeter) RecordBatch(context.Context, LabelSet, ...Measurement) { +func (NoopMeter) RecordBatch(context.Context, []core.KeyValue, ...Measurement) { } func (NoopMeter) NewInt64Counter(string, ...Option) (Int64Counter, error) { diff --git a/api/metric/observer.go b/api/metric/observer.go index b75741f124a..0ad9421a418 100644 --- a/api/metric/observer.go +++ b/api/metric/observer.go @@ -14,16 +14,18 @@ package metric +import "go.opentelemetry.io/otel/api/core" + // Int64ObserverResult is an interface for reporting integral // observations. type Int64ObserverResult interface { - Observe(value int64, labels LabelSet) + Observe(value int64, labels ...core.KeyValue) } // Float64ObserverResult is an interface for reporting floating point // observations. type Float64ObserverResult interface { - Observe(value float64, labels LabelSet) + Observe(value float64, labels ...core.KeyValue) } // Int64ObserverCallback is a type of callback that integral diff --git a/api/metric/registry/registry.go b/api/metric/registry/registry.go index 01d441c52bc..cfc4d7ee962 100644 --- a/api/metric/registry/registry.go +++ b/api/metric/registry/registry.go @@ -53,13 +53,8 @@ func NewUniqueInstrumentMeterImpl(impl metric.MeterImpl) metric.MeterImpl { } } -// Labels implements metric.MeterImpl. -func (u *uniqueInstrumentMeterImpl) Labels(kvs ...core.KeyValue) metric.LabelSet { - return u.impl.Labels(kvs...) -} - // RecordBatch implements metric.MeterImpl. -func (u *uniqueInstrumentMeterImpl) RecordBatch(ctx context.Context, labels metric.LabelSet, ms ...metric.Measurement) { +func (u *uniqueInstrumentMeterImpl) RecordBatch(ctx context.Context, labels []core.KeyValue, ms ...metric.Measurement) { u.impl.RecordBatch(ctx, labels, ms...) } @@ -130,7 +125,7 @@ func (u *uniqueInstrumentMeterImpl) NewSyncInstrument(descriptor metric.Descript // NewAsyncInstrument implements metric.MeterImpl. func (u *uniqueInstrumentMeterImpl) NewAsyncInstrument( descriptor metric.Descriptor, - callback func(func(core.Number, metric.LabelSet)), + callback func(func(core.Number, []core.KeyValue)), ) (metric.AsyncImpl, error) { u.lock.Lock() defer u.lock.Unlock() diff --git a/api/metric/sdkhelpers.go b/api/metric/sdkhelpers.go index ad5f2d6035c..3292bf4a0a6 100644 --- a/api/metric/sdkhelpers.go +++ b/api/metric/sdkhelpers.go @@ -26,12 +26,8 @@ import ( // re-implement the API's type-safe interfaces. Helpers provided in // this package will construct a `Meter` given a `MeterImpl`. type MeterImpl interface { - // Labels returns a reference to a set of labels that cannot - // be read by the application. - Labels(...core.KeyValue) LabelSet - // RecordBatch atomically records a batch of measurements. - RecordBatch(context.Context, LabelSet, ...Measurement) + RecordBatch(context.Context, []core.KeyValue, ...Measurement) // NewSyncInstrument returns a newly constructed // synchronous instrument implementation or an error, should @@ -43,19 +39,10 @@ type MeterImpl interface { // one occur. NewAsyncInstrument( descriptor Descriptor, - callback func(func(core.Number, LabelSet)), + callback func(func(core.Number, []core.KeyValue)), ) (AsyncImpl, error) } -// LabelSetDelegate is a general-purpose delegating implementation of -// the LabelSet interface. This is implemented by the default -// Provider returned by api/global.SetMeterProvider(), and should be -// tested for by implementations before converting a LabelSet to their -// private concrete type. -type LabelSetDelegate interface { - Delegate() LabelSet -} - // InstrumentImpl is a common interface for synchronous and // asynchronous instruments. type InstrumentImpl interface { @@ -75,10 +62,10 @@ type SyncImpl interface { // Bind creates an implementation-level bound instrument, // binding a label set with this instrument implementation. - Bind(labels LabelSet) BoundSyncImpl + Bind(labels []core.KeyValue) BoundSyncImpl // RecordOne captures a single synchronous metric event. - RecordOne(ctx context.Context, number core.Number, labels LabelSet) + RecordOne(ctx context.Context, number core.Number, labels []core.KeyValue) } // BoundSyncImpl is the implementation-level interface to a @@ -111,13 +98,13 @@ type wrappedMeterImpl struct { // int64ObserverResult is an adapter for int64-valued asynchronous // callbacks. type int64ObserverResult struct { - observe func(core.Number, LabelSet) + observe func(core.Number, []core.KeyValue) } // float64ObserverResult is an adapter for float64-valued asynchronous // callbacks. type float64ObserverResult struct { - observe func(core.Number, LabelSet) + observe func(core.Number, []core.KeyValue) } var ( @@ -167,11 +154,7 @@ func WrapMeterImpl(impl MeterImpl, libraryName string) Meter { } } -func (m *wrappedMeterImpl) Labels(labels ...core.KeyValue) LabelSet { - return m.impl.Labels(labels...) -} - -func (m *wrappedMeterImpl) RecordBatch(ctx context.Context, ls LabelSet, ms ...Measurement) { +func (m *wrappedMeterImpl) RecordBatch(ctx context.Context, ls []core.KeyValue, ms ...Measurement) { m.impl.RecordBatch(ctx, ls, ms...) } @@ -238,7 +221,7 @@ func WrapFloat64MeasureInstrument(syncInst SyncImpl, err error) (Float64Measure, return Float64Measure{syncInstrument: common}, err } -func (m *wrappedMeterImpl) newAsync(name string, mkind Kind, nkind core.NumberKind, opts []Option, callback func(func(core.Number, LabelSet))) (AsyncImpl, error) { +func (m *wrappedMeterImpl) newAsync(name string, mkind Kind, nkind core.NumberKind, opts []Option, callback func(func(core.Number, []core.KeyValue))) (AsyncImpl, error) { opts = insertResource(m.impl, opts) desc := NewDescriptor(name, mkind, nkind, opts...) desc.config.LibraryName = m.libraryName @@ -251,7 +234,7 @@ func (m *wrappedMeterImpl) RegisterInt64Observer(name string, callback Int64Obse } return WrapInt64ObserverInstrument( m.newAsync(name, ObserverKind, core.Int64NumberKind, opts, - func(observe func(core.Number, LabelSet)) { + func(observe func(core.Number, []core.KeyValue)) { // Note: this memory allocation could be avoided by // using a pointer to this object and mutating it // on each collection interval. @@ -274,7 +257,7 @@ func (m *wrappedMeterImpl) RegisterFloat64Observer(name string, callback Float64 } return WrapFloat64ObserverInstrument( m.newAsync(name, ObserverKind, core.Float64NumberKind, opts, - func(observe func(core.Number, LabelSet)) { + func(observe func(core.Number, []core.KeyValue)) { callback(float64ObserverResult{observe}) })) } @@ -288,10 +271,10 @@ func WrapFloat64ObserverInstrument(asyncInst AsyncImpl, err error) (Float64Obser return Float64Observer{asyncInstrument: common}, err } -func (io int64ObserverResult) Observe(value int64, labels LabelSet) { +func (io int64ObserverResult) Observe(value int64, labels ...core.KeyValue) { io.observe(core.NewInt64Number(value), labels) } -func (fo float64ObserverResult) Observe(value float64, labels LabelSet) { +func (fo float64ObserverResult) Observe(value float64, labels ...core.KeyValue) { fo.observe(core.NewFloat64Number(value), labels) } diff --git a/example/basic/main.go b/example/basic/main.go index 5188264f928..56d3b0facbb 100644 --- a/example/basic/main.go +++ b/example/basic/main.go @@ -72,10 +72,10 @@ func main() { tracer := global.Tracer("ex.com/basic") meter := global.Meter("ex.com/basic") - commonLabels := meter.Labels(lemonsKey.Int(10), key.String("A", "1"), key.String("B", "2"), key.String("C", "3")) + commonLabels := []core.KeyValue{lemonsKey.Int(10), key.String("A", "1"), key.String("B", "2"), key.String("C", "3")} oneMetricCB := func(result metric.Float64ObserverResult) { - result.Observe(1, commonLabels) + result.Observe(1, commonLabels...) } _ = metric.Must(meter).RegisterFloat64Observer("ex.com.one", oneMetricCB, metric.WithKeys(fooKey, barKey, lemonsKey), @@ -91,7 +91,7 @@ func main() { barKey.String("bar1"), ) - measure := measureTwo.Bind(commonLabels) + measure := measureTwo.Bind(commonLabels...) defer measure.Unbind() err := tracer.WithSpan(ctx, "operation", func(ctx context.Context) error { diff --git a/example/prometheus/main.go b/example/prometheus/main.go index 2d61c44ef43..9fb567b0bfe 100644 --- a/example/prometheus/main.go +++ b/example/prometheus/main.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "go.opentelemetry.io/otel/api/core" "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/key" "go.opentelemetry.io/otel/api/metric" @@ -53,13 +54,13 @@ func main() { meter := global.Meter("ex.com/basic") observerLock := new(sync.RWMutex) observerValueToReport := new(float64) - observerLabelSetToReport := new(metric.LabelSet) + observerLabelsToReport := new([]core.KeyValue) cb := func(result metric.Float64ObserverResult) { (*observerLock).RLock() value := *observerValueToReport - labelset := *observerLabelSetToReport + labels := *observerLabelsToReport (*observerLock).RUnlock() - result.Observe(value, labelset) + result.Observe(value, labels...) } _ = metric.Must(meter).RegisterFloat64Observer("ex.com.one", cb, metric.WithKeys(fooKey, barKey, lemonsKey), @@ -69,14 +70,14 @@ func main() { measureTwo := metric.Must(meter).NewFloat64Measure("ex.com.two", metric.WithKeys(key.New("A"))) measureThree := metric.Must(meter).NewFloat64Counter("ex.com.three") - commonLabels := meter.Labels(lemonsKey.Int(10), key.String("A", "1"), key.String("B", "2"), key.String("C", "3")) - notSoCommonLabels := meter.Labels(lemonsKey.Int(13)) + commonLabels := []core.KeyValue{lemonsKey.Int(10), key.String("A", "1"), key.String("B", "2"), key.String("C", "3")} + notSoCommonLabels := []core.KeyValue{lemonsKey.Int(13)} ctx := context.Background() (*observerLock).Lock() *observerValueToReport = 1.0 - *observerLabelSetToReport = &commonLabels + *observerLabelsToReport = commonLabels (*observerLock).Unlock() meter.RecordBatch( ctx, @@ -89,7 +90,7 @@ func main() { (*observerLock).Lock() *observerValueToReport = 1.0 - *observerLabelSetToReport = ¬SoCommonLabels + *observerLabelsToReport = notSoCommonLabels (*observerLock).Unlock() meter.RecordBatch( ctx, @@ -102,7 +103,7 @@ func main() { (*observerLock).Lock() *observerValueToReport = 13.0 - *observerLabelSetToReport = &commonLabels + *observerLabelsToReport = commonLabels (*observerLock).Unlock() meter.RecordBatch( ctx, diff --git a/exporters/metric/prometheus/prometheus.go b/exporters/metric/prometheus/prometheus.go index 8263607381c..38740b881ed 100644 --- a/exporters/metric/prometheus/prometheus.go +++ b/exporters/metric/prometheus/prometheus.go @@ -287,9 +287,9 @@ func (c *collector) exportSummary(ch chan<- prometheus.Metric, dist aggregator.D ch <- m } -func (c *collector) toDesc(metric *export.Record) *prometheus.Desc { - desc := metric.Descriptor() - labels := labelsKeys(metric.Labels()) +func (c *collector) toDesc(record *export.Record) *prometheus.Desc { + desc := record.Descriptor() + labels := labelsKeys(record.Labels()) return prometheus.NewDesc(sanitize(desc.Name()), desc.Description(), labels, nil) } diff --git a/exporters/metric/stdout/example_test.go b/exporters/metric/stdout/example_test.go index 38dd15d2cbf..df61db0b5bf 100644 --- a/exporters/metric/stdout/example_test.go +++ b/exporters/metric/stdout/example_test.go @@ -19,6 +19,7 @@ import ( "log" "time" + "go.opentelemetry.io/otel/api/core" "go.opentelemetry.io/otel/api/key" "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/exporters/metric/stdout" @@ -42,9 +43,9 @@ func ExampleNewExportPipeline() { // Create and update a single counter: counter := metric.Must(meter).NewInt64Counter("a.counter", metric.WithKeys(key)) - labels := meter.Labels(key.String("value")) + labels := []core.KeyValue{key.String("value")} - counter.Add(ctx, 100, labels) + counter.Add(ctx, 100, labels...) // Output: // { diff --git a/exporters/metric/test/test.go b/exporters/metric/test/test.go index 6e3d1127b8d..ae2994449f6 100644 --- a/exporters/metric/test/test.go +++ b/exporters/metric/test/test.go @@ -34,7 +34,7 @@ type CheckpointSet struct { } // NewCheckpointSet returns a test CheckpointSet that new records could be added. -// Records are grouped by their LabelSet. +// Records are grouped by their encoded labels. func NewCheckpointSet(encoder export.LabelEncoder) *CheckpointSet { return &CheckpointSet{ encoder: encoder, @@ -49,7 +49,7 @@ func (p *CheckpointSet) Reset() { // Add a new descriptor to a Checkpoint. // -// If there is an existing record with the same descriptor and LabelSet +// If there is an existing record with the same descriptor and labels, // the stored aggregator will be returned and should be merged. func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, labels ...core.KeyValue) (agg export.Aggregator, added bool) { elabels := export.NewSimpleLabels(p.encoder, labels...) diff --git a/exporters/otlp/otlp_test.go b/exporters/otlp/otlp_test.go index 3c0b46e09a5..81c5b8540cb 100644 --- a/exporters/otlp/otlp_test.go +++ b/exporters/otlp/otlp_test.go @@ -27,6 +27,7 @@ import ( metricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1" "go.opentelemetry.io/otel/api/core" + "go.opentelemetry.io/otel/api/key" "go.opentelemetry.io/otel/api/metric" metricapi "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/exporters/otlp" @@ -117,7 +118,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) ctx := context.Background() meter := pusher.Meter("test-meter") - labels := meter.Labels(core.Key("test").Bool(true)) + labels := []core.KeyValue{key.Bool("test", true)} type data struct { iKind metric.Kind @@ -137,18 +138,18 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) case metric.CounterKind: switch data.nKind { case core.Int64NumberKind: - metricapi.Must(meter).NewInt64Counter(name).Add(ctx, data.val, labels) + metricapi.Must(meter).NewInt64Counter(name).Add(ctx, data.val, labels...) case core.Float64NumberKind: - metricapi.Must(meter).NewFloat64Counter(name).Add(ctx, float64(data.val), labels) + metricapi.Must(meter).NewFloat64Counter(name).Add(ctx, float64(data.val), labels...) default: assert.Failf(t, "unsupported number testing kind", data.nKind.String()) } case metric.MeasureKind: switch data.nKind { case core.Int64NumberKind: - metricapi.Must(meter).NewInt64Measure(name).Record(ctx, data.val, labels) + metricapi.Must(meter).NewInt64Measure(name).Record(ctx, data.val, labels...) case core.Float64NumberKind: - metricapi.Must(meter).NewFloat64Measure(name).Record(ctx, float64(data.val), labels) + metricapi.Must(meter).NewFloat64Measure(name).Record(ctx, float64(data.val), labels...) default: assert.Failf(t, "unsupported number testing kind", data.nKind.String()) } @@ -156,12 +157,12 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) switch data.nKind { case core.Int64NumberKind: callback := func(v int64) metricapi.Int64ObserverCallback { - return metricapi.Int64ObserverCallback(func(result metricapi.Int64ObserverResult) { result.Observe(v, labels) }) + return metricapi.Int64ObserverCallback(func(result metricapi.Int64ObserverResult) { result.Observe(v, labels...) }) }(data.val) metricapi.Must(meter).RegisterInt64Observer(name, callback) case core.Float64NumberKind: callback := func(v float64) metricapi.Float64ObserverCallback { - return metricapi.Float64ObserverCallback(func(result metricapi.Float64ObserverResult) { result.Observe(v, labels) }) + return metricapi.Float64ObserverCallback(func(result metricapi.Float64ObserverResult) { result.Observe(v, labels...) }) }(float64(data.val)) metricapi.Must(meter).RegisterFloat64Observer(name, callback) default: diff --git a/internal/metric/mock.go b/internal/metric/mock.go index 06488e7727a..2793c19ac0e 100644 --- a/internal/metric/mock.go +++ b/internal/metric/mock.go @@ -27,19 +27,14 @@ import ( type ( Handle struct { Instrument *Sync - LabelSet *LabelSet - } - - LabelSet struct { - Impl *MeterImpl - Labels map[core.Key]core.Value + Labels []core.KeyValue } Batch struct { // Measurement needs to be aligned for 64-bit atomic operations. Measurements []Measurement Ctx context.Context - LabelSet *LabelSet + Labels []core.KeyValue LibraryName string } @@ -69,7 +64,7 @@ type ( Async struct { Instrument - callback func(func(core.Number, apimetric.LabelSet)) + callback func(func(core.Number, []core.KeyValue)) } Sync struct { @@ -80,7 +75,6 @@ type ( var ( _ apimetric.SyncImpl = &Sync{} _ apimetric.BoundSyncImpl = &Handle{} - _ apimetric.LabelSet = &LabelSet{} _ apimetric.MeterImpl = &MeterImpl{} _ apimetric.AsyncImpl = &Async{} ) @@ -97,32 +91,26 @@ func (s *Sync) Implementation() interface{} { return s } -func (s *Sync) Bind(labels apimetric.LabelSet) apimetric.BoundSyncImpl { - if ld, ok := labels.(apimetric.LabelSetDelegate); ok { - labels = ld.Delegate() - } +func (s *Sync) Bind(labels []core.KeyValue) apimetric.BoundSyncImpl { return &Handle{ Instrument: s, - LabelSet: labels.(*LabelSet), + Labels: labels, } } -func (s *Sync) RecordOne(ctx context.Context, number core.Number, labels apimetric.LabelSet) { - if ld, ok := labels.(apimetric.LabelSetDelegate); ok { - labels = ld.Delegate() - } - s.meter.doRecordSingle(ctx, labels.(*LabelSet), s, number) +func (s *Sync) RecordOne(ctx context.Context, number core.Number, labels []core.KeyValue) { + s.meter.doRecordSingle(ctx, labels, s, number) } func (h *Handle) RecordOne(ctx context.Context, number core.Number) { - h.Instrument.meter.doRecordSingle(ctx, h.LabelSet, h.Instrument, number) + h.Instrument.meter.doRecordSingle(ctx, h.Labels, h.Instrument, number) } func (h *Handle) Unbind() { } -func (m *MeterImpl) doRecordSingle(ctx context.Context, labelSet *LabelSet, instrument apimetric.InstrumentImpl, number core.Number) { - m.recordMockBatch(ctx, labelSet, Measurement{ +func (m *MeterImpl) doRecordSingle(ctx context.Context, labels []core.KeyValue, instrument apimetric.InstrumentImpl, number core.Number) { + m.recordMockBatch(ctx, labels, Measurement{ Instrument: instrument, Number: number, }) @@ -155,17 +143,6 @@ func NewMeter() (*MeterImpl, apimetric.Meter) { return impl, p.Meter("mock") } -func (m *MeterImpl) Labels(labels ...core.KeyValue) apimetric.LabelSet { - ul := make(map[core.Key]core.Value) - for _, kv := range labels { - ul[kv.Key] = kv.Value - } - return &LabelSet{ - Impl: m, - Labels: ul, - } -} - func (m *MeterImpl) NewSyncInstrument(descriptor metric.Descriptor) (apimetric.SyncImpl, error) { return &Sync{ Instrument{ @@ -175,7 +152,7 @@ func (m *MeterImpl) NewSyncInstrument(descriptor metric.Descriptor) (apimetric.S }, nil } -func (m *MeterImpl) NewAsyncInstrument(descriptor metric.Descriptor, callback func(func(core.Number, apimetric.LabelSet))) (apimetric.AsyncImpl, error) { +func (m *MeterImpl) NewAsyncInstrument(descriptor metric.Descriptor, callback func(func(core.Number, []core.KeyValue))) (apimetric.AsyncImpl, error) { a := &Async{ Instrument: Instrument{ descriptor: descriptor, @@ -187,8 +164,7 @@ func (m *MeterImpl) NewAsyncInstrument(descriptor metric.Descriptor, callback fu return a, nil } -func (m *MeterImpl) RecordBatch(ctx context.Context, labels apimetric.LabelSet, measurements ...apimetric.Measurement) { - ourLabelSet := labels.(*LabelSet) +func (m *MeterImpl) RecordBatch(ctx context.Context, labels []core.KeyValue, measurements ...apimetric.Measurement) { mm := make([]Measurement, len(measurements)) for i := 0; i < len(measurements); i++ { m := measurements[i] @@ -197,26 +173,21 @@ func (m *MeterImpl) RecordBatch(ctx context.Context, labels apimetric.LabelSet, Number: m.Number(), } } - m.recordMockBatch(ctx, ourLabelSet, mm...) + m.recordMockBatch(ctx, labels, mm...) } -func (m *MeterImpl) recordMockBatch(ctx context.Context, labelSet *LabelSet, measurements ...Measurement) { +func (m *MeterImpl) recordMockBatch(ctx context.Context, labels []core.KeyValue, measurements ...Measurement) { m.MeasurementBatches = append(m.MeasurementBatches, Batch{ Ctx: ctx, - LabelSet: labelSet, + Labels: labels, Measurements: measurements, }) } func (m *MeterImpl) RunAsyncInstruments() { for _, observer := range m.AsyncInstruments { - observer.callback(func(n core.Number, labels apimetric.LabelSet) { - - if ld, ok := labels.(apimetric.LabelSetDelegate); ok { - labels = ld.Delegate() - } - - m.doRecordSingle(context.Background(), labels.(*LabelSet), observer, n) + observer.callback(func(n core.Number, labels []core.KeyValue) { + m.doRecordSingle(context.Background(), labels, observer, n) }) } } diff --git a/sdk/metric/alignment_test.go b/sdk/metric/alignment_test.go index 4fea7bdf0a7..a260ca02fdd 100644 --- a/sdk/metric/alignment_test.go +++ b/sdk/metric/alignment_test.go @@ -17,24 +17,21 @@ package metric import ( "os" "testing" - "unsafe" ottest "go.opentelemetry.io/otel/internal/testing" ) // Ensure struct alignment prior to running tests. func TestMain(m *testing.M) { - fields := []ottest.FieldOffset{ - { - Name: "record.refMapped.value", - Offset: unsafe.Offsetof(record{}.refMapped.value), - }, - { - Name: "record.modified", - Offset: unsafe.Offsetof(record{}.modified), - }, + offsets := AtomicFieldOffsets() + var r []ottest.FieldOffset + for name, offset := range offsets { + r = append(r, ottest.FieldOffset{ + Name: name, + Offset: offset, + }) } - if !ottest.Aligned8Byte(fields, os.Stderr) { + if !ottest.Aligned8Byte(r, os.Stderr) { os.Exit(1) } diff --git a/sdk/metric/atomicfields.go b/sdk/metric/atomicfields.go new file mode 100644 index 00000000000..ed6ca0700c7 --- /dev/null +++ b/sdk/metric/atomicfields.go @@ -0,0 +1,25 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import "unsafe" + +func AtomicFieldOffsets() map[string]uintptr { + return map[string]uintptr{ + "record.refMapped.value": unsafe.Offsetof(record{}.refMapped.value), + "record.modified": unsafe.Offsetof(record{}.modified), + "record.labels.cachedEncoderID": unsafe.Offsetof(record{}.labels.cachedEncoded), + } +} diff --git a/sdk/metric/batcher/defaultkeys/defaultkeys.go b/sdk/metric/batcher/defaultkeys/defaultkeys.go index 521a52e4eab..29f0f138f0c 100644 --- a/sdk/metric/batcher/defaultkeys/defaultkeys.go +++ b/sdk/metric/batcher/defaultkeys/defaultkeys.go @@ -97,7 +97,7 @@ func (b *Batcher) Process(_ context.Context, record export.Record) error { // Note also the possibility to speed this computation of // "encoded" via "outputLabels" in the form of a (Descriptor, - // LabelSet)->(Labels, Encoded) cache. + // Labels)->(Labels, Encoded) cache. iter := record.Labels().Iter() for iter.Next() { kv := iter.Label() diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index c083f3aa9e7..12cc5bf6e4d 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -89,7 +89,7 @@ func (*benchFixture) CheckpointSet() export.CheckpointSet { func (*benchFixture) FinishedCollection() { } -func makeLabelSets(n int) [][]core.KeyValue { +func makeManyLabels(n int) [][]core.KeyValue { r := make([][]core.KeyValue, n) for i := 0; i < n; i++ { @@ -117,89 +117,82 @@ func makeLabels(n int) []core.KeyValue { } func benchmarkLabels(b *testing.B, n int) { + ctx := context.Background() fix := newFixture(b) labs := makeLabels(n) + cnt := fix.meter.NewInt64Counter("int64.counter") b.ResetTimer() for i := 0; i < b.N; i++ { - fix.sdk.Labels(labs...) + cnt.Add(ctx, 1, labs...) } } -func BenchmarkLabels_1(b *testing.B) { +func BenchmarkInt64CounterAddWithLabels_1(b *testing.B) { benchmarkLabels(b, 1) } -func BenchmarkLabels_2(b *testing.B) { +func BenchmarkInt64CounterAddWithLabels_2(b *testing.B) { benchmarkLabels(b, 2) } -func BenchmarkLabels_4(b *testing.B) { +func BenchmarkInt64CounterAddWithLabels_4(b *testing.B) { benchmarkLabels(b, 4) } -func BenchmarkLabels_8(b *testing.B) { +func BenchmarkInt64CounterAddWithLabels_8(b *testing.B) { benchmarkLabels(b, 8) } -func BenchmarkLabels_16(b *testing.B) { +func BenchmarkInt64CounterAddWithLabels_16(b *testing.B) { benchmarkLabels(b, 16) } // Note: performance does not depend on label set size for the -// benchmarks below. +// benchmarks below--all are benchmarked for a single label. func BenchmarkAcquireNewHandle(b *testing.B) { fix := newFixture(b) - labelSets := makeLabelSets(b.N) + labelSets := makeManyLabels(b.N) cnt := fix.meter.NewInt64Counter("int64.counter") - labels := make([]metric.LabelSet, b.N) - - for i := 0; i < b.N; i++ { - labels[i] = fix.sdk.Labels(labelSets[i]...) - } b.ResetTimer() for i := 0; i < b.N; i++ { - cnt.Bind(labels[i]) + cnt.Bind(labelSets[i]...) } } func BenchmarkAcquireExistingHandle(b *testing.B) { fix := newFixture(b) - labelSets := makeLabelSets(b.N) + labelSets := makeManyLabels(b.N) cnt := fix.meter.NewInt64Counter("int64.counter") - labels := make([]metric.LabelSet, b.N) for i := 0; i < b.N; i++ { - labels[i] = fix.sdk.Labels(labelSets[i]...) - cnt.Bind(labels[i]).Unbind() + cnt.Bind(labelSets[i]...).Unbind() } b.ResetTimer() for i := 0; i < b.N; i++ { - cnt.Bind(labels[i]) + cnt.Bind(labelSets[i]...) } } func BenchmarkAcquireReleaseExistingHandle(b *testing.B) { fix := newFixture(b) - labelSets := makeLabelSets(b.N) + labelSets := makeManyLabels(b.N) cnt := fix.meter.NewInt64Counter("int64.counter") - labels := make([]metric.LabelSet, b.N) for i := 0; i < b.N; i++ { - labels[i] = fix.sdk.Labels(labelSets[i]...) - cnt.Bind(labels[i]).Unbind() + cnt.Bind(labelSets[i]...).Unbind() } b.ResetTimer() for i := 0; i < b.N; i++ { - cnt.Bind(labels[i]).Unbind() + cnt.Bind(labelSets[i]...).Unbind() } } @@ -224,12 +217,10 @@ func benchmarkIterator(b *testing.B, n int) { benchmarkIteratorVar = kv return nil }) - labs := fix.sdk.Labels(makeLabels(n)...) cnt := fix.meter.NewInt64Counter("int64.counter") ctx := context.Background() - cnt.Add(ctx, 1, labs) + cnt.Add(ctx, 1, makeLabels(n)...) - b.StopTimer() b.ResetTimer() fix.sdk.Collect(ctx) } @@ -263,22 +254,22 @@ func BenchmarkIterator_16(b *testing.B) { func BenchmarkInt64CounterAdd(b *testing.B) { ctx := context.Background() fix := newFixture(b) - labs := fix.sdk.Labels(makeLabels(1)...) + labs := makeLabels(1) cnt := fix.meter.NewInt64Counter("int64.counter") b.ResetTimer() for i := 0; i < b.N; i++ { - cnt.Add(ctx, 1, labs) + cnt.Add(ctx, 1, labs...) } } func BenchmarkInt64CounterHandleAdd(b *testing.B) { ctx := context.Background() fix := newFixture(b) - labs := fix.sdk.Labels(makeLabels(1)...) + labs := makeLabels(1) cnt := fix.meter.NewInt64Counter("int64.counter") - handle := cnt.Bind(labs) + handle := cnt.Bind(labs...) b.ResetTimer() @@ -290,22 +281,22 @@ func BenchmarkInt64CounterHandleAdd(b *testing.B) { func BenchmarkFloat64CounterAdd(b *testing.B) { ctx := context.Background() fix := newFixture(b) - labs := fix.sdk.Labels(makeLabels(1)...) + labs := makeLabels(1) cnt := fix.meter.NewFloat64Counter("float64.counter") b.ResetTimer() for i := 0; i < b.N; i++ { - cnt.Add(ctx, 1.1, labs) + cnt.Add(ctx, 1.1, labs...) } } func BenchmarkFloat64CounterHandleAdd(b *testing.B) { ctx := context.Background() fix := newFixture(b) - labs := fix.sdk.Labels(makeLabels(1)...) + labs := makeLabels(1) cnt := fix.meter.NewFloat64Counter("float64.counter") - handle := cnt.Bind(labs) + handle := cnt.Bind(labs...) b.ResetTimer() @@ -319,22 +310,22 @@ func BenchmarkFloat64CounterHandleAdd(b *testing.B) { func BenchmarkInt64LastValueAdd(b *testing.B) { ctx := context.Background() fix := newFixture(b) - labs := fix.sdk.Labels(makeLabels(1)...) + labs := makeLabels(1) mea := fix.meter.NewInt64Measure("int64.lastvalue") b.ResetTimer() for i := 0; i < b.N; i++ { - mea.Record(ctx, int64(i), labs) + mea.Record(ctx, int64(i), labs...) } } func BenchmarkInt64LastValueHandleAdd(b *testing.B) { ctx := context.Background() fix := newFixture(b) - labs := fix.sdk.Labels(makeLabels(1)...) + labs := makeLabels(1) mea := fix.meter.NewInt64Measure("int64.lastvalue") - handle := mea.Bind(labs) + handle := mea.Bind(labs...) b.ResetTimer() @@ -346,22 +337,22 @@ func BenchmarkInt64LastValueHandleAdd(b *testing.B) { func BenchmarkFloat64LastValueAdd(b *testing.B) { ctx := context.Background() fix := newFixture(b) - labs := fix.sdk.Labels(makeLabels(1)...) + labs := makeLabels(1) mea := fix.meter.NewFloat64Measure("float64.lastvalue") b.ResetTimer() for i := 0; i < b.N; i++ { - mea.Record(ctx, float64(i), labs) + mea.Record(ctx, float64(i), labs...) } } func BenchmarkFloat64LastValueHandleAdd(b *testing.B) { ctx := context.Background() fix := newFixture(b) - labs := fix.sdk.Labels(makeLabels(1)...) + labs := makeLabels(1) mea := fix.meter.NewFloat64Measure("float64.lastvalue") - handle := mea.Bind(labs) + handle := mea.Bind(labs...) b.ResetTimer() @@ -375,22 +366,22 @@ func BenchmarkFloat64LastValueHandleAdd(b *testing.B) { func benchmarkInt64MeasureAdd(b *testing.B, name string) { ctx := context.Background() fix := newFixture(b) - labs := fix.sdk.Labels(makeLabels(1)...) + labs := makeLabels(1) mea := fix.meter.NewInt64Measure(name) b.ResetTimer() for i := 0; i < b.N; i++ { - mea.Record(ctx, int64(i), labs) + mea.Record(ctx, int64(i), labs...) } } func benchmarkInt64MeasureHandleAdd(b *testing.B, name string) { ctx := context.Background() fix := newFixture(b) - labs := fix.sdk.Labels(makeLabels(1)...) + labs := makeLabels(1) mea := fix.meter.NewInt64Measure(name) - handle := mea.Bind(labs) + handle := mea.Bind(labs...) b.ResetTimer() @@ -402,22 +393,22 @@ func benchmarkInt64MeasureHandleAdd(b *testing.B, name string) { func benchmarkFloat64MeasureAdd(b *testing.B, name string) { ctx := context.Background() fix := newFixture(b) - labs := fix.sdk.Labels(makeLabels(1)...) + labs := makeLabels(1) mea := fix.meter.NewFloat64Measure(name) b.ResetTimer() for i := 0; i < b.N; i++ { - mea.Record(ctx, float64(i), labs) + mea.Record(ctx, float64(i), labs...) } } func benchmarkFloat64MeasureHandleAdd(b *testing.B, name string) { ctx := context.Background() fix := newFixture(b) - labs := fix.sdk.Labels(makeLabels(1)...) + labs := makeLabels(1) mea := fix.meter.NewFloat64Measure(name) - handle := mea.Bind(labs) + handle := mea.Bind(labs...) b.ResetTimer() @@ -446,32 +437,30 @@ func BenchmarkObserverRegistration(b *testing.B) { func BenchmarkObserverObservationInt64(b *testing.B) { ctx := context.Background() fix := newFixture(b) - labs := fix.sdk.Labels(makeLabels(1)...) + labs := makeLabels(1) _ = fix.meter.RegisterInt64Observer("test.observer", func(result metric.Int64ObserverResult) { - b.StartTimer() - defer b.StopTimer() for i := 0; i < b.N; i++ { - result.Observe((int64)(i), labs) + result.Observe((int64)(i), labs...) } }) - b.StopTimer() + b.ResetTimer() + fix.sdk.Collect(ctx) } func BenchmarkObserverObservationFloat64(b *testing.B) { ctx := context.Background() fix := newFixture(b) - labs := fix.sdk.Labels(makeLabels(1)...) + labs := makeLabels(1) _ = fix.meter.RegisterFloat64Observer("test.observer", func(result metric.Float64ObserverResult) { - b.StartTimer() - defer b.StopTimer() for i := 0; i < b.N; i++ { - result.Observe((float64)(i), labs) + result.Observe((float64)(i), labs...) } }) - b.StopTimer() + b.ResetTimer() + fix.sdk.Collect(ctx) } @@ -546,7 +535,7 @@ func benchmarkBatchRecord8Labels(b *testing.B, numInst int) { b.ResetTimer() for i := 0; i < b.N; i++ { - fix.sdk.RecordBatch(ctx, fix.sdk.Labels(labs...), meas...) + fix.sdk.RecordBatch(ctx, labs, meas...) } } diff --git a/sdk/metric/controller/push/push_test.go b/sdk/metric/controller/push/push_test.go index a5378d17d11..f98dba8e0f9 100644 --- a/sdk/metric/controller/push/push_test.go +++ b/sdk/metric/controller/push/push_test.go @@ -193,7 +193,7 @@ func TestPushTicker(t *testing.T) { p.Start() - counter.Add(ctx, 3, meter.Labels()) + counter.Add(ctx, 3) records, exports := fix.exporter.resetRecords() checkpoints, finishes := fix.batcher.getCounts() @@ -219,7 +219,7 @@ func TestPushTicker(t *testing.T) { fix.checkpointSet.Reset() - counter.Add(ctx, 7, meter.Labels()) + counter.Add(ctx, 7) mock.Add(time.Second) runtime.Gosched() @@ -286,8 +286,8 @@ func TestPushExportError(t *testing.T) { p.Start() runtime.Gosched() - counter1.Add(ctx, 3, meter.Labels()) - counter2.Add(ctx, 5, meter.Labels()) + counter1.Add(ctx, 3) + counter2.Add(ctx, 5) require.Equal(t, 0, fix.exporter.exports) require.Nil(t, err) diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index 1b5f842f719..0faaec39db9 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -82,7 +82,7 @@ func TestInputRangeTestCounter(t *testing.T) { counter := Must(meter).NewInt64Counter("name.counter") - counter.Add(ctx, -1, sdk.Labels()) + counter.Add(ctx, -1) require.Equal(t, aggregator.ErrNegativeInput, sdkErr) sdkErr = nil @@ -93,7 +93,7 @@ func TestInputRangeTestCounter(t *testing.T) { require.Nil(t, err) batcher.records = nil - counter.Add(ctx, 1, sdk.Labels()) + counter.Add(ctx, 1) checkpointed = sdk.Collect(ctx) sum, err = batcher.records[0].Aggregator().(aggregator.Sum).Sum() require.Equal(t, int64(1), sum.AsInt64()) @@ -117,7 +117,7 @@ func TestInputRangeTestMeasure(t *testing.T) { measure := Must(meter).NewFloat64Measure("name.measure") - measure.Record(ctx, math.NaN(), sdk.Labels()) + measure.Record(ctx, math.NaN()) require.Equal(t, aggregator.ErrNaNInput, sdkErr) sdkErr = nil @@ -127,8 +127,8 @@ func TestInputRangeTestMeasure(t *testing.T) { require.Equal(t, 1, checkpointed) require.Nil(t, err) - measure.Record(ctx, 1, sdk.Labels()) - measure.Record(ctx, 2, sdk.Labels()) + measure.Record(ctx, 1) + measure.Record(ctx, 2) batcher.records = nil checkpointed = sdk.Collect(ctx) @@ -150,7 +150,7 @@ func TestDisabledInstrument(t *testing.T) { measure := Must(meter).NewFloat64Measure("name.disabled") - measure.Record(ctx, -1, sdk.Labels()) + measure.Record(ctx, -1) checkpointed := sdk.Collect(ctx) require.Equal(t, 0, checkpointed) @@ -173,7 +173,7 @@ func TestRecordNaN(t *testing.T) { c := Must(meter).NewFloat64Counter("sum.name") require.Nil(t, sdkErr) - c.Add(ctx, math.NaN(), sdk.Labels()) + c.Add(ctx, math.NaN()) require.Error(t, sdkErr) } @@ -219,14 +219,14 @@ func TestSDKLabelsDeduplication(t *testing.T) { expectB = append(expectB, keysB[i].Int(repeats-1)) } - counter.Add(ctx, 1, sdk.Labels(kvsA...)) - counter.Add(ctx, 1, sdk.Labels(kvsA...)) + counter.Add(ctx, 1, kvsA...) + counter.Add(ctx, 1, kvsA...) allExpect = append(allExpect, expectA) if numKeys != 0 { // In this case A and B sets are the same. - counter.Add(ctx, 1, sdk.Labels(kvsB...)) - counter.Add(ctx, 1, sdk.Labels(kvsB...)) + counter.Add(ctx, 1, kvsB...) + counter.Add(ctx, 1, kvsB...) allExpect = append(allExpect, expectB) } @@ -290,12 +290,12 @@ func TestObserverCollection(t *testing.T) { // following line we get 1-1==0 instead of -1: // result.Observe(1, meter.Labels(key.String("A", "B"))) - result.Observe(-1, meter.Labels(key.String("A", "B"))) - result.Observe(-1, meter.Labels(key.String("C", "D"))) + result.Observe(-1, key.String("A", "B")) + result.Observe(-1, key.String("C", "D")) }) _ = Must(meter).RegisterInt64Observer("int.observer", func(result metric.Int64ObserverResult) { - result.Observe(1, meter.Labels(key.String("A", "B"))) - result.Observe(1, meter.Labels()) + result.Observe(1, key.String("A", "B")) + result.Observe(1) }) _ = Must(meter).RegisterInt64Observer("empty.observer", func(result metric.Int64ObserverResult) { }) @@ -315,5 +315,44 @@ func TestObserverCollection(t *testing.T) { "int.observer/": 1, "int.observer/A=B": 1, }, out.Map) +} + +func TestRecordBatch(t *testing.T) { + ctx := context.Background() + batcher := &correctnessBatcher{ + t: t, + } + sdk := metricsdk.New(batcher) + meter := metric.WrapMeterImpl(sdk, "test") + + counter1 := Must(meter).NewInt64Counter("int64.counter") + counter2 := Must(meter).NewFloat64Counter("float64.counter") + measure1 := Must(meter).NewInt64Measure("int64.measure") + measure2 := Must(meter).NewFloat64Measure("float64.measure") + + sdk.RecordBatch( + ctx, + []core.KeyValue{ + key.String("A", "B"), + key.String("C", "D"), + }, + counter1.Measurement(1), + counter2.Measurement(2), + measure1.Measurement(3), + measure2.Measurement(4), + ) + + sdk.Collect(ctx) + + out := batchTest.NewOutput(export.NewDefaultLabelEncoder()) + for _, rec := range batcher.records { + _ = out.AddTo(rec) + } + require.EqualValues(t, map[string]float64{ + "int64.counter/A=B,C=D": 1, + "float64.counter/A=B,C=D": 2, + "int64.measure/A=B,C=D": 3, + "float64.measure/A=B,C=D": 4, + }, out.Map) } diff --git a/sdk/metric/example_test.go b/sdk/metric/example_test.go index 0f265cdaf63..629775587f9 100644 --- a/sdk/metric/example_test.go +++ b/sdk/metric/example_test.go @@ -40,9 +40,8 @@ func ExampleNew() { meter := pusher.Meter("example") counter := metric.Must(meter).NewInt64Counter("a.counter", metric.WithKeys(key)) - labels := meter.Labels(key.String("value")) - counter.Add(ctx, 100, labels) + counter.Add(ctx, 100, key.String("value")) // Output: // { diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index 8cc10335be9..fa299cf3958 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -49,10 +49,6 @@ type ( // `*asyncInstrument` instances asyncInstruments sync.Map - // empty is the (singleton) result of Labels() - // w/ zero arguments. - empty labels - // currentEpoch is the current epoch number. It is // incremented in `Collect()`. currentEpoch int64 @@ -68,6 +64,11 @@ type ( // resource represents the entity producing telemetry. resource resource.Resource + + // asyncSortSlice has a single purpose - as a temporary + // place for sorting during labels creation to avoid + // allocation. It is cleared after use. + asyncSortSlice sortedLabels } syncInstrument struct { @@ -78,9 +79,8 @@ type ( // suitable for use as a map key. orderedLabels interface{} - // labels implements the OpenTelemetry LabelSet API, - // represents an internalized set of labels that may be used - // repeatedly. + // labels represents an internalized set of labels that have been + // sorted and deduplicated. labels struct { // cachedEncoderID needs to be aligned for atomic access cachedEncoderID int64 @@ -88,23 +88,18 @@ type ( // labels cachedEncoded string - meter *SDK // ordered is the output of sorting and deduplicating // the labels, copied into an array of the correct // size for use as a map key. ordered orderedLabels - // sortSlice has a single purpose - as a temporary - // place for sorting during labels creation to avoid - // allocation - sortSlice sortedLabels // cachedValue contains a `reflect.Value` of the `ordered` // member cachedValue reflect.Value } // mapkey uniquely describes a metric instrument in terms of - // its InstrumentID and the encoded form of its LabelSet. + // its InstrumentID and the encoded form of its labels. mapkey struct { descriptor *metric.Descriptor ordered orderedLabels @@ -125,8 +120,15 @@ type ( // modified has to be aligned for 64-bit atomic operations. modified int64 - // labels is the LabelSet passed by the user. - labels *labels + // labels is the processed label set for this record. + // + // labels has to be aligned for 64-bit atomic operations. + labels labels + + // sortSlice has a single purpose - as a temporary + // place for sorting during labels creation to avoid + // allocation. + sortSlice sortedLabels // inst is a pointer to the corresponding instrument. inst *syncInstrument @@ -148,13 +150,13 @@ type ( // labelset and recorder recorders map[orderedLabels]labeledRecorder - callback func(func(core.Number, api.LabelSet)) + callback func(func(core.Number, []core.KeyValue)) } labeledRecorder struct { - recorder export.Aggregator - labels *labels modifiedEpoch int64 + labels labels + recorder export.Aggregator } ErrorHandler func(error) @@ -162,7 +164,6 @@ type ( var ( _ api.MeterImpl = &SDK{} - _ api.LabelSet = &labels{} _ api.AsyncImpl = &asyncInstrument{} _ api.SyncImpl = &syncInstrument{} _ api.BoundSyncImpl = &record{} @@ -171,6 +172,11 @@ var ( _ export.Labels = &labels{} kvType = reflect.TypeOf(core.KeyValue{}) + + emptyLabels = labels{ + ordered: [0]core.KeyValue{}, + cachedValue: reflect.ValueOf([0]core.KeyValue{}), + } ) func (inst *instrument) Descriptor() api.Descriptor { @@ -185,12 +191,12 @@ func (s *syncInstrument) Implementation() interface{} { return s } -func (a *asyncInstrument) observe(number core.Number, ls api.LabelSet) { +func (a *asyncInstrument) observe(number core.Number, labels []core.KeyValue) { if err := aggregator.RangeTest(number, &a.descriptor); err != nil { a.meter.errorHandler(err) return } - recorder := a.getRecorder(ls) + recorder := a.getRecorder(labels) if recorder == nil { // The instrument is disabled according to the // AggregationSelector. @@ -202,8 +208,12 @@ func (a *asyncInstrument) observe(number core.Number, ls api.LabelSet) { } } -func (a *asyncInstrument) getRecorder(ls api.LabelSet) export.Aggregator { - labels := a.meter.labsFor(ls) +func (a *asyncInstrument) getRecorder(kvs []core.KeyValue) export.Aggregator { + // We are in a single-threaded context. Note: this assumption + // could be violated if the user added concurrency within + // their callback. + labels := a.meter.makeLabels(kvs, &a.meter.asyncSortSlice) + lrec, ok := a.recorders[labels.ordered] if ok { lrec.modifiedEpoch = a.meter.currentEpoch @@ -229,32 +239,50 @@ func (m *SDK) SetErrorHandler(f ErrorHandler) { m.errorHandler = f } -func (s *syncInstrument) acquireHandle(ls *labels) *record { - // Create lookup key for sync.Map (one allocation) +// acquireHandle gets or creates a `*record` corresponding to `kvs`, +// the input labels. The second argument `labels` is passed in to +// support re-use of the orderedLabels computed by a previous +// measurement in the same batch. This performs two allocations +// in the common case. +func (s *syncInstrument) acquireHandle(kvs []core.KeyValue, lptr *labels) *record { + var rec *record + var labels labels + + if lptr == nil || lptr.ordered == nil { + // This memory allocation may not be used, but it's + // needed for the `sortSlice` field, to avoid an + // allocation while sorting. + rec = &record{} + labels = s.meter.makeLabels(kvs, &rec.sortSlice) + } else { + labels = *lptr + } + + // Create lookup key for sync.Map (one allocation, as this + // passes through an interface{}) mk := mapkey{ descriptor: &s.descriptor, - ordered: ls.ordered, + ordered: labels.ordered, } if actual, ok := s.meter.current.Load(mk); ok { - // Existing record case, only one allocation so far. - rec := actual.(*record) - if rec.refMapped.ref() { + // Existing record case. + existingRec := actual.(*record) + if existingRec.refMapped.ref() { // At this moment it is guaranteed that the entry is in // the map and will not be removed. - return rec + return existingRec } // This entry is no longer mapped, try to add a new entry. } - // There's a memory allocation here. - rec := &record{ - labels: ls, - inst: s, - refMapped: refcountMapped{value: 2}, - modified: 0, - recorder: s.meter.batcher.AggregatorFor(&s.descriptor), + if rec == nil { + rec = &record{} } + rec.refMapped = refcountMapped{value: 2} + rec.labels = labels + rec.inst = s + rec.recorder = s.meter.batcher.AggregatorFor(&s.descriptor) for { // Load/Store: there's a memory allocation to place `mk` into @@ -286,14 +314,12 @@ func (s *syncInstrument) acquireHandle(ls *labels) *record { } } -func (s *syncInstrument) Bind(ls api.LabelSet) api.BoundSyncImpl { - labs := s.meter.labsFor(ls) - return s.acquireHandle(labs) +func (s *syncInstrument) Bind(kvs []core.KeyValue) api.BoundSyncImpl { + return s.acquireHandle(kvs, nil) } -func (s *syncInstrument) RecordOne(ctx context.Context, number core.Number, ls api.LabelSet) { - ourLs := s.meter.labsFor(ls) - h := s.acquireHandle(ourLs) +func (s *syncInstrument) RecordOne(ctx context.Context, number core.Number, kvs []core.KeyValue) { + h := s.acquireHandle(kvs, nil) defer h.Unbind() h.RecordOne(ctx, number) } @@ -313,43 +339,36 @@ func New(batcher export.Batcher, opts ...Option) *SDK { opt.Apply(c) } - m := &SDK{ - empty: labels{ - ordered: [0]core.KeyValue{}, - }, + return &SDK{ batcher: batcher, errorHandler: c.ErrorHandler, resource: c.Resource, } - m.empty.meter = m - m.empty.cachedValue = reflect.ValueOf(m.empty.ordered) - return m } func DefaultErrorHandler(err error) { fmt.Fprintln(os.Stderr, "Metrics SDK error:", err) } -// Labels returns a LabelSet corresponding to the arguments. Passed labels +// makeLabels returns a `labels` corresponding to the arguments. Labels // are sorted and de-duplicated, with last-value-wins semantics. Note that // sorting and deduplicating happens in-place to avoid allocation, so the -// passed slice will be modified. -func (m *SDK) Labels(kvs ...core.KeyValue) api.LabelSet { +// passed slice will be modified. The `sortSlice` argument refers to a memory +// location used temporarily while sorting the slice, to avoid a memory +// allocation. +func (m *SDK) makeLabels(kvs []core.KeyValue, sortSlice *sortedLabels) labels { // Check for empty set. if len(kvs) == 0 { - return &m.empty + return emptyLabels } - ls := &labels{ // allocation - meter: m, - sortSlice: kvs, - } + *sortSlice = kvs - // Sort and de-duplicate. Note: this use of `ls.sortSlice` - // avoids an allocation by using the address-able field rather - // than `kvs`. - sort.Stable(&ls.sortSlice) - ls.sortSlice = nil + // Sort and de-duplicate. Note: this use of `sortSlice` + // avoids an allocation because it is a pointer. + sort.Stable(sortSlice) + + *sortSlice = nil oi := 1 for i := 1; i < len(kvs); i++ { @@ -362,8 +381,7 @@ func (m *SDK) Labels(kvs ...core.KeyValue) api.LabelSet { oi++ } kvs = kvs[0:oi] - ls.computeOrdered(kvs) - return ls + return computeOrderedLabels(kvs) } // NumLabels is a part of an implementation of the export.LabelStorage @@ -429,12 +447,14 @@ func (ls *labels) Encoded(encoder export.LabelEncoder) string { return encoded } -func (ls *labels) computeOrdered(kvs []core.KeyValue) { +func computeOrderedLabels(kvs []core.KeyValue) labels { + var ls labels ls.ordered = computeOrderedFixed(kvs) if ls.ordered == nil { ls.ordered = computeOrderedReflect(kvs) } ls.cachedValue = reflect.ValueOf(ls.ordered) + return ls } func computeOrderedFixed(kvs []core.KeyValue) orderedLabels { @@ -492,18 +512,6 @@ func computeOrderedReflect(kvs []core.KeyValue) interface{} { return at.Interface() } -// labsFor sanitizes the input LabelSet. The input will be rejected -// if it was created by another Meter instance, for example. -func (m *SDK) labsFor(ls api.LabelSet) *labels { - if del, ok := ls.(api.LabelSetDelegate); ok { - ls = del.Delegate() - } - if l, _ := ls.(*labels); l != nil && l.meter == m { - return l - } - return &m.empty -} - func (m *SDK) NewSyncInstrument(descriptor api.Descriptor) (api.SyncImpl, error) { return &syncInstrument{ instrument: instrument{ @@ -513,7 +521,7 @@ func (m *SDK) NewSyncInstrument(descriptor api.Descriptor) (api.SyncImpl, error) }, nil } -func (m *SDK) NewAsyncInstrument(descriptor api.Descriptor, callback func(func(core.Number, api.LabelSet))) (api.AsyncImpl, error) { +func (m *SDK) NewAsyncInstrument(descriptor api.Descriptor, callback func(func(core.Number, []core.KeyValue))) (api.AsyncImpl, error) { a := &asyncInstrument{ instrument: instrument{ descriptor: descriptor, @@ -551,6 +559,9 @@ func (m *SDK) collectRecords(ctx context.Context) int { unmapped := inuse.refMapped.tryUnmap() // If able to unmap then remove the record from the current Map. if unmapped { + // TODO: Consider leaving the record in the map for one + // collection interval? Since creating records is relatively + // expensive, this would optimize common cases of ongoing use. m.current.Delete(inuse.mapkey()) } @@ -583,7 +594,7 @@ func (m *SDK) collectAsync(ctx context.Context) int { } func (m *SDK) checkpointRecord(ctx context.Context, r *record) int { - return m.checkpoint(ctx, &r.inst.descriptor, r.recorder, r.labels) + return m.checkpoint(ctx, &r.inst.descriptor, r.recorder, &r.labels) } func (m *SDK) checkpointAsync(ctx context.Context, a *asyncInstrument) int { @@ -592,9 +603,10 @@ func (m *SDK) checkpointAsync(ctx context.Context, a *asyncInstrument) int { } checkpointed := 0 for encodedLabels, lrec := range a.recorders { + lrec := lrec epochDiff := m.currentEpoch - lrec.modifiedEpoch if epochDiff == 0 { - checkpointed += m.checkpoint(ctx, &a.descriptor, lrec.recorder, lrec.labels) + checkpointed += m.checkpoint(ctx, &a.descriptor, lrec.recorder, &lrec.labels) } else if epochDiff > 1 { // This is second collection cycle with no // observations for this labelset. Remove the @@ -633,9 +645,24 @@ func (m *SDK) Resource() resource.Resource { } // RecordBatch enters a batch of metric events. -func (m *SDK) RecordBatch(ctx context.Context, ls api.LabelSet, measurements ...api.Measurement) { - for _, meas := range measurements { - meas.SyncImpl().RecordOne(ctx, meas.Number(), ls) +func (m *SDK) RecordBatch(ctx context.Context, kvs []core.KeyValue, measurements ...api.Measurement) { + // Labels will be computed the first time acquireHandle is + // called. Subsequent calls to acquireHandle will re-use the + // previously computed value instead of recomputing the + // ordered labels. + var labels labels + for i, meas := range measurements { + s := meas.SyncImpl().(*syncInstrument) + + h := s.acquireHandle(kvs, &labels) + + // Re-use labels for the next measurement. + if i == 0 { + labels = h.labels + } + + defer h.Unbind() + h.RecordOne(ctx, meas.Number()) } } @@ -645,11 +672,11 @@ func (r *record) RecordOne(ctx context.Context, number core.Number) { return } if err := aggregator.RangeTest(number, &r.inst.descriptor); err != nil { - r.labels.meter.errorHandler(err) + r.inst.meter.errorHandler(err) return } if err := r.recorder.Update(ctx, number, &r.inst.descriptor); err != nil { - r.labels.meter.errorHandler(err) + r.inst.meter.errorHandler(err) return } } diff --git a/sdk/metric/stress_test.go b/sdk/metric/stress_test.go index 8159ac932bf..3da8a32e888 100644 --- a/sdk/metric/stress_test.go +++ b/sdk/metric/stress_test.go @@ -44,7 +44,7 @@ import ( const ( concurrencyPerCPU = 100 reclaimPeriod = time.Millisecond * 100 - testRun = time.Second + testRun = 5 * time.Second epsilon = 1e-10 ) @@ -75,7 +75,7 @@ type ( testImpl struct { newInstrument func(meter api.Meter, name string) SyncImpler getUpdateValue func() core.Number - operate func(interface{}, context.Context, core.Number, api.LabelSet) + operate func(interface{}, context.Context, core.Number, []core.KeyValue) newStore func() interface{} // storeCollect and storeExpect are the same for @@ -167,7 +167,6 @@ func (f *testFixture) startWorker(impl *SDK, meter api.Meter, wg *sync.WaitGroup } kvs := f.someLabels() clabs := canonicalizeLabels(kvs) - labs := meter.Labels(kvs...) dur := getPeriod() key := testKey{ labels: clabs, @@ -177,7 +176,7 @@ func (f *testFixture) startWorker(impl *SDK, meter api.Meter, wg *sync.WaitGroup sleep := time.Duration(rand.ExpFloat64() * float64(dur)) time.Sleep(sleep) value := f.impl.getUpdateValue() - f.impl.operate(instrument, ctx, value, labs) + f.impl.operate(instrument, ctx, value, kvs) actual, _ := f.expected.LoadOrStore(key, f.impl.newStore()) @@ -191,6 +190,7 @@ func (f *testFixture) startWorker(impl *SDK, meter api.Meter, wg *sync.WaitGroup } func (f *testFixture) assertTest(numCollect int) { + var allErrs []func() csize := 0 f.received.Range(func(key, gstore interface{}) bool { csize++ @@ -198,13 +198,18 @@ func (f *testFixture) assertTest(numCollect int) { estore, loaded := f.expected.Load(key) if !loaded { - f.T.Error("Could not locate expected key: ", key) + allErrs = append(allErrs, func() { + f.T.Error("Could not locate expected key: ", key) + }) + return true } evalue := f.impl.readStore(estore) if !f.impl.equalValues(evalue, gvalue) { - f.T.Error("Expected value mismatch: ", - evalue, "!=", gvalue, " for ", key) + allErrs = append(allErrs, func() { + f.T.Error("Expected value mismatch: ", + evalue, "!=", gvalue, " for ", key) + }) } return true }) @@ -212,7 +217,9 @@ func (f *testFixture) assertTest(numCollect int) { f.expected.Range(func(key, value interface{}) bool { rsize++ if _, loaded := f.received.Load(key); !loaded { - f.T.Error("Did not receive expected key: ", key) + allErrs = append(allErrs, func() { + f.T.Error("Did not receive expected key: ", key) + }) } return true }) @@ -220,6 +227,10 @@ func (f *testFixture) assertTest(numCollect int) { f.T.Error("Did not receive the correct set of metrics: Received != Expected", rsize, csize) } + for _, anErr := range allErrs { + anErr() + } + // Note: It's useful to know the test triggers this condition, // but we can't assert it. Infrequently no duplicates are // found, and we can't really force a race to happen. @@ -353,9 +364,9 @@ func intCounterTestImpl() testImpl { } } }, - operate: func(inst interface{}, ctx context.Context, value core.Number, labels api.LabelSet) { + operate: func(inst interface{}, ctx context.Context, value core.Number, labels []core.KeyValue) { counter := inst.(api.Int64Counter) - counter.Add(ctx, value.AsInt64(), labels) + counter.Add(ctx, value.AsInt64(), labels...) }, newStore: func() interface{} { n := core.NewInt64Number(0) @@ -391,9 +402,9 @@ func floatCounterTestImpl() testImpl { } } }, - operate: func(inst interface{}, ctx context.Context, value core.Number, labels api.LabelSet) { + operate: func(inst interface{}, ctx context.Context, value core.Number, labels []core.KeyValue) { counter := inst.(api.Float64Counter) - counter.Add(ctx, value.AsFloat64(), labels) + counter.Add(ctx, value.AsFloat64(), labels...) }, newStore: func() interface{} { n := core.NewFloat64Number(0.0) @@ -427,9 +438,9 @@ func intLastValueTestImpl() testImpl { r1 := rand.Int63() return core.NewInt64Number(rand.Int63() - r1) }, - operate: func(inst interface{}, ctx context.Context, value core.Number, labels api.LabelSet) { + operate: func(inst interface{}, ctx context.Context, value core.Number, labels []core.KeyValue) { measure := inst.(api.Int64Measure) - measure.Record(ctx, value.AsInt64(), labels) + measure.Record(ctx, value.AsInt64(), labels...) }, newStore: func() interface{} { return &lastValueState{ @@ -468,9 +479,9 @@ func floatLastValueTestImpl() testImpl { getUpdateValue: func() core.Number { return core.NewFloat64Number((-0.5 + rand.Float64()) * 100000) }, - operate: func(inst interface{}, ctx context.Context, value core.Number, labels api.LabelSet) { + operate: func(inst interface{}, ctx context.Context, value core.Number, labels []core.KeyValue) { measure := inst.(api.Float64Measure) - measure.Record(ctx, value.AsFloat64(), labels) + measure.Record(ctx, value.AsFloat64(), labels...) }, newStore: func() interface{} { return &lastValueState{