diff --git a/CHANGELOG.md b/CHANGELOG.md index f3a18154b023a..a72bee49cf7a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -125,6 +125,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Tracing Framework] Add support for SpanKind. ([#10122](https://github.com/opensearch-project/OpenSearch/pull/10122)) - Pass parent filter to inner query in nested query ([#10246](https://github.com/opensearch-project/OpenSearch/pull/10246)) - Disable concurrent segment search when terminate_after is used ([#10200](https://github.com/opensearch-project/OpenSearch/pull/10200)) +- Add instrumentation in Inbound Handler. ([#100143](https://github.com/opensearch-project/OpenSearch/pull/10143)) - Enable remote segment upload backpressure by default ([#10356](https://github.com/opensearch-project/OpenSearch/pull/10356)) ### Deprecated diff --git a/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4ModulePlugin.java b/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4ModulePlugin.java index ca51d70702a82..1a34dfd2c9ee4 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4ModulePlugin.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4ModulePlugin.java @@ -96,7 +96,8 @@ public Map> getTransports( PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService + NetworkService networkService, + Tracer tracer ) { return Collections.singletonMap( NETTY_TRANSPORT_NAME, @@ -108,7 +109,8 @@ public Map> getTransports( pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, - getSharedGroupFactory(settings) + getSharedGroupFactory(settings), + tracer ) ); } diff --git a/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/Netty4Transport.java index f77c29c8bfa60..e76a227630dc1 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/transport/netty4/Netty4Transport.java @@ -50,6 +50,7 @@ import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.indices.breaker.CircuitBreakerService; +import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Netty4NioSocketChannel; import org.opensearch.transport.NettyAllocator; @@ -131,9 +132,10 @@ public Netty4Transport( PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService, - SharedGroupFactory sharedGroupFactory + SharedGroupFactory sharedGroupFactory, + Tracer tracer ) { - super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); + super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService, tracer); Netty4Utils.setAvailableProcessors(OpenSearchExecutors.NODE_PROCESSORS_SETTING.get(settings)); NettyAllocator.logAllocatorDescriptionIfNeeded(); this.sharedGroupFactory = sharedGroupFactory; diff --git a/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java b/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java index 3e5f71f1464a1..c92ccba82835f 100644 --- a/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java +++ b/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java @@ -40,6 +40,7 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.SharedGroupFactory; @@ -86,7 +87,8 @@ public void startThreadPool() { recycler, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService(), - new SharedGroupFactory(settings) + new SharedGroupFactory(settings), + NoopTracer.INSTANCE ); nettyTransport.start(); diff --git a/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/NettyTransportMultiPortTests.java b/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/NettyTransportMultiPortTests.java index 98a001b8ae4bb..7cca00db68559 100644 --- a/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/NettyTransportMultiPortTests.java +++ b/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/NettyTransportMultiPortTests.java @@ -40,6 +40,7 @@ import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -141,7 +142,8 @@ private TcpTransport startTransport(Settings settings, ThreadPool threadPool) { recycler, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService(), - new SharedGroupFactory(settings) + new SharedGroupFactory(settings), + NoopTracer.INSTANCE ); transport.start(); diff --git a/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/SimpleNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/SimpleNetty4TransportTests.java index 35b19002dce8d..710b3ff6bd0ca 100644 --- a/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/SimpleNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/SimpleNetty4TransportTests.java @@ -44,6 +44,7 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.transport.MockTransportService; import org.opensearch.test.transport.StubbableTransport; import org.opensearch.transport.AbstractSimpleTransportTestCase; @@ -82,7 +83,8 @@ protected Transport build(Settings settings, final Version version, ClusterSetti PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService(), - new SharedGroupFactory(settings) + new SharedGroupFactory(settings), + NoopTracer.INSTANCE ) { @Override diff --git a/plugins/discovery-ec2/src/test/java/org/opensearch/discovery/ec2/Ec2DiscoveryTests.java b/plugins/discovery-ec2/src/test/java/org/opensearch/discovery/ec2/Ec2DiscoveryTests.java index 6bed6564cfd36..02e1ff40f7ed6 100644 --- a/plugins/discovery-ec2/src/test/java/org/opensearch/discovery/ec2/Ec2DiscoveryTests.java +++ b/plugins/discovery-ec2/src/test/java/org/opensearch/discovery/ec2/Ec2DiscoveryTests.java @@ -92,7 +92,8 @@ protected MockTransportService createTransportService() { new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, writableRegistry(), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ) { @Override public TransportAddress[] addressesFromString(String address) { diff --git a/plugins/discovery-ec2/src/test/java/org/opensearch/discovery/ec2/Ec2RetriesTests.java b/plugins/discovery-ec2/src/test/java/org/opensearch/discovery/ec2/Ec2RetriesTests.java index 3311ddc8842f2..ce097667f9c4b 100644 --- a/plugins/discovery-ec2/src/test/java/org/opensearch/discovery/ec2/Ec2RetriesTests.java +++ b/plugins/discovery-ec2/src/test/java/org/opensearch/discovery/ec2/Ec2RetriesTests.java @@ -77,7 +77,8 @@ protected MockTransportService createTransportService() { networkService, PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ), threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java index 8ad03d807d9da..fc917968579e1 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java @@ -57,7 +57,9 @@ public void addAttribute(String key, Boolean value) { @Override public void setError(Exception exception) { - delegateSpan.setStatus(StatusCode.ERROR, exception.getMessage()); + if (exception != null) { + delegateSpan.setStatus(StatusCode.ERROR, exception.getMessage()); + } } @Override diff --git a/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransport.java b/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransport.java index dfa72d6d59a0d..55920bab4efd3 100644 --- a/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransport.java +++ b/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransport.java @@ -52,6 +52,7 @@ import org.opensearch.nio.NioSelector; import org.opensearch.nio.NioSocketChannel; import org.opensearch.nio.ServerChannelContext; +import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TcpTransport; import org.opensearch.transport.TransportSettings; @@ -84,9 +85,10 @@ protected NioTransport( PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService, - NioGroupFactory groupFactory + NioGroupFactory groupFactory, + Tracer tracer ) { - super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService); + super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService, tracer); this.pageAllocator = new PageAllocator(pageCacheRecycler); this.groupFactory = groupFactory; } diff --git a/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransportPlugin.java b/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransportPlugin.java index ec266d76eff3d..d4be876867651 100644 --- a/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransportPlugin.java +++ b/plugins/transport-nio/src/main/java/org/opensearch/transport/nio/NioTransportPlugin.java @@ -91,7 +91,8 @@ public Map> getTransports( PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService + NetworkService networkService, + Tracer tracer ) { return Collections.singletonMap( NIO_TRANSPORT_NAME, @@ -103,7 +104,8 @@ public Map> getTransports( pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, - getNioGroupFactory(settings) + getNioGroupFactory(settings), + tracer ) ); } diff --git a/plugins/transport-nio/src/test/java/org/opensearch/transport/nio/SimpleNioTransportTests.java b/plugins/transport-nio/src/test/java/org/opensearch/transport/nio/SimpleNioTransportTests.java index 24cc38c17a9d1..f5d1c618f5ace 100644 --- a/plugins/transport-nio/src/test/java/org/opensearch/transport/nio/SimpleNioTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/opensearch/transport/nio/SimpleNioTransportTests.java @@ -44,6 +44,7 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.transport.MockTransportService; import org.opensearch.test.transport.StubbableTransport; import org.opensearch.transport.AbstractSimpleTransportTestCase; @@ -81,7 +82,8 @@ protected Transport build(Settings settings, final Version version, ClusterSetti new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService(), - new NioGroupFactory(settings, logger) + new NioGroupFactory(settings, logger), + NoopTracer.INSTANCE ) { @Override diff --git a/server/src/main/java/org/opensearch/common/network/NetworkModule.java b/server/src/main/java/org/opensearch/common/network/NetworkModule.java index 8870e26c373e9..0734659d8ee72 100644 --- a/server/src/main/java/org/opensearch/common/network/NetworkModule.java +++ b/server/src/main/java/org/opensearch/common/network/NetworkModule.java @@ -174,7 +174,8 @@ public NetworkModule( pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, - networkService + networkService, + tracer ); for (Map.Entry> entry : transportFactory.entrySet()) { registerTransport(entry.getKey(), entry.getValue()); diff --git a/server/src/main/java/org/opensearch/plugins/NetworkPlugin.java b/server/src/main/java/org/opensearch/plugins/NetworkPlugin.java index f2f8e84f04e02..07df40bafe6a1 100644 --- a/server/src/main/java/org/opensearch/plugins/NetworkPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/NetworkPlugin.java @@ -83,7 +83,8 @@ default Map> getTransports( PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService + NetworkService networkService, + Tracer tracer ) { return Collections.emptyMap(); } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java index e86b21ae0fd3b..a9514c298ef88 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/AttributeNames.java @@ -60,6 +60,11 @@ private AttributeNames() { */ public static final String TRANSPORT_TARGET_HOST = "target_host"; + /** + * Transport Service send request local host. + */ + public static final String TRANSPORT_HOST = "host"; + /** * Action Name. */ diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java index a93ce11f374fe..d97fbd371ab2a 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/SpanBuilder.java @@ -13,6 +13,7 @@ import org.opensearch.http.HttpRequest; import org.opensearch.rest.RestRequest; import org.opensearch.telemetry.tracing.attributes.Attributes; +import org.opensearch.transport.TcpChannel; import org.opensearch.transport.Transport; import java.util.Arrays; @@ -127,4 +128,26 @@ private static Attributes buildSpanAttributes(String action, Transport.Connectio return attributes; } + /** + * Creates {@link SpanCreationContext} from Inbound Handler. + * @param action action. + * @param tcpChannel tcp channel. + * @return context + */ + public static SpanCreationContext from(String action, TcpChannel tcpChannel) { + return SpanCreationContext.server().name(createSpanName(action, tcpChannel)).attributes(buildSpanAttributes(action, tcpChannel)); + } + + private static String createSpanName(String action, TcpChannel tcpChannel) { + return action + SEPARATOR + (tcpChannel.getRemoteAddress() != null + ? tcpChannel.getRemoteAddress().getHostString() + : tcpChannel.getLocalAddress().getHostString()); + } + + private static Attributes buildSpanAttributes(String action, TcpChannel tcpChannel) { + Attributes attributes = Attributes.create().addAttribute(AttributeNames.TRANSPORT_ACTION, action); + attributes.addAttribute(AttributeNames.TRANSPORT_HOST, tcpChannel.getLocalAddress().getHostString()); + return attributes; + } + } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java new file mode 100644 index 0000000000000..333e06eb037cb --- /dev/null +++ b/server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java @@ -0,0 +1,112 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.channels; + +import org.opensearch.Version; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.transport.TransportResponse; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanScope; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.transport.BaseTcpTransportChannel; +import org.opensearch.transport.TcpTransportChannel; +import org.opensearch.transport.TransportChannel; + +import java.io.IOException; + +/** + * Tracer wrapped {@link TransportChannel} + */ +public class TraceableTcpTransportChannel extends BaseTcpTransportChannel { + + private final TransportChannel delegate; + private final Span span; + private final Tracer tracer; + + /** + * Constructor. + * @param delegate delegate + * @param span span + * @param tracer tracer + */ + public TraceableTcpTransportChannel(TcpTransportChannel delegate, Span span, Tracer tracer) { + super(delegate.getChannel()); + this.delegate = delegate; + this.span = span; + this.tracer = tracer; + } + + /** + * Factory method. + * + * @param delegate delegate + * @param span span + * @param tracer tracer + * @return transport channel + */ + public static TransportChannel create(TcpTransportChannel delegate, final Span span, final Tracer tracer) { + if (FeatureFlags.isEnabled(FeatureFlags.TELEMETRY) == true) { + delegate.getChannel().addCloseListener(new ActionListener() { + @Override + public void onResponse(Void unused) { + onFailure(null); + } + + @Override + public void onFailure(Exception e) { + span.addEvent("The TransportChannel was closed without sending the response"); + span.setError(e); + span.endSpan(); + } + }); + + return new TraceableTcpTransportChannel(delegate, span, tracer); + } else { + return delegate; + } + } + + @Override + public String getProfileName() { + return delegate.getProfileName(); + } + + @Override + public String getChannelType() { + return delegate.getChannelType(); + } + + @Override + public void sendResponse(TransportResponse response) throws IOException { + try (SpanScope scope = tracer.withSpanInScope(span)) { + delegate.sendResponse(response); + } catch (final IOException ex) { + span.setError(ex); + throw ex; + } finally { + span.endSpan(); + } + } + + @Override + public void sendResponse(Exception exception) throws IOException { + try (SpanScope scope = tracer.withSpanInScope(span)) { + delegate.sendResponse(exception); + } finally { + span.setError(exception); + span.endSpan(); + } + } + + @Override + public Version getVersion() { + return delegate.getVersion(); + } +} diff --git a/server/src/main/java/org/opensearch/transport/BaseTcpTransportChannel.java b/server/src/main/java/org/opensearch/transport/BaseTcpTransportChannel.java new file mode 100644 index 0000000000000..14e065d3350c7 --- /dev/null +++ b/server/src/main/java/org/opensearch/transport/BaseTcpTransportChannel.java @@ -0,0 +1,33 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport; + +/** + * Base class TcpTransportChannel + */ +public abstract class BaseTcpTransportChannel implements TransportChannel { + private final TcpChannel channel; + + /** + * Constructor. + * @param channel tcp channel + */ + public BaseTcpTransportChannel(TcpChannel channel) { + this.channel = channel; + } + + /** + * Returns {@link TcpChannel} + * @return TcpChannel + */ + public TcpChannel getChannel() { + return channel; + } + +} diff --git a/server/src/main/java/org/opensearch/transport/InboundHandler.java b/server/src/main/java/org/opensearch/transport/InboundHandler.java index 9f9232c18079a..c14a53e799319 100644 --- a/server/src/main/java/org/opensearch/transport/InboundHandler.java +++ b/server/src/main/java/org/opensearch/transport/InboundHandler.java @@ -46,6 +46,11 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanBuilder; +import org.opensearch.telemetry.tracing.SpanScope; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.telemetry.tracing.channels.TraceableTcpTransportChannel; import org.opensearch.threadpool.ThreadPool; import java.io.EOFException; @@ -74,6 +79,8 @@ public class InboundHandler { private volatile long slowLogThresholdMs = Long.MAX_VALUE; + private final Tracer tracer; + InboundHandler( ThreadPool threadPool, OutboundHandler outboundHandler, @@ -81,7 +88,8 @@ public class InboundHandler { TransportHandshaker handshaker, TransportKeepAlive keepAlive, Transport.RequestHandlers requestHandlers, - Transport.ResponseHandlers responseHandlers + Transport.ResponseHandlers responseHandlers, + Tracer tracer ) { this.threadPool = threadPool; this.outboundHandler = outboundHandler; @@ -90,6 +98,7 @@ public class InboundHandler { this.keepAlive = keepAlive; this.requestHandlers = requestHandlers; this.responseHandlers = responseHandlers; + this.tracer = tracer; } void setMessageListener(TransportMessageListener listener) { @@ -108,7 +117,6 @@ void inboundMessage(TcpChannel channel, InboundMessage message) throws Exception final long startTime = threadPool.relativeTimeInMillis(); channel.getChannelStats().markAccessed(startTime); TransportLogger.logInboundMessage(channel, message); - if (message.isPing()) { keepAlive.receiveKeepAlive(channel); } else { @@ -123,7 +131,6 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st final InetSocketAddress remoteAddress = channel.getRemoteAddress(); final Header header = message.getHeader(); assert header.needsToReadVariableHeader() == false; - ThreadContext threadContext = threadPool.getThreadContext(); try (ThreadContext.StoredContext existing = threadContext.stashContext()) { // Place the context with the headers from the message @@ -165,6 +172,7 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st handleResponse(requestId, remoteAddress, EMPTY_STREAM_INPUT, handler); } } + } } finally { final long took = threadPool.relativeTimeInMillis() - startTime; @@ -184,80 +192,89 @@ private void handleRequest(TcpChannel channel, Head final String action = header.getActionName(); final long requestId = header.getRequestId(); final Version version = header.getVersion(); - if (header.isHandshake()) { - messageListener.onRequestReceived(requestId, action); - // Cannot short circuit handshakes - assert message.isShortCircuit() == false; - final StreamInput stream = namedWriteableStream(message.openOrGetStreamInput()); - assertRemoteVersion(stream, header.getVersion()); - final TransportChannel transportChannel = new TcpTransportChannel( - outboundHandler, - channel, - action, - requestId, - version, - header.getFeatures(), - header.isCompressed(), - header.isHandshake(), - message.takeBreakerReleaseControl() - ); - try { - handshaker.handleHandshake(transportChannel, requestId, stream); - } catch (Exception e) { - if (Version.CURRENT.isCompatible(header.getVersion())) { - sendErrorResponse(action, transportChannel, e); - } else { - logger.warn( - new ParameterizedMessage( - "could not send error response to handshake received on [{}] using wire format version [{}], closing channel", - channel, - header.getVersion() - ), - e - ); - channel.close(); - } - } - } else { - final TransportChannel transportChannel = new TcpTransportChannel( - outboundHandler, - channel, - action, - requestId, - version, - header.getFeatures(), - header.isCompressed(), - header.isHandshake(), - message.takeBreakerReleaseControl() - ); - try { + Span span = tracer.startSpan(SpanBuilder.from(action, channel)); + try (SpanScope spanScope = tracer.withSpanInScope(span)) { + if (header.isHandshake()) { messageListener.onRequestReceived(requestId, action); - if (message.isShortCircuit()) { - sendErrorResponse(action, transportChannel, message.getException()); - } else { - final StreamInput stream = namedWriteableStream(message.openOrGetStreamInput()); - assertRemoteVersion(stream, header.getVersion()); - final RequestHandlerRegistry reg = requestHandlers.getHandler(action); - assert reg != null; - - final T request = newRequest(requestId, action, stream, reg); - request.remoteAddress(new TransportAddress(channel.getRemoteAddress())); - checkStreamIsFullyConsumed(requestId, action, stream); - - final String executor = reg.getExecutor(); - if (ThreadPool.Names.SAME.equals(executor)) { - try { - reg.processMessageReceived(request, transportChannel); - } catch (Exception e) { - sendErrorResponse(reg.getAction(), transportChannel, e); - } + // Cannot short circuit handshakes + assert message.isShortCircuit() == false; + final StreamInput stream = namedWriteableStream(message.openOrGetStreamInput()); + assertRemoteVersion(stream, header.getVersion()); + final TcpTransportChannel transportChannel = new TcpTransportChannel( + outboundHandler, + channel, + action, + requestId, + version, + header.getFeatures(), + header.isCompressed(), + header.isHandshake(), + message.takeBreakerReleaseControl() + ); + TransportChannel traceableTransportChannel = TraceableTcpTransportChannel.create(transportChannel, span, tracer); + try { + handshaker.handleHandshake(traceableTransportChannel, requestId, stream); + } catch (Exception e) { + if (Version.CURRENT.isCompatible(header.getVersion())) { + sendErrorResponse(action, traceableTransportChannel, e); } else { - threadPool.executor(executor).execute(new RequestHandler<>(reg, request, transportChannel)); + logger.warn( + new ParameterizedMessage( + "could not send error response to handshake received on [{}] using wire format version [{}], closing channel", + channel, + header.getVersion() + ), + e + ); + channel.close(); } } - } catch (Exception e) { - sendErrorResponse(action, transportChannel, e); + } else { + final TcpTransportChannel transportChannel = new TcpTransportChannel( + outboundHandler, + channel, + action, + requestId, + version, + header.getFeatures(), + header.isCompressed(), + header.isHandshake(), + message.takeBreakerReleaseControl() + ); + TransportChannel traceableTransportChannel = TraceableTcpTransportChannel.create(transportChannel, span, tracer); + try { + messageListener.onRequestReceived(requestId, action); + if (message.isShortCircuit()) { + sendErrorResponse(action, traceableTransportChannel, message.getException()); + } else { + final StreamInput stream = namedWriteableStream(message.openOrGetStreamInput()); + assertRemoteVersion(stream, header.getVersion()); + final RequestHandlerRegistry reg = requestHandlers.getHandler(action); + assert reg != null; + + final T request = newRequest(requestId, action, stream, reg); + request.remoteAddress(new TransportAddress(channel.getRemoteAddress())); + checkStreamIsFullyConsumed(requestId, action, stream); + + final String executor = reg.getExecutor(); + if (ThreadPool.Names.SAME.equals(executor)) { + try { + reg.processMessageReceived(request, traceableTransportChannel); + } catch (Exception e) { + sendErrorResponse(reg.getAction(), traceableTransportChannel, e); + } + } else { + threadPool.executor(executor).execute(new RequestHandler<>(reg, request, traceableTransportChannel)); + } + } + } catch (Exception e) { + sendErrorResponse(action, traceableTransportChannel, e); + } } + } catch (Exception e) { + span.setError(e); + span.endSpan(); + throw e; } } diff --git a/server/src/main/java/org/opensearch/transport/RequestHandlerRegistry.java b/server/src/main/java/org/opensearch/transport/RequestHandlerRegistry.java index 464282730d2b2..98c182c562928 100644 --- a/server/src/main/java/org/opensearch/transport/RequestHandlerRegistry.java +++ b/server/src/main/java/org/opensearch/transport/RequestHandlerRegistry.java @@ -91,14 +91,14 @@ public void processMessageReceived(Request request, TransportChannel channel) th Releasable unregisterTask = () -> taskManager.unregister(task); try { - if (channel instanceof TcpTransportChannel && task instanceof CancellableTask) { + if (channel instanceof BaseTcpTransportChannel && task instanceof CancellableTask) { if (request instanceof ShardSearchRequest) { // on receiving request, update the inbound network time to reflect time spent in transit over the network ((ShardSearchRequest) request).setInboundNetworkTime( Math.max(0, System.currentTimeMillis() - ((ShardSearchRequest) request).getInboundNetworkTime()) ); } - final TcpChannel tcpChannel = ((TcpTransportChannel) channel).getChannel(); + final TcpChannel tcpChannel = ((BaseTcpTransportChannel) channel).getChannel(); final Releasable stopTracking = taskManager.startTrackingCancellableChannelTask(tcpChannel, (CancellableTask) task); unregisterTask = Releasables.wrap(unregisterTask, stopTracking); } diff --git a/server/src/main/java/org/opensearch/transport/TcpTransport.java b/server/src/main/java/org/opensearch/transport/TcpTransport.java index 7da7dcad13120..d0e6516973382 100644 --- a/server/src/main/java/org/opensearch/transport/TcpTransport.java +++ b/server/src/main/java/org/opensearch/transport/TcpTransport.java @@ -68,6 +68,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.monitor.jvm.JvmInfo; import org.opensearch.node.Node; +import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -159,7 +160,8 @@ public TcpTransport( PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService + NetworkService networkService, + Tracer tracer ) { this.settings = settings; this.profileSettings = getProfileSettings(settings); @@ -208,7 +210,8 @@ public TcpTransport( handshaker, keepAlive, requestHandlers, - responseHandlers + responseHandlers, + tracer ); } diff --git a/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java b/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java index 00702d08902a9..a7bedcf93e129 100644 --- a/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java +++ b/server/src/main/java/org/opensearch/transport/TcpTransportChannel.java @@ -46,11 +46,10 @@ * * @opensearch.internal */ -public final class TcpTransportChannel implements TransportChannel { +public final class TcpTransportChannel extends BaseTcpTransportChannel { private final AtomicBoolean released = new AtomicBoolean(); private final OutboundHandler outboundHandler; - private final TcpChannel channel; private final String action; private final long requestId; private final Version version; @@ -70,9 +69,9 @@ public final class TcpTransportChannel implements TransportChannel { boolean isHandshake, Releasable breakerRelease ) { + super(channel); this.version = version; this.features = features; - this.channel = channel; this.outboundHandler = outboundHandler; this.action = action; this.requestId = requestId; @@ -83,7 +82,7 @@ public final class TcpTransportChannel implements TransportChannel { @Override public String getProfileName() { - return channel.getProfile(); + return getChannel().getProfile(); } @Override @@ -93,7 +92,7 @@ public void sendResponse(TransportResponse response) throws IOException { // update outbound network time with current time before sending response over network ((QuerySearchResult) response).getShardSearchRequest().setOutboundNetworkTime(System.currentTimeMillis()); } - outboundHandler.sendResponse(version, features, channel, requestId, action, response, compressResponse, isHandshake); + outboundHandler.sendResponse(version, features, getChannel(), requestId, action, response, compressResponse, isHandshake); } finally { release(false); } @@ -102,7 +101,7 @@ public void sendResponse(TransportResponse response) throws IOException { @Override public void sendResponse(Exception exception) throws IOException { try { - outboundHandler.sendErrorResponse(version, features, channel, requestId, action, exception); + outboundHandler.sendErrorResponse(version, features, getChannel(), requestId, action, exception); } finally { release(true); } @@ -131,7 +130,4 @@ public Version getVersion() { return version; } - public TcpChannel getChannel() { - return channel; - } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index 07e149dd72164..a3fa0f9cb16e4 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -211,7 +211,8 @@ public TestNode(String name, ThreadPool threadPool, Settings settings) { new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ), threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, diff --git a/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java index fbac465f946f4..3bd8930064563 100644 --- a/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/opensearch/action/resync/TransportResyncReplicationActionTests.java @@ -135,7 +135,8 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception { new NetworkService(emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ) ) { diff --git a/server/src/test/java/org/opensearch/action/support/replication/BroadcastReplicationTests.java b/server/src/test/java/org/opensearch/action/support/replication/BroadcastReplicationTests.java index 77c9c64ad6611..19a9918fa4561 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/BroadcastReplicationTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/BroadcastReplicationTests.java @@ -116,7 +116,8 @@ public void setUp() throws Exception { new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - circuitBreakerService + circuitBreakerService, + NoopTracer.INSTANCE ); clusterService = createClusterService(threadPool); transportService = new TransportService( diff --git a/server/src/test/java/org/opensearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/opensearch/action/support/replication/TransportReplicationActionTests.java index 0bee99f4d5656..03150d6e30755 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/TransportReplicationActionTests.java @@ -1318,7 +1318,8 @@ public void testRetryOnReplicaWithRealTransport() throws Exception { new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ); transportService = new MockTransportService( Settings.EMPTY, diff --git a/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java b/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java index d28a4a51999e6..0ca118fe422a5 100644 --- a/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java +++ b/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java @@ -118,7 +118,8 @@ public Map> getTransports( PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService + NetworkService networkService, + Tracer tracer ) { return Collections.singletonMap("custom", custom); } @@ -176,7 +177,8 @@ public Map> getTransports( PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService + NetworkService networkService, + Tracer tracer ) { return Collections.singletonMap("default_custom", customTransport); } @@ -220,7 +222,8 @@ public Map> getTransports( PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService + NetworkService networkService, + Tracer tracer ) { return Collections.singletonMap("default_custom", customTransport); } diff --git a/server/src/test/java/org/opensearch/discovery/FileBasedSeedHostsProviderTests.java b/server/src/test/java/org/opensearch/discovery/FileBasedSeedHostsProviderTests.java index 688a532a61c4a..f4515361a89b8 100644 --- a/server/src/test/java/org/opensearch/discovery/FileBasedSeedHostsProviderTests.java +++ b/server/src/test/java/org/opensearch/discovery/FileBasedSeedHostsProviderTests.java @@ -100,7 +100,8 @@ private void createTransportSvc() { new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ) { @Override public BoundTransportAddress boundAddress() { diff --git a/server/src/test/java/org/opensearch/discovery/SeedHostsResolverTests.java b/server/src/test/java/org/opensearch/discovery/SeedHostsResolverTests.java index dc0829adac101..421f6c6fe279b 100644 --- a/server/src/test/java/org/opensearch/discovery/SeedHostsResolverTests.java +++ b/server/src/test/java/org/opensearch/discovery/SeedHostsResolverTests.java @@ -185,7 +185,8 @@ public void testRemovingLocalAddresses() { networkService, PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ) { @Override @@ -237,7 +238,8 @@ public void testUnknownHost() { networkService, PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ) { @Override @@ -292,7 +294,8 @@ public void testResolveTimeout() { networkService, PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ) { @Override @@ -368,7 +371,8 @@ public void testCancellationOnClose() throws InterruptedException { networkService, PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ) { @Override @@ -432,7 +436,8 @@ public void testInvalidHosts() throws IllegalAccessException { new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ) { @Override public BoundTransportAddress boundAddress() { diff --git a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java index c61afdd5c5261..3c25dbdff3342 100644 --- a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java +++ b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java @@ -117,7 +117,8 @@ public void setup() throws Exception { new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ); transportService = new MockTransportService( settings, diff --git a/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java b/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java index 1dede94c68208..c4d2f81f7cf79 100644 --- a/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java +++ b/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java @@ -68,7 +68,8 @@ public void setup() throws Exception { new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ); transportService = new MockTransportService( settings, diff --git a/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java b/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java index e237214ab88f5..0dae0ae1b4e0b 100644 --- a/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java +++ b/server/src/test/java/org/opensearch/extensions/rest/RestInitializeExtensionActionTests.java @@ -68,7 +68,8 @@ public void setup() throws Exception { new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ); transportService = new MockTransportService( settings, diff --git a/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java b/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java index fe738ff7d85e6..9da976de7d7f6 100644 --- a/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java +++ b/server/src/test/java/org/opensearch/extensions/rest/RestSendToExtensionActionTests.java @@ -79,7 +79,8 @@ public void setup() throws Exception { new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()), - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ); transportService = new MockTransportService( settings, diff --git a/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java b/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java index 9a261c5745bc2..e002297911788 100644 --- a/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java +++ b/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java @@ -50,6 +50,7 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.tasks.TaskManager; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.MockLogAppender; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; @@ -117,7 +118,8 @@ public void sendMessage(BytesReference reference, ActionListener listener) handshaker, keepAlive, requestHandlers, - responseHandlers + responseHandlers, + NoopTracer.INSTANCE ); } diff --git a/server/src/test/java/org/opensearch/transport/TcpTransportTests.java b/server/src/test/java/org/opensearch/transport/TcpTransportTests.java index 06545b77c6d76..7ab78cca7d615 100644 --- a/server/src/test/java/org/opensearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/opensearch/transport/TcpTransportTests.java @@ -47,6 +47,7 @@ import org.opensearch.common.util.MockPageCacheRecycler; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.MockLogAppender; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.junit.annotations.TestLogging; @@ -255,7 +256,8 @@ private void testDefaultSeedAddresses(final Settings settings, Matcher> getTransports( PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NetworkService networkService + NetworkService networkService, + Tracer tracer ) { return Collections.singletonMap( MOCK_NIO_TRANSPORT_NAME, @@ -68,7 +70,8 @@ public Map> getTransports( networkService, pageCacheRecycler, namedWriteableRegistry, - circuitBreakerService + circuitBreakerService, + tracer ) ); } diff --git a/test/framework/src/test/java/org/opensearch/transport/nio/SimpleMockNioTransportTests.java b/test/framework/src/test/java/org/opensearch/transport/nio/SimpleMockNioTransportTests.java index fb77161a02aef..ce401ad99fad7 100644 --- a/test/framework/src/test/java/org/opensearch/transport/nio/SimpleMockNioTransportTests.java +++ b/test/framework/src/test/java/org/opensearch/transport/nio/SimpleMockNioTransportTests.java @@ -42,6 +42,7 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.transport.AbstractSimpleTransportTestCase; import org.opensearch.transport.ConnectTransportException; import org.opensearch.transport.ConnectionProfile; @@ -71,7 +72,8 @@ protected Transport build(Settings settings, final Version version, ClusterSetti networkService, new MockPageCacheRecycler(settings), namedWriteableRegistry, - new NoneCircuitBreakerService() + new NoneCircuitBreakerService(), + NoopTracer.INSTANCE ) { @Override diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockSpan.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockSpan.java index 4b661dc0ad0fe..c5d179f6412a8 100644 --- a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockSpan.java +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockSpan.java @@ -166,7 +166,9 @@ public Long getEndTime() { } public void setError(Exception exception) { - putMetadata("ERROR", exception.getMessage()); + if (exception != null) { + putMetadata("ERROR", exception.getMessage()); + } } private static class IdGenerator {