Skip to content

Commit

Permalink
make runWithContext(Flux, ..) public
Browse files Browse the repository at this point in the history
  • Loading branch information
lmolkova committed Dec 6, 2021
1 parent cc22505 commit b19b66f
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 6 deletions.
1 change: 1 addition & 0 deletions instrumentation/reactor-3.1/javaagent/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ muzzle {
group.set("io.projectreactor")
module.set("reactor-core")
versions.set("[3.1.0.RELEASE,)")
extraDependency("io.opentelemetry:opentelemetry-api:1.0.0")
assertInverse.set(true)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.isStatic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

Expand Down Expand Up @@ -52,10 +53,10 @@ public void transform(TypeTransformer transformer) {
.and(isPublic())
.and(isStatic())
.and(named("runWithContext"))
.and(takesArgument(0, named("reactor.core.publisher.Mono")))
.and(takesArgument(0, namedOneOf("reactor.core.publisher.Mono", "reactor.core.publisher.Flux")))
.and(takesArgument(1, named("application.io.opentelemetry.context.Context")))
.and(returns(named("reactor.core.publisher.Mono"))),
ContextPropagationOperatorInstrumentation.class.getName() + "$RunMonoAdvice");
.and(returns(namedOneOf("reactor.core.publisher.Mono", "reactor.core.publisher.Flux"))),
ContextPropagationOperatorInstrumentation.class.getName() + "$RunWithAdvice");
}

@SuppressWarnings("unused")
Expand Down Expand Up @@ -100,7 +101,7 @@ public static void methodExit(
}

@SuppressWarnings("unused")
public static class RunMonoAdvice {
public static class RunWithAdvice {
@Advice.OnMethodEnter
public static void methodEnter(
@Advice.FieldValue(value = "enabled", readOnly = false) boolean enabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.opentelemetry.context.Context
import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator
import io.opentelemetry.instrumentation.reactor.TracedWithSpan
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.test.StepVerifier

Expand Down Expand Up @@ -70,7 +71,7 @@ class ContextPropagationOperatorInstrumentationTest extends AgentInstrumentation
}
}

def "run with context forces it to become current"() {
def "run Mono with context forces it to become current"() {
setup:
def result = Mono.defer({ ->
Span span = GlobalOpenTelemetry.getTracer("test").spanBuilder("parent").startSpan()
Expand Down Expand Up @@ -103,6 +104,39 @@ class ContextPropagationOperatorInstrumentationTest extends AgentInstrumentation
}
}

def "run Flux with context forces it to become current"() {
setup:
def result = Flux.defer({ ->
Span span = GlobalOpenTelemetry.getTracer("test").spanBuilder("parent").startSpan()
def outer = Flux.defer({ -> new TracedWithSpan().flux(Flux.just("Value") )});
return ContextPropagationOperator
.runWithContext(outer, Context.current().with(span))
.doFinally({ i -> span.end() })
})

StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()

expect:
assertTraces(1) {
trace(0, 2) {
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()
}
span(1) {
name "TracedWithSpan.flux"
kind SpanKind.INTERNAL
childOf span(0)
attributes {
}
}
}
}
}

def "store context forces it to become current"() {
setup:
def result = Mono.defer({ ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public static <T> Mono<T> runWithContext(Mono<T> publisher, Context tracingConte
}

/** Forces Flux to run in traceContext scope. */
static <T> Flux<T> runWithContext(Flux<T> publisher, Context tracingContext) {
public static <T> Flux<T> runWithContext(Flux<T> publisher, Context tracingContext) {
if (!enabled) {
return publisher;
}
Expand Down

0 comments on commit b19b66f

Please sign in to comment.