From de3215599ab0f745da5bf9e1c7e2afc6e8f4ac3d Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Sat, 11 Sep 2021 13:30:12 -0700 Subject: [PATCH 1/3] Fix grpc instrumentation of callbacks --- .../grpc/v1_6/TracingClientInterceptor.java | 28 +++-- .../grpc/v1_6/AbstractGrpcTest.groovy | 118 ++++++++++++++++++ 2 files changed, 136 insertions(+), 10 deletions(-) diff --git a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java index 1edd68e152e0..b7607cf29811 100644 --- a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java +++ b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java @@ -46,7 +46,8 @@ final class TracingClientInterceptor implements ClientInterceptor { public ClientCall interceptCall( MethodDescriptor method, CallOptions callOptions, Channel next) { GrpcRequest request = new GrpcRequest(method, null, null); - Context context = instrumenter.start(Context.current(), request); + Context parentContext = Context.current(); + Context context = instrumenter.start(parentContext, request); final ClientCall result; try (Scope ignored = context.makeCurrent()) { try { @@ -61,18 +62,23 @@ public ClientCall interceptCall( SocketAddress address = result.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); request.setRemoteAddress(address); - return new TracingClientCall<>(result, context, request); + return new TracingClientCall<>(result, parentContext, context, request); } final class TracingClientCall extends ForwardingClientCall.SimpleForwardingClientCall { + private final Context parentContext; private final Context context; private final GrpcRequest request; TracingClientCall( - ClientCall delegate, Context context, GrpcRequest request) { + ClientCall delegate, + Context parentContext, + Context context, + GrpcRequest request) { super(delegate); + this.parentContext = parentContext; this.context = context; this.request = request; } @@ -81,7 +87,9 @@ final class TracingClientCall public void start(Listener responseListener, Metadata headers) { propagators.getTextMapPropagator().inject(context, headers, SETTER); try (Scope ignored = context.makeCurrent()) { - super.start(new TracingClientCallListener<>(responseListener, context, request), headers); + super.start( + new TracingClientCallListener<>(responseListener, parentContext, context, request), + headers); } catch (Throwable e) { instrumenter.end(context, request, null, e); throw e; @@ -102,6 +110,7 @@ public void sendMessage(REQUEST message) { final class TracingClientCallListener extends ForwardingClientCallListener.SimpleForwardingClientCallListener { + private final Context parentContext; private final Context context; private final GrpcRequest request; @@ -109,8 +118,10 @@ final class TracingClientCallListener @SuppressWarnings("UnusedVariable") volatile long messageId; - TracingClientCallListener(Listener delegate, Context context, GrpcRequest request) { + TracingClientCallListener( + Listener delegate, Context parentContext, Context context, GrpcRequest request) { super(delegate); + this.parentContext = parentContext; this.context = context; this.request = request; } @@ -135,13 +146,10 @@ public void onMessage(RESPONSE message) { @Override public void onClose(Status status, Metadata trailers) { - try (Scope ignored = context.makeCurrent()) { + instrumenter.end(context, request, status, status.getCause()); + try (Scope ignored = parentContext.makeCurrent()) { delegate().onClose(status, trailers); - } catch (Throwable e) { - instrumenter.end(context, request, status, e); - throw e; } - instrumenter.end(context, request, status, status.getCause()); } @Override diff --git a/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.groovy b/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.groovy index eec368feb72d..98ab1e22041d 100644 --- a/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.groovy +++ b/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.groovy @@ -143,6 +143,124 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification { paramName << ["some name", "some other name"] } + def "test callback"() { + setup: + BindableService greeter = new GreeterGrpc.GreeterImplBase() { + @Override + void sayHello( + final Helloworld.Request req, final StreamObserver responseObserver) { + final Helloworld.Response reply = Helloworld.Response.newBuilder().setMessage("Hello $req.name").build() + responseObserver.onNext(reply) + responseObserver.onCompleted() + } + } + def port = PortUtils.findOpenPort() + Server server = configureServer(ServerBuilder.forPort(port).addService(greeter)).build().start() + ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port)) + + // Depending on the version of gRPC usePlainText may or may not take an argument. + try { + channelBuilder.usePlaintext() + } catch (MissingMethodException e) { + channelBuilder.usePlaintext(true) + } + ManagedChannel channel = channelBuilder.build() + GreeterGrpc.GreeterStub client = GreeterGrpc.newStub(channel) + + when: + AtomicReference response = new AtomicReference<>() + AtomicReference error = new AtomicReference<>() + CountDownLatch latch = new CountDownLatch(1) + runWithSpan("parent") { + client.sayHello(Helloworld.Request.newBuilder().setName("test").build(), + new StreamObserver() { + @Override + void onNext(Helloworld.Response r) { + response.set(r) + } + + @Override + void onError(Throwable throwable) { + error.set(throwable) + } + + @Override + void onCompleted() { + runWithSpan("child") {} + latch.countDown() + } + } + ) + } + + latch.await(10, TimeUnit.SECONDS) + + then: + error.get() == null + response.get().message == "Hello test" + + assertTraces(1) { + trace(0, 4) { + span(0) { + name "parent" + kind SpanKind.INTERNAL + hasNoParent() + } + span(1) { + name "example.Greeter/SayHello" + kind CLIENT + childOf span(0) + event(0) { + eventName "message" + attributes { + "message.type" "SENT" + "message.id" 1 + } + } + attributes { + "${SemanticAttributes.RPC_SYSTEM.key}" "grpc" + "${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter" + "${SemanticAttributes.RPC_METHOD.key}" "SayHello" + "${SemanticAttributes.NET_TRANSPORT}" SemanticAttributes.NetTransportValues.IP_TCP + "${SemanticAttributes.RPC_GRPC_STATUS_CODE}" Status.Code.OK.value() + } + } + span(2) { + name "example.Greeter/SayHello" + kind SERVER + childOf span(1) + event(0) { + eventName "message" + attributes { + "message.type" "RECEIVED" + "message.id" 1 + } + } + attributes { + "${SemanticAttributes.RPC_SYSTEM.key}" "grpc" + "${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter" + "${SemanticAttributes.RPC_METHOD.key}" "SayHello" + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" + // "localhost" on linux, "127.0.0.1" on windows + "${SemanticAttributes.NET_PEER_NAME.key}" { it == "localhost" || it == "127.0.0.1" } + "${SemanticAttributes.NET_PEER_PORT.key}" Long + "${SemanticAttributes.NET_TRANSPORT.key}" SemanticAttributes.NetTransportValues.IP_TCP + "${SemanticAttributes.RPC_GRPC_STATUS_CODE.key}" Status.Code.OK.value() + } + } + span(3) { + name "child" + kind SpanKind.INTERNAL + childOf span(0) + } + } + } + + cleanup: + channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) + server?.shutdownNow()?.awaitTermination() + } + def "test error - #paramName"() { setup: def error = grpcStatus.asException() From 7560ad10c0c94a64c0ce871be8a032c04b1db822 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Sun, 12 Sep 2021 19:31:16 -0700 Subject: [PATCH 2/3] Add ListenableFuture test --- .../grpc/v1_6/AbstractGrpcTest.groovy | 113 +++++++++++++++++- 1 file changed, 112 insertions(+), 1 deletion(-) diff --git a/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.groovy b/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.groovy index 6e749ac7b2d0..68fd3e47b9e0 100644 --- a/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.groovy +++ b/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.groovy @@ -5,6 +5,7 @@ package io.opentelemetry.instrumentation.grpc.v1_6 +import com.google.common.util.concurrent.MoreExecutors import example.GreeterGrpc import example.Helloworld import io.grpc.BindableService @@ -143,7 +144,117 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification { paramName << ["some name", "some other name"] } - def "test callback"() { + def "test ListenableFuture callback"() { + setup: + BindableService greeter = new GreeterGrpc.GreeterImplBase() { + @Override + void sayHello( + final Helloworld.Request req, final StreamObserver responseObserver) { + final Helloworld.Response reply = Helloworld.Response.newBuilder().setMessage("Hello $req.name").build() + responseObserver.onNext(reply) + responseObserver.onCompleted() + } + } + def port = PortUtils.findOpenPort() + Server server = configureServer(ServerBuilder.forPort(port).addService(greeter)).build().start() + ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port)) + + // Depending on the version of gRPC usePlainText may or may not take an argument. + try { + channelBuilder.usePlaintext() + } catch (MissingMethodException e) { + channelBuilder.usePlaintext(true) + } + ManagedChannel channel = channelBuilder.build() + GreeterGrpc.GreeterFutureStub client = GreeterGrpc.newFutureStub(channel) + + when: + AtomicReference response = new AtomicReference<>() + AtomicReference error = new AtomicReference<>() + CountDownLatch latch = new CountDownLatch(1) + runWithSpan("parent") { + def future = client.sayHello(Helloworld.Request.newBuilder().setName("test").build()) + future.addListener({ + try { + response.set(future.get()) + } catch (Exception e) { + error.set(e); + } + runWithSpan("child") {} + latch.countDown() + }, MoreExecutors.directExecutor()) + } + + latch.await(10, TimeUnit.SECONDS) + + then: + error.get() == null + response.get().message == "Hello test" + + assertTraces(1) { + trace(0, 4) { + span(0) { + name "parent" + kind SpanKind.INTERNAL + hasNoParent() + } + span(1) { + name "example.Greeter/SayHello" + kind CLIENT + childOf span(0) + event(0) { + eventName "message" + attributes { + "message.type" "SENT" + "message.id" 1 + } + } + attributes { + "${SemanticAttributes.RPC_SYSTEM.key}" "grpc" + "${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter" + "${SemanticAttributes.RPC_METHOD.key}" "SayHello" + "${SemanticAttributes.NET_TRANSPORT}" SemanticAttributes.NetTransportValues.IP_TCP + "${SemanticAttributes.RPC_GRPC_STATUS_CODE}" Status.Code.OK.value() + } + } + span(2) { + name "example.Greeter/SayHello" + kind SERVER + childOf span(1) + event(0) { + eventName "message" + attributes { + "message.type" "RECEIVED" + "message.id" 1 + } + } + attributes { + "${SemanticAttributes.RPC_SYSTEM.key}" "grpc" + "${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter" + "${SemanticAttributes.RPC_METHOD.key}" "SayHello" + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" + // "localhost" on linux, "127.0.0.1" on windows + "${SemanticAttributes.NET_PEER_NAME.key}" { it == "localhost" || it == "127.0.0.1" } + "${SemanticAttributes.NET_PEER_PORT.key}" Long + "${SemanticAttributes.NET_TRANSPORT.key}" SemanticAttributes.NetTransportValues.IP_TCP + "${SemanticAttributes.RPC_GRPC_STATUS_CODE.key}" Status.Code.OK.value() + } + } + span(3) { + name "child" + kind SpanKind.INTERNAL + childOf span(0) + } + } + } + + cleanup: + channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) + server?.shutdownNow()?.awaitTermination() + } + + + def "test onCompleted callback"() { setup: BindableService greeter = new GreeterGrpc.GreeterImplBase() { @Override From 383ca4a510cb2177e91b69a7800dbb9fdf8a2630 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Sun, 12 Sep 2021 20:12:13 -0700 Subject: [PATCH 3/3] Futures.transform --- .../grpc/v1_6/AbstractGrpcTest.groovy | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.groovy b/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.groovy index 68fd3e47b9e0..3618e5732541 100644 --- a/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.groovy +++ b/instrumentation/grpc-1.6/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.groovy @@ -5,6 +5,7 @@ package io.opentelemetry.instrumentation.grpc.v1_6 +import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.MoreExecutors import example.GreeterGrpc import example.Helloworld @@ -171,22 +172,21 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification { when: AtomicReference response = new AtomicReference<>() AtomicReference error = new AtomicReference<>() - CountDownLatch latch = new CountDownLatch(1) runWithSpan("parent") { - def future = client.sayHello(Helloworld.Request.newBuilder().setName("test").build()) - future.addListener({ - try { - response.set(future.get()) - } catch (Exception e) { - error.set(e); - } - runWithSpan("child") {} - latch.countDown() - }, MoreExecutors.directExecutor()) + def future = Futures.transform( + client.sayHello(Helloworld.Request.newBuilder().setName("test").build()), + { + runWithSpan("child") {} + return it + }, + MoreExecutors.directExecutor()) + try { + response.set(future.get()) + } catch (Exception e) { + error.set(e) + } } - latch.await(10, TimeUnit.SECONDS) - then: error.get() == null response.get().message == "Hello test"