Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow multiple async callbacks, allow callbacks to be removed #4143

Merged
merged 9 commits into from
Feb 25, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,15 @@
/**
* A reference to an observable metric registered with {@link
* DoubleCounterBuilder#buildWithCallback(Consumer)}.
*
* <p>This interface currently has no methods but may be extended in the future with functionality
* such as canceling the observable.
*/
public interface ObservableDoubleCounter {}
public interface ObservableDoubleCounter extends AutoCloseable {

/**
* Remove the callback associated with this instrument. After this is called, callbacks won't be
* invoked on future collections. Subsequent calls to {@link #close()} will have no effect.
*
* @see DoubleCounterBuilder#buildWithCallback(Consumer)
*/
@Override
default void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@
/**
* A reference to an observable metric registered with {@link
* DoubleGaugeBuilder#buildWithCallback(Consumer)}.
*
* <p>This interface currently has no methods but may be extended in the future with functionality
* such as canceling the observable.
*/
public interface ObservableDoubleGauge {}
public interface ObservableDoubleGauge extends AutoCloseable {
/**
* Remove the callback associated with this instrument. After this is called, callbacks won't be
* invoked on future collections. Subsequent calls to {@link #close()} will have no effect.
*
* @see DoubleGaugeBuilder#buildWithCallback(Consumer)
*/
@Override
default void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@
/**
* A reference to an observable metric registered with {@link
* DoubleUpDownCounterBuilder#buildWithCallback(Consumer)}.
*
* <p>This interface currently has no methods but may be extended in the future with functionality
* such as canceling the observable.
*/
public interface ObservableDoubleUpDownCounter {}
public interface ObservableDoubleUpDownCounter extends AutoCloseable {
/**
* Remove the callback associated with this instrument. After this is called, callbacks won't be
* invoked on future collections. Subsequent calls to {@link #close()} will have no effect.
*
* @see DoubleUpDownCounterBuilder#buildWithCallback(Consumer)
*/
@Override
default void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,15 @@
/**
* A reference to an observable metric registered with {@link
* LongCounterBuilder#buildWithCallback(Consumer)}.
*
* <p>This interface currently has no methods but may be extended in the future with functionality
* such as canceling the observable.
*/
public interface ObservableLongCounter {}
public interface ObservableLongCounter extends AutoCloseable {

/**
* Remove the callback associated with this instrument. After this is called, callbacks won't be
* invoked on future collections. Subsequent calls to {@link #close()} will have no effect.
*
* @see LongCounterBuilder#buildWithCallback(Consumer)
*/
@Override
default void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,15 @@
/**
* A reference to an observable metric registered with {@link
* LongGaugeBuilder#buildWithCallback(Consumer)}.
*
* <p>This interface currently has no methods but may be extended in the future with functionality
* such as canceling the observable.
*/
public interface ObservableLongGauge {}
public interface ObservableLongGauge extends AutoCloseable {

/**
* Remove the callback associated with this instrument. After this is called, callbacks won't be
* invoked on future collections. Subsequent calls to {@link #close()} will have no effect.
*
* @see LongGaugeBuilder#buildWithCallback(Consumer)
*/
@Override
default void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@
/**
* A reference to an observable metric registered with {@link
* LongUpDownCounterBuilder#buildWithCallback(Consumer)}.
*
* <p>This interface currently has no methods but may be extended in the future with functionality
* such as canceling the observable.
*/
public interface ObservableLongUpDownCounter {}
public interface ObservableLongUpDownCounter extends AutoCloseable {
/**
* Remove the callback associated with this instrument. After this is called, callbacks won't be
* invoked on future collections. Subsequent calls to {@link #close()} will have no effect.
*
* @see LongUpDownCounterBuilder#buildWithCallback(Consumer)
*/
@Override
default void close() {}
}
25 changes: 24 additions & 1 deletion docs/apidiffs/current_vs_latest/opentelemetry-api.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,25 @@
Comparing source compatibility of against
No changes.
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableDoubleCounter (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW INTERFACE: java.lang.AutoCloseable
+++! NEW METHOD: PUBLIC(+) void close()
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableDoubleGauge (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW INTERFACE: java.lang.AutoCloseable
+++! NEW METHOD: PUBLIC(+) void close()
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableDoubleUpDownCounter (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW INTERFACE: java.lang.AutoCloseable
+++! NEW METHOD: PUBLIC(+) void close()
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableLongCounter (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW INTERFACE: java.lang.AutoCloseable
+++! NEW METHOD: PUBLIC(+) void close()
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableLongGauge (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW INTERFACE: java.lang.AutoCloseable
+++! NEW METHOD: PUBLIC(+) void close()
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableLongUpDownCounter (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW INTERFACE: java.lang.AutoCloseable
+++! NEW METHOD: PUBLIC(+) void close()
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ void testDuplicateExceptionMessage_pureInstruments() {
.contains("- Description [description2] does not match [description]")
.contains("- InstrumentType [COUNTER] does not match [OBSERVABLE_COUNTER]")
.contains("- InstrumentValueType [LONG] does not match [DOUBLE]")
.contains("- InstrumentType [OBSERVABLE_COUNTER] is async and already registered")
.contains(simple.getSourceInstrument().getSourceInfo().multiLineDebugString())
.contains("Original instrument registered with same name but is incompatible.")
.contains(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.state.AsynchronousMetricStorage;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
import io.opentelemetry.sdk.metrics.internal.state.MeterSharedState;
import io.opentelemetry.sdk.metrics.internal.state.WriteableMetricStorage;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Consumer;

Expand All @@ -21,10 +23,11 @@ public abstract class AbstractInstrumentBuilder<BuilderT extends AbstractInstrum

private final MeterProviderSharedState meterProviderSharedState;
private final MeterSharedState meterSharedState;
private final String instrumentName;
private String description;
private String unit;

protected final String instrumentName;

AbstractInstrumentBuilder(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState,
Expand Down Expand Up @@ -69,17 +72,19 @@ final <I extends AbstractInstrument> I buildSynchronousInstrument(
return instrumentFactory.apply(descriptor, storage);
}

final void registerDoubleAsynchronousInstrument(
InstrumentType type, Consumer<ObservableDoubleMeasurement> updater) {
final List<AsynchronousMetricStorage<?, ObservableDoubleMeasurement>>
registerDoubleAsynchronousInstrument(
InstrumentType type, Consumer<ObservableDoubleMeasurement> updater) {
InstrumentDescriptor descriptor = makeDescriptor(type, InstrumentValueType.DOUBLE);
meterSharedState.registerDoubleAsynchronousInstrument(
return meterSharedState.registerDoubleAsynchronousInstrument(
descriptor, meterProviderSharedState, updater);
}

final void registerLongAsynchronousInstrument(
InstrumentType type, Consumer<ObservableLongMeasurement> updater) {
final List<AsynchronousMetricStorage<?, ObservableLongMeasurement>>
registerLongAsynchronousInstrument(
InstrumentType type, Consumer<ObservableLongMeasurement> updater) {
InstrumentDescriptor descriptor = makeDescriptor(type, InstrumentValueType.LONG);
meterSharedState.registerLongAsynchronousInstrument(
return meterSharedState.registerLongAsynchronousInstrument(
descriptor, meterProviderSharedState, updater);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ public void unbind() {
static final class Builder extends AbstractInstrumentBuilder<SdkDoubleCounter.Builder>
implements DoubleCounterBuilder {

private static final ObservableDoubleCounter NOOP = new ObservableDoubleCounter() {};

Builder(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState sharedState,
Expand All @@ -127,8 +125,10 @@ public SdkDoubleCounter build() {
@Override
public ObservableDoubleCounter buildWithCallback(
Consumer<ObservableDoubleMeasurement> callback) {
registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_COUNTER, callback);
return NOOP;
return new SdkObservableInstrument<>(
instrumentName,
registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_COUNTER, callback),
callback);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
final class SdkDoubleGaugeBuilder extends AbstractInstrumentBuilder<SdkDoubleGaugeBuilder>
implements DoubleGaugeBuilder {

private static final ObservableDoubleGauge NOOP = new ObservableDoubleGauge() {};

SdkDoubleGaugeBuilder(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState,
Expand Down Expand Up @@ -47,7 +45,9 @@ public LongGaugeBuilder ofLongs() {

@Override
public ObservableDoubleGauge buildWithCallback(Consumer<ObservableDoubleMeasurement> callback) {
registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_GAUGE, callback);
return NOOP;
return new SdkObservableInstrument<>(
instrumentName,
registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_GAUGE, callback),
callback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.function.Consumer;

final class SdkDoubleUpDownCounter extends AbstractInstrument implements DoubleUpDownCounter {
private static final ObservableDoubleUpDownCounter NOOP = new ObservableDoubleUpDownCounter() {};

private final WriteableMetricStorage storage;

Expand Down Expand Up @@ -101,8 +100,10 @@ public DoubleUpDownCounter build() {
@Override
public ObservableDoubleUpDownCounter buildWithCallback(
Consumer<ObservableDoubleMeasurement> callback) {
registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, callback);
return NOOP;
return new SdkObservableInstrument<>(
instrumentName,
registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, callback),
callback);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.logging.Logger;

final class SdkLongCounter extends AbstractInstrument implements LongCounter {
private static final ObservableLongCounter NOOP = new ObservableLongCounter() {};

private static final Logger logger = Logger.getLogger(SdkLongCounter.class.getName());

Expand Down Expand Up @@ -139,8 +138,10 @@ public DoubleCounterBuilder ofDoubles() {

@Override
public ObservableLongCounter buildWithCallback(Consumer<ObservableLongMeasurement> callback) {
registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_COUNTER, callback);
return NOOP;
return new SdkObservableInstrument<>(
instrumentName,
registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_COUNTER, callback),
callback);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
final class SdkLongGaugeBuilder extends AbstractInstrumentBuilder<SdkLongGaugeBuilder>
implements LongGaugeBuilder {

private static final ObservableLongGauge NOOP = new ObservableLongGauge() {};

SdkLongGaugeBuilder(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState sharedState,
Expand All @@ -34,7 +32,9 @@ protected SdkLongGaugeBuilder getThis() {

@Override
public ObservableLongGauge buildWithCallback(Consumer<ObservableLongMeasurement> callback) {
registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_GAUGE, callback);
return NOOP;
return new SdkObservableInstrument<>(
instrumentName,
registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_GAUGE, callback),
callback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.function.Consumer;

final class SdkLongUpDownCounter extends AbstractInstrument implements LongUpDownCounter {
private static final ObservableLongUpDownCounter NOOP = new ObservableLongUpDownCounter() {};

private final WriteableMetricStorage storage;

Expand Down Expand Up @@ -114,8 +113,10 @@ public DoubleUpDownCounterBuilder ofDoubles() {
@Override
public ObservableLongUpDownCounter buildWithCallback(
Consumer<ObservableLongMeasurement> callback) {
registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, callback);
return NOOP;
return new SdkObservableInstrument<>(
instrumentName,
registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, callback),
callback);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics;

import io.opentelemetry.api.metrics.ObservableDoubleCounter;
import io.opentelemetry.api.metrics.ObservableDoubleGauge;
import io.opentelemetry.api.metrics.ObservableDoubleUpDownCounter;
import io.opentelemetry.api.metrics.ObservableLongCounter;
import io.opentelemetry.api.metrics.ObservableLongGauge;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import io.opentelemetry.sdk.metrics.internal.state.AsynchronousMetricStorage;
import java.util.List;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

class SdkObservableInstrument<O>
implements ObservableDoubleCounter,
ObservableLongCounter,
ObservableDoubleGauge,
ObservableLongGauge,
ObservableDoubleUpDownCounter,
ObservableLongUpDownCounter {

private static final Logger logger = Logger.getLogger(SdkObservableInstrument.class.getName());

private final ThrottlingLogger throttlingLogger = new ThrottlingLogger(logger);
private final String instrumentName;
private final List<AsynchronousMetricStorage<?, O>> storages;
private final Consumer<O> callback;
private volatile boolean removed = false;

SdkObservableInstrument(
String instrumentName, List<AsynchronousMetricStorage<?, O>> storages, Consumer<O> callback) {
this.instrumentName = instrumentName;
this.storages = storages;
this.callback = callback;
}

@Override
public void close() {
if (removed) {
jack-berg marked this conversation as resolved.
Show resolved Hide resolved
throttlingLogger.log(
Level.WARNING, "Instrument " + instrumentName + " has called remove() multiple times.");
return;
}
storages.forEach(storage -> storage.removeCallback(callback));
removed = true;
}
}
Loading