Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport] [2.x] - Add Tracing Instrumentation at Network and Rest layer #9939

Merged
merged 2 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Move ZStd to a plugin ([#9658](https://github.com/opensearch-project/OpenSearch/pull/9658))
- [Remote Store] Add support for Remote Translog Store upload stats in `_nodes/stats/` API ([#8908](https://github.com/opensearch-project/OpenSearch/pull/8908))
- [Remote Store] Removing feature flag to mark feature GA ([#9761](https://github.com/opensearch-project/OpenSearch/pull/9761))
- Add instrumentation in rest and network layer. ([#9415](https://github.com/opensearch-project/OpenSearch/pull/9415))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,13 @@ protected void addDefaultAttributes(Span span) {
}

@Override
public Span startSpan(String spanName, Map<String, List<String>> headers, Attributes attributes) {
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, List<String>> headers) {
Optional<Span> propagatedSpan = tracingTelemetry.getContextPropagator().extractFromHeaders(headers);
return startSpan(spanName, propagatedSpan.map(SpanContext::new).orElse(null), attributes);
return startSpan(
spanCreationContext.getSpanName(),
propagatedSpan.map(SpanContext::new).orElse(null),
spanCreationContext.getAttributes()
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.attributes.Attributes;
import org.opensearch.telemetry.tracing.SpanCreationContext;

import java.util.List;
import java.util.Map;
Expand All @@ -28,10 +28,9 @@ public interface HttpTracer {
/**
* Start the span with propagating the tracing info from the HttpRequest header.
*
* @param spanName span name.
* @param spanCreationContext span name.
* @param header http request header.
* @param attributes span attributes.
* @return span.
*/
Span startSpan(String spanName, Map<String, List<String>> header, Attributes attributes);
Span startSpan(SpanCreationContext spanCreationContext, Map<String, List<String>> header);
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void close() {
}

@Override
public Span startSpan(String spanName, Map<String, List<String>> header, Attributes attributes) {
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, List<String>> header) {
return NoopSpan.INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.http.HttpHandlingSettings;
import org.opensearch.http.HttpReadTimeoutException;
import org.opensearch.http.HttpServerChannel;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.NettyAllocator;
import org.opensearch.transport.NettyByteBufSizer;
Expand Down Expand Up @@ -174,9 +175,10 @@ public Netty4HttpServerTransport(
NamedXContentRegistry xContentRegistry,
Dispatcher dispatcher,
ClusterSettings clusterSettings,
SharedGroupFactory sharedGroupFactory
SharedGroupFactory sharedGroupFactory,
Tracer tracer
) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings);
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings, tracer);
Netty4Utils.setAvailableProcessors(OpenSearchExecutors.NODE_PROCESSORS_SETTING.get(settings));
NettyAllocator.logAllocatorDescriptionIfNeeded();
this.sharedGroupFactory = sharedGroupFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.http.netty4.Netty4HttpServerTransport;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.netty4.Netty4Transport;

Expand Down Expand Up @@ -122,7 +123,8 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(
NamedXContentRegistry xContentRegistry,
NetworkService networkService,
HttpServerTransport.Dispatcher dispatcher,
ClusterSettings clusterSettings
ClusterSettings clusterSettings,
Tracer tracer
) {
return Collections.singletonMap(
NETTY_HTTP_TRANSPORT_NAME,
Expand All @@ -134,7 +136,8 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(
xContentRegistry,
dispatcher,
clusterSettings,
getSharedGroupFactory(settings)
getSharedGroupFactory(settings),
tracer
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestRequest;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -112,7 +113,8 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext,
xContentRegistry(),
dispatcher,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
new SharedGroupFactory(Settings.EMPTY)
new SharedGroupFactory(Settings.EMPTY),
NoopTracer.INSTANCE
)
) {
httpServerTransport.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.http.HttpResponse;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.http.NullDispatcher;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -135,7 +136,8 @@ class CustomNettyHttpServerTransport extends Netty4HttpServerTransport {
xContentRegistry(),
new NullDispatcher(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestRequest;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.rest.FakeRestRequest;
import org.opensearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -198,7 +199,8 @@ public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext,
xContentRegistry(),
dispatcher,
clusterSettings,
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
)
) {
transport.start();
Expand Down Expand Up @@ -247,7 +249,8 @@ public void testBindUnavailableAddress() {
xContentRegistry(),
new NullDispatcher(),
clusterSettings,
new SharedGroupFactory(Settings.EMPTY)
new SharedGroupFactory(Settings.EMPTY),
NoopTracer.INSTANCE
)
) {
transport.start();
Expand All @@ -265,7 +268,8 @@ public void testBindUnavailableAddress() {
xContentRegistry(),
new NullDispatcher(),
clusterSettings,
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
)
) {
BindHttpException bindHttpException = expectThrows(BindHttpException.class, otherTransport::start);
Expand Down Expand Up @@ -317,7 +321,8 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
xContentRegistry(),
dispatcher,
clusterSettings,
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
)
) {
transport.start();
Expand Down Expand Up @@ -379,7 +384,8 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
xContentRegistry(),
dispatcher,
clusterSettings,
new SharedGroupFactory(Settings.EMPTY)
new SharedGroupFactory(Settings.EMPTY),
NoopTracer.INSTANCE
)
) {
transport.start();
Expand Down Expand Up @@ -448,7 +454,8 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
xContentRegistry(),
dispatcher,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
)
) {
transport.start();
Expand Down Expand Up @@ -521,7 +528,8 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
xContentRegistry(),
dispatcher,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
)
) {
transport.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -99,7 +100,14 @@ public TransportAddress[] addressesFromString(String address) {
return new TransportAddress[] { poorMansDNS.getOrDefault(address, buildNewFakeTransportAddress()) };
}
};
return new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
return new MockTransportService(
Settings.EMPTY,
transport,
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null,
NoopTracer.INSTANCE
);
}

protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.discovery.SeedHostsProvider;
import org.opensearch.discovery.SeedHostsResolver;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.nio.MockNioTransport;
Expand Down Expand Up @@ -80,7 +81,8 @@ protected MockTransportService createTransportService() {
),
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null
null,
NoopTracer.INSTANCE
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -109,7 +110,7 @@ public void setProjectName() {

@Before
public void createTransportService() {
transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null);
transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, NoopTracer.INSTANCE);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import org.opensearch.test.telemetry.tracing.MockSpanData;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import io.opentelemetry.sdk.common.CompletableResultCode;
Expand All @@ -21,7 +23,7 @@

public class InMemorySingletonSpanExporter implements SpanExporter {

private static final InMemorySingletonSpanExporter INSTANCE = new InMemorySingletonSpanExporter(InMemorySpanExporter.create());
public static final InMemorySingletonSpanExporter INSTANCE = new InMemorySingletonSpanExporter(InMemorySpanExporter.create());

private static InMemorySpanExporter delegate;

Expand Down Expand Up @@ -62,10 +64,30 @@ private List<MockSpanData> convertSpanDataListToMockSpanDataList(List<SpanData>
spanData.getStartEpochNanos(),
spanData.getEndEpochNanos(),
spanData.hasEnded(),
spanData.getName()
spanData.getName(),
getAttributes(spanData)
)
)
.collect(Collectors.toList());
return mockSpanDataList;
}

private Map<String, Object> getAttributes(SpanData spanData) {
if (spanData.getAttributes() != null) {
return spanData.getAttributes()
.asMap()
.entrySet()
.stream()
.collect(Collectors.toMap(e -> e.getKey().getKey(), e -> e.getValue()));
} else {
return Collections.emptyMap();
}
}

/**
* Clears the state.
*/
public void reset() {
delegate.reset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,15 @@ public void testSanityCheckWhenTracingDisabled() throws Exception {

ensureGreen();
refresh();
InMemorySingletonSpanExporter exporter = InMemorySingletonSpanExporter.INSTANCE;
exporter.reset();

// Make the search call;
client.prepareSearch().setQuery(queryStringQuery("fox")).get();

// Sleep for about 3s to wait for traces are published (the delay is 1s)
Thread.sleep(3000);

InMemorySingletonSpanExporter exporter = InMemorySingletonSpanExporter.create();
assertTrue(exporter.getFinishedSpanItems().isEmpty());
}

Expand Down
Loading
Loading