diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTransportChannel.java b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java similarity index 87% rename from server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTransportChannel.java rename to server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java index bf88b69a3a26e..e9222b0787433 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTransportChannel.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java @@ -15,6 +15,7 @@ import org.opensearch.telemetry.tracing.Span; import org.opensearch.telemetry.tracing.SpanScope; import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.transport.BaseTcpTransportChannel; import org.opensearch.transport.TcpChannel; import org.opensearch.transport.TransportChannel; @@ -23,7 +24,7 @@ /** * Tracer wrapped {@link TransportChannel} */ -public class TraceableTransportChannel implements TransportChannel { +public class TraceableTcpTransportChannel extends BaseTcpTransportChannel { private final TransportChannel delegate; private final Span span; @@ -34,8 +35,10 @@ public class TraceableTransportChannel implements TransportChannel { * @param delegate delegate * @param span span * @param tracer tracer + * @param channel channel */ - public TraceableTransportChannel(TransportChannel delegate, Span span, Tracer tracer, TcpChannel tcpChannel) { + public TraceableTcpTransportChannel(TransportChannel delegate, Span span, Tracer tracer, TcpChannel channel) { + super(channel); this.delegate = delegate; this.span = span; this.tracer = tracer; @@ -47,6 +50,7 @@ public TraceableTransportChannel(TransportChannel delegate, Span span, Tracer tr * @param delegate delegate * @param span span * @param tracer tracer + * @param tcpChannel tcpChannel * @return transport channel */ public static TransportChannel create(TransportChannel delegate, final Span span, final Tracer tracer, final TcpChannel tcpChannel) { @@ -65,7 +69,7 @@ public void onFailure(Exception e) { } }); - return new TraceableTransportChannel(delegate, span, tracer, tcpChannel); + return new TraceableTcpTransportChannel(delegate, span, tracer, tcpChannel); } else { return delegate; } diff --git a/server/src/main/java/org/opensearch/transport/BaseTcpTransportChannel.java b/server/src/main/java/org/opensearch/transport/BaseTcpTransportChannel.java new file mode 100644 index 0000000000000..b7c4523c98dd2 --- /dev/null +++ b/server/src/main/java/org/opensearch/transport/BaseTcpTransportChannel.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport; + +/** + * Base class TcpTransportChannel + */ +public abstract class BaseTcpTransportChannel implements TransportChannel { + private final TcpChannel channel; + + /** + * Constructor. + * @param channel tcp channel + */ + public BaseTcpTransportChannel(TcpChannel channel) { + this.channel = channel; + } + + /** + * Returns {@link TcpChannel} + * @return TcpChannel + */ + public TcpChannel getChannel() { + return channel; + } +} diff --git a/server/src/main/java/org/opensearch/transport/InboundHandler.java b/server/src/main/java/org/opensearch/transport/InboundHandler.java index 38b8dab2bd83f..f6e90aed860b3 100644 --- a/server/src/main/java/org/opensearch/transport/InboundHandler.java +++ b/server/src/main/java/org/opensearch/transport/InboundHandler.java @@ -50,7 +50,7 @@ import org.opensearch.telemetry.tracing.SpanBuilder; import org.opensearch.telemetry.tracing.SpanScope; import org.opensearch.telemetry.tracing.Tracer; -import org.opensearch.telemetry.tracing.channels.TraceableTransportChannel; +import org.opensearch.telemetry.tracing.channels.TraceableTcpTransportChannel; import org.opensearch.threadpool.ThreadPool; import java.io.EOFException; @@ -200,7 +200,7 @@ private void handleRequest(TcpChannel channel, Head assert message.isShortCircuit() == false; final StreamInput stream = namedWriteableStream(message.openOrGetStreamInput()); assertRemoteVersion(stream, header.getVersion()); - final TransportChannel transportChannel = new TcpTransportChannel( + final TcpTransportChannel transportChannel = new TcpTransportChannel( outboundHandler, channel, action, @@ -211,7 +211,7 @@ private void handleRequest(TcpChannel channel, Head header.isHandshake(), message.takeBreakerReleaseControl() ); - TransportChannel traceableTransportChannel = TraceableTransportChannel.create(transportChannel, span, tracer, channel); + TransportChannel traceableTransportChannel = TraceableTcpTransportChannel.create(transportChannel, span, tracer, channel); try { handshaker.handleHandshake(traceableTransportChannel, requestId, stream); } catch (Exception e) { @@ -230,7 +230,7 @@ private void handleRequest(TcpChannel channel, Head } } } else { - final TransportChannel transportChannel = new TcpTransportChannel( + final TcpTransportChannel transportChannel = new TcpTransportChannel( outboundHandler, channel, action, @@ -241,7 +241,7 @@ private void handleRequest(TcpChannel channel, Head header.isHandshake(), message.takeBreakerReleaseControl() ); - TransportChannel traceableTransportChannel = TraceableTransportChannel.create(transportChannel, span, tracer, channel); + TransportChannel traceableTransportChannel = TraceableTcpTransportChannel.create(transportChannel, span, tracer, channel); try { messageListener.onRequestReceived(requestId, action); if (message.isShortCircuit()) { diff --git a/server/src/main/java/org/opensearch/transport/RequestHandlerRegistry.java b/server/src/main/java/org/opensearch/transport/RequestHandlerRegistry.java index 464282730d2b2..98c182c562928 100644 --- a/server/src/main/java/org/opensearch/transport/RequestHandlerRegistry.java +++ b/server/src/main/java/org/opensearch/transport/RequestHandlerRegistry.java @@ -91,14 +91,14 @@ public void processMessageReceived(Request request, TransportChannel channel) th Releasable unregisterTask = () -> taskManager.unregister(task); try { - if (channel instanceof TcpTransportChannel && task instanceof CancellableTask) { + if (channel instanceof BaseTcpTransportChannel && task instanceof CancellableTask) { if (request instanceof ShardSearchRequest) { // on receiving request, update the inbound network time to reflect time spent in transit over the network ((ShardSearchRequest) request).setInboundNetworkTime( Math.max(0, System.currentTimeMillis() - ((ShardSearchRequest) request).getInboundNetworkTime()) ); } - final TcpChannel tcpChannel = ((TcpTransportChannel) channel).getChannel(); + final TcpChannel tcpChannel = ((BaseTcpTransportChannel) channel).getChannel(); final Releasable stopTracking = taskManager.startTrackingCancellableChannelTask(tcpChannel, (CancellableTask) task); unregisterTask = Releasables.wrap(unregisterTask, stopTracking); } diff --git a/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java b/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java index 00702d08902a9..34ea3c0ab9996 100644 --- a/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java +++ b/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java @@ -46,7 +46,7 @@ * * @opensearch.internal */ -public final class TcpTransportChannel implements TransportChannel { +public final class TcpTransportChannel extends BaseTcpTransportChannel { private final AtomicBoolean released = new AtomicBoolean(); private final OutboundHandler outboundHandler; @@ -70,6 +70,7 @@ public final class TcpTransportChannel implements TransportChannel { boolean isHandshake, Releasable breakerRelease ) { + super(channel); this.version = version; this.features = features; this.channel = channel; @@ -131,7 +132,4 @@ public Version getVersion() { return version; } - public TcpChannel getChannel() { - return channel; - } }