Skip to content

Commit

Permalink
Allow multiple async callbacks, allow callbacks to be removed (#4143)
Browse files Browse the repository at this point in the history
* Allow multiple async callbacks, allow callbacks to be removed

* Use AutoCloseable to remove callbacks, don't unregister from MetricStorageRegistry

* Use CopyOnWriteArrayList

* PR feedback

* PR feedback
  • Loading branch information
jack-berg committed Feb 25, 2022
1 parent c053393 commit 9f10e00
Show file tree
Hide file tree
Showing 32 changed files with 731 additions and 289 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,16 @@
/**
* A reference to an observable metric registered with {@link
* DoubleCounterBuilder#buildWithCallback(Consumer)}.
*
* <p>This interface currently has no methods but may be extended in the future with functionality
* such as canceling the observable.
*/
public interface ObservableDoubleCounter {}
public interface ObservableDoubleCounter extends AutoCloseable {

/**
* Remove the callback registered via {@link DoubleCounterBuilder#buildWithCallback(Consumer)}.
* After this is called, the callback won't be invoked on future collections. Subsequent calls to
* {@link #close()} have no effect.
*
* <p>Note: other callbacks registered to the metric with the same identity are unaffected.
*/
@Override
default void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,16 @@
/**
* A reference to an observable metric registered with {@link
* DoubleGaugeBuilder#buildWithCallback(Consumer)}.
*
* <p>This interface currently has no methods but may be extended in the future with functionality
* such as canceling the observable.
*/
public interface ObservableDoubleGauge {}
public interface ObservableDoubleGauge extends AutoCloseable {

/**
* Remove the callback registered via {@link DoubleGaugeBuilder#buildWithCallback(Consumer)}.
* After this is called, the callback won't be invoked on future collections. Subsequent calls to
* {@link #close()} have no effect.
*
* <p>Note: other callbacks registered to the metric with the same identity are unaffected.
*/
@Override
default void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,16 @@
/**
* A reference to an observable metric registered with {@link
* DoubleUpDownCounterBuilder#buildWithCallback(Consumer)}.
*
* <p>This interface currently has no methods but may be extended in the future with functionality
* such as canceling the observable.
*/
public interface ObservableDoubleUpDownCounter {}
public interface ObservableDoubleUpDownCounter extends AutoCloseable {

/**
* Remove the callback registered via {@link
* DoubleUpDownCounterBuilder#buildWithCallback(Consumer)}. After this is called, the callback
* won't be invoked on future collections. Subsequent calls to {@link #close()} have no effect.
*
* <p>Note: other callbacks registered to the metric with the same identity are unaffected.
*/
@Override
default void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,16 @@
/**
* A reference to an observable metric registered with {@link
* LongCounterBuilder#buildWithCallback(Consumer)}.
*
* <p>This interface currently has no methods but may be extended in the future with functionality
* such as canceling the observable.
*/
public interface ObservableLongCounter {}
public interface ObservableLongCounter extends AutoCloseable {

/**
* Remove the callback registered via {@link LongCounterBuilder#buildWithCallback(Consumer)}.
* After this is called, the callback won't be invoked on future collections. Subsequent calls to
* {@link #close()} have no effect.
*
* <p>Note: other callbacks registered to the metric with the same identity are unaffected.
*/
@Override
default void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,16 @@
/**
* A reference to an observable metric registered with {@link
* LongGaugeBuilder#buildWithCallback(Consumer)}.
*
* <p>This interface currently has no methods but may be extended in the future with functionality
* such as canceling the observable.
*/
public interface ObservableLongGauge {}
public interface ObservableLongGauge extends AutoCloseable {

/**
* Remove the callback registered via {@link LongGaugeBuilder#buildWithCallback(Consumer)}. After
* this is called, the callback won't be invoked on future collections. Subsequent calls to {@link
* #close()} have no effect.
*
* <p>Note: other callbacks registered to the metric with the same identity are unaffected.
*/
@Override
default void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,16 @@
/**
* A reference to an observable metric registered with {@link
* LongUpDownCounterBuilder#buildWithCallback(Consumer)}.
*
* <p>This interface currently has no methods but may be extended in the future with functionality
* such as canceling the observable.
*/
public interface ObservableLongUpDownCounter {}
public interface ObservableLongUpDownCounter extends AutoCloseable {

/**
* Remove the callback registered via {@link
* LongUpDownCounterBuilder#buildWithCallback(Consumer)}. After this is called, the callback won't
* be invoked on future collections. Subsequent calls to {@link #close()} have no effect.
*
* <p>Note: other callbacks registered to the metric with the same identity are unaffected.
*/
@Override
default void close() {}
}
24 changes: 24 additions & 0 deletions docs/apidiffs/current_vs_latest/opentelemetry-api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,27 @@ Comparing source compatibility of against
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.common.AttributesBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++! NEW METHOD: PUBLIC(+) io.opentelemetry.api.common.AttributesBuilder put(io.opentelemetry.api.common.AttributeKey, java.lang.Object[])
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableDoubleCounter (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW INTERFACE: java.lang.AutoCloseable
+++! NEW METHOD: PUBLIC(+) void close()
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableDoubleGauge (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW INTERFACE: java.lang.AutoCloseable
+++! NEW METHOD: PUBLIC(+) void close()
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableDoubleUpDownCounter (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW INTERFACE: java.lang.AutoCloseable
+++! NEW METHOD: PUBLIC(+) void close()
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableLongCounter (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW INTERFACE: java.lang.AutoCloseable
+++! NEW METHOD: PUBLIC(+) void close()
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableLongGauge (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW INTERFACE: java.lang.AutoCloseable
+++! NEW METHOD: PUBLIC(+) void close()
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableLongUpDownCounter (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW INTERFACE: java.lang.AutoCloseable
+++! NEW METHOD: PUBLIC(+) void close()
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ void testDuplicateExceptionMessage_pureInstruments() {
.contains("- Description [description2] does not match [description]")
.contains("- InstrumentType [COUNTER] does not match [OBSERVABLE_COUNTER]")
.contains("- InstrumentValueType [LONG] does not match [DOUBLE]")
.contains("- InstrumentType [OBSERVABLE_COUNTER] is async and already registered")
.contains(simple.getSourceInstrument().getSourceInfo().multiLineDebugString())
.contains("Original instrument registered with same name but is incompatible.")
.contains(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.state.AsynchronousMetricStorage;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
import io.opentelemetry.sdk.metrics.internal.state.MeterSharedState;
import io.opentelemetry.sdk.metrics.internal.state.WriteableMetricStorage;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Consumer;

Expand All @@ -21,10 +23,11 @@ abstract class AbstractInstrumentBuilder<BuilderT extends AbstractInstrumentBuil

private final MeterProviderSharedState meterProviderSharedState;
private final MeterSharedState meterSharedState;
private final String instrumentName;
private String description;
private String unit;

protected final String instrumentName;

AbstractInstrumentBuilder(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState,
Expand Down Expand Up @@ -69,17 +72,19 @@ final <I extends AbstractInstrument> I buildSynchronousInstrument(
return instrumentFactory.apply(descriptor, storage);
}

final void registerDoubleAsynchronousInstrument(
InstrumentType type, Consumer<ObservableDoubleMeasurement> updater) {
final List<AsynchronousMetricStorage<?, ObservableDoubleMeasurement>>
registerDoubleAsynchronousInstrument(
InstrumentType type, Consumer<ObservableDoubleMeasurement> updater) {
InstrumentDescriptor descriptor = makeDescriptor(type, InstrumentValueType.DOUBLE);
meterSharedState.registerDoubleAsynchronousInstrument(
return meterSharedState.registerDoubleAsynchronousInstrument(
descriptor, meterProviderSharedState, updater);
}

final void registerLongAsynchronousInstrument(
InstrumentType type, Consumer<ObservableLongMeasurement> updater) {
final List<AsynchronousMetricStorage<?, ObservableLongMeasurement>>
registerLongAsynchronousInstrument(
InstrumentType type, Consumer<ObservableLongMeasurement> updater) {
InstrumentDescriptor descriptor = makeDescriptor(type, InstrumentValueType.LONG);
meterSharedState.registerLongAsynchronousInstrument(
return meterSharedState.registerLongAsynchronousInstrument(
descriptor, meterProviderSharedState, updater);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ public void unbind() {
static final class Builder extends AbstractInstrumentBuilder<SdkDoubleCounter.Builder>
implements DoubleCounterBuilder {

private static final ObservableDoubleCounter NOOP = new ObservableDoubleCounter() {};

Builder(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState sharedState,
Expand All @@ -127,8 +125,10 @@ public SdkDoubleCounter build() {
@Override
public ObservableDoubleCounter buildWithCallback(
Consumer<ObservableDoubleMeasurement> callback) {
registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_COUNTER, callback);
return NOOP;
return new SdkObservableInstrument<>(
instrumentName,
registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_COUNTER, callback),
callback);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
final class SdkDoubleGaugeBuilder extends AbstractInstrumentBuilder<SdkDoubleGaugeBuilder>
implements DoubleGaugeBuilder {

private static final ObservableDoubleGauge NOOP = new ObservableDoubleGauge() {};

SdkDoubleGaugeBuilder(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState,
Expand Down Expand Up @@ -47,7 +45,9 @@ public LongGaugeBuilder ofLongs() {

@Override
public ObservableDoubleGauge buildWithCallback(Consumer<ObservableDoubleMeasurement> callback) {
registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_GAUGE, callback);
return NOOP;
return new SdkObservableInstrument<>(
instrumentName,
registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_GAUGE, callback),
callback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.function.Consumer;

final class SdkDoubleUpDownCounter extends AbstractInstrument implements DoubleUpDownCounter {
private static final ObservableDoubleUpDownCounter NOOP = new ObservableDoubleUpDownCounter() {};

private final WriteableMetricStorage storage;

Expand Down Expand Up @@ -101,8 +100,10 @@ public DoubleUpDownCounter build() {
@Override
public ObservableDoubleUpDownCounter buildWithCallback(
Consumer<ObservableDoubleMeasurement> callback) {
registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, callback);
return NOOP;
return new SdkObservableInstrument<>(
instrumentName,
registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, callback),
callback);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.logging.Logger;

final class SdkLongCounter extends AbstractInstrument implements LongCounter {
private static final ObservableLongCounter NOOP = new ObservableLongCounter() {};

private static final Logger logger = Logger.getLogger(SdkLongCounter.class.getName());

Expand Down Expand Up @@ -139,8 +138,10 @@ public DoubleCounterBuilder ofDoubles() {

@Override
public ObservableLongCounter buildWithCallback(Consumer<ObservableLongMeasurement> callback) {
registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_COUNTER, callback);
return NOOP;
return new SdkObservableInstrument<>(
instrumentName,
registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_COUNTER, callback),
callback);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
final class SdkLongGaugeBuilder extends AbstractInstrumentBuilder<SdkLongGaugeBuilder>
implements LongGaugeBuilder {

private static final ObservableLongGauge NOOP = new ObservableLongGauge() {};

SdkLongGaugeBuilder(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState sharedState,
Expand All @@ -34,7 +32,9 @@ protected SdkLongGaugeBuilder getThis() {

@Override
public ObservableLongGauge buildWithCallback(Consumer<ObservableLongMeasurement> callback) {
registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_GAUGE, callback);
return NOOP;
return new SdkObservableInstrument<>(
instrumentName,
registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_GAUGE, callback),
callback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.function.Consumer;

final class SdkLongUpDownCounter extends AbstractInstrument implements LongUpDownCounter {
private static final ObservableLongUpDownCounter NOOP = new ObservableLongUpDownCounter() {};

private final WriteableMetricStorage storage;

Expand Down Expand Up @@ -114,8 +113,10 @@ public DoubleUpDownCounterBuilder ofDoubles() {
@Override
public ObservableLongUpDownCounter buildWithCallback(
Consumer<ObservableLongMeasurement> callback) {
registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, callback);
return NOOP;
return new SdkObservableInstrument<>(
instrumentName,
registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, callback),
callback);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics;

import io.opentelemetry.api.metrics.ObservableDoubleCounter;
import io.opentelemetry.api.metrics.ObservableDoubleGauge;
import io.opentelemetry.api.metrics.ObservableDoubleUpDownCounter;
import io.opentelemetry.api.metrics.ObservableLongCounter;
import io.opentelemetry.api.metrics.ObservableLongGauge;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import io.opentelemetry.sdk.metrics.internal.state.AsynchronousMetricStorage;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

class SdkObservableInstrument<O>
implements ObservableDoubleCounter,
ObservableLongCounter,
ObservableDoubleGauge,
ObservableLongGauge,
ObservableDoubleUpDownCounter,
ObservableLongUpDownCounter {

private static final Logger logger = Logger.getLogger(SdkObservableInstrument.class.getName());

private final ThrottlingLogger throttlingLogger = new ThrottlingLogger(logger);
private final String instrumentName;
private final List<AsynchronousMetricStorage<?, O>> storages;
private final Consumer<O> callback;
private final AtomicBoolean removed = new AtomicBoolean(false);

SdkObservableInstrument(
String instrumentName, List<AsynchronousMetricStorage<?, O>> storages, Consumer<O> callback) {
this.instrumentName = instrumentName;
this.storages = storages;
this.callback = callback;
}

@Override
public void close() {
if (!removed.compareAndSet(false, true)) {
throttlingLogger.log(
Level.WARNING, "Instrument " + instrumentName + " has called close() multiple times.");
return;
}
storages.forEach(storage -> storage.removeCallback(callback));
}
}
Loading

0 comments on commit 9f10e00

Please sign in to comment.