Skip to content

Commit

Permalink
SetMeterProvider might miss the delegation for instruments and regi…
Browse files Browse the repository at this point in the history
…stries (#5780)

Closes #5757

This PR fixes an issue where `SetMeterProvider` might miss the
delegation for instruments and registries. This bug brings a concurrent
issue that could possibly make instruments and registries unable to
operate correctly, such as recording, after using the `SetMeterProvider`
method. The data put on these instruments and registries might be lost.
  • Loading branch information
XSAM committed Sep 6, 2024
1 parent 9e1b015 commit b37e8a9
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 57 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Fix memory leak in the global `MeterProvider` when identical instruments are repeatedly created. (#5754)
- Fix panic instruments creation when setting meter provider. (#5758)
- Fix an issue where `SetMeterProvider` in `go.opentelemetry.io/otel` might miss the delegation for instruments and registries. (#5780)

### Removed

Expand Down
133 changes: 80 additions & 53 deletions internal/global/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"container/list"
"reflect"
"sync"
"sync/atomic"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
Expand Down Expand Up @@ -97,7 +96,7 @@ type meter struct {

registry list.List

delegate atomic.Value // metric.Meter
delegate metric.Meter
}

type delegatedInstrument interface {
Expand All @@ -123,12 +122,12 @@ type instID struct {
//
// It is guaranteed by the caller that this happens only once.
func (m *meter) setDelegate(provider metric.MeterProvider) {
meter := provider.Meter(m.name, m.opts...)
m.delegate.Store(meter)

m.mtx.Lock()
defer m.mtx.Unlock()

meter := provider.Meter(m.name, m.opts...)
m.delegate = meter

for _, inst := range m.instruments {
inst.setDelegate(meter)
}
Expand All @@ -141,16 +140,18 @@ func (m *meter) setDelegate(provider metric.MeterProvider) {
m.registry.Remove(e)
}

clear(m.instruments)
m.instruments = nil
m.registry.Init()
}

func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption) (metric.Int64Counter, error) {
if del, ok := m.delegate.Load().(metric.Meter); ok {
return del.Int64Counter(name, options...)
}
m.mtx.Lock()
defer m.mtx.Unlock()

if m.delegate != nil {
return m.delegate.Int64Counter(name, options...)
}

i := &siCounter{name: name, opts: options}
cfg := metric.NewInt64CounterConfig(options...)
id := instID{
Expand All @@ -164,11 +165,13 @@ func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption)
}

func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCounterOption) (metric.Int64UpDownCounter, error) {
if del, ok := m.delegate.Load().(metric.Meter); ok {
return del.Int64UpDownCounter(name, options...)
}
m.mtx.Lock()
defer m.mtx.Unlock()

if m.delegate != nil {
return m.delegate.Int64UpDownCounter(name, options...)
}

i := &siUpDownCounter{name: name, opts: options}
cfg := metric.NewInt64UpDownCounterConfig(options...)
id := instID{
Expand All @@ -182,11 +185,13 @@ func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCou
}

func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOption) (metric.Int64Histogram, error) {
if del, ok := m.delegate.Load().(metric.Meter); ok {
return del.Int64Histogram(name, options...)
}
m.mtx.Lock()
defer m.mtx.Unlock()

if m.delegate != nil {
return m.delegate.Int64Histogram(name, options...)
}

i := &siHistogram{name: name, opts: options}
cfg := metric.NewInt64HistogramConfig(options...)
id := instID{
Expand All @@ -200,11 +205,13 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti
}

func (m *meter) Int64Gauge(name string, options ...metric.Int64GaugeOption) (metric.Int64Gauge, error) {
if del, ok := m.delegate.Load().(metric.Meter); ok {
return del.Int64Gauge(name, options...)
}
m.mtx.Lock()
defer m.mtx.Unlock()

if m.delegate != nil {
return m.delegate.Int64Gauge(name, options...)
}

i := &siGauge{name: name, opts: options}
cfg := metric.NewInt64GaugeConfig(options...)
id := instID{
Expand All @@ -218,11 +225,13 @@ func (m *meter) Int64Gauge(name string, options ...metric.Int64GaugeOption) (met
}

func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) {
if del, ok := m.delegate.Load().(metric.Meter); ok {
return del.Int64ObservableCounter(name, options...)
}
m.mtx.Lock()
defer m.mtx.Unlock()

if m.delegate != nil {
return m.delegate.Int64ObservableCounter(name, options...)
}

i := &aiCounter{name: name, opts: options}
cfg := metric.NewInt64ObservableCounterConfig(options...)
id := instID{
Expand All @@ -236,11 +245,13 @@ func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64Obser
}

func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) {
if del, ok := m.delegate.Load().(metric.Meter); ok {
return del.Int64ObservableUpDownCounter(name, options...)
}
m.mtx.Lock()
defer m.mtx.Unlock()

if m.delegate != nil {
return m.delegate.Int64ObservableUpDownCounter(name, options...)
}

i := &aiUpDownCounter{name: name, opts: options}
cfg := metric.NewInt64ObservableUpDownCounterConfig(options...)
id := instID{
Expand All @@ -254,11 +265,13 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int6
}

func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) {
if del, ok := m.delegate.Load().(metric.Meter); ok {
return del.Int64ObservableGauge(name, options...)
}
m.mtx.Lock()
defer m.mtx.Unlock()

if m.delegate != nil {
return m.delegate.Int64ObservableGauge(name, options...)
}

i := &aiGauge{name: name, opts: options}
cfg := metric.NewInt64ObservableGaugeConfig(options...)
id := instID{
Expand All @@ -272,11 +285,13 @@ func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64Observa
}

func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOption) (metric.Float64Counter, error) {
if del, ok := m.delegate.Load().(metric.Meter); ok {
return del.Float64Counter(name, options...)
}
m.mtx.Lock()
defer m.mtx.Unlock()

if m.delegate != nil {
return m.delegate.Float64Counter(name, options...)
}

i := &sfCounter{name: name, opts: options}
cfg := metric.NewFloat64CounterConfig(options...)
id := instID{
Expand All @@ -290,11 +305,13 @@ func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOpti
}

func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDownCounterOption) (metric.Float64UpDownCounter, error) {
if del, ok := m.delegate.Load().(metric.Meter); ok {
return del.Float64UpDownCounter(name, options...)
}
m.mtx.Lock()
defer m.mtx.Unlock()

if m.delegate != nil {
return m.delegate.Float64UpDownCounter(name, options...)
}

i := &sfUpDownCounter{name: name, opts: options}
cfg := metric.NewFloat64UpDownCounterConfig(options...)
id := instID{
Expand All @@ -308,11 +325,13 @@ func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDow
}

func (m *meter) Float64Histogram(name string, options ...metric.Float64HistogramOption) (metric.Float64Histogram, error) {
if del, ok := m.delegate.Load().(metric.Meter); ok {
return del.Float64Histogram(name, options...)
}
m.mtx.Lock()
defer m.mtx.Unlock()

if m.delegate != nil {
return m.delegate.Float64Histogram(name, options...)
}

i := &sfHistogram{name: name, opts: options}
cfg := metric.NewFloat64HistogramConfig(options...)
id := instID{
Expand All @@ -326,11 +345,13 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram
}

func (m *meter) Float64Gauge(name string, options ...metric.Float64GaugeOption) (metric.Float64Gauge, error) {
if del, ok := m.delegate.Load().(metric.Meter); ok {
return del.Float64Gauge(name, options...)
}
m.mtx.Lock()
defer m.mtx.Unlock()

if m.delegate != nil {
return m.delegate.Float64Gauge(name, options...)
}

i := &sfGauge{name: name, opts: options}
cfg := metric.NewFloat64GaugeConfig(options...)
id := instID{
Expand All @@ -344,11 +365,13 @@ func (m *meter) Float64Gauge(name string, options ...metric.Float64GaugeOption)
}

func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) {
if del, ok := m.delegate.Load().(metric.Meter); ok {
return del.Float64ObservableCounter(name, options...)
}
m.mtx.Lock()
defer m.mtx.Unlock()

if m.delegate != nil {
return m.delegate.Float64ObservableCounter(name, options...)
}

i := &afCounter{name: name, opts: options}
cfg := metric.NewFloat64ObservableCounterConfig(options...)
id := instID{
Expand All @@ -362,11 +385,13 @@ func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64O
}

func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) {
if del, ok := m.delegate.Load().(metric.Meter); ok {
return del.Float64ObservableUpDownCounter(name, options...)
}
m.mtx.Lock()
defer m.mtx.Unlock()

if m.delegate != nil {
return m.delegate.Float64ObservableUpDownCounter(name, options...)
}

i := &afUpDownCounter{name: name, opts: options}
cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...)
id := instID{
Expand All @@ -380,11 +405,13 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Fl
}

func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) {
if del, ok := m.delegate.Load().(metric.Meter); ok {
return del.Float64ObservableGauge(name, options...)
}
m.mtx.Lock()
defer m.mtx.Unlock()

if m.delegate != nil {
return m.delegate.Float64ObservableGauge(name, options...)
}

i := &afGauge{name: name, opts: options}
cfg := metric.NewFloat64ObservableGaugeConfig(options...)
id := instID{
Expand All @@ -399,14 +426,14 @@ func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64Obs

// RegisterCallback captures the function that will be called during Collect.
func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) (metric.Registration, error) {
if del, ok := m.delegate.Load().(metric.Meter); ok {
insts = unwrapInstruments(insts)
return del.RegisterCallback(f, insts...)
}

m.mtx.Lock()
defer m.mtx.Unlock()

if m.delegate != nil {
insts = unwrapInstruments(insts)
return m.delegate.RegisterCallback(f, insts...)
}

reg := &registration{instruments: insts, function: f}
e := m.registry.PushBack(reg)
reg.unreg = func() error {
Expand Down
15 changes: 11 additions & 4 deletions internal/global/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ func TestMeterConcurrentSafe(t *testing.T) {
mtr.setDelegate(noop.NewMeterProvider())
close(finish)
<-done

// No instruments should be left after the meter is replaced.
assert.Empty(t, mtr.instruments)

// No callbacks should be left after the meter is replaced.
assert.Zero(t, mtr.registry.Len())
}

func TestUnregisterConcurrentSafe(t *testing.T) {
Expand Down Expand Up @@ -162,8 +168,9 @@ func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (metric.Float64Co
// This is to emulate a read from an exporter.
func testCollect(t *testing.T, m metric.Meter) {
if tMeter, ok := m.(*meter); ok {
m, ok = tMeter.delegate.Load().(metric.Meter)
if !ok {
// This changes the input m to the delegate.
m = tMeter.delegate
if m == nil {
t.Error("meter was not delegated")
return
}
Expand Down Expand Up @@ -261,7 +268,7 @@ func TestMeterDelegatesCalls(t *testing.T) {

// Calls to Meter methods after setDelegate() should be executed by the delegate
require.IsType(t, &meter{}, m)
tMeter := m.(*meter).delegate.Load().(*testMeter)
tMeter := m.(*meter).delegate.(*testMeter)
require.NotNil(t, tMeter)
assert.Equal(t, 1, tMeter.afCount)
assert.Equal(t, 1, tMeter.afUDCount)
Expand Down Expand Up @@ -309,7 +316,7 @@ func TestMeterDefersDelegations(t *testing.T) {

// Calls to Meter() before setDelegate() should be the delegated type
require.IsType(t, &meter{}, m)
tMeter := m.(*meter).delegate.Load().(*testMeter)
tMeter := m.(*meter).delegate.(*testMeter)
require.NotNil(t, tMeter)
assert.Equal(t, 1, tMeter.afCount)
assert.Equal(t, 1, tMeter.afUDCount)
Expand Down

0 comments on commit b37e8a9

Please sign in to comment.