Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix grpc instrumentation of callbacks #4097

Merged
merged 4 commits into from
Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ final class TracingClientInterceptor implements ClientInterceptor {
public <REQUEST, RESPONSE> ClientCall<REQUEST, RESPONSE> interceptCall(
MethodDescriptor<REQUEST, RESPONSE> method, CallOptions callOptions, Channel next) {
GrpcRequest request = new GrpcRequest(method, null, null);
Context context = instrumenter.start(Context.current(), request);
Context parentContext = Context.current();
Context context = instrumenter.start(parentContext, request);
final ClientCall<REQUEST, RESPONSE> result;
try (Scope ignored = context.makeCurrent()) {
try {
Expand All @@ -61,18 +62,23 @@ public <REQUEST, RESPONSE> ClientCall<REQUEST, RESPONSE> interceptCall(
SocketAddress address = result.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
request.setRemoteAddress(address);

return new TracingClientCall<>(result, context, request);
return new TracingClientCall<>(result, parentContext, context, request);
}

final class TracingClientCall<REQUEST, RESPONSE>
extends ForwardingClientCall.SimpleForwardingClientCall<REQUEST, RESPONSE> {

private final Context parentContext;
private final Context context;
private final GrpcRequest request;

TracingClientCall(
ClientCall<REQUEST, RESPONSE> delegate, Context context, GrpcRequest request) {
ClientCall<REQUEST, RESPONSE> delegate,
Context parentContext,
Context context,
GrpcRequest request) {
super(delegate);
this.parentContext = parentContext;
this.context = context;
this.request = request;
}
Expand All @@ -81,7 +87,9 @@ final class TracingClientCall<REQUEST, RESPONSE>
public void start(Listener<RESPONSE> responseListener, Metadata headers) {
propagators.getTextMapPropagator().inject(context, headers, SETTER);
try (Scope ignored = context.makeCurrent()) {
super.start(new TracingClientCallListener<>(responseListener, context, request), headers);
super.start(
new TracingClientCallListener<>(responseListener, parentContext, context, request),
headers);
} catch (Throwable e) {
instrumenter.end(context, request, null, e);
throw e;
Expand All @@ -102,15 +110,18 @@ public void sendMessage(REQUEST message) {
final class TracingClientCallListener<RESPONSE>
extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RESPONSE> {

private final Context parentContext;
private final Context context;
private final GrpcRequest request;

// Used by MESSAGE_ID_UPDATER
@SuppressWarnings("UnusedVariable")
volatile long messageId;

TracingClientCallListener(Listener<RESPONSE> delegate, Context context, GrpcRequest request) {
TracingClientCallListener(
Listener<RESPONSE> delegate, Context parentContext, Context context, GrpcRequest request) {
super(delegate);
this.parentContext = parentContext;
this.context = context;
this.request = request;
}
Expand All @@ -135,13 +146,10 @@ public void onMessage(RESPONSE message) {

@Override
public void onClose(Status status, Metadata trailers) {
try (Scope ignored = context.makeCurrent()) {
instrumenter.end(context, request, status, status.getCause());
try (Scope ignored = parentContext.makeCurrent()) {
delegate().onClose(status, trailers);
} catch (Throwable e) {
instrumenter.end(context, request, status, e);
throw e;
}
instrumenter.end(context, request, status, status.getCause());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package io.opentelemetry.instrumentation.grpc.v1_6

import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.MoreExecutors
import example.GreeterGrpc
import example.Helloworld
import io.grpc.BindableService
Expand Down Expand Up @@ -143,6 +145,233 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification {
paramName << ["some name", "some other name"]
}

def "test ListenableFuture callback"() {
setup:
BindableService greeter = new GreeterGrpc.GreeterImplBase() {
@Override
void sayHello(
final Helloworld.Request req, final StreamObserver<Helloworld.Response> responseObserver) {
final Helloworld.Response reply = Helloworld.Response.newBuilder().setMessage("Hello $req.name").build()
responseObserver.onNext(reply)
responseObserver.onCompleted()
}
}
def port = PortUtils.findOpenPort()
Server server = configureServer(ServerBuilder.forPort(port).addService(greeter)).build().start()
ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port))

// Depending on the version of gRPC usePlainText may or may not take an argument.
try {
channelBuilder.usePlaintext()
} catch (MissingMethodException e) {
channelBuilder.usePlaintext(true)
}
ManagedChannel channel = channelBuilder.build()
GreeterGrpc.GreeterFutureStub client = GreeterGrpc.newFutureStub(channel)

when:
AtomicReference<Helloworld.Response> response = new AtomicReference<>()
AtomicReference<Throwable> error = new AtomicReference<>()
runWithSpan("parent") {
def future = Futures.transform(
client.sayHello(Helloworld.Request.newBuilder().setName("test").build()),
{
runWithSpan("child") {}
return it
},
MoreExecutors.directExecutor())
try {
response.set(future.get())
} catch (Exception e) {
error.set(e)
}
}

then:
error.get() == null
response.get().message == "Hello test"

assertTraces(1) {
trace(0, 4) {
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()
}
span(1) {
name "example.Greeter/SayHello"
kind CLIENT
childOf span(0)
event(0) {
eventName "message"
attributes {
"message.type" "SENT"
"message.id" 1
}
}
attributes {
"${SemanticAttributes.RPC_SYSTEM.key}" "grpc"
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key}" "SayHello"
"${SemanticAttributes.NET_TRANSPORT}" SemanticAttributes.NetTransportValues.IP_TCP
"${SemanticAttributes.RPC_GRPC_STATUS_CODE}" Status.Code.OK.value()
}
}
span(2) {
name "example.Greeter/SayHello"
kind SERVER
childOf span(1)
event(0) {
eventName "message"
attributes {
"message.type" "RECEIVED"
"message.id" 1
}
}
attributes {
"${SemanticAttributes.RPC_SYSTEM.key}" "grpc"
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key}" "SayHello"
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
// "localhost" on linux, "127.0.0.1" on windows
"${SemanticAttributes.NET_PEER_NAME.key}" { it == "localhost" || it == "127.0.0.1" }
"${SemanticAttributes.NET_PEER_PORT.key}" Long
"${SemanticAttributes.NET_TRANSPORT.key}" SemanticAttributes.NetTransportValues.IP_TCP
"${SemanticAttributes.RPC_GRPC_STATUS_CODE.key}" Status.Code.OK.value()
}
}
span(3) {
name "child"
kind SpanKind.INTERNAL
childOf span(0)
}
}
}

cleanup:
channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS)
server?.shutdownNow()?.awaitTermination()
}


def "test onCompleted callback"() {
setup:
BindableService greeter = new GreeterGrpc.GreeterImplBase() {
@Override
void sayHello(
final Helloworld.Request req, final StreamObserver<Helloworld.Response> responseObserver) {
final Helloworld.Response reply = Helloworld.Response.newBuilder().setMessage("Hello $req.name").build()
responseObserver.onNext(reply)
responseObserver.onCompleted()
}
}
def port = PortUtils.findOpenPort()
Server server = configureServer(ServerBuilder.forPort(port).addService(greeter)).build().start()
ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port))

// Depending on the version of gRPC usePlainText may or may not take an argument.
try {
channelBuilder.usePlaintext()
} catch (MissingMethodException e) {
channelBuilder.usePlaintext(true)
}
ManagedChannel channel = channelBuilder.build()
GreeterGrpc.GreeterStub client = GreeterGrpc.newStub(channel)

when:
AtomicReference<Helloworld.Response> response = new AtomicReference<>()
AtomicReference<Throwable> error = new AtomicReference<>()
CountDownLatch latch = new CountDownLatch(1)
runWithSpan("parent") {
client.sayHello(Helloworld.Request.newBuilder().setName("test").build(),
new StreamObserver<Helloworld.Response>() {
@Override
void onNext(Helloworld.Response r) {
response.set(r)
}

@Override
void onError(Throwable throwable) {
error.set(throwable)
}

@Override
void onCompleted() {
runWithSpan("child") {}
latch.countDown()
}
}
)
}

latch.await(10, TimeUnit.SECONDS)

then:
error.get() == null
response.get().message == "Hello test"

assertTraces(1) {
trace(0, 4) {
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()
}
span(1) {
name "example.Greeter/SayHello"
kind CLIENT
childOf span(0)
event(0) {
eventName "message"
attributes {
"message.type" "SENT"
"message.id" 1
}
}
attributes {
"${SemanticAttributes.RPC_SYSTEM.key}" "grpc"
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key}" "SayHello"
"${SemanticAttributes.NET_TRANSPORT}" SemanticAttributes.NetTransportValues.IP_TCP
"${SemanticAttributes.RPC_GRPC_STATUS_CODE}" Status.Code.OK.value()
}
}
span(2) {
name "example.Greeter/SayHello"
kind SERVER
childOf span(1)
event(0) {
eventName "message"
attributes {
"message.type" "RECEIVED"
"message.id" 1
}
}
attributes {
"${SemanticAttributes.RPC_SYSTEM.key}" "grpc"
"${SemanticAttributes.RPC_SERVICE.key}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key}" "SayHello"
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
// "localhost" on linux, "127.0.0.1" on windows
"${SemanticAttributes.NET_PEER_NAME.key}" { it == "localhost" || it == "127.0.0.1" }
"${SemanticAttributes.NET_PEER_PORT.key}" Long
"${SemanticAttributes.NET_TRANSPORT.key}" SemanticAttributes.NetTransportValues.IP_TCP
"${SemanticAttributes.RPC_GRPC_STATUS_CODE.key}" Status.Code.OK.value()
}
}
span(3) {
name "child"
kind SpanKind.INTERNAL
childOf span(0)
}
}
}

cleanup:
channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS)
server?.shutdownNow()?.awaitTermination()
}

def "test error - #paramName"() {
setup:
def error = grpcStatus.asException()
Expand Down