From b14308dcba431d0ddde49373b90923bef0f136e9 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Wed, 5 Jan 2022 00:15:35 +0200 Subject: [PATCH] Instrumentation for rxjava3.1.1 (#4954) * Instrumentation for rxjava3.1.1 * Move common test classes * spotless * move rxjava 3.1.1 classes to separate package --- .../rxjava-3-common/library/build.gradle.kts | 8 + .../RxJava3AsyncOperationEndStrategy.java | 0 ...Java3AsyncOperationEndStrategyBuilder.java | 0 .../rxjava3/TracingCompletableObserver.java | 4 +- .../rxjava3/TracingMaybeObserver.java | 4 +- .../rxjava3/TracingSingleObserver.java | 4 +- ...xJava3AsyncOperationEndStrategyTest.groovy | 0 .../testing/build.gradle.kts | 2 +- .../AbstractRxJava3SubscriptionTest.groovy | 0 .../rxjava3/AbstractRxJava3Test.groovy | 0 ...tRxJava3WithSpanInstrumentationTest.groovy | 1045 +++++++++++++++++ .../rxjava3/RxJava3ConcurrencyTestHelper.java | 0 .../rxjava3/TracedWithSpan.java | 0 .../rxjava-3.0/javaagent/build.gradle.kts | 4 +- .../rxjava3/RxJava3InstrumentationModule.java | 8 + .../groovy/RxJava3SubscriptionTest.groovy | 1 - .../RxJava3WithSpanInstrumentationTest.groovy | 1039 +--------------- .../rxjava-3.0/library/build.gradle.kts | 3 +- .../rxjava-3.1.1/javaagent/build.gradle.kts | 28 + .../v3_1_1/RxJava3InstrumentationModule.java | 33 + .../v3_1_1/RxJavaPluginsInstrumentation.java | 40 + .../v3_1_1/TracingAssemblyActivation.java | 34 + .../groovy/RxJava3SubscriptionTest.groovy | 10 + .../src/test/groovy/RxJava3Test.groovy | 10 + .../RxJava3WithSpanInstrumentationTest.groovy | 9 + .../rxjava-3.1.1/library/build.gradle.kts | 15 + .../rxjava3/v3_1_1/TracingAssembly.java | 328 ++++++ .../v3_1_1/TracingAssemblyBuilder.java | 22 + .../v3_1_1/TracingConditionalSubscriber.java | 83 ++ .../rxjava3/v3_1_1/TracingObserver.java | 76 ++ .../v3_1_1/TracingParallelFlowable.java | 67 ++ .../rxjava3/v3_1_1/TracingSubscriber.java | 76 ++ .../groovy/RxJava3SubscriptionTest.groovy | 18 + .../src/test/groovy/RxJava3Test.groovy | 18 + settings.gradle.kts | 5 +- 35 files changed, 1946 insertions(+), 1048 deletions(-) create mode 100644 instrumentation/rxjava/rxjava-3-common/library/build.gradle.kts rename instrumentation/rxjava/{rxjava-3.0 => rxjava-3-common}/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncOperationEndStrategy.java (100%) rename instrumentation/rxjava/{rxjava-3.0 => rxjava-3-common}/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncOperationEndStrategyBuilder.java (100%) rename instrumentation/rxjava/{rxjava-3.0 => rxjava-3-common}/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingCompletableObserver.java (91%) rename instrumentation/rxjava/{rxjava-3.0 => rxjava-3-common}/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingMaybeObserver.java (92%) rename instrumentation/rxjava/{rxjava-3.0 => rxjava-3-common}/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingSingleObserver.java (92%) rename instrumentation/rxjava/{rxjava-3.0 => rxjava-3-common}/library/src/test/groovy/RxJava3AsyncOperationEndStrategyTest.groovy (100%) rename instrumentation/rxjava/{rxjava-3.0 => rxjava-3-common}/testing/build.gradle.kts (82%) rename instrumentation/rxjava/{rxjava-3.0 => rxjava-3-common}/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3SubscriptionTest.groovy (100%) rename instrumentation/rxjava/{rxjava-3.0 => rxjava-3-common}/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3Test.groovy (100%) create mode 100644 instrumentation/rxjava/rxjava-3-common/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3WithSpanInstrumentationTest.groovy rename instrumentation/rxjava/{rxjava-3.0 => rxjava-3-common}/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/RxJava3ConcurrencyTestHelper.java (100%) rename instrumentation/rxjava/{rxjava-3.0/javaagent/src/test => rxjava-3-common/testing/src/main}/java/io/opentelemetry/instrumentation/rxjava3/TracedWithSpan.java (100%) create mode 100644 instrumentation/rxjava/rxjava-3.1.1/javaagent/build.gradle.kts create mode 100644 instrumentation/rxjava/rxjava-3.1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/v3_1_1/RxJava3InstrumentationModule.java create mode 100644 instrumentation/rxjava/rxjava-3.1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/v3_1_1/RxJavaPluginsInstrumentation.java create mode 100644 instrumentation/rxjava/rxjava-3.1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/v3_1_1/TracingAssemblyActivation.java create mode 100644 instrumentation/rxjava/rxjava-3.1.1/javaagent/src/test/groovy/RxJava3SubscriptionTest.groovy create mode 100644 instrumentation/rxjava/rxjava-3.1.1/javaagent/src/test/groovy/RxJava3Test.groovy create mode 100644 instrumentation/rxjava/rxjava-3.1.1/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy create mode 100644 instrumentation/rxjava/rxjava-3.1.1/library/build.gradle.kts create mode 100644 instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingAssembly.java create mode 100644 instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingAssemblyBuilder.java create mode 100644 instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingConditionalSubscriber.java create mode 100644 instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingObserver.java create mode 100644 instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingParallelFlowable.java create mode 100644 instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingSubscriber.java create mode 100644 instrumentation/rxjava/rxjava-3.1.1/library/src/test/groovy/RxJava3SubscriptionTest.groovy create mode 100644 instrumentation/rxjava/rxjava-3.1.1/library/src/test/groovy/RxJava3Test.groovy diff --git a/instrumentation/rxjava/rxjava-3-common/library/build.gradle.kts b/instrumentation/rxjava/rxjava-3-common/library/build.gradle.kts new file mode 100644 index 000000000000..302f19d718dd --- /dev/null +++ b/instrumentation/rxjava/rxjava-3-common/library/build.gradle.kts @@ -0,0 +1,8 @@ +plugins { + id("otel.library-instrumentation") +} + +dependencies { + library("io.reactivex.rxjava3:rxjava:3.0.12") + implementation(project(":instrumentation-api-annotation-support")) +} diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncOperationEndStrategy.java b/instrumentation/rxjava/rxjava-3-common/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncOperationEndStrategy.java similarity index 100% rename from instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncOperationEndStrategy.java rename to instrumentation/rxjava/rxjava-3-common/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncOperationEndStrategy.java diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncOperationEndStrategyBuilder.java b/instrumentation/rxjava/rxjava-3-common/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncOperationEndStrategyBuilder.java similarity index 100% rename from instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncOperationEndStrategyBuilder.java rename to instrumentation/rxjava/rxjava-3-common/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/RxJava3AsyncOperationEndStrategyBuilder.java diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingCompletableObserver.java b/instrumentation/rxjava/rxjava-3-common/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingCompletableObserver.java similarity index 91% rename from instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingCompletableObserver.java rename to instrumentation/rxjava/rxjava-3-common/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingCompletableObserver.java index d310a8577456..c4326664e9bd 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingCompletableObserver.java +++ b/instrumentation/rxjava/rxjava-3-common/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingCompletableObserver.java @@ -28,13 +28,13 @@ import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.internal.disposables.DisposableHelper; -class TracingCompletableObserver implements CompletableObserver, Disposable { +public class TracingCompletableObserver implements CompletableObserver, Disposable { private final CompletableObserver actual; private final Context context; private Disposable disposable; - TracingCompletableObserver(CompletableObserver actual, Context context) { + public TracingCompletableObserver(CompletableObserver actual, Context context) { this.actual = actual; this.context = context; } diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingMaybeObserver.java b/instrumentation/rxjava/rxjava-3-common/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingMaybeObserver.java similarity index 92% rename from instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingMaybeObserver.java rename to instrumentation/rxjava/rxjava-3-common/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingMaybeObserver.java index 2e8e1f09cc63..135454f1d985 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingMaybeObserver.java +++ b/instrumentation/rxjava/rxjava-3-common/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingMaybeObserver.java @@ -28,13 +28,13 @@ import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.internal.disposables.DisposableHelper; -class TracingMaybeObserver implements MaybeObserver, Disposable { +public class TracingMaybeObserver implements MaybeObserver, Disposable { private final MaybeObserver actual; private final Context context; private Disposable disposable; - TracingMaybeObserver(MaybeObserver actual, Context context) { + public TracingMaybeObserver(MaybeObserver actual, Context context) { this.actual = actual; this.context = context; } diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingSingleObserver.java b/instrumentation/rxjava/rxjava-3-common/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingSingleObserver.java similarity index 92% rename from instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingSingleObserver.java rename to instrumentation/rxjava/rxjava-3-common/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingSingleObserver.java index 56ed482df99e..f3cead2e32a7 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingSingleObserver.java +++ b/instrumentation/rxjava/rxjava-3-common/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracingSingleObserver.java @@ -28,13 +28,13 @@ import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.internal.disposables.DisposableHelper; -class TracingSingleObserver implements SingleObserver, Disposable { +public class TracingSingleObserver implements SingleObserver, Disposable { private final SingleObserver actual; private final Context context; private Disposable disposable; - TracingSingleObserver(SingleObserver actual, Context context) { + public TracingSingleObserver(SingleObserver actual, Context context) { this.actual = actual; this.context = context; } diff --git a/instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3AsyncOperationEndStrategyTest.groovy b/instrumentation/rxjava/rxjava-3-common/library/src/test/groovy/RxJava3AsyncOperationEndStrategyTest.groovy similarity index 100% rename from instrumentation/rxjava/rxjava-3.0/library/src/test/groovy/RxJava3AsyncOperationEndStrategyTest.groovy rename to instrumentation/rxjava/rxjava-3-common/library/src/test/groovy/RxJava3AsyncOperationEndStrategyTest.groovy diff --git a/instrumentation/rxjava/rxjava-3.0/testing/build.gradle.kts b/instrumentation/rxjava/rxjava-3-common/testing/build.gradle.kts similarity index 82% rename from instrumentation/rxjava/rxjava-3.0/testing/build.gradle.kts rename to instrumentation/rxjava/rxjava-3-common/testing/build.gradle.kts index bc444dde7af7..ed6899efb422 100644 --- a/instrumentation/rxjava/rxjava-3.0/testing/build.gradle.kts +++ b/instrumentation/rxjava/rxjava-3-common/testing/build.gradle.kts @@ -7,8 +7,8 @@ dependencies { api("io.reactivex.rxjava3:rxjava:3.0.12") + implementation("io.opentelemetry:opentelemetry-extension-annotations") implementation("com.google.guava:guava") - implementation("org.codehaus.groovy:groovy-all") implementation("io.opentelemetry:opentelemetry-api") implementation("org.spockframework:spock-core") diff --git a/instrumentation/rxjava/rxjava-3.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3SubscriptionTest.groovy b/instrumentation/rxjava/rxjava-3-common/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3SubscriptionTest.groovy similarity index 100% rename from instrumentation/rxjava/rxjava-3.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3SubscriptionTest.groovy rename to instrumentation/rxjava/rxjava-3-common/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3SubscriptionTest.groovy diff --git a/instrumentation/rxjava/rxjava-3.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3Test.groovy b/instrumentation/rxjava/rxjava-3-common/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3Test.groovy similarity index 100% rename from instrumentation/rxjava/rxjava-3.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3Test.groovy rename to instrumentation/rxjava/rxjava-3-common/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3Test.groovy diff --git a/instrumentation/rxjava/rxjava-3-common/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3WithSpanInstrumentationTest.groovy b/instrumentation/rxjava/rxjava-3-common/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3WithSpanInstrumentationTest.groovy new file mode 100644 index 000000000000..703d001f3427 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3-common/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/AbstractRxJava3WithSpanInstrumentationTest.groovy @@ -0,0 +1,1045 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rxjava3 + +import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification +import io.reactivex.rxjava3.core.Completable +import io.reactivex.rxjava3.core.Flowable +import io.reactivex.rxjava3.core.Maybe +import io.reactivex.rxjava3.core.Observable +import io.reactivex.rxjava3.core.Single +import io.reactivex.rxjava3.observers.TestObserver +import io.reactivex.rxjava3.processors.UnicastProcessor +import io.reactivex.rxjava3.subjects.CompletableSubject +import io.reactivex.rxjava3.subjects.MaybeSubject +import io.reactivex.rxjava3.subjects.SingleSubject +import io.reactivex.rxjava3.subjects.UnicastSubject +import io.reactivex.rxjava3.subscribers.TestSubscriber +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber +import org.reactivestreams.Subscription + +import static io.opentelemetry.api.trace.SpanKind.INTERNAL +import static io.opentelemetry.api.trace.StatusCode.ERROR + +class AbstractRxJava3WithSpanInstrumentationTest extends AgentInstrumentationSpecification { + + def "should capture span for already completed Completable"() { + setup: + def observer = new TestObserver() + def source = Completable.complete() + new TracedWithSpan() + .completable(source) + .subscribe(observer) + observer.assertComplete() + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.completable" + kind INTERNAL + hasNoParent() + attributes { + } + } + } + } + } + + def "should capture span for eventually completed Completable"() { + setup: + def source = CompletableSubject.create() + def observer = new TestObserver() + new TracedWithSpan() + .completable(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onComplete() + observer.assertComplete() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.completable" + kind INTERNAL + hasNoParent() + attributes { + } + } + } + } + } + + def "should capture span for already errored Completable"() { + setup: + def error = new IllegalArgumentException("Boom") + def observer = new TestObserver() + def source = Completable.error(error) + new TracedWithSpan() + .completable(source) + .subscribe(observer) + observer.assertError(error) + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.completable" + kind INTERNAL + hasNoParent() + status ERROR + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for eventually errored Completable"() { + setup: + def error = new IllegalArgumentException("Boom") + def source = CompletableSubject.create() + def observer = new TestObserver() + new TracedWithSpan() + .completable(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onError(error) + observer.assertError(error) + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.completable" + kind INTERNAL + hasNoParent() + status ERROR + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for canceled Completable"() { + setup: + def source = CompletableSubject.create() + def observer = new TestObserver() + new TracedWithSpan() + .completable(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.dispose() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.completable" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + + def "should capture span for already completed Maybe"() { + setup: + def observer = new TestObserver() + def source = Maybe.just("Value") + new TracedWithSpan() + .maybe(source) + .subscribe(observer) + observer.assertValue("Value") + observer.assertComplete() + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.maybe" + kind INTERNAL + hasNoParent() + attributes { + } + } + } + } + } + + def "should capture span for already empty Maybe"() { + setup: + def observer = new TestObserver() + def source = Maybe. empty() + new TracedWithSpan() + .maybe(source) + .subscribe(observer) + observer.assertComplete() + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.maybe" + kind INTERNAL + hasNoParent() + attributes { + } + } + } + } + } + + def "should capture span for eventually completed Maybe"() { + setup: + def source = MaybeSubject. create() + def observer = new TestObserver() + new TracedWithSpan() + .maybe(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onSuccess("Value") + observer.assertValue("Value") + observer.assertComplete() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.maybe" + kind INTERNAL + hasNoParent() + attributes { + } + } + } + } + } + + def "should capture span for already errored Maybe"() { + setup: + def error = new IllegalArgumentException("Boom") + def observer = new TestObserver() + def source = Maybe. error(error) + new TracedWithSpan() + .maybe(source) + .subscribe(observer) + observer.assertError(error) + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.maybe" + kind INTERNAL + hasNoParent() + status ERROR + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for eventually errored Maybe"() { + setup: + def error = new IllegalArgumentException("Boom") + def source = MaybeSubject. create() + def observer = new TestObserver() + new TracedWithSpan() + .maybe(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onError(error) + observer.assertError(error) + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.maybe" + kind INTERNAL + hasNoParent() + status ERROR + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for canceled Maybe"() { + setup: + def source = MaybeSubject. create() + def observer = new TestObserver() + new TracedWithSpan() + .maybe(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.dispose() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.maybe" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + + def "should capture span for already completed Single"() { + setup: + def observer = new TestObserver() + def source = Single.just("Value") + new TracedWithSpan() + .single(source) + .subscribe(observer) + observer.assertValue("Value") + observer.assertComplete() + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.single" + kind INTERNAL + hasNoParent() + attributes { + } + } + } + } + } + + def "should capture span for eventually completed Single"() { + setup: + def source = SingleSubject. create() + def observer = new TestObserver() + new TracedWithSpan() + .single(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onSuccess("Value") + observer.assertValue("Value") + observer.assertComplete() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.single" + kind INTERNAL + hasNoParent() + attributes { + } + } + } + } + } + + def "should capture span for already errored Single"() { + setup: + def error = new IllegalArgumentException("Boom") + def observer = new TestObserver() + def source = Single. error(error) + new TracedWithSpan() + .single(source) + .subscribe(observer) + observer.assertError(error) + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.single" + kind INTERNAL + hasNoParent() + status ERROR + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for eventually errored Single"() { + setup: + def error = new IllegalArgumentException("Boom") + def source = SingleSubject. create() + def observer = new TestObserver() + new TracedWithSpan() + .single(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onError(error) + observer.assertError(error) + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.single" + kind INTERNAL + hasNoParent() + status ERROR + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for canceled Single"() { + setup: + def source = SingleSubject. create() + def observer = new TestObserver() + new TracedWithSpan() + .single(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.dispose() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.single" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + + def "should capture span for already completed Observable"() { + setup: + def observer = new TestObserver() + def source = Observable. just("Value") + new TracedWithSpan() + .observable(source) + .subscribe(observer) + observer.assertValue("Value") + observer.assertComplete() + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.observable" + kind INTERNAL + hasNoParent() + attributes { + } + } + } + } + } + + def "should capture span for eventually completed Observable"() { + setup: + def source = UnicastSubject. create() + def observer = new TestObserver() + new TracedWithSpan() + .observable(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onComplete() + observer.assertComplete() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.observable" + kind INTERNAL + hasNoParent() + attributes { + } + } + } + } + } + + def "should capture span for already errored Observable"() { + setup: + def error = new IllegalArgumentException("Boom") + def observer = new TestObserver() + def source = Observable. error(error) + new TracedWithSpan() + .observable(source) + .subscribe(observer) + observer.assertError(error) + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.observable" + kind INTERNAL + hasNoParent() + status ERROR + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for eventually errored Observable"() { + setup: + def error = new IllegalArgumentException("Boom") + def source = UnicastSubject. create() + def observer = new TestObserver() + new TracedWithSpan() + .observable(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onError(error) + observer.assertError(error) + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.observable" + kind INTERNAL + hasNoParent() + status ERROR + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for canceled Observable"() { + setup: + def source = UnicastSubject. create() + def observer = new TestObserver() + new TracedWithSpan() + .observable(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.dispose() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.observable" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + + def "should capture span for already completed Flowable"() { + setup: + def observer = new TestSubscriber() + def source = Flowable. just("Value") + new TracedWithSpan() + .flowable(source) + .subscribe(observer) + observer.assertValue("Value") + observer.assertComplete() + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.flowable" + kind INTERNAL + hasNoParent() + attributes { + } + } + } + } + } + + def "should capture span for eventually completed Flowable"() { + setup: + def source = UnicastProcessor. create() + def observer = new TestSubscriber() + new TracedWithSpan() + .flowable(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onComplete() + observer.assertComplete() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.flowable" + kind INTERNAL + hasNoParent() + attributes { + } + } + } + } + } + + def "should capture span for already errored Flowable"() { + setup: + def error = new IllegalArgumentException("Boom") + def observer = new TestSubscriber() + def source = Flowable. error(error) + new TracedWithSpan() + .flowable(source) + .subscribe(observer) + observer.assertError(error) + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.flowable" + kind INTERNAL + hasNoParent() + status ERROR + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for eventually errored Flowable"() { + setup: + def error = new IllegalArgumentException("Boom") + def source = UnicastProcessor. create() + def observer = new TestSubscriber() + new TracedWithSpan() + .flowable(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onError(error) + observer.assertError(error) + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.flowable" + kind INTERNAL + hasNoParent() + status ERROR + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for canceled Flowable"() { + setup: + def source = UnicastProcessor. create() + def observer = new TestSubscriber() + new TracedWithSpan() + .flowable(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.cancel() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.flowable" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + + def "should capture span for already completed ParallelFlowable"() { + setup: + def observer = new TestSubscriber() + def source = Flowable. just("Value") + new TracedWithSpan() + .parallelFlowable(source.parallel()) + .sequential() + .subscribe(observer) + observer.assertValue("Value") + observer.assertComplete() + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.parallelFlowable" + kind INTERNAL + hasNoParent() + attributes { + } + } + } + } + } + + def "should capture span for eventually completed ParallelFlowable"() { + setup: + def source = UnicastProcessor. create() + def observer = new TestSubscriber() + new TracedWithSpan() + .parallelFlowable(source.parallel()) + .sequential() + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onComplete() + observer.assertComplete() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.parallelFlowable" + kind INTERNAL + hasNoParent() + attributes { + } + } + } + } + } + + def "should capture span for already errored ParallelFlowable"() { + setup: + def error = new IllegalArgumentException("Boom") + def observer = new TestSubscriber() + def source = Flowable. error(error) + new TracedWithSpan() + .parallelFlowable(source.parallel()) + .sequential() + .subscribe(observer) + observer.assertError(error) + + expect: + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.parallelFlowable" + kind INTERNAL + hasNoParent() + status ERROR + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for eventually errored ParallelFlowable"() { + setup: + def error = new IllegalArgumentException("Boom") + def source = UnicastProcessor. create() + def observer = new TestSubscriber() + new TracedWithSpan() + .parallelFlowable(source.parallel()) + .sequential() + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onError(error) + observer.assertError(error) + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.parallelFlowable" + kind INTERNAL + hasNoParent() + status ERROR + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for canceled ParallelFlowable"() { + setup: + def source = UnicastProcessor. create() + def observer = new TestSubscriber() + new TracedWithSpan() + .parallelFlowable(source.parallel()) + .sequential() + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onNext("Value") + observer.assertValue("Value") + + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.cancel() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.parallelFlowable" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + + def "should capture span for eventually completed Publisher"() { + setup: + def source = new CustomPublisher() + def observer = new TestSubscriber() + new TracedWithSpan() + .publisher(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onComplete() + observer.assertComplete() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.publisher" + kind INTERNAL + hasNoParent() + attributes { + } + } + } + } + } + + def "should capture span for eventually errored Publisher"() { + setup: + def error = new IllegalArgumentException("Boom") + def source = new CustomPublisher() + def observer = new TestSubscriber() + new TracedWithSpan() + .publisher(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + source.onError(error) + observer.assertError(error) + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.publisher" + kind INTERNAL + hasNoParent() + status ERROR + errorEvent(IllegalArgumentException, "Boom") + attributes { + } + } + } + } + } + + def "should capture span for canceled Publisher"() { + setup: + def source = new CustomPublisher() + def observer = new TestSubscriber() + new TracedWithSpan() + .publisher(source) + .subscribe(observer) + + expect: + Thread.sleep(500) // sleep a bit just to make sure no span is captured + assertTraces(0) {} + + observer.cancel() + + assertTraces(1) { + trace(0, 1) { + span(0) { + name "TracedWithSpan.publisher" + kind INTERNAL + hasNoParent() + attributes { + "rxjava.canceled" true + } + } + } + } + } + + static class CustomPublisher implements Publisher, Subscription { + Subscriber subscriber + + @Override + void subscribe(Subscriber subscriber) { + this.subscriber = subscriber + subscriber.onSubscribe(this) + } + + void onComplete() { + this.subscriber.onComplete() + } + + void onError(Throwable exception) { + this.subscriber.onError(exception) + } + + @Override + void request(long l) {} + + @Override + void cancel() {} + } +} diff --git a/instrumentation/rxjava/rxjava-3.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/RxJava3ConcurrencyTestHelper.java b/instrumentation/rxjava/rxjava-3-common/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/RxJava3ConcurrencyTestHelper.java similarity index 100% rename from instrumentation/rxjava/rxjava-3.0/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/RxJava3ConcurrencyTestHelper.java rename to instrumentation/rxjava/rxjava-3-common/testing/src/main/groovy/io/opentelemetry/instrumentation/rxjava3/RxJava3ConcurrencyTestHelper.java diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/java/io/opentelemetry/instrumentation/rxjava3/TracedWithSpan.java b/instrumentation/rxjava/rxjava-3-common/testing/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracedWithSpan.java similarity index 100% rename from instrumentation/rxjava/rxjava-3.0/javaagent/src/test/java/io/opentelemetry/instrumentation/rxjava3/TracedWithSpan.java rename to instrumentation/rxjava/rxjava-3-common/testing/src/main/java/io/opentelemetry/instrumentation/rxjava3/TracedWithSpan.java diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/build.gradle.kts b/instrumentation/rxjava/rxjava-3.0/javaagent/build.gradle.kts index 9d03dd300ceb..8846e2fa0303 100644 --- a/instrumentation/rxjava/rxjava-3.0/javaagent/build.gradle.kts +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/build.gradle.kts @@ -18,7 +18,9 @@ dependencies { implementation(project(":instrumentation:rxjava:rxjava-3.0:library")) testImplementation("io.opentelemetry:opentelemetry-extension-annotations") - testImplementation(project(":instrumentation:rxjava:rxjava-3.0:testing")) + testImplementation(project(":instrumentation:rxjava:rxjava-3-common:testing")) + + testInstrumentation(project(":instrumentation:rxjava:rxjava-3.1.1:javaagent")) latestDepTestLibrary("io.reactivex.rxjava3:rxjava:3.1.0") } diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/RxJava3InstrumentationModule.java b/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/RxJava3InstrumentationModule.java index 8f2848abd9d7..6cdc5f798cb7 100644 --- a/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/RxJava3InstrumentationModule.java +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/RxJava3InstrumentationModule.java @@ -5,11 +5,14 @@ package io.opentelemetry.javaagent.instrumentation.rxjava3; +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; + import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import java.util.Collections; import java.util.List; +import net.bytebuddy.matcher.ElementMatcher; @AutoService(InstrumentationModule.class) public class RxJava3InstrumentationModule extends InstrumentationModule { @@ -18,6 +21,11 @@ public RxJava3InstrumentationModule() { super("rxjava3"); } + @Override + public ElementMatcher.Junction classLoaderMatcher() { + return hasClassesNamed("io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber"); + } + @Override public List typeInstrumentations() { return Collections.singletonList(new RxJavaPluginsInstrumentation()); diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3SubscriptionTest.groovy b/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3SubscriptionTest.groovy index bf947bd67bfc..7330ce48311a 100644 --- a/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3SubscriptionTest.groovy +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3SubscriptionTest.groovy @@ -7,5 +7,4 @@ import io.opentelemetry.instrumentation.rxjava3.AbstractRxJava3SubscriptionTest import io.opentelemetry.instrumentation.test.AgentTestTrait class RxJava3SubscriptionTest extends AbstractRxJava3SubscriptionTest implements AgentTestTrait { - } diff --git a/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy b/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy index dfd353ae8457..091350c65789 100644 --- a/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy +++ b/instrumentation/rxjava/rxjava-3.0/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy @@ -3,1042 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -import io.opentelemetry.instrumentation.rxjava3.TracedWithSpan -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import io.reactivex.rxjava3.core.Completable -import io.reactivex.rxjava3.core.Flowable -import io.reactivex.rxjava3.core.Maybe -import io.reactivex.rxjava3.core.Observable -import io.reactivex.rxjava3.core.Single -import io.reactivex.rxjava3.observers.TestObserver -import io.reactivex.rxjava3.processors.UnicastProcessor -import io.reactivex.rxjava3.subjects.CompletableSubject -import io.reactivex.rxjava3.subjects.MaybeSubject -import io.reactivex.rxjava3.subjects.SingleSubject -import io.reactivex.rxjava3.subjects.UnicastSubject -import io.reactivex.rxjava3.subscribers.TestSubscriber -import org.reactivestreams.Publisher -import org.reactivestreams.Subscriber -import org.reactivestreams.Subscription +import io.opentelemetry.instrumentation.rxjava3.AbstractRxJava3WithSpanInstrumentationTest -import static io.opentelemetry.api.trace.SpanKind.INTERNAL -import static io.opentelemetry.api.trace.StatusCode.ERROR - -class RxJava3WithSpanInstrumentationTest extends AgentInstrumentationSpecification { - - def "should capture span for already completed Completable"() { - setup: - def observer = new TestObserver() - def source = Completable.complete() - new TracedWithSpan() - .completable(source) - .subscribe(observer) - observer.assertComplete() - - expect: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.completable" - kind INTERNAL - hasNoParent() - attributes { - } - } - } - } - } - - def "should capture span for eventually completed Completable"() { - setup: - def source = CompletableSubject.create() - def observer = new TestObserver() - new TracedWithSpan() - .completable(source) - .subscribe(observer) - - expect: - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onComplete() - observer.assertComplete() - - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.completable" - kind INTERNAL - hasNoParent() - attributes { - } - } - } - } - } - - def "should capture span for already errored Completable"() { - setup: - def error = new IllegalArgumentException("Boom") - def observer = new TestObserver() - def source = Completable.error(error) - new TracedWithSpan() - .completable(source) - .subscribe(observer) - observer.assertError(error) - - expect: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.completable" - kind INTERNAL - hasNoParent() - status ERROR - errorEvent(IllegalArgumentException, "Boom") - attributes { - } - } - } - } - } - - def "should capture span for eventually errored Completable"() { - setup: - def error = new IllegalArgumentException("Boom") - def source = CompletableSubject.create() - def observer = new TestObserver() - new TracedWithSpan() - .completable(source) - .subscribe(observer) - - expect: - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onError(error) - observer.assertError(error) - - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.completable" - kind INTERNAL - hasNoParent() - status ERROR - errorEvent(IllegalArgumentException, "Boom") - attributes { - } - } - } - } - } - - def "should capture span for canceled Completable"() { - setup: - def source = CompletableSubject.create() - def observer = new TestObserver() - new TracedWithSpan() - .completable(source) - .subscribe(observer) - - expect: - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - observer.dispose() - - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.completable" - kind INTERNAL - hasNoParent() - attributes { - "rxjava.canceled" true - } - } - } - } - } - - def "should capture span for already completed Maybe"() { - setup: - def observer = new TestObserver() - def source = Maybe.just("Value") - new TracedWithSpan() - .maybe(source) - .subscribe(observer) - observer.assertValue("Value") - observer.assertComplete() - - expect: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.maybe" - kind INTERNAL - hasNoParent() - attributes { - } - } - } - } - } - - def "should capture span for already empty Maybe"() { - setup: - def observer = new TestObserver() - def source = Maybe. empty() - new TracedWithSpan() - .maybe(source) - .subscribe(observer) - observer.assertComplete() - - expect: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.maybe" - kind INTERNAL - hasNoParent() - attributes { - } - } - } - } - } - - def "should capture span for eventually completed Maybe"() { - setup: - def source = MaybeSubject. create() - def observer = new TestObserver() - new TracedWithSpan() - .maybe(source) - .subscribe(observer) - - expect: - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onSuccess("Value") - observer.assertValue("Value") - observer.assertComplete() - - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.maybe" - kind INTERNAL - hasNoParent() - attributes { - } - } - } - } - } - - def "should capture span for already errored Maybe"() { - setup: - def error = new IllegalArgumentException("Boom") - def observer = new TestObserver() - def source = Maybe. error(error) - new TracedWithSpan() - .maybe(source) - .subscribe(observer) - observer.assertError(error) - - expect: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.maybe" - kind INTERNAL - hasNoParent() - status ERROR - errorEvent(IllegalArgumentException, "Boom") - attributes { - } - } - } - } - } - - def "should capture span for eventually errored Maybe"() { - setup: - def error = new IllegalArgumentException("Boom") - def source = MaybeSubject. create() - def observer = new TestObserver() - new TracedWithSpan() - .maybe(source) - .subscribe(observer) - - expect: - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onError(error) - observer.assertError(error) - - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.maybe" - kind INTERNAL - hasNoParent() - status ERROR - errorEvent(IllegalArgumentException, "Boom") - attributes { - } - } - } - } - } - - def "should capture span for canceled Maybe"() { - setup: - def source = MaybeSubject. create() - def observer = new TestObserver() - new TracedWithSpan() - .maybe(source) - .subscribe(observer) - - expect: - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - observer.dispose() - - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.maybe" - kind INTERNAL - hasNoParent() - attributes { - "rxjava.canceled" true - } - } - } - } - } - - def "should capture span for already completed Single"() { - setup: - def observer = new TestObserver() - def source = Single.just("Value") - new TracedWithSpan() - .single(source) - .subscribe(observer) - observer.assertValue("Value") - observer.assertComplete() - - expect: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.single" - kind INTERNAL - hasNoParent() - attributes { - } - } - } - } - } - - def "should capture span for eventually completed Single"() { - setup: - def source = SingleSubject. create() - def observer = new TestObserver() - new TracedWithSpan() - .single(source) - .subscribe(observer) - - expect: - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onSuccess("Value") - observer.assertValue("Value") - observer.assertComplete() - - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.single" - kind INTERNAL - hasNoParent() - attributes { - } - } - } - } - } - - def "should capture span for already errored Single"() { - setup: - def error = new IllegalArgumentException("Boom") - def observer = new TestObserver() - def source = Single. error(error) - new TracedWithSpan() - .single(source) - .subscribe(observer) - observer.assertError(error) - - expect: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.single" - kind INTERNAL - hasNoParent() - status ERROR - errorEvent(IllegalArgumentException, "Boom") - attributes { - } - } - } - } - } - - def "should capture span for eventually errored Single"() { - setup: - def error = new IllegalArgumentException("Boom") - def source = SingleSubject. create() - def observer = new TestObserver() - new TracedWithSpan() - .single(source) - .subscribe(observer) - - expect: - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onError(error) - observer.assertError(error) - - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.single" - kind INTERNAL - hasNoParent() - status ERROR - errorEvent(IllegalArgumentException, "Boom") - attributes { - } - } - } - } - } - - def "should capture span for canceled Single"() { - setup: - def source = SingleSubject. create() - def observer = new TestObserver() - new TracedWithSpan() - .single(source) - .subscribe(observer) - - expect: - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - observer.dispose() - - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.single" - kind INTERNAL - hasNoParent() - attributes { - "rxjava.canceled" true - } - } - } - } - } - - def "should capture span for already completed Observable"() { - setup: - def observer = new TestObserver() - def source = Observable. just("Value") - new TracedWithSpan() - .observable(source) - .subscribe(observer) - observer.assertValue("Value") - observer.assertComplete() - - expect: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.observable" - kind INTERNAL - hasNoParent() - attributes { - } - } - } - } - } - - def "should capture span for eventually completed Observable"() { - setup: - def source = UnicastSubject. create() - def observer = new TestObserver() - new TracedWithSpan() - .observable(source) - .subscribe(observer) - - expect: - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onNext("Value") - observer.assertValue("Value") - - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onComplete() - observer.assertComplete() - - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.observable" - kind INTERNAL - hasNoParent() - attributes { - } - } - } - } - } - - def "should capture span for already errored Observable"() { - setup: - def error = new IllegalArgumentException("Boom") - def observer = new TestObserver() - def source = Observable. error(error) - new TracedWithSpan() - .observable(source) - .subscribe(observer) - observer.assertError(error) - - expect: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.observable" - kind INTERNAL - hasNoParent() - status ERROR - errorEvent(IllegalArgumentException, "Boom") - attributes { - } - } - } - } - } - - def "should capture span for eventually errored Observable"() { - setup: - def error = new IllegalArgumentException("Boom") - def source = UnicastSubject. create() - def observer = new TestObserver() - new TracedWithSpan() - .observable(source) - .subscribe(observer) - - expect: - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onNext("Value") - observer.assertValue("Value") - - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onError(error) - observer.assertError(error) - - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.observable" - kind INTERNAL - hasNoParent() - status ERROR - errorEvent(IllegalArgumentException, "Boom") - attributes { - } - } - } - } - } - - def "should capture span for canceled Observable"() { - setup: - def source = UnicastSubject. create() - def observer = new TestObserver() - new TracedWithSpan() - .observable(source) - .subscribe(observer) - - expect: - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onNext("Value") - observer.assertValue("Value") - - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - observer.dispose() - - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.observable" - kind INTERNAL - hasNoParent() - attributes { - "rxjava.canceled" true - } - } - } - } - } - - def "should capture span for already completed Flowable"() { - setup: - def observer = new TestSubscriber() - def source = Flowable. just("Value") - new TracedWithSpan() - .flowable(source) - .subscribe(observer) - observer.assertValue("Value") - observer.assertComplete() - - expect: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.flowable" - kind INTERNAL - hasNoParent() - attributes { - } - } - } - } - } - - def "should capture span for eventually completed Flowable"() { - setup: - def source = UnicastProcessor. create() - def observer = new TestSubscriber() - new TracedWithSpan() - .flowable(source) - .subscribe(observer) - - expect: - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onNext("Value") - observer.assertValue("Value") - - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onComplete() - observer.assertComplete() - - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.flowable" - kind INTERNAL - hasNoParent() - attributes { - } - } - } - } - } - - def "should capture span for already errored Flowable"() { - setup: - def error = new IllegalArgumentException("Boom") - def observer = new TestSubscriber() - def source = Flowable. error(error) - new TracedWithSpan() - .flowable(source) - .subscribe(observer) - observer.assertError(error) - - expect: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.flowable" - kind INTERNAL - hasNoParent() - status ERROR - errorEvent(IllegalArgumentException, "Boom") - attributes { - } - } - } - } - } - - def "should capture span for eventually errored Flowable"() { - setup: - def error = new IllegalArgumentException("Boom") - def source = UnicastProcessor. create() - def observer = new TestSubscriber() - new TracedWithSpan() - .flowable(source) - .subscribe(observer) - - expect: - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onNext("Value") - observer.assertValue("Value") - - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onError(error) - observer.assertError(error) - - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.flowable" - kind INTERNAL - hasNoParent() - status ERROR - errorEvent(IllegalArgumentException, "Boom") - attributes { - } - } - } - } - } - - def "should capture span for canceled Flowable"() { - setup: - def source = UnicastProcessor. create() - def observer = new TestSubscriber() - new TracedWithSpan() - .flowable(source) - .subscribe(observer) - - expect: - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onNext("Value") - observer.assertValue("Value") - - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - observer.cancel() - - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.flowable" - kind INTERNAL - hasNoParent() - attributes { - "rxjava.canceled" true - } - } - } - } - } - - def "should capture span for already completed ParallelFlowable"() { - setup: - def observer = new TestSubscriber() - def source = Flowable. just("Value") - new TracedWithSpan() - .parallelFlowable(source.parallel()) - .sequential() - .subscribe(observer) - observer.assertValue("Value") - observer.assertComplete() - - expect: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.parallelFlowable" - kind INTERNAL - hasNoParent() - attributes { - } - } - } - } - } - - def "should capture span for eventually completed ParallelFlowable"() { - setup: - def source = UnicastProcessor. create() - def observer = new TestSubscriber() - new TracedWithSpan() - .parallelFlowable(source.parallel()) - .sequential() - .subscribe(observer) - - expect: - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onNext("Value") - observer.assertValue("Value") - - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onComplete() - observer.assertComplete() - - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.parallelFlowable" - kind INTERNAL - hasNoParent() - attributes { - } - } - } - } - } - - def "should capture span for already errored ParallelFlowable"() { - setup: - def error = new IllegalArgumentException("Boom") - def observer = new TestSubscriber() - def source = Flowable. error(error) - new TracedWithSpan() - .parallelFlowable(source.parallel()) - .sequential() - .subscribe(observer) - observer.assertError(error) - - expect: - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.parallelFlowable" - kind INTERNAL - hasNoParent() - status ERROR - errorEvent(IllegalArgumentException, "Boom") - attributes { - } - } - } - } - } - - def "should capture span for eventually errored ParallelFlowable"() { - setup: - def error = new IllegalArgumentException("Boom") - def source = UnicastProcessor. create() - def observer = new TestSubscriber() - new TracedWithSpan() - .parallelFlowable(source.parallel()) - .sequential() - .subscribe(observer) - - expect: - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onNext("Value") - observer.assertValue("Value") - - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onError(error) - observer.assertError(error) - - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.parallelFlowable" - kind INTERNAL - hasNoParent() - status ERROR - errorEvent(IllegalArgumentException, "Boom") - attributes { - } - } - } - } - } - - def "should capture span for canceled ParallelFlowable"() { - setup: - def source = UnicastProcessor. create() - def observer = new TestSubscriber() - new TracedWithSpan() - .parallelFlowable(source.parallel()) - .sequential() - .subscribe(observer) - - expect: - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onNext("Value") - observer.assertValue("Value") - - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - observer.cancel() - - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.parallelFlowable" - kind INTERNAL - hasNoParent() - attributes { - "rxjava.canceled" true - } - } - } - } - } - - def "should capture span for eventually completed Publisher"() { - setup: - def source = new CustomPublisher() - def observer = new TestSubscriber() - new TracedWithSpan() - .publisher(source) - .subscribe(observer) - - expect: - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onComplete() - observer.assertComplete() - - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.publisher" - kind INTERNAL - hasNoParent() - attributes { - } - } - } - } - } - - def "should capture span for eventually errored Publisher"() { - setup: - def error = new IllegalArgumentException("Boom") - def source = new CustomPublisher() - def observer = new TestSubscriber() - new TracedWithSpan() - .publisher(source) - .subscribe(observer) - - expect: - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - source.onError(error) - observer.assertError(error) - - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.publisher" - kind INTERNAL - hasNoParent() - status ERROR - errorEvent(IllegalArgumentException, "Boom") - attributes { - } - } - } - } - } - - def "should capture span for canceled Publisher"() { - setup: - def source = new CustomPublisher() - def observer = new TestSubscriber() - new TracedWithSpan() - .publisher(source) - .subscribe(observer) - - expect: - Thread.sleep(500) // sleep a bit just to make sure no span is captured - assertTraces(0) {} - - observer.cancel() - - assertTraces(1) { - trace(0, 1) { - span(0) { - name "TracedWithSpan.publisher" - kind INTERNAL - hasNoParent() - attributes { - "rxjava.canceled" true - } - } - } - } - } - - static class CustomPublisher implements Publisher, Subscription { - Subscriber subscriber - - @Override - void subscribe(Subscriber subscriber) { - this.subscriber = subscriber - subscriber.onSubscribe(this) - } - - void onComplete() { - this.subscriber.onComplete() - } - - void onError(Throwable exception) { - this.subscriber.onError(exception) - } - - @Override - void request(long l) {} - - @Override - void cancel() {} - } +class RxJava3WithSpanInstrumentationTest extends AbstractRxJava3WithSpanInstrumentationTest { } diff --git a/instrumentation/rxjava/rxjava-3.0/library/build.gradle.kts b/instrumentation/rxjava/rxjava-3.0/library/build.gradle.kts index e10bd0468e8b..41935bcb1855 100644 --- a/instrumentation/rxjava/rxjava-3.0/library/build.gradle.kts +++ b/instrumentation/rxjava/rxjava-3.0/library/build.gradle.kts @@ -5,8 +5,9 @@ plugins { dependencies { library("io.reactivex.rxjava3:rxjava:3.0.12") implementation(project(":instrumentation-api-annotation-support")) + implementation(project(":instrumentation:rxjava:rxjava-3-common:library")) - testImplementation(project(":instrumentation:rxjava:rxjava-3.0:testing")) + testImplementation(project(":instrumentation:rxjava:rxjava-3-common:testing")) latestDepTestLibrary("io.reactivex.rxjava3:rxjava:3.1.0") } diff --git a/instrumentation/rxjava/rxjava-3.1.1/javaagent/build.gradle.kts b/instrumentation/rxjava/rxjava-3.1.1/javaagent/build.gradle.kts new file mode 100644 index 000000000000..20730e889e5e --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.1.1/javaagent/build.gradle.kts @@ -0,0 +1,28 @@ +plugins { + id("otel.javaagent-instrumentation") +} + +muzzle { + pass { + group.set("io.reactivex.rxjava3") + module.set("rxjava") + versions.set("[3.1.1,)") + assertInverse.set(true) + } +} + +dependencies { + library("io.reactivex.rxjava3:rxjava:3.1.1") + compileOnly(project(":instrumentation-api-annotation-support")) + + implementation(project(":instrumentation:rxjava:rxjava-3.1.1:library")) + + testImplementation(project(":instrumentation:rxjava:rxjava-3-common:testing")) + + testInstrumentation(project(":instrumentation:rxjava:rxjava-3.0:javaagent")) +} + +tasks.withType().configureEach { + // TODO run tests both with and without experimental span attributes + jvmArgs("-Dotel.instrumentation.rxjava.experimental-span-attributes=true") +} diff --git a/instrumentation/rxjava/rxjava-3.1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/v3_1_1/RxJava3InstrumentationModule.java b/instrumentation/rxjava/rxjava-3.1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/v3_1_1/RxJava3InstrumentationModule.java new file mode 100644 index 000000000000..7af4d0da03c1 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/v3_1_1/RxJava3InstrumentationModule.java @@ -0,0 +1,33 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rxjava3.v3_1_1; + +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; + +import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import java.util.Collections; +import java.util.List; +import net.bytebuddy.matcher.ElementMatcher; + +@AutoService(InstrumentationModule.class) +public class RxJava3InstrumentationModule extends InstrumentationModule { + + public RxJava3InstrumentationModule() { + super("rxjava3.1.1", "rxjava3"); + } + + @Override + public ElementMatcher.Junction classLoaderMatcher() { + return hasClassesNamed("io.reactivex.rxjava3.operators.ConditionalSubscriber"); + } + + @Override + public List typeInstrumentations() { + return Collections.singletonList(new RxJavaPluginsInstrumentation()); + } +} diff --git a/instrumentation/rxjava/rxjava-3.1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/v3_1_1/RxJavaPluginsInstrumentation.java b/instrumentation/rxjava/rxjava-3.1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/v3_1_1/RxJavaPluginsInstrumentation.java new file mode 100644 index 000000000000..cd6863f56f67 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/v3_1_1/RxJavaPluginsInstrumentation.java @@ -0,0 +1,40 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rxjava3.v3_1_1; + +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.named; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +public class RxJavaPluginsInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("io.reactivex.rxjava3.plugins.RxJavaPlugins"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod(isMethod(), this.getClass().getName() + "$MethodAdvice"); + } + + @SuppressWarnings("unused") + public static class MethodAdvice { + + // TODO(anuraaga): Replace with adding a type initializer to RxJavaPlugins + // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/2685 + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void activateOncePerClassloader() { + TracingAssemblyActivation.activate(RxJavaPlugins.class); + } + } +} diff --git a/instrumentation/rxjava/rxjava-3.1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/v3_1_1/TracingAssemblyActivation.java b/instrumentation/rxjava/rxjava-3.1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/v3_1_1/TracingAssemblyActivation.java new file mode 100644 index 000000000000..ee10d984c415 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rxjava3/v3_1_1/TracingAssemblyActivation.java @@ -0,0 +1,34 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rxjava3.v3_1_1; + +import io.opentelemetry.instrumentation.api.config.Config; +import io.opentelemetry.instrumentation.rxjava3.v3_1_1.TracingAssembly; +import java.util.concurrent.atomic.AtomicBoolean; + +public final class TracingAssemblyActivation { + + private static final ClassValue activated = + new ClassValue() { + @Override + protected AtomicBoolean computeValue(Class type) { + return new AtomicBoolean(); + } + }; + + public static void activate(Class clz) { + if (activated.get(clz).compareAndSet(false, true)) { + TracingAssembly.builder() + .setCaptureExperimentalSpanAttributes( + Config.get() + .getBoolean("otel.instrumentation.rxjava.experimental-span-attributes", false)) + .build() + .enable(); + } + } + + private TracingAssemblyActivation() {} +} diff --git a/instrumentation/rxjava/rxjava-3.1.1/javaagent/src/test/groovy/RxJava3SubscriptionTest.groovy b/instrumentation/rxjava/rxjava-3.1.1/javaagent/src/test/groovy/RxJava3SubscriptionTest.groovy new file mode 100644 index 000000000000..7330ce48311a --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.1.1/javaagent/src/test/groovy/RxJava3SubscriptionTest.groovy @@ -0,0 +1,10 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.instrumentation.rxjava3.AbstractRxJava3SubscriptionTest +import io.opentelemetry.instrumentation.test.AgentTestTrait + +class RxJava3SubscriptionTest extends AbstractRxJava3SubscriptionTest implements AgentTestTrait { +} diff --git a/instrumentation/rxjava/rxjava-3.1.1/javaagent/src/test/groovy/RxJava3Test.groovy b/instrumentation/rxjava/rxjava-3.1.1/javaagent/src/test/groovy/RxJava3Test.groovy new file mode 100644 index 000000000000..1cd151d9fbd8 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.1.1/javaagent/src/test/groovy/RxJava3Test.groovy @@ -0,0 +1,10 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.instrumentation.rxjava3.AbstractRxJava3Test +import io.opentelemetry.instrumentation.test.AgentTestTrait + +class RxJava3Test extends AbstractRxJava3Test implements AgentTestTrait { +} diff --git a/instrumentation/rxjava/rxjava-3.1.1/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy b/instrumentation/rxjava/rxjava-3.1.1/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy new file mode 100644 index 000000000000..091350c65789 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.1.1/javaagent/src/test/groovy/RxJava3WithSpanInstrumentationTest.groovy @@ -0,0 +1,9 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.instrumentation.rxjava3.AbstractRxJava3WithSpanInstrumentationTest + +class RxJava3WithSpanInstrumentationTest extends AbstractRxJava3WithSpanInstrumentationTest { +} diff --git a/instrumentation/rxjava/rxjava-3.1.1/library/build.gradle.kts b/instrumentation/rxjava/rxjava-3.1.1/library/build.gradle.kts new file mode 100644 index 000000000000..2feae3b34292 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.1.1/library/build.gradle.kts @@ -0,0 +1,15 @@ +plugins { + id("otel.library-instrumentation") +} + +dependencies { + library("io.reactivex.rxjava3:rxjava:3.1.1") + implementation(project(":instrumentation-api-annotation-support")) + implementation(project(":instrumentation:rxjava:rxjava-3-common:library")) + + testImplementation(project(":instrumentation:rxjava:rxjava-3-common:testing")) +} + +tasks.withType().configureEach { + jvmArgs("-Dio.opentelemetry.context.enableStrictContext=false") +} diff --git a/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingAssembly.java b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingAssembly.java new file mode 100644 index 000000000000..0ae7dc5d0e7e --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingAssembly.java @@ -0,0 +1,328 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +// Includes work from: +/* + * Copyright 2018 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.opentelemetry.instrumentation.rxjava3.v3_1_1; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.annotation.support.async.AsyncOperationEndStrategies; +import io.opentelemetry.instrumentation.api.internal.GuardedBy; +import io.opentelemetry.instrumentation.rxjava3.RxJava3AsyncOperationEndStrategy; +import io.opentelemetry.instrumentation.rxjava3.TracingCompletableObserver; +import io.opentelemetry.instrumentation.rxjava3.TracingMaybeObserver; +import io.opentelemetry.instrumentation.rxjava3.TracingSingleObserver; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.CompletableObserver; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.MaybeObserver; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleObserver; +import io.reactivex.rxjava3.functions.BiFunction; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.operators.ConditionalSubscriber; +import io.reactivex.rxjava3.parallel.ParallelFlowable; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import javax.annotation.Nullable; +import org.reactivestreams.Subscriber; + +/** + * RxJava3 library instrumentation. + * + *

In order to enable RxJava3 instrumentation one has to call the {@link + * TracingAssembly#enable()} method. + * + *

Instrumentation uses on*Assembly and on*Subscribe RxJavaPlugin hooks + * to wrap RxJava3 classes in their tracing equivalents. + * + *

Instrumentation can be disabled by calling the {@link TracingAssembly#disable()} method. + */ +public final class TracingAssembly { + + @SuppressWarnings("rawtypes") + @GuardedBy("TracingAssembly.class") + @Nullable + private static BiFunction + oldOnObservableSubscribe; + + @SuppressWarnings("rawtypes") + @GuardedBy("TracingAssembly.class") + @Nullable + private static BiFunction< + ? super Completable, ? super CompletableObserver, ? extends CompletableObserver> + oldOnCompletableSubscribe; + + @SuppressWarnings("rawtypes") + @GuardedBy("TracingAssembly.class") + @Nullable + private static BiFunction + oldOnSingleSubscribe; + + @SuppressWarnings("rawtypes") + @GuardedBy("TracingAssembly.class") + @Nullable + private static BiFunction + oldOnMaybeSubscribe; + + @SuppressWarnings("rawtypes") + @GuardedBy("TracingAssembly.class") + @Nullable + private static BiFunction + oldOnFlowableSubscribe; + + @SuppressWarnings("rawtypes") + @GuardedBy("TracingAssembly.class") + @Nullable + private static Function + oldOnParallelAssembly; + + @GuardedBy("TracingAssembly.class") + private static boolean enabled; + + public static TracingAssembly create() { + return builder().build(); + } + + public static TracingAssemblyBuilder builder() { + return new TracingAssemblyBuilder(); + } + + private final boolean captureExperimentalSpanAttributes; + + TracingAssembly(boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + } + + public void enable() { + synchronized (TracingAssembly.class) { + if (enabled) { + return; + } + + enableObservable(); + + enableCompletable(); + + enableSingle(); + + enableMaybe(); + + enableFlowable(); + + enableParallel(); + + enableWithSpanStrategy(captureExperimentalSpanAttributes); + + enabled = true; + } + } + + public void disable() { + synchronized (TracingAssembly.class) { + if (!enabled) { + return; + } + + disableObservable(); + + disableCompletable(); + + disableSingle(); + + disableMaybe(); + + disableFlowable(); + + disableParallel(); + + disableWithSpanStrategy(); + + enabled = false; + } + } + + @GuardedBy("TracingAssembly.class") + @SuppressWarnings({"rawtypes", "unchecked"}) + private static void enableParallel() { + oldOnParallelAssembly = RxJavaPlugins.getOnParallelAssembly(); + RxJavaPlugins.setOnParallelAssembly( + compose( + oldOnParallelAssembly, + parallelFlowable -> new TracingParallelFlowable(parallelFlowable, Context.current()))); + } + + @GuardedBy("TracingAssembly.class") + private static void enableCompletable() { + oldOnCompletableSubscribe = RxJavaPlugins.getOnCompletableSubscribe(); + RxJavaPlugins.setOnCompletableSubscribe( + biCompose( + oldOnCompletableSubscribe, + (completable, observer) -> { + Context context = Context.current(); + try (Scope ignored = context.makeCurrent()) { + return new TracingCompletableObserver(observer, context); + } + })); + } + + @GuardedBy("TracingAssembly.class") + @SuppressWarnings({"rawtypes", "unchecked"}) + private static void enableFlowable() { + oldOnFlowableSubscribe = RxJavaPlugins.getOnFlowableSubscribe(); + RxJavaPlugins.setOnFlowableSubscribe( + biCompose( + oldOnFlowableSubscribe, + (flowable, subscriber) -> { + Context context = Context.current(); + try (Scope ignored = context.makeCurrent()) { + if (subscriber instanceof ConditionalSubscriber) { + return new TracingConditionalSubscriber<>( + (ConditionalSubscriber) subscriber, context); + } else { + return new TracingSubscriber<>(subscriber, context); + } + } + })); + } + + @GuardedBy("TracingAssembly.class") + @SuppressWarnings({"rawtypes", "unchecked"}) + private static void enableObservable() { + oldOnObservableSubscribe = RxJavaPlugins.getOnObservableSubscribe(); + RxJavaPlugins.setOnObservableSubscribe( + biCompose( + oldOnObservableSubscribe, + (observable, observer) -> { + Context context = Context.current(); + try (Scope ignored = context.makeCurrent()) { + return new TracingObserver(observer, context); + } + })); + } + + @GuardedBy("TracingAssembly.class") + @SuppressWarnings({"rawtypes", "unchecked"}) + private static void enableSingle() { + oldOnSingleSubscribe = RxJavaPlugins.getOnSingleSubscribe(); + RxJavaPlugins.setOnSingleSubscribe( + biCompose( + oldOnSingleSubscribe, + (single, singleObserver) -> { + Context context = Context.current(); + try (Scope ignored = context.makeCurrent()) { + return new TracingSingleObserver(singleObserver, context); + } + })); + } + + @GuardedBy("TracingAssembly.class") + @SuppressWarnings({"rawtypes", "unchecked"}) + private static void enableMaybe() { + oldOnMaybeSubscribe = RxJavaPlugins.getOnMaybeSubscribe(); + RxJavaPlugins.setOnMaybeSubscribe( + (BiFunction) + biCompose( + oldOnMaybeSubscribe, + (BiFunction) + (maybe, maybeObserver) -> { + Context context = Context.current(); + try (Scope ignored = context.makeCurrent()) { + return new TracingMaybeObserver(maybeObserver, context); + } + })); + } + + private static RxJava3AsyncOperationEndStrategy asyncOperationEndStrategy; + + private static void enableWithSpanStrategy(boolean captureExperimentalSpanAttributes) { + asyncOperationEndStrategy = + RxJava3AsyncOperationEndStrategy.builder() + .setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes) + .build(); + + AsyncOperationEndStrategies.instance().registerStrategy(asyncOperationEndStrategy); + } + + @GuardedBy("TracingAssembly.class") + private static void disableParallel() { + RxJavaPlugins.setOnParallelAssembly(oldOnParallelAssembly); + oldOnParallelAssembly = null; + } + + @GuardedBy("TracingAssembly.class") + private static void disableObservable() { + RxJavaPlugins.setOnObservableSubscribe(oldOnObservableSubscribe); + oldOnObservableSubscribe = null; + } + + @GuardedBy("TracingAssembly.class") + private static void disableCompletable() { + RxJavaPlugins.setOnCompletableSubscribe(oldOnCompletableSubscribe); + oldOnCompletableSubscribe = null; + } + + @GuardedBy("TracingAssembly.class") + private static void disableFlowable() { + RxJavaPlugins.setOnFlowableSubscribe(oldOnFlowableSubscribe); + oldOnFlowableSubscribe = null; + } + + @GuardedBy("TracingAssembly.class") + private static void disableSingle() { + RxJavaPlugins.setOnSingleSubscribe(oldOnSingleSubscribe); + oldOnSingleSubscribe = null; + } + + @GuardedBy("TracingAssembly.class") + @SuppressWarnings({"rawtypes", "unchecked"}) + private static void disableMaybe() { + RxJavaPlugins.setOnMaybeSubscribe( + (BiFunction) oldOnMaybeSubscribe); + oldOnMaybeSubscribe = null; + } + + private static void disableWithSpanStrategy() { + if (asyncOperationEndStrategy != null) { + AsyncOperationEndStrategies.instance().unregisterStrategy(asyncOperationEndStrategy); + asyncOperationEndStrategy = null; + } + } + + private static Function compose( + Function before, Function after) { + if (before == null) { + return after; + } + return (T v) -> after.apply(before.apply(v)); + } + + private static BiFunction biCompose( + BiFunction before, + BiFunction after) { + if (before == null) { + return after; + } + return (T v, U u) -> after.apply(v, before.apply(v, u)); + } +} diff --git a/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingAssemblyBuilder.java b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingAssemblyBuilder.java new file mode 100644 index 000000000000..a0ae6a93ca54 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingAssemblyBuilder.java @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rxjava3.v3_1_1; + +public final class TracingAssemblyBuilder { + private boolean captureExperimentalSpanAttributes; + + TracingAssemblyBuilder() {} + + public TracingAssemblyBuilder setCaptureExperimentalSpanAttributes( + boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + return this; + } + + public TracingAssembly build() { + return new TracingAssembly(captureExperimentalSpanAttributes); + } +} diff --git a/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingConditionalSubscriber.java b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingConditionalSubscriber.java new file mode 100644 index 000000000000..b15c94df269d --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingConditionalSubscriber.java @@ -0,0 +1,83 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +// Includes work from: +/* + * Copyright 2018 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.opentelemetry.instrumentation.rxjava3.v3_1_1; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.reactivex.rxjava3.internal.subscribers.BasicFuseableConditionalSubscriber; +import io.reactivex.rxjava3.operators.ConditionalSubscriber; +import io.reactivex.rxjava3.operators.QueueSubscription; + +class TracingConditionalSubscriber extends BasicFuseableConditionalSubscriber { + + private final Context context; + + TracingConditionalSubscriber(ConditionalSubscriber downstream, Context context) { + super(downstream); + this.context = context; + } + + @Override + public boolean tryOnNext(T t) { + try (Scope ignored = context.makeCurrent()) { + return downstream.tryOnNext(t); + } + } + + @Override + public void onNext(T t) { + try (Scope ignored = context.makeCurrent()) { + downstream.onNext(t); + } + } + + @Override + public void onError(Throwable t) { + try (Scope ignored = context.makeCurrent()) { + downstream.onError(t); + } + } + + @Override + public void onComplete() { + try (Scope ignored = context.makeCurrent()) { + downstream.onComplete(); + } + } + + @Override + public int requestFusion(int mode) { + QueueSubscription qs = this.qs; + if (qs != null) { + int m = qs.requestFusion(mode); + sourceMode = m; + return m; + } + return NONE; + } + + @Override + public T poll() throws Throwable { + return qs.poll(); + } +} diff --git a/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingObserver.java b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingObserver.java new file mode 100644 index 000000000000..e72ce3c449d3 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingObserver.java @@ -0,0 +1,76 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +// Includes work from: +/* + * Copyright 2018 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.opentelemetry.instrumentation.rxjava3.v3_1_1; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.internal.observers.BasicFuseableObserver; +import io.reactivex.rxjava3.operators.QueueDisposable; + +class TracingObserver extends BasicFuseableObserver { + + private final Context context; + + TracingObserver(Observer downstream, Context context) { + super(downstream); + this.context = context; + } + + @Override + public void onNext(T t) { + try (Scope ignored = context.makeCurrent()) { + downstream.onNext(t); + } + } + + @Override + public void onError(Throwable t) { + try (Scope ignored = context.makeCurrent()) { + downstream.onError(t); + } + } + + @Override + public void onComplete() { + try (Scope ignored = context.makeCurrent()) { + downstream.onComplete(); + } + } + + @Override + public int requestFusion(int mode) { + QueueDisposable qd = this.qd; + if (qd != null) { + int m = qd.requestFusion(mode); + sourceMode = m; + return m; + } + return NONE; + } + + @Override + public T poll() throws Throwable { + return qd.poll(); + } +} diff --git a/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingParallelFlowable.java b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingParallelFlowable.java new file mode 100644 index 000000000000..8b9a40cb0193 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingParallelFlowable.java @@ -0,0 +1,67 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +// Includes work from: +/* + * Copyright 2018 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.opentelemetry.instrumentation.rxjava3.v3_1_1; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.reactivex.rxjava3.operators.ConditionalSubscriber; +import io.reactivex.rxjava3.parallel.ParallelFlowable; +import org.reactivestreams.Subscriber; + +class TracingParallelFlowable extends ParallelFlowable { + + private final ParallelFlowable source; + private final Context context; + + TracingParallelFlowable(ParallelFlowable source, Context context) { + this.source = source; + this.context = context; + } + + @SuppressWarnings("unchecked") + @Override + public void subscribe(Subscriber[] subscribers) { + if (!validate(subscribers)) { + return; + } + int n = subscribers.length; + Subscriber[] parents = new Subscriber[n]; + for (int i = 0; i < n; i++) { + Subscriber z = subscribers[i]; + if (z instanceof ConditionalSubscriber) { + parents[i] = + new TracingConditionalSubscriber<>((ConditionalSubscriber) z, context); + } else { + parents[i] = new TracingSubscriber<>(z, context); + } + } + try (Scope ignored = context.makeCurrent()) { + source.subscribe(parents); + } + } + + @Override + public int parallelism() { + return source.parallelism(); + } +} diff --git a/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingSubscriber.java b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingSubscriber.java new file mode 100644 index 000000000000..0300c621eb93 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.1.1/library/src/main/java/io/opentelemetry/instrumentation/rxjava3/v3_1_1/TracingSubscriber.java @@ -0,0 +1,76 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +// Includes work from: +/* + * Copyright 2018 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.opentelemetry.instrumentation.rxjava3.v3_1_1; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber; +import io.reactivex.rxjava3.operators.QueueSubscription; +import org.reactivestreams.Subscriber; + +class TracingSubscriber extends BasicFuseableSubscriber { + + private final Context context; + + TracingSubscriber(Subscriber downstream, Context context) { + super(downstream); + this.context = context; + } + + @Override + public void onNext(T t) { + try (Scope ignored = context.makeCurrent()) { + downstream.onNext(t); + } + } + + @Override + public void onError(Throwable t) { + try (Scope ignored = context.makeCurrent()) { + downstream.onError(t); + } + } + + @Override + public void onComplete() { + try (Scope ignored = context.makeCurrent()) { + downstream.onComplete(); + } + } + + @Override + public int requestFusion(int mode) { + QueueSubscription qs = this.qs; + if (qs != null) { + int m = qs.requestFusion(mode); + sourceMode = m; + return m; + } + return NONE; + } + + @Override + public T poll() throws Throwable { + return qs.poll(); + } +} diff --git a/instrumentation/rxjava/rxjava-3.1.1/library/src/test/groovy/RxJava3SubscriptionTest.groovy b/instrumentation/rxjava/rxjava-3.1.1/library/src/test/groovy/RxJava3SubscriptionTest.groovy new file mode 100644 index 000000000000..2813139efc99 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.1.1/library/src/test/groovy/RxJava3SubscriptionTest.groovy @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.instrumentation.rxjava3.AbstractRxJava3SubscriptionTest +import io.opentelemetry.instrumentation.rxjava3.v3_1_1.TracingAssembly +import io.opentelemetry.instrumentation.test.LibraryTestTrait +import spock.lang.Shared + +class RxJava3SubscriptionTest extends AbstractRxJava3SubscriptionTest implements LibraryTestTrait { + @Shared + TracingAssembly tracingAssembly = TracingAssembly.create() + + def setupSpec() { + tracingAssembly.enable() + } +} diff --git a/instrumentation/rxjava/rxjava-3.1.1/library/src/test/groovy/RxJava3Test.groovy b/instrumentation/rxjava/rxjava-3.1.1/library/src/test/groovy/RxJava3Test.groovy new file mode 100644 index 000000000000..94770dee7326 --- /dev/null +++ b/instrumentation/rxjava/rxjava-3.1.1/library/src/test/groovy/RxJava3Test.groovy @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.instrumentation.rxjava3.AbstractRxJava3Test +import io.opentelemetry.instrumentation.rxjava3.v3_1_1.TracingAssembly +import io.opentelemetry.instrumentation.test.LibraryTestTrait +import spock.lang.Shared + +class RxJava3Test extends AbstractRxJava3Test implements LibraryTestTrait { + @Shared + TracingAssembly tracingAssembly = TracingAssembly.create() + + def setupSpec() { + tracingAssembly.enable() + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 7f1692ad8a45..2ab03001e314 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -345,8 +345,11 @@ include(":instrumentation:rxjava:rxjava-2.0:library") include(":instrumentation:rxjava:rxjava-2.0:testing") include(":instrumentation:rxjava:rxjava-2.0:javaagent") include(":instrumentation:rxjava:rxjava-3.0:library") -include(":instrumentation:rxjava:rxjava-3.0:testing") include(":instrumentation:rxjava:rxjava-3.0:javaagent") +include(":instrumentation:rxjava:rxjava-3.1.1:library") +include(":instrumentation:rxjava:rxjava-3.1.1:javaagent") +include(":instrumentation:rxjava:rxjava-3-common:library") +include(":instrumentation:rxjava:rxjava-3-common:testing") include(":instrumentation:scala-executors:javaagent") include(":instrumentation:servlet:servlet-common:bootstrap") include(":instrumentation:servlet:servlet-common:javaagent")