Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

One more micrometer AsyncInstrumentRegistry fix #5118

Merged
merged 1 commit into from
Jan 14, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.instrumentation.api.internal.GuardedBy;
import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.ToDoubleFunction;
import java.util.function.ToLongFunction;
Expand All @@ -32,23 +31,17 @@ final class AsyncInstrumentRegistry {
// OpentelemetryMeterRegistry is GC'd
private final WeakReference<Meter> meter;

// we're always locking lock on the whole instrument map; the add/remove methods aren't called
// that often, so it's probably better to opt for correctness in that case - there is a small
// window between removing a single measurement and removing the whole instrument (if it has no
// more measurements) when potentially a new measurement could be added; a ConcurrentHashMap
// wouldn't be enough in this case
// values from the maps below are never removed - that is because the underlying OpenTelemetry
// async instruments are never removed; if we removed the recorder and tried to register it once
// again OTel would log an error and basically ignore the new callback
// these maps are GC'd together with this AsyncInstrumentRegistry instance - that is, when the
// whole OpenTelemetry Meter gets GC'd

@GuardedBy("gauges")
private final Map<String, DoubleMeasurementsRecorder> gauges = new HashMap<>();

@GuardedBy("doubleCounters")
private final Map<String, DoubleMeasurementsRecorder> doubleCounters = new HashMap<>();

@GuardedBy("longCounters")
private final Map<String, LongMeasurementsRecorder> longCounters = new HashMap<>();

@GuardedBy("upDownDoubleCounters")
private final Map<String, DoubleMeasurementsRecorder> upDownDoubleCounters = new HashMap<>();
private final Map<String, DoubleMeasurementsRecorder> gauges = new ConcurrentHashMap<>();
private final Map<String, DoubleMeasurementsRecorder> doubleCounters = new ConcurrentHashMap<>();
private final Map<String, LongMeasurementsRecorder> longCounters = new ConcurrentHashMap<>();
private final Map<String, DoubleMeasurementsRecorder> upDownDoubleCounters =
new ConcurrentHashMap<>();

AsyncInstrumentRegistry(Meter meter) {
this.meter = new WeakReference<>(meter);
Expand All @@ -71,29 +64,22 @@ <T> AsyncMeasurementHandle buildGauge(
@Nullable T obj,
ToDoubleFunction<T> objMetric) {

synchronized (gauges) {
// use the gauges map as lock for the recorder state - this way all gauge-related mutable
// state will always be accessed in synchronized(gauges)
Object recorderLock = gauges;

DoubleMeasurementsRecorder recorder =
gauges.computeIfAbsent(
name,
n -> {
DoubleMeasurementsRecorder recorderCallback =
new DoubleMeasurementsRecorder(recorderLock);
otelMeter()
.gaugeBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.buildWithCallback(recorderCallback);
return recorderCallback;
});
recorder.addMeasurement(
attributes, new DoubleMeasurementSource(obj, (ToDoubleFunction<Object>) objMetric));

return new AsyncMeasurementHandle(gauges, name, attributes);
}
DoubleMeasurementsRecorder recorder =
gauges.computeIfAbsent(
name,
n -> {
DoubleMeasurementsRecorder recorderCallback = new DoubleMeasurementsRecorder();
otelMeter()
.gaugeBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.buildWithCallback(recorderCallback);
return recorderCallback;
});
recorder.addMeasurement(
attributes, new DoubleMeasurementSource(obj, (ToDoubleFunction<Object>) objMetric));

return new AsyncMeasurementHandle(recorder, attributes);
}

<T> AsyncMeasurementHandle buildDoubleCounter(
Expand All @@ -113,30 +99,23 @@ <T> AsyncMeasurementHandle buildDoubleCounter(
@Nullable T obj,
ToDoubleFunction<T> objMetric) {

synchronized (doubleCounters) {
// use the counters map as lock for the recorder state - this way all double counter-related
// mutable state will always be accessed in synchronized(doubleCounters)
Object recorderLock = doubleCounters;

DoubleMeasurementsRecorder recorder =
doubleCounters.computeIfAbsent(
name,
n -> {
DoubleMeasurementsRecorder recorderCallback =
new DoubleMeasurementsRecorder(recorderLock);
otelMeter()
.counterBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.ofDoubles()
.buildWithCallback(recorderCallback);
return recorderCallback;
});
recorder.addMeasurement(
attributes, new DoubleMeasurementSource(obj, (ToDoubleFunction<Object>) objMetric));

return new AsyncMeasurementHandle(doubleCounters, name, attributes);
}
DoubleMeasurementsRecorder recorder =
doubleCounters.computeIfAbsent(
name,
n -> {
DoubleMeasurementsRecorder recorderCallback = new DoubleMeasurementsRecorder();
otelMeter()
.counterBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.ofDoubles()
.buildWithCallback(recorderCallback);
return recorderCallback;
});
recorder.addMeasurement(
attributes, new DoubleMeasurementSource(obj, (ToDoubleFunction<Object>) objMetric));

return new AsyncMeasurementHandle(recorder, attributes);
}

<T> AsyncMeasurementHandle buildLongCounter(
Expand All @@ -147,29 +126,22 @@ <T> AsyncMeasurementHandle buildLongCounter(
@Nullable T obj,
ToLongFunction<T> objMetric) {

synchronized (longCounters) {
// use the counters map as lock for the recorder state - this way all gauge-related mutable
// state will always be accessed in synchronized(longCounters)
Object recorderLock = longCounters;

LongMeasurementsRecorder recorder =
longCounters.computeIfAbsent(
name,
n -> {
LongMeasurementsRecorder recorderCallback =
new LongMeasurementsRecorder(recorderLock);
otelMeter()
.counterBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.buildWithCallback(recorderCallback);
return recorderCallback;
});
recorder.addMeasurement(
attributes, new LongMeasurementSource(obj, (ToLongFunction<Object>) objMetric));

return new AsyncMeasurementHandle(longCounters, name, attributes);
}
LongMeasurementsRecorder recorder =
longCounters.computeIfAbsent(
name,
n -> {
LongMeasurementsRecorder recorderCallback = new LongMeasurementsRecorder();
otelMeter()
.counterBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.buildWithCallback(recorderCallback);
return recorderCallback;
});
recorder.addMeasurement(
attributes, new LongMeasurementSource(obj, (ToLongFunction<Object>) objMetric));

return new AsyncMeasurementHandle(recorder, attributes);
}

<T> AsyncMeasurementHandle buildUpDownDoubleCounter(
Expand All @@ -180,30 +152,23 @@ <T> AsyncMeasurementHandle buildUpDownDoubleCounter(
T obj,
ToDoubleFunction<T> objMetric) {

synchronized (upDownDoubleCounters) {
// use the counters map as lock for the recorder state - this way all double counter-related
// mutable state will always be accessed in synchronized(upDownDoubleCounters)
Object recorderLock = upDownDoubleCounters;

DoubleMeasurementsRecorder recorder =
upDownDoubleCounters.computeIfAbsent(
name,
n -> {
DoubleMeasurementsRecorder recorderCallback =
new DoubleMeasurementsRecorder(recorderLock);
otelMeter()
.upDownCounterBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.ofDoubles()
.buildWithCallback(recorderCallback);
return recorderCallback;
});
recorder.addMeasurement(
attributes, new DoubleMeasurementSource(obj, (ToDoubleFunction<Object>) objMetric));

return new AsyncMeasurementHandle(upDownDoubleCounters, name, attributes);
}
DoubleMeasurementsRecorder recorder =
upDownDoubleCounters.computeIfAbsent(
name,
n -> {
DoubleMeasurementsRecorder recorderCallback = new DoubleMeasurementsRecorder();
otelMeter()
.upDownCounterBuilder(name)
.setDescription(description)
.setUnit(baseUnit)
.ofDoubles()
.buildWithCallback(recorderCallback);
return recorderCallback;
});
recorder.addMeasurement(
attributes, new DoubleMeasurementSource(obj, (ToDoubleFunction<Object>) objMetric));

return new AsyncMeasurementHandle(recorder, attributes);
}

private Meter otelMeter() {
Expand All @@ -217,79 +182,46 @@ private Meter otelMeter() {

private abstract static class MeasurementsRecorder<I> {

private final Object lock;

@GuardedBy("lock")
private final Map<Attributes, I> measurements = new HashMap<>();

protected MeasurementsRecorder(Object lock) {
this.lock = lock;
}

Map<Attributes, I> copyForRead() {
synchronized (lock) {
return new HashMap<>(measurements);
}
}
final Map<Attributes, I> measurements = new ConcurrentHashMap<>();

void addMeasurement(Attributes attributes, I info) {
synchronized (lock) {
measurements.put(attributes, info);
}
measurements.put(attributes, info);
}

void removeMeasurement(Attributes attributes) {
synchronized (lock) {
measurements.remove(attributes);
}
}

boolean isEmpty() {
synchronized (lock) {
return measurements.isEmpty();
}
measurements.remove(attributes);
}
}

private static final class DoubleMeasurementsRecorder
extends MeasurementsRecorder<DoubleMeasurementSource>
implements Consumer<ObservableDoubleMeasurement> {

private DoubleMeasurementsRecorder(Object lock) {
super(lock);
}

@Override
public void accept(ObservableDoubleMeasurement measurement) {
copyForRead()
.forEach(
(attributes, gauge) -> {
Object obj = gauge.objWeakRef.get();
if (obj != null) {
measurement.record(gauge.metricFunction.applyAsDouble(obj), attributes);
}
});
measurements.forEach(
(attributes, gauge) -> {
Object obj = gauge.objWeakRef.get();
if (obj != null) {
measurement.record(gauge.metricFunction.applyAsDouble(obj), attributes);
}
});
}
}

private static final class LongMeasurementsRecorder
extends MeasurementsRecorder<LongMeasurementSource>
implements Consumer<ObservableLongMeasurement> {

private LongMeasurementsRecorder(Object lock) {
super(lock);
}

@Override
public void accept(ObservableLongMeasurement measurement) {
copyForRead()
.forEach(
(attributes, gauge) -> {
Object obj = gauge.objWeakRef.get();
if (obj != null) {
measurement.record(gauge.metricFunction.applyAsLong(obj), attributes);
}
});
measurements.forEach(
(attributes, gauge) -> {
Object obj = gauge.objWeakRef.get();
if (obj != null) {
measurement.record(gauge.metricFunction.applyAsLong(obj), attributes);
}
});
}
}

Expand Down Expand Up @@ -317,32 +249,16 @@ private LongMeasurementSource(@Nullable Object obj, ToLongFunction<Object> metri

static final class AsyncMeasurementHandle {

@GuardedBy("instrumentRegistry")
private final Map<String, ? extends MeasurementsRecorder<?>> instrumentRegistry;

private final String name;
private final MeasurementsRecorder<?> measurementsRecorder;
private final Attributes attributes;

AsyncMeasurementHandle(
Map<String, ? extends MeasurementsRecorder<?>> instrumentRegistry,
String name,
Attributes attributes) {
this.instrumentRegistry = instrumentRegistry;
this.name = name;
AsyncMeasurementHandle(MeasurementsRecorder<?> measurementsRecorder, Attributes attributes) {
this.measurementsRecorder = measurementsRecorder;
this.attributes = attributes;
}

void remove() {
synchronized (instrumentRegistry) {
MeasurementsRecorder<?> recorder = instrumentRegistry.get(name);
if (recorder != null) {
recorder.removeMeasurement(attributes);
// if this was the last measurement then let's remove the whole recorder
if (recorder.isEmpty()) {
instrumentRegistry.remove(name);
}
}
}
measurementsRecorder.removeMeasurement(attributes);
}
}
}