From 9f10e0048e13ff82a97a08c7cde187fa4329ba3f Mon Sep 17 00:00:00 2001
From: jack-berg <34418638+jack-berg@users.noreply.github.com>
Date: Fri, 25 Feb 2022 11:51:51 -0600
Subject: [PATCH] Allow multiple async callbacks, allow callbacks to be removed
(#4143)
* Allow multiple async callbacks, allow callbacks to be removed
* Use AutoCloseable to remove callbacks, don't unregister from MetricStorageRegistry
* Use CopyOnWriteArrayList
* PR feedback
* PR feedback
---
.../api/metrics/ObservableDoubleCounter.java | 16 +-
.../api/metrics/ObservableDoubleGauge.java | 16 +-
.../ObservableDoubleUpDownCounter.java | 16 +-
.../api/metrics/ObservableLongCounter.java | 16 +-
.../api/metrics/ObservableLongGauge.java | 16 +-
.../metrics/ObservableLongUpDownCounter.java | 16 +-
.../current_vs_latest/opentelemetry-api.txt | 24 +++
.../opentelemetry/testing/TestSourceInfo.java | 1 -
.../metrics/AbstractInstrumentBuilder.java | 19 +-
.../sdk/metrics/SdkDoubleCounter.java | 8 +-
.../sdk/metrics/SdkDoubleGaugeBuilder.java | 8 +-
.../sdk/metrics/SdkDoubleUpDownCounter.java | 7 +-
.../sdk/metrics/SdkLongCounter.java | 7 +-
.../sdk/metrics/SdkLongGaugeBuilder.java | 8 +-
.../sdk/metrics/SdkLongUpDownCounter.java | 7 +-
.../sdk/metrics/SdkObservableInstrument.java | 54 +++++
.../internal/descriptor/MetricDescriptor.java | 16 --
.../state/AsynchronousMetricStorage.java | 204 ++++++++++--------
.../metrics/internal/state/DebugUtils.java | 6 -
.../internal/state/MeterSharedState.java | 86 +++++---
.../internal/state/MetricStorageRegistry.java | 7 -
.../metrics/SdkDoubleGaugeBuilderTest.java | 18 +-
.../sdk/metrics/SdkLongGaugeBuilderTest.java | 21 +-
.../sdk/metrics/SdkMeterProviderTest.java | 62 ++++++
.../SdkObservableDoubleCounterTest.java | 21 ++
.../SdkObservableDoubleUpDownCounterTest.java | 22 ++
.../metrics/SdkObservableInstrumentTest.java | 52 +++++
.../metrics/SdkObservableLongCounterTest.java | 20 ++
.../SdkObservableLongUpDownCounterTest.java | 20 ++
.../descriptor/MetricDescriptorTest.java | 18 --
.../state/AsynchronousMetricStorageTest.java | 157 ++++++++++----
.../state/MetricStorageRegistryTest.java | 51 +++--
32 files changed, 731 insertions(+), 289 deletions(-)
create mode 100644 sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkObservableInstrument.java
create mode 100644 sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkObservableInstrumentTest.java
diff --git a/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableDoubleCounter.java b/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableDoubleCounter.java
index 26c57e96523..24ff38d491d 100644
--- a/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableDoubleCounter.java
+++ b/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableDoubleCounter.java
@@ -10,8 +10,16 @@
/**
* A reference to an observable metric registered with {@link
* DoubleCounterBuilder#buildWithCallback(Consumer)}.
- *
- *
This interface currently has no methods but may be extended in the future with functionality
- * such as canceling the observable.
*/
-public interface ObservableDoubleCounter {}
+public interface ObservableDoubleCounter extends AutoCloseable {
+
+ /**
+ * Remove the callback registered via {@link DoubleCounterBuilder#buildWithCallback(Consumer)}.
+ * After this is called, the callback won't be invoked on future collections. Subsequent calls to
+ * {@link #close()} have no effect.
+ *
+ *
Note: other callbacks registered to the metric with the same identity are unaffected.
+ */
+ @Override
+ default void close() {}
+}
diff --git a/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableDoubleGauge.java b/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableDoubleGauge.java
index 7955b4afb9a..ec5683a40e7 100644
--- a/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableDoubleGauge.java
+++ b/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableDoubleGauge.java
@@ -10,8 +10,16 @@
/**
* A reference to an observable metric registered with {@link
* DoubleGaugeBuilder#buildWithCallback(Consumer)}.
- *
- *
This interface currently has no methods but may be extended in the future with functionality
- * such as canceling the observable.
*/
-public interface ObservableDoubleGauge {}
+public interface ObservableDoubleGauge extends AutoCloseable {
+
+ /**
+ * Remove the callback registered via {@link DoubleGaugeBuilder#buildWithCallback(Consumer)}.
+ * After this is called, the callback won't be invoked on future collections. Subsequent calls to
+ * {@link #close()} have no effect.
+ *
+ *
Note: other callbacks registered to the metric with the same identity are unaffected.
+ */
+ @Override
+ default void close() {}
+}
diff --git a/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableDoubleUpDownCounter.java b/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableDoubleUpDownCounter.java
index d5897d9a5c7..e193eb675c7 100644
--- a/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableDoubleUpDownCounter.java
+++ b/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableDoubleUpDownCounter.java
@@ -10,8 +10,16 @@
/**
* A reference to an observable metric registered with {@link
* DoubleUpDownCounterBuilder#buildWithCallback(Consumer)}.
- *
- *
This interface currently has no methods but may be extended in the future with functionality
- * such as canceling the observable.
*/
-public interface ObservableDoubleUpDownCounter {}
+public interface ObservableDoubleUpDownCounter extends AutoCloseable {
+
+ /**
+ * Remove the callback registered via {@link
+ * DoubleUpDownCounterBuilder#buildWithCallback(Consumer)}. After this is called, the callback
+ * won't be invoked on future collections. Subsequent calls to {@link #close()} have no effect.
+ *
+ *
Note: other callbacks registered to the metric with the same identity are unaffected.
+ */
+ @Override
+ default void close() {}
+}
diff --git a/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableLongCounter.java b/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableLongCounter.java
index 3fc29babdd2..c0553c226b7 100644
--- a/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableLongCounter.java
+++ b/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableLongCounter.java
@@ -10,8 +10,16 @@
/**
* A reference to an observable metric registered with {@link
* LongCounterBuilder#buildWithCallback(Consumer)}.
- *
- *
This interface currently has no methods but may be extended in the future with functionality
- * such as canceling the observable.
*/
-public interface ObservableLongCounter {}
+public interface ObservableLongCounter extends AutoCloseable {
+
+ /**
+ * Remove the callback registered via {@link LongCounterBuilder#buildWithCallback(Consumer)}.
+ * After this is called, the callback won't be invoked on future collections. Subsequent calls to
+ * {@link #close()} have no effect.
+ *
+ *
Note: other callbacks registered to the metric with the same identity are unaffected.
+ */
+ @Override
+ default void close() {}
+}
diff --git a/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableLongGauge.java b/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableLongGauge.java
index e551f0cef8a..19ca351c850 100644
--- a/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableLongGauge.java
+++ b/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableLongGauge.java
@@ -10,8 +10,16 @@
/**
* A reference to an observable metric registered with {@link
* LongGaugeBuilder#buildWithCallback(Consumer)}.
- *
- *
This interface currently has no methods but may be extended in the future with functionality
- * such as canceling the observable.
*/
-public interface ObservableLongGauge {}
+public interface ObservableLongGauge extends AutoCloseable {
+
+ /**
+ * Remove the callback registered via {@link LongGaugeBuilder#buildWithCallback(Consumer)}. After
+ * this is called, the callback won't be invoked on future collections. Subsequent calls to {@link
+ * #close()} have no effect.
+ *
+ *
Note: other callbacks registered to the metric with the same identity are unaffected.
+ */
+ @Override
+ default void close() {}
+}
diff --git a/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableLongUpDownCounter.java b/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableLongUpDownCounter.java
index 6452c10b92c..65d8b167816 100644
--- a/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableLongUpDownCounter.java
+++ b/api/all/src/main/java/io/opentelemetry/api/metrics/ObservableLongUpDownCounter.java
@@ -10,8 +10,16 @@
/**
* A reference to an observable metric registered with {@link
* LongUpDownCounterBuilder#buildWithCallback(Consumer)}.
- *
- *
This interface currently has no methods but may be extended in the future with functionality
- * such as canceling the observable.
*/
-public interface ObservableLongUpDownCounter {}
+public interface ObservableLongUpDownCounter extends AutoCloseable {
+
+ /**
+ * Remove the callback registered via {@link
+ * LongUpDownCounterBuilder#buildWithCallback(Consumer)}. After this is called, the callback won't
+ * be invoked on future collections. Subsequent calls to {@link #close()} have no effect.
+ *
+ *
Note: other callbacks registered to the metric with the same identity are unaffected.
+ */
+ @Override
+ default void close() {}
+}
diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-api.txt b/docs/apidiffs/current_vs_latest/opentelemetry-api.txt
index 573eeb42ebe..ac862d7bd1a 100644
--- a/docs/apidiffs/current_vs_latest/opentelemetry-api.txt
+++ b/docs/apidiffs/current_vs_latest/opentelemetry-api.txt
@@ -2,3 +2,27 @@ Comparing source compatibility of against
***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.common.AttributesBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++! NEW METHOD: PUBLIC(+) io.opentelemetry.api.common.AttributesBuilder put(io.opentelemetry.api.common.AttributeKey, java.lang.Object[])
+***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableDoubleCounter (not serializable)
+ === CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+ +++ NEW INTERFACE: java.lang.AutoCloseable
+ +++! NEW METHOD: PUBLIC(+) void close()
+***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableDoubleGauge (not serializable)
+ === CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+ +++ NEW INTERFACE: java.lang.AutoCloseable
+ +++! NEW METHOD: PUBLIC(+) void close()
+***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableDoubleUpDownCounter (not serializable)
+ === CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+ +++ NEW INTERFACE: java.lang.AutoCloseable
+ +++! NEW METHOD: PUBLIC(+) void close()
+***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableLongCounter (not serializable)
+ === CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+ +++ NEW INTERFACE: java.lang.AutoCloseable
+ +++! NEW METHOD: PUBLIC(+) void close()
+***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableLongGauge (not serializable)
+ === CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+ +++ NEW INTERFACE: java.lang.AutoCloseable
+ +++! NEW METHOD: PUBLIC(+) void close()
+***! MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.api.metrics.ObservableLongUpDownCounter (not serializable)
+ === CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+ +++ NEW INTERFACE: java.lang.AutoCloseable
+ +++! NEW METHOD: PUBLIC(+) void close()
diff --git a/sdk/metrics/src/debugEnabledTest/java/io/opentelemetry/testing/TestSourceInfo.java b/sdk/metrics/src/debugEnabledTest/java/io/opentelemetry/testing/TestSourceInfo.java
index 37707d1f96f..c6500ff3a7d 100644
--- a/sdk/metrics/src/debugEnabledTest/java/io/opentelemetry/testing/TestSourceInfo.java
+++ b/sdk/metrics/src/debugEnabledTest/java/io/opentelemetry/testing/TestSourceInfo.java
@@ -51,7 +51,6 @@ void testDuplicateExceptionMessage_pureInstruments() {
.contains("- Description [description2] does not match [description]")
.contains("- InstrumentType [COUNTER] does not match [OBSERVABLE_COUNTER]")
.contains("- InstrumentValueType [LONG] does not match [DOUBLE]")
- .contains("- InstrumentType [OBSERVABLE_COUNTER] is async and already registered")
.contains(simple.getSourceInstrument().getSourceInfo().multiLineDebugString())
.contains("Original instrument registered with same name but is incompatible.")
.contains(
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/AbstractInstrumentBuilder.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/AbstractInstrumentBuilder.java
index f634548d87f..6865ee07a4b 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/AbstractInstrumentBuilder.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/AbstractInstrumentBuilder.java
@@ -10,9 +10,11 @@
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
+import io.opentelemetry.sdk.metrics.internal.state.AsynchronousMetricStorage;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
import io.opentelemetry.sdk.metrics.internal.state.MeterSharedState;
import io.opentelemetry.sdk.metrics.internal.state.WriteableMetricStorage;
+import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@@ -21,10 +23,11 @@ abstract class AbstractInstrumentBuilder I buildSynchronousInstrument(
return instrumentFactory.apply(descriptor, storage);
}
- final void registerDoubleAsynchronousInstrument(
- InstrumentType type, Consumer updater) {
+ final List>
+ registerDoubleAsynchronousInstrument(
+ InstrumentType type, Consumer updater) {
InstrumentDescriptor descriptor = makeDescriptor(type, InstrumentValueType.DOUBLE);
- meterSharedState.registerDoubleAsynchronousInstrument(
+ return meterSharedState.registerDoubleAsynchronousInstrument(
descriptor, meterProviderSharedState, updater);
}
- final void registerLongAsynchronousInstrument(
- InstrumentType type, Consumer updater) {
+ final List>
+ registerLongAsynchronousInstrument(
+ InstrumentType type, Consumer updater) {
InstrumentDescriptor descriptor = makeDescriptor(type, InstrumentValueType.LONG);
- meterSharedState.registerLongAsynchronousInstrument(
+ return meterSharedState.registerLongAsynchronousInstrument(
descriptor, meterProviderSharedState, updater);
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleCounter.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleCounter.java
index f30fbdfaf4a..ab03c5b6bd1 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleCounter.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleCounter.java
@@ -102,8 +102,6 @@ public void unbind() {
static final class Builder extends AbstractInstrumentBuilder
implements DoubleCounterBuilder {
- private static final ObservableDoubleCounter NOOP = new ObservableDoubleCounter() {};
-
Builder(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState sharedState,
@@ -127,8 +125,10 @@ public SdkDoubleCounter build() {
@Override
public ObservableDoubleCounter buildWithCallback(
Consumer callback) {
- registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_COUNTER, callback);
- return NOOP;
+ return new SdkObservableInstrument<>(
+ instrumentName,
+ registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_COUNTER, callback),
+ callback);
}
}
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeBuilder.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeBuilder.java
index b47c0fa54bd..ec8f0e74a09 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeBuilder.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeBuilder.java
@@ -17,8 +17,6 @@
final class SdkDoubleGaugeBuilder extends AbstractInstrumentBuilder
implements DoubleGaugeBuilder {
- private static final ObservableDoubleGauge NOOP = new ObservableDoubleGauge() {};
-
SdkDoubleGaugeBuilder(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState meterSharedState,
@@ -47,7 +45,9 @@ public LongGaugeBuilder ofLongs() {
@Override
public ObservableDoubleGauge buildWithCallback(Consumer callback) {
- registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_GAUGE, callback);
- return NOOP;
+ return new SdkObservableInstrument<>(
+ instrumentName,
+ registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_GAUGE, callback),
+ callback);
}
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounter.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounter.java
index 1c23ac8c1e2..d8aafe1f4e2 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounter.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkDoubleUpDownCounter.java
@@ -22,7 +22,6 @@
import java.util.function.Consumer;
final class SdkDoubleUpDownCounter extends AbstractInstrument implements DoubleUpDownCounter {
- private static final ObservableDoubleUpDownCounter NOOP = new ObservableDoubleUpDownCounter() {};
private final WriteableMetricStorage storage;
@@ -101,8 +100,10 @@ public DoubleUpDownCounter build() {
@Override
public ObservableDoubleUpDownCounter buildWithCallback(
Consumer callback) {
- registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, callback);
- return NOOP;
+ return new SdkObservableInstrument<>(
+ instrumentName,
+ registerDoubleAsynchronousInstrument(InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, callback),
+ callback);
}
}
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongCounter.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongCounter.java
index fe83bf509bd..2008a1cd9a8 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongCounter.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongCounter.java
@@ -26,7 +26,6 @@
import java.util.logging.Logger;
final class SdkLongCounter extends AbstractInstrument implements LongCounter {
- private static final ObservableLongCounter NOOP = new ObservableLongCounter() {};
private static final Logger logger = Logger.getLogger(SdkLongCounter.class.getName());
@@ -139,8 +138,10 @@ public DoubleCounterBuilder ofDoubles() {
@Override
public ObservableLongCounter buildWithCallback(Consumer callback) {
- registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_COUNTER, callback);
- return NOOP;
+ return new SdkObservableInstrument<>(
+ instrumentName,
+ registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_COUNTER, callback),
+ callback);
}
}
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongGaugeBuilder.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongGaugeBuilder.java
index 05888eecf04..bd2d7143b3f 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongGaugeBuilder.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongGaugeBuilder.java
@@ -16,8 +16,6 @@
final class SdkLongGaugeBuilder extends AbstractInstrumentBuilder
implements LongGaugeBuilder {
- private static final ObservableLongGauge NOOP = new ObservableLongGauge() {};
-
SdkLongGaugeBuilder(
MeterProviderSharedState meterProviderSharedState,
MeterSharedState sharedState,
@@ -34,7 +32,9 @@ protected SdkLongGaugeBuilder getThis() {
@Override
public ObservableLongGauge buildWithCallback(Consumer callback) {
- registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_GAUGE, callback);
- return NOOP;
+ return new SdkObservableInstrument<>(
+ instrumentName,
+ registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_GAUGE, callback),
+ callback);
}
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounter.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounter.java
index dc7643055a8..c076f4f17fc 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounter.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkLongUpDownCounter.java
@@ -23,7 +23,6 @@
import java.util.function.Consumer;
final class SdkLongUpDownCounter extends AbstractInstrument implements LongUpDownCounter {
- private static final ObservableLongUpDownCounter NOOP = new ObservableLongUpDownCounter() {};
private final WriteableMetricStorage storage;
@@ -114,8 +113,10 @@ public DoubleUpDownCounterBuilder ofDoubles() {
@Override
public ObservableLongUpDownCounter buildWithCallback(
Consumer callback) {
- registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, callback);
- return NOOP;
+ return new SdkObservableInstrument<>(
+ instrumentName,
+ registerLongAsynchronousInstrument(InstrumentType.OBSERVABLE_UP_DOWN_COUNTER, callback),
+ callback);
}
}
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkObservableInstrument.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkObservableInstrument.java
new file mode 100644
index 00000000000..a5f4f523893
--- /dev/null
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkObservableInstrument.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.sdk.metrics;
+
+import io.opentelemetry.api.metrics.ObservableDoubleCounter;
+import io.opentelemetry.api.metrics.ObservableDoubleGauge;
+import io.opentelemetry.api.metrics.ObservableDoubleUpDownCounter;
+import io.opentelemetry.api.metrics.ObservableLongCounter;
+import io.opentelemetry.api.metrics.ObservableLongGauge;
+import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
+import io.opentelemetry.sdk.internal.ThrottlingLogger;
+import io.opentelemetry.sdk.metrics.internal.state.AsynchronousMetricStorage;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+class SdkObservableInstrument
+ implements ObservableDoubleCounter,
+ ObservableLongCounter,
+ ObservableDoubleGauge,
+ ObservableLongGauge,
+ ObservableDoubleUpDownCounter,
+ ObservableLongUpDownCounter {
+
+ private static final Logger logger = Logger.getLogger(SdkObservableInstrument.class.getName());
+
+ private final ThrottlingLogger throttlingLogger = new ThrottlingLogger(logger);
+ private final String instrumentName;
+ private final List> storages;
+ private final Consumer callback;
+ private final AtomicBoolean removed = new AtomicBoolean(false);
+
+ SdkObservableInstrument(
+ String instrumentName, List> storages, Consumer callback) {
+ this.instrumentName = instrumentName;
+ this.storages = storages;
+ this.callback = callback;
+ }
+
+ @Override
+ public void close() {
+ if (!removed.compareAndSet(false, true)) {
+ throttlingLogger.log(
+ Level.WARNING, "Instrument " + instrumentName + " has called close() multiple times.");
+ return;
+ }
+ storages.forEach(storage -> storage.removeCallback(callback));
+ }
+}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/descriptor/MetricDescriptor.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/descriptor/MetricDescriptor.java
index ff648d84fff..b99eb03ea74 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/descriptor/MetricDescriptor.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/descriptor/MetricDescriptor.java
@@ -86,20 +86,4 @@ public boolean isCompatibleWith(MetricDescriptor other) {
&& Objects.equals(
getSourceInstrument().getValueType(), other.getSourceInstrument().getValueType());
}
-
- /** Returns whether the descriptor describes an async {@link InstrumentType}. */
- public boolean isAsync() {
- switch (getSourceInstrument().getType()) {
- case OBSERVABLE_UP_DOWN_COUNTER:
- case OBSERVABLE_GAUGE:
- case OBSERVABLE_COUNTER:
- return true;
- case HISTOGRAM:
- case COUNTER:
- case UP_DOWN_COUNTER:
- return false;
- }
- throw new IllegalStateException(
- "Unrecognized instrument type " + getSourceInstrument().getType());
- }
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java
index b58ee534f60..9bdd2daf113 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java
@@ -26,7 +26,9 @@
import io.opentelemetry.sdk.metrics.view.View;
import io.opentelemetry.sdk.resources.Resource;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.logging.Level;
@@ -38,104 +40,61 @@
* This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
-public final class AsynchronousMetricStorage implements MetricStorage {
+public class AsynchronousMetricStorage implements MetricStorage {
private static final Logger logger = Logger.getLogger(AsynchronousMetricStorage.class.getName());
private final ThrottlingLogger throttlingLogger = new ThrottlingLogger(logger);
- private final MetricDescriptor metricDescriptor;
private final ReentrantLock collectLock = new ReentrantLock();
- private final AsyncAccumulator asyncAccumulator;
+ private final List> callbacks = new CopyOnWriteArrayList<>();
+ private final MetricDescriptor metricDescriptor;
private final TemporalMetricStorage storage;
- private final Runnable metricUpdater;
+ private final AsyncAccumulator accumulator;
+ private final O measurement;
- /** Constructs asynchronous metric storage which stores nothing. */
- public static MetricStorage empty() {
- return EmptyMetricStorage.INSTANCE;
+ private AsynchronousMetricStorage(
+ MetricDescriptor metricDescriptor,
+ Aggregator aggregator,
+ AsyncAccumulator accumulator,
+ O measurement) {
+ this.metricDescriptor = metricDescriptor;
+ this.storage = new TemporalMetricStorage<>(aggregator, /* isSynchronous= */ false);
+ this.accumulator = accumulator;
+ this.measurement = measurement;
}
- /** Constructs storage for {@code double} valued instruments. */
- public static MetricStorage doubleAsynchronousAccumulator(
- View view,
- InstrumentDescriptor instrument,
- Consumer metricUpdater) {
+ /** Create an asynchronous storage instance for double measurements. */
+ public static
+ AsynchronousMetricStorage, ObservableDoubleMeasurement> createDoubleAsyncStorage(
+ View view, InstrumentDescriptor instrument) {
MetricDescriptor metricDescriptor = MetricDescriptor.create(view, instrument);
+ // TODO: optimize when aggregator is Aggregator.drop()
Aggregator aggregator =
((AggregatorFactory) view.getAggregation())
.createAggregator(instrument, ExemplarFilter.neverSample());
-
- AsyncAccumulator measurementAccumulator = new AsyncAccumulator<>(instrument);
- if (Aggregator.drop() == aggregator) {
- return empty();
- }
- AttributesProcessor attributesProcessor = view.getAttributesProcessor();
- // TODO: Find a way to grab the measurement JUST ONCE for all async metrics.
- ObservableDoubleMeasurement result =
- new ObservableDoubleMeasurement() {
- @Override
- public void record(double value, Attributes attributes) {
- T accumulation =
- aggregator.accumulateDoubleMeasurement(value, attributes, Context.current());
- if (accumulation != null) {
- measurementAccumulator.record(
- attributesProcessor.process(attributes, Context.current()), accumulation);
- }
- }
-
- @Override
- public void record(double value) {
- record(value, Attributes.empty());
- }
- };
- return new AsynchronousMetricStorage<>(
- metricDescriptor, aggregator, measurementAccumulator, () -> metricUpdater.accept(result));
+ AsyncAccumulator accumulator = new AsyncAccumulator<>(instrument);
+ ObservableDoubleMeasurement measurement =
+ new ObservableDoubleMeasurementImpl<>(
+ aggregator, accumulator, view.getAttributesProcessor());
+ return new AsynchronousMetricStorage<>(metricDescriptor, aggregator, accumulator, measurement);
}
- /** Constructs storage for {@code long} valued instruments. */
- public static MetricStorage longAsynchronousAccumulator(
- View view,
- InstrumentDescriptor instrument,
- Consumer metricUpdater) {
+ /** Create an asynchronous storage instance for long measurements. */
+ public static AsynchronousMetricStorage, ObservableLongMeasurement> createLongAsyncStorage(
+ View view, InstrumentDescriptor instrument) {
MetricDescriptor metricDescriptor = MetricDescriptor.create(view, instrument);
+ // TODO: optimize when aggregator is Aggregator.drop()
Aggregator aggregator =
((AggregatorFactory) view.getAggregation())
.createAggregator(instrument, ExemplarFilter.neverSample());
- AsyncAccumulator measurementAccumulator = new AsyncAccumulator<>(instrument);
- if (Aggregator.drop() == aggregator) {
- return empty();
- }
- AttributesProcessor attributesProcessor = view.getAttributesProcessor();
- // TODO: Find a way to grab the measurement JUST ONCE for all async metrics.
- ObservableLongMeasurement result =
- new ObservableLongMeasurement() {
-
- @Override
- public void record(long value, Attributes attributes) {
- T accumulation =
- aggregator.accumulateLongMeasurement(value, attributes, Context.current());
- if (accumulation != null) {
- measurementAccumulator.record(
- attributesProcessor.process(attributes, Context.current()), accumulation);
- }
- }
-
- @Override
- public void record(long value) {
- record(value, Attributes.empty());
- }
- };
- return new AsynchronousMetricStorage<>(
- metricDescriptor, aggregator, measurementAccumulator, () -> metricUpdater.accept(result));
+ AsyncAccumulator accumulator = new AsyncAccumulator<>(instrument);
+ ObservableLongMeasurement measurement =
+ new ObservableLongMeasurementImpl<>(aggregator, accumulator, view.getAttributesProcessor());
+ return new AsynchronousMetricStorage<>(metricDescriptor, aggregator, accumulator, measurement);
}
- private AsynchronousMetricStorage(
- MetricDescriptor metricDescriptor,
- Aggregator aggregator,
- AsyncAccumulator asyncAccumulator,
- Runnable metricUpdater) {
- this.metricDescriptor = metricDescriptor;
- this.asyncAccumulator = asyncAccumulator;
- this.metricUpdater = metricUpdater;
- this.storage = new TemporalMetricStorage<>(aggregator, /* isSynchronous= */ false);
+ @Override
+ public MetricDescriptor getMetricDescriptor() {
+ return metricDescriptor;
}
@Override
@@ -151,7 +110,14 @@ public MetricData collectAndReset(
collectLock.lock();
try {
try {
- metricUpdater.run();
+ boolean empty = true;
+ for (Consumer callback : callbacks) {
+ empty = false;
+ callback.accept(measurement);
+ }
+ if (empty) {
+ return EmptyMetricData.getInstance();
+ }
} catch (Throwable e) {
propagateIfFatal(e);
throttlingLogger.log(
@@ -168,7 +134,7 @@ public MetricData collectAndReset(
instrumentationLibraryInfo,
getMetricDescriptor(),
temporality,
- asyncAccumulator.collectAndReset(),
+ accumulator.collectAndReset(),
startEpochNanos,
epochNanos);
} finally {
@@ -176,9 +142,17 @@ public MetricData collectAndReset(
}
}
- @Override
- public MetricDescriptor getMetricDescriptor() {
- return metricDescriptor;
+ /** Add a callback to the storage. */
+ public void addCallback(Consumer callback) {
+ this.callbacks.add(callback);
+ }
+
+ /**
+ * Remove the callback from the storage. Called when {@link AutoCloseable#close()} is invoked on
+ * observable instruments.
+ */
+ public void removeCallback(Consumer callback) {
+ this.callbacks.remove(callback);
}
/** Helper class to record async measurements on demand. */
@@ -192,7 +166,7 @@ static final class AsyncAccumulator {
this.instrument = instrument;
}
- public void record(Attributes attributes, T accumulation) {
+ void record(Attributes attributes, T accumulation) {
// Check we're under the max allowed accumulations
if (currentAccumulation.size() >= MetricStorageUtils.MAX_ACCUMULATIONS) {
throttlingLogger.log(
@@ -218,10 +192,70 @@ public void record(Attributes attributes, T accumulation) {
currentAccumulation.put(attributes, accumulation);
}
- public Map collectAndReset() {
+ Map collectAndReset() {
Map result = currentAccumulation;
currentAccumulation = new HashMap<>();
return result;
}
}
+
+ private static class ObservableLongMeasurementImpl implements ObservableLongMeasurement {
+
+ private final Aggregator aggregator;
+ private final AsyncAccumulator asyncAccumulator;
+ private final AttributesProcessor attributesProcessor;
+
+ private ObservableLongMeasurementImpl(
+ Aggregator aggregator,
+ AsyncAccumulator asyncAccumulator,
+ AttributesProcessor attributesProcessor) {
+ this.aggregator = aggregator;
+ this.asyncAccumulator = asyncAccumulator;
+ this.attributesProcessor = attributesProcessor;
+ }
+
+ @Override
+ public void record(long value) {
+ record(value, Attributes.empty());
+ }
+
+ @Override
+ public void record(long value, Attributes attributes) {
+ T accumulation = aggregator.accumulateLongMeasurement(value, attributes, Context.current());
+ if (accumulation != null) {
+ asyncAccumulator.record(
+ attributesProcessor.process(attributes, Context.current()), accumulation);
+ }
+ }
+ }
+
+ private static class ObservableDoubleMeasurementImpl implements ObservableDoubleMeasurement {
+
+ private final Aggregator aggregator;
+ private final AsyncAccumulator asyncAccumulator;
+ private final AttributesProcessor attributesProcessor;
+
+ private ObservableDoubleMeasurementImpl(
+ Aggregator aggregator,
+ AsyncAccumulator asyncAccumulator,
+ AttributesProcessor attributesProcessor) {
+ this.aggregator = aggregator;
+ this.asyncAccumulator = asyncAccumulator;
+ this.attributesProcessor = attributesProcessor;
+ }
+
+ @Override
+ public void record(double value) {
+ record(value, Attributes.empty());
+ }
+
+ @Override
+ public void record(double value, Attributes attributes) {
+ T accumulation = aggregator.accumulateDoubleMeasurement(value, attributes, Context.current());
+ if (accumulation != null) {
+ asyncAccumulator.record(
+ attributesProcessor.process(attributes, Context.current()), accumulation);
+ }
+ }
+ }
}
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DebugUtils.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DebugUtils.java
index bfafa6bbc19..4d548d6d3b8 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DebugUtils.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/DebugUtils.java
@@ -96,12 +96,6 @@ public static String duplicateMetricErrorMessage(
.append(existing.getSourceInstrument().getValueType())
.append("]\n");
}
- if (existing.isAsync()) {
- result
- .append("- InstrumentType [")
- .append(existing.getSourceInstrument().getType())
- .append("] is async and already registered\n");
- }
// Next we write out where the existing metric descriptor came from, either a raw instrument
// or a view on a raw instrument.
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MeterSharedState.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MeterSharedState.java
index 4fc7723e9f7..ddbb1069aea 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MeterSharedState.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MeterSharedState.java
@@ -5,6 +5,8 @@
package io.opentelemetry.sdk.metrics.internal.state;
+import static java.util.stream.Collectors.toList;
+
import com.google.auto.value.AutoValue;
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
@@ -19,7 +21,6 @@
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
-import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
@@ -88,7 +89,7 @@ public final WriteableMetricStorage registerSynchronousMetricStorage(
.filter(m -> !m.isEmpty())
.map(this::register)
.filter(Objects::nonNull)
- .collect(Collectors.toList());
+ .collect(toList());
if (storage.size() == 1) {
return storage.get(0);
@@ -98,39 +99,60 @@ public final WriteableMetricStorage registerSynchronousMetricStorage(
}
/** Registers new asynchronous storage associated with a given {@code long} instrument. */
- public final void registerLongAsynchronousInstrument(
- InstrumentDescriptor instrument,
- MeterProviderSharedState meterProviderSharedState,
- Consumer metricUpdater) {
-
- meterProviderSharedState
- .getViewRegistry()
- .findViews(instrument, getInstrumentationLibraryInfo())
- .stream()
- .map(
- view ->
- AsynchronousMetricStorage.longAsynchronousAccumulator(
- view, instrument, metricUpdater))
- .filter(m -> !m.isEmpty())
- .forEach(this::register);
+ public final List>
+ registerLongAsynchronousInstrument(
+ InstrumentDescriptor instrument,
+ MeterProviderSharedState meterProviderSharedState,
+ Consumer callback) {
+
+ List> storages =
+ meterProviderSharedState
+ .getViewRegistry()
+ .findViews(instrument, getInstrumentationLibraryInfo())
+ .stream()
+ .map(view -> AsynchronousMetricStorage.createLongAsyncStorage(view, instrument))
+ .filter(storage -> !storage.isEmpty())
+ .collect(toList());
+
+ List> registeredStorages =
+ new ArrayList<>();
+ for (AsynchronousMetricStorage, ObservableLongMeasurement> storage : storages) {
+ AsynchronousMetricStorage, ObservableLongMeasurement> registeredStorage = register(storage);
+ if (registeredStorage != null) {
+ registeredStorage.addCallback(callback);
+ registeredStorages.add(registeredStorage);
+ }
+ }
+ return registeredStorages;
}
/** Registers new asynchronous storage associated with a given {@code double} instrument. */
- public final void registerDoubleAsynchronousInstrument(
- InstrumentDescriptor instrument,
- MeterProviderSharedState meterProviderSharedState,
- Consumer metricUpdater) {
-
- meterProviderSharedState
- .getViewRegistry()
- .findViews(instrument, getInstrumentationLibraryInfo())
- .stream()
- .map(
- view ->
- AsynchronousMetricStorage.doubleAsynchronousAccumulator(
- view, instrument, metricUpdater))
- .filter(m -> !m.isEmpty())
- .forEach(this::register);
+ public final List>
+ registerDoubleAsynchronousInstrument(
+ InstrumentDescriptor instrument,
+ MeterProviderSharedState meterProviderSharedState,
+ Consumer callback) {
+
+ List> storages =
+ meterProviderSharedState
+ .getViewRegistry()
+ .findViews(instrument, getInstrumentationLibraryInfo())
+ .stream()
+ .map(view -> AsynchronousMetricStorage.createDoubleAsyncStorage(view, instrument))
+ .filter(storage -> !storage.isEmpty())
+ .collect(toList());
+
+ List> registeredStorages =
+ new ArrayList<>();
+ for (AsynchronousMetricStorage, ObservableDoubleMeasurement> storage : storages) {
+ AsynchronousMetricStorage, ObservableDoubleMeasurement> registeredStorage =
+ register(storage);
+ if (registeredStorage != null) {
+ registeredStorage.addCallback(callback);
+ registeredStorages.add(registeredStorage);
+ }
+ }
+ return registeredStorages;
}
@Nullable
diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistry.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistry.java
index 9041cdfe556..0f70a66b2c2 100644
--- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistry.java
+++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistry.java
@@ -69,13 +69,6 @@ public I register(I storage) {
descriptor,
"Metric with same name and different descriptor already created.");
}
- // Descriptors are compatible, but can't register async instruments multiple times.
- if (descriptor.isAsync()) {
- throw new DuplicateMetricStorageException(
- oldOrNewStorage.getMetricDescriptor(),
- descriptor,
- "Async metric with same name has already been created.");
- }
// Metric already existed, and is compatible with new storage.
return (I) oldOrNewStorage;
}
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeBuilderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeBuilderTest.java
index 9772467d3d1..2072611cfa1 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeBuilderTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkDoubleGaugeBuilderTest.java
@@ -10,6 +10,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableDoubleGauge;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
@@ -17,7 +18,7 @@
import java.time.Duration;
import org.junit.jupiter.api.Test;
-/** Unit tests for {@link DoubleValueObserverSdk}. */
+/** Unit tests for SDK {@link ObservableDoubleGauge}. */
class SdkDoubleGaugeBuilderTest {
private static final Resource RESOURCE =
Resource.create(Attributes.of(stringKey("resource_key"), "resource_value"));
@@ -33,6 +34,20 @@ class SdkDoubleGaugeBuilderTest {
.build();
private final Meter sdkMeter = sdkMeterProvider.get(getClass().getName());
+ @Test
+ void removeCallback() {
+ ObservableDoubleGauge gauge =
+ sdkMeter.gaugeBuilder("testGauge").buildWithCallback(measurement -> measurement.record(10));
+
+ assertThat(sdkMeterReader.collectAllMetrics())
+ .satisfiesExactly(
+ metric -> assertThat(metric).hasName("testGauge").hasDoubleGauge().points().hasSize(1));
+
+ gauge.close();
+
+ assertThat(sdkMeterReader.collectAllMetrics()).hasSize(0);
+ }
+
@Test
void collectMetrics_NoRecords() {
sdkMeter
@@ -44,7 +59,6 @@ void collectMetrics_NoRecords() {
}
@Test
- @SuppressWarnings("unchecked")
void collectMetrics_WithOneRecord() {
sdkMeter
.gaugeBuilder("testObserver")
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongGaugeBuilderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongGaugeBuilderTest.java
index 1fe9623f3f7..39b0acb7b15 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongGaugeBuilderTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkLongGaugeBuilderTest.java
@@ -10,6 +10,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongGauge;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
@@ -17,7 +18,7 @@
import java.time.Duration;
import org.junit.jupiter.api.Test;
-/** Unit tests for {@link LongValueObserverSdk}. */
+/** Unit tests for SDK {@link ObservableLongGauge}. */
class SdkLongGaugeBuilderTest {
private static final Resource RESOURCE =
Resource.create(Attributes.of(stringKey("resource_key"), "resource_value"));
@@ -33,6 +34,23 @@ class SdkLongGaugeBuilderTest {
.build();
private final Meter sdkMeter = sdkMeterProvider.get(getClass().getName());
+ @Test
+ void removeCallback() {
+ ObservableLongGauge gauge =
+ sdkMeter
+ .gaugeBuilder("testGauge")
+ .ofLongs()
+ .buildWithCallback(measurement -> measurement.record(10));
+
+ assertThat(sdkMeterReader.collectAllMetrics())
+ .satisfiesExactly(
+ metric -> assertThat(metric).hasName("testGauge").hasLongGauge().points().hasSize(1));
+
+ gauge.close();
+
+ assertThat(sdkMeterReader.collectAllMetrics()).hasSize(0);
+ }
+
@Test
void collectMetrics_NoRecords() {
sdkMeter
@@ -45,7 +63,6 @@ void collectMetrics_NoRecords() {
}
@Test
- @SuppressWarnings("unchecked")
void collectMetrics_WithOneRecord() {
sdkMeter
.gaugeBuilder("testObserver")
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterProviderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterProviderTest.java
index b6b1232e4bf..156ec712b86 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterProviderTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkMeterProviderTest.java
@@ -6,6 +6,7 @@
package io.opentelemetry.sdk.metrics;
import static io.opentelemetry.sdk.testing.assertj.MetricAssertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
import io.opentelemetry.api.baggage.Baggage;
@@ -19,11 +20,13 @@
import io.opentelemetry.api.metrics.LongUpDownCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
+import io.opentelemetry.api.metrics.ObservableLongCounter;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricReader;
import io.opentelemetry.sdk.metrics.view.Aggregation;
@@ -35,6 +38,7 @@
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
@@ -453,6 +457,64 @@ void collectAllAsyncInstruments() {
.hasValue(10.1)));
}
+ @Test
+ void removeAsyncInstrument() {
+ InMemoryMetricReader reader = InMemoryMetricReader.create();
+ Meter meter =
+ sdkMeterProviderBuilder.registerMetricReader(reader).build().get(getClass().getName());
+
+ ObservableLongCounter observableCounter1 =
+ meter
+ .counterBuilder("foo")
+ .buildWithCallback(
+ measurement ->
+ measurement.record(10, Attributes.builder().put("callback", "one").build()));
+ ObservableLongCounter observableCounter2 =
+ meter
+ .counterBuilder("foo")
+ .buildWithCallback(
+ measurement ->
+ measurement.record(10, Attributes.builder().put("callback", "two").build()));
+
+ assertThat(reader.collectAllMetrics())
+ .hasSize(1)
+ .satisfiesExactly(
+ metricData ->
+ assertThat(metricData)
+ .hasLongSum()
+ .points()
+ .hasSize(2)
+ .satisfiesExactlyInAnyOrder(
+ pointData ->
+ assertThat(pointData)
+ .hasAttributes(Attributes.builder().put("callback", "one").build()),
+ (Consumer)
+ longPointData ->
+ assertThat(longPointData)
+ .hasAttributes(
+ Attributes.builder().put("callback", "two").build())));
+
+ observableCounter1.close();
+
+ assertThat(reader.collectAllMetrics())
+ .hasSize(1)
+ .satisfiesExactly(
+ metricData ->
+ assertThat(metricData)
+ .hasLongSum()
+ .points()
+ .hasSize(1)
+ .satisfiesExactlyInAnyOrder(
+ (Consumer)
+ longPointData ->
+ assertThat(longPointData)
+ .hasAttributes(
+ Attributes.builder().put("callback", "two").build())));
+
+ observableCounter2.close();
+ assertThat(reader.collectAllMetrics()).hasSize(0);
+ }
+
@Test
void viewSdk_AllowRenames() {
InMemoryMetricReader reader = InMemoryMetricReader.create();
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkObservableDoubleCounterTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkObservableDoubleCounterTest.java
index 9f575258263..d7f6353db02 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkObservableDoubleCounterTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkObservableDoubleCounterTest.java
@@ -32,6 +32,27 @@ class SdkObservableDoubleCounterTest {
private final SdkMeterProviderBuilder sdkMeterProviderBuilder =
SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE);
+ @Test
+ void removeCallback() {
+ InMemoryMetricReader sdkMeterReader = InMemoryMetricReader.create();
+ ObservableDoubleCounter counter =
+ sdkMeterProviderBuilder
+ .registerMetricReader(sdkMeterReader)
+ .build()
+ .get(getClass().getName())
+ .counterBuilder("testCounter")
+ .ofDoubles()
+ .buildWithCallback(measurement -> measurement.record(10));
+
+ assertThat(sdkMeterReader.collectAllMetrics())
+ .satisfiesExactly(
+ metric -> assertThat(metric).hasName("testCounter").hasDoubleSum().points().hasSize(1));
+
+ counter.close();
+
+ assertThat(sdkMeterReader.collectAllMetrics()).hasSize(0);
+ }
+
@Test
void collectMetrics_NoRecords() {
InMemoryMetricReader sdkMeterReader = InMemoryMetricReader.create();
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkObservableDoubleUpDownCounterTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkObservableDoubleUpDownCounterTest.java
index a9256ad1858..a6dd83a5983 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkObservableDoubleUpDownCounterTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkObservableDoubleUpDownCounterTest.java
@@ -7,6 +7,7 @@
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.sdk.testing.assertj.MetricAssertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.ObservableDoubleUpDownCounter;
@@ -32,6 +33,27 @@ class SdkObservableDoubleUpDownCounterTest {
private final SdkMeterProviderBuilder sdkMeterProviderBuilder =
SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE);
+ @Test
+ void removeCallback() {
+ InMemoryMetricReader sdkMeterReader = InMemoryMetricReader.create();
+ ObservableDoubleUpDownCounter counter =
+ sdkMeterProviderBuilder
+ .registerMetricReader(sdkMeterReader)
+ .build()
+ .get(getClass().getName())
+ .upDownCounterBuilder("testCounter")
+ .ofDoubles()
+ .buildWithCallback(measurement -> measurement.record(10));
+
+ assertThat(sdkMeterReader.collectAllMetrics())
+ .satisfiesExactly(
+ metric -> assertThat(metric).hasName("testCounter").hasDoubleSum().points().hasSize(1));
+
+ counter.close();
+
+ assertThat(sdkMeterReader.collectAllMetrics()).hasSize(0);
+ }
+
@Test
void collectMetrics_NoRecords() {
InMemoryMetricReader sdkMeterReader = InMemoryMetricReader.create();
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkObservableInstrumentTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkObservableInstrumentTest.java
new file mode 100644
index 00000000000..d8bf7cbb3ef
--- /dev/null
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkObservableInstrumentTest.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.sdk.metrics;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import io.github.netmikey.logunit.api.LogCapturer;
+import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
+import io.opentelemetry.sdk.metrics.internal.state.AsynchronousMetricStorage;
+import java.util.Arrays;
+import java.util.function.Consumer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.mockito.Mockito;
+
+class SdkObservableInstrumentTest {
+
+ @RegisterExtension
+ LogCapturer logs = LogCapturer.create().captureForType(SdkObservableInstrument.class);
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void close() {
+ AsynchronousMetricStorage, ObservableDoubleMeasurement> storage1 =
+ mock(AsynchronousMetricStorage.class);
+ AsynchronousMetricStorage, ObservableDoubleMeasurement> storage2 =
+ mock(AsynchronousMetricStorage.class);
+
+ Consumer callback = unused -> {};
+ SdkObservableInstrument observableInstrument =
+ new SdkObservableInstrument<>("my-instrument", Arrays.asList(storage1, storage2), callback);
+
+ // First call to close should trigger remove from storage
+ observableInstrument.close();
+ verify(storage1).removeCallback(callback);
+ verify(storage2).removeCallback(callback);
+ logs.assertDoesNotContain("Instrument my-instrument has called close() multiple times.");
+
+ // Close a second time should not trigger remove from storage and should log a warning
+ Mockito.reset(storage1, storage2);
+ observableInstrument.close();
+ verify(storage1, never()).removeCallback(any());
+ verify(storage2, never()).removeCallback(any());
+ logs.assertContains("Instrument my-instrument has called close() multiple times.");
+ }
+}
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkObservableLongCounterTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkObservableLongCounterTest.java
index cc51f6af7cb..1b69c232d53 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkObservableLongCounterTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkObservableLongCounterTest.java
@@ -32,6 +32,26 @@ class SdkObservableLongCounterTest {
private final SdkMeterProviderBuilder sdkMeterProviderBuilder =
SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE);
+ @Test
+ void removeCallback() {
+ InMemoryMetricReader sdkMeterReader = InMemoryMetricReader.create();
+ ObservableLongCounter counter =
+ sdkMeterProviderBuilder
+ .registerMetricReader(sdkMeterReader)
+ .build()
+ .get(getClass().getName())
+ .counterBuilder("testCounter")
+ .buildWithCallback(measurement -> measurement.record(10));
+
+ assertThat(sdkMeterReader.collectAllMetrics())
+ .satisfiesExactly(
+ metric -> assertThat(metric).hasName("testCounter").hasLongSum().points().hasSize(1));
+
+ counter.close();
+
+ assertThat(sdkMeterReader.collectAllMetrics()).hasSize(0);
+ }
+
@Test
void collectMetrics_NoRecords() {
InMemoryMetricReader sdkMeterReader = InMemoryMetricReader.create();
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkObservableLongUpDownCounterTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkObservableLongUpDownCounterTest.java
index 9e3a3a55550..60c8af9bcdc 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkObservableLongUpDownCounterTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkObservableLongUpDownCounterTest.java
@@ -32,6 +32,26 @@ class SdkObservableLongUpDownCounterTest {
private final SdkMeterProviderBuilder sdkMeterProviderBuilder =
SdkMeterProvider.builder().setClock(testClock).setResource(RESOURCE);
+ @Test
+ void removeCallback() {
+ InMemoryMetricReader sdkMeterReader = InMemoryMetricReader.create();
+ ObservableLongUpDownCounter counter =
+ sdkMeterProviderBuilder
+ .registerMetricReader(sdkMeterReader)
+ .build()
+ .get(getClass().getName())
+ .upDownCounterBuilder("testCounter")
+ .buildWithCallback(measurement -> measurement.record(10));
+
+ assertThat(sdkMeterReader.collectAllMetrics())
+ .satisfiesExactly(
+ metric -> assertThat(metric).hasName("testCounter").hasLongSum().points().hasSize(1));
+
+ counter.close();
+
+ assertThat(sdkMeterReader.collectAllMetrics()).hasSize(0);
+ }
+
@Test
void collectMetrics_NoRecords() {
InMemoryMetricReader sdkMeterReader = InMemoryMetricReader.create();
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/descriptor/MetricDescriptorTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/descriptor/MetricDescriptorTest.java
index d587c129206..ebb78c86ded 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/descriptor/MetricDescriptorTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/descriptor/MetricDescriptorTest.java
@@ -123,22 +123,4 @@ void metricDescriptor_isCompatible() {
InstrumentValueType.LONG))))
.isFalse();
}
-
- @Test
- void isAsync() {
- assertThat(descriptorForInstrument(InstrumentType.OBSERVABLE_UP_DOWN_COUNTER).isAsync())
- .isTrue();
- assertThat(descriptorForInstrument(InstrumentType.OBSERVABLE_GAUGE).isAsync()).isTrue();
- assertThat(descriptorForInstrument(InstrumentType.OBSERVABLE_COUNTER).isAsync()).isTrue();
- assertThat(descriptorForInstrument(InstrumentType.HISTOGRAM).isAsync()).isFalse();
- assertThat(descriptorForInstrument(InstrumentType.COUNTER).isAsync()).isFalse();
- assertThat(descriptorForInstrument(InstrumentType.UP_DOWN_COUNTER).isAsync()).isFalse();
- }
-
- private static MetricDescriptor descriptorForInstrument(InstrumentType instrumentType) {
- InstrumentDescriptor instrument =
- InstrumentDescriptor.create(
- "name", "description", "unit", instrumentType, InstrumentValueType.DOUBLE);
- return MetricDescriptor.create(View.builder().build(), instrument);
- }
}
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java
index c2efec95043..3c4478e7e85 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java
@@ -6,9 +6,12 @@
package io.opentelemetry.sdk.metrics.internal.state;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.verify;
import io.github.netmikey.logunit.api.LogCapturer;
import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.context.Context;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
@@ -28,12 +31,13 @@
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.time.TestClock;
import java.util.Set;
+import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mock;
-import org.mockito.Mockito;
+import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
@@ -42,7 +46,6 @@ public class AsynchronousMetricStorageTest {
private MeterProviderSharedState meterProviderSharedState;
private final MeterSharedState meterSharedState =
MeterSharedState.create(InstrumentationLibraryInfo.empty());
- private AttributesProcessor spyAttributesProcessor;
private View view;
private CollectionHandle handle;
private Set all;
@@ -51,10 +54,10 @@ public class AsynchronousMetricStorageTest {
LogCapturer logs = LogCapturer.create().captureForType(AsynchronousMetricStorage.class);
@Mock private MetricReader reader;
+ @Spy private AttributesProcessor spyAttributesProcessor = AttributesProcessor.noop();
@BeforeEach
void setup() {
- spyAttributesProcessor = Mockito.spy(AttributesProcessor.noop());
view =
View.builder()
.setAggregation(Aggregation.lastValue())
@@ -80,62 +83,133 @@ void setup() {
@Test
void doubleAsynchronousAccumulator_AttributesProcessor_used() {
- AsynchronousMetricStorage.doubleAsynchronousAccumulator(
+ AsynchronousMetricStorage, ObservableDoubleMeasurement> doubleAsyncStorage =
+ AsynchronousMetricStorage.createDoubleAsyncStorage(
view,
InstrumentDescriptor.create(
"name",
"description",
"unit",
InstrumentType.OBSERVABLE_GAUGE,
- InstrumentValueType.DOUBLE),
- value -> value.record(1.0, Attributes.empty()))
- .collectAndReset(
- CollectionInfo.create(handle, all, reader),
- meterProviderSharedState.getResource(),
- meterSharedState.getInstrumentationLibraryInfo(),
- 0,
- testClock.now(),
- false);
- Mockito.verify(spyAttributesProcessor).process(Attributes.empty(), Context.current());
+ InstrumentValueType.DOUBLE));
+ doubleAsyncStorage.addCallback(value -> value.record(1.0, Attributes.empty()));
+ doubleAsyncStorage.collectAndReset(
+ CollectionInfo.create(handle, all, reader),
+ meterProviderSharedState.getResource(),
+ meterSharedState.getInstrumentationLibraryInfo(),
+ 0,
+ testClock.now(),
+ /* suppressSynchronousCollection= */ false);
+ verify(spyAttributesProcessor).process(Attributes.empty(), Context.current());
}
@Test
void longAsynchronousAccumulator_AttributesProcessor_used() {
- AsynchronousMetricStorage.longAsynchronousAccumulator(
+ AsynchronousMetricStorage, ObservableLongMeasurement> longAsyncStorage =
+ AsynchronousMetricStorage.createLongAsyncStorage(
view,
InstrumentDescriptor.create(
"name",
"description",
"unit",
InstrumentType.OBSERVABLE_GAUGE,
- InstrumentValueType.LONG),
- value -> value.record(1, Attributes.empty()))
- .collectAndReset(
- CollectionInfo.create(handle, all, reader),
- meterProviderSharedState.getResource(),
- meterSharedState.getInstrumentationLibraryInfo(),
- 0,
- testClock.nanoTime(),
- false);
- Mockito.verify(spyAttributesProcessor).process(Attributes.empty(), Context.current());
+ InstrumentValueType.LONG));
+ longAsyncStorage.addCallback(value -> value.record(1, Attributes.empty()));
+ longAsyncStorage.collectAndReset(
+ CollectionInfo.create(handle, all, reader),
+ meterProviderSharedState.getResource(),
+ meterSharedState.getInstrumentationLibraryInfo(),
+ 0,
+ testClock.nanoTime(),
+ /* suppressSynchronousCollection= */ false);
+ verify(spyAttributesProcessor).process(Attributes.empty(), Context.current());
+ }
+
+ @Test
+ void collectAndReset_CallsMultipleCallbacks() {
+ AsynchronousMetricStorage, ObservableLongMeasurement> metricStorage =
+ AsynchronousMetricStorage.createLongAsyncStorage(
+ view,
+ InstrumentDescriptor.create(
+ "my-instrument",
+ "description",
+ "unit",
+ InstrumentType.OBSERVABLE_GAUGE,
+ InstrumentValueType.LONG));
+ // Callbacks partially overlap for the metrics they record, should take first registered
+ Consumer callback1 =
+ measurement -> measurement.record(1, Attributes.builder().put("key", "a").build());
+ Consumer callback2 =
+ measurement -> {
+ measurement.record(3, Attributes.builder().put("key", "a").build());
+ measurement.record(3, Attributes.builder().put("key", "b").build());
+ };
+ metricStorage.addCallback(callback1);
+ metricStorage.addCallback(callback2);
+
+ assertThat(
+ metricStorage.collectAndReset(
+ CollectionInfo.create(handle, all, reader),
+ meterProviderSharedState.getResource(),
+ meterSharedState.getInstrumentationLibraryInfo(),
+ 0,
+ testClock.nanoTime(),
+ /* suppressSynchronousCollection= */ false))
+ .satisfies(
+ metricData ->
+ assertThat(metricData.getLongGaugeData().getPoints())
+ .satisfiesExactlyInAnyOrder(
+ dataPoint -> {
+ assertThat(dataPoint.getValue()).isEqualTo(1);
+ assertThat(dataPoint.getAttributes())
+ .isEqualTo(Attributes.builder().put("key", "a").build());
+ },
+ dataPoint -> {
+ assertThat(dataPoint.getValue()).isEqualTo(3);
+ assertThat(dataPoint.getAttributes())
+ .isEqualTo(Attributes.builder().put("key", "b").build());
+ }));
+ logs.assertContains(
+ "Instrument my-instrument has recorded multiple values for the same attributes.");
+
+ // Remove callback2, verify only callback1 is called
+ metricStorage.removeCallback(callback2);
+ assertThat(
+ metricStorage.collectAndReset(
+ CollectionInfo.create(handle, all, reader),
+ meterProviderSharedState.getResource(),
+ meterSharedState.getInstrumentationLibraryInfo(),
+ 0,
+ testClock.nanoTime(),
+ /* suppressSynchronousCollection= */ false))
+ .satisfies(
+ metricData ->
+ assertThat(metricData.getLongGaugeData().getPoints())
+ .satisfiesExactlyInAnyOrder(
+ dataPoint -> {
+ assertThat(dataPoint.getValue()).isEqualTo(1);
+ assertThat(dataPoint.getAttributes())
+ .isEqualTo(Attributes.builder().put("key", "a").build());
+ }));
}
@Test
void collectAndReset_IgnoresDuplicates() {
- MetricStorage metricStorage =
- AsynchronousMetricStorage.longAsynchronousAccumulator(
+ AsynchronousMetricStorage, ObservableLongMeasurement> metricStorage =
+ AsynchronousMetricStorage.createLongAsyncStorage(
view,
InstrumentDescriptor.create(
"my-instrument",
"description",
"unit",
InstrumentType.OBSERVABLE_GAUGE,
- InstrumentValueType.LONG),
- measurement -> {
- measurement.record(1, Attributes.builder().put("key", "a").build());
- measurement.record(2, Attributes.builder().put("key", "a").build());
- measurement.record(3, Attributes.builder().put("key", "b").build());
- });
+ InstrumentValueType.LONG));
+ metricStorage.addCallback(
+ measurement -> {
+ measurement.record(1, Attributes.builder().put("key", "a").build());
+ measurement.record(2, Attributes.builder().put("key", "a").build());
+ measurement.record(3, Attributes.builder().put("key", "b").build());
+ });
assertThat(
metricStorage.collectAndReset(
CollectionInfo.create(handle, all, reader),
@@ -143,7 +217,7 @@ void collectAndReset_IgnoresDuplicates() {
meterSharedState.getInstrumentationLibraryInfo(),
0,
testClock.nanoTime(),
- false))
+ /* suppressSynchronousCollection= */ false))
.satisfies(
metricData ->
assertThat(metricData.getLongGaugeData().getPoints())
@@ -165,18 +239,19 @@ void collectAndReset_IgnoresDuplicates() {
@Test
@SuppressLogger(AsynchronousMetricStorage.class)
void collectAndReset_CallbackException() {
- MetricStorage metricStorage =
- AsynchronousMetricStorage.longAsynchronousAccumulator(
+ AsynchronousMetricStorage, ObservableDoubleMeasurement> metricStorage =
+ AsynchronousMetricStorage.createDoubleAsyncStorage(
view,
InstrumentDescriptor.create(
"my-instrument",
"description",
"unit",
InstrumentType.OBSERVABLE_GAUGE,
- InstrumentValueType.LONG),
- unused -> {
- throw new RuntimeException("Error!");
- });
+ InstrumentValueType.LONG));
+ metricStorage.addCallback(
+ unused -> {
+ throw new RuntimeException("Error!");
+ });
assertThat(
metricStorage.collectAndReset(
CollectionInfo.create(handle, all, reader),
@@ -184,7 +259,7 @@ void collectAndReset_CallbackException() {
meterSharedState.getInstrumentationLibraryInfo(),
0,
testClock.nanoTime(),
- false))
+ /* suppressSynchronousCollection= */ false))
.isEqualTo(EmptyMetricData.getInstance());
logs.assertContains("An exception occurred invoking callback for instrument my-instrument.");
}
diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java
index 988ef909cb7..013cb680815 100644
--- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java
+++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/MetricStorageRegistryTest.java
@@ -28,52 +28,49 @@ class MetricStorageRegistryTest {
descriptor("sync", "other_description", InstrumentType.COUNTER);
private static final MetricDescriptor ASYNC_DESCRIPTOR =
descriptor("async", "description", InstrumentType.OBSERVABLE_GAUGE);
+ private static final MetricDescriptor OTHER_ASYNC_DESCRIPTOR =
+ descriptor("async", "other_description", InstrumentType.OBSERVABLE_GAUGE);
- private final MeterSharedState meterSharedState =
- MeterSharedState.create(InstrumentationLibraryInfo.empty());
+ private final MetricStorageRegistry metricStorageRegistry = new MetricStorageRegistry();
@Test
void register_Sync() {
- TestMetricStorage testInstrument = new TestMetricStorage(SYNC_DESCRIPTOR);
- assertThat(meterSharedState.getMetricStorageRegistry().register(testInstrument))
- .isSameAs(testInstrument);
- assertThat(meterSharedState.getMetricStorageRegistry().register(testInstrument))
- .isSameAs(testInstrument);
- assertThat(
- meterSharedState
- .getMetricStorageRegistry()
- .register(new TestMetricStorage(SYNC_DESCRIPTOR)))
- .isSameAs(testInstrument);
+ TestMetricStorage storage = new TestMetricStorage(SYNC_DESCRIPTOR);
+ assertThat(metricStorageRegistry.register(storage)).isSameAs(storage);
+ assertThat(metricStorageRegistry.register(storage)).isSameAs(storage);
+ assertThat(metricStorageRegistry.register(new TestMetricStorage(SYNC_DESCRIPTOR)))
+ .isSameAs(storage);
}
@Test
void register_SyncIncompatibleDescriptor() {
- TestMetricStorage testInstrument = new TestMetricStorage(SYNC_DESCRIPTOR);
- assertThat(meterSharedState.getMetricStorageRegistry().register(testInstrument))
- .isSameAs(testInstrument);
+ TestMetricStorage storage = new TestMetricStorage(SYNC_DESCRIPTOR);
+ assertThat(metricStorageRegistry.register(storage)).isSameAs(storage);
assertThatThrownBy(
- () ->
- meterSharedState
- .getMetricStorageRegistry()
- .register(new TestMetricStorage(OTHER_SYNC_DESCRIPTOR)))
+ () -> metricStorageRegistry.register(new TestMetricStorage(OTHER_SYNC_DESCRIPTOR)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Metric with same name and different descriptor already created.");
}
@Test
void register_Async() {
- TestMetricStorage testInstrument = new TestMetricStorage(ASYNC_DESCRIPTOR);
- assertThat(meterSharedState.getMetricStorageRegistry().register(testInstrument))
- .isSameAs(testInstrument);
+ TestMetricStorage storage = new TestMetricStorage(ASYNC_DESCRIPTOR);
+ assertThat(metricStorageRegistry.register(storage)).isSameAs(storage);
+ assertThat(metricStorageRegistry.register(storage)).isSameAs(storage);
+ assertThat(metricStorageRegistry.register(new TestMetricStorage(ASYNC_DESCRIPTOR)))
+ .isSameAs(storage);
+ }
+
+ @Test
+ void register_AsyncIncompatibleDescriptor() {
+ TestMetricStorage storage = new TestMetricStorage(ASYNC_DESCRIPTOR);
+ assertThat(metricStorageRegistry.register(storage)).isSameAs(storage);
assertThatThrownBy(
- () ->
- meterSharedState
- .getMetricStorageRegistry()
- .register(new TestMetricStorage(ASYNC_DESCRIPTOR)))
+ () -> metricStorageRegistry.register(new TestMetricStorage(OTHER_ASYNC_DESCRIPTOR)))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("Async metric with same name has already been created.");
+ .hasMessageContaining("Metric with same name and different descriptor already created.");
}
private static MetricDescriptor descriptor(