Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument cassandra executeReactive method #6441

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
634755c
new cassandra 4.14 module to support reactive tracing
SimoneGiusso Aug 21, 2022
f59e4d7
fix DuplicateFileCopyingException
SimoneGiusso Aug 21, 2022
128f0f6
Move CassandraSingletons in instrumentation libraries and add new Cas…
SimoneGiusso Aug 27, 2022
25de63f
Merge branch 'main' into instrumenting-executereactive-cassandra-method
SimoneGiusso Nov 15, 2022
0ca0549
sync with main branch
SimoneGiusso Nov 15, 2022
8841346
WIP: Fix tests
SimoneGiusso Nov 16, 2022
5f03349
WIP: use lowest java-driver-core version that supports reactive
SimoneGiusso Nov 17, 2022
15172a4
WIP: create common test package + wrap original TracingCqlSession
SimoneGiusso Nov 24, 2022
eec8951
Merge branch 'main' into instrumenting-executereactive-cassandra-method
SimoneGiusso Nov 24, 2022
15e4b42
WIP: avoid having both cassandra 4.* modules applied
SimoneGiusso Nov 26, 2022
ad461f6
Address last comments from mateuszrzeszutek's review
SimoneGiusso Nov 27, 2022
3e8ea44
Share code keeping it package protected + clean-up
SimoneGiusso Nov 27, 2022
74fd803
Make CompletionStageFunction public and rename package for classes un…
SimoneGiusso Nov 27, 2022
308a11e
Merge branch 'main' into instrumenting-executereactive-cassandra-method
SimoneGiusso Nov 29, 2022
dc416a8
Merge remote-tracking branch 'upstream/main' into instrumenting-execu…
trask Feb 27, 2023
aced6ee
Move library to 4.4
trask Feb 27, 2023
ed61c54
more
trask Feb 27, 2023
865b337
more
trask Feb 27, 2023
a924133
todo
trask Feb 27, 2023
19225cd
Update docs
trask Feb 27, 2023
2385797
Rewrite cassandra reactive test in java
SimoneGiusso Feb 27, 2023
8593fe4
Remove deprecated ExecutionInfo#getStatement method
SimoneGiusso Feb 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ muzzle {
}

dependencies {
implementation(project(":instrumentation:cassandra:cassandra-4.0:library"))

library("com.datastax.oss:java-driver-core:4.0.0")

compileOnly("com.google.auto.value:auto-value-annotations")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,22 @@
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.db.DbClientSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.db.SqlClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.net.NetClientAttributesExtractor;
import io.opentelemetry.instrumentation.cassandra.v4_0.CassandraRequest;
import io.opentelemetry.instrumentation.cassandra.v4_0.CassandraTelemetry;
import io.opentelemetry.javaagent.bootstrap.internal.CommonConfig;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;

public final class CassandraSingletons {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.cassandra-4.0";

// using ExecutionInfo because we can get that from ResultSet, AsyncResultSet and DriverException
private static final Instrumenter<CassandraRequest, ExecutionInfo> INSTRUMENTER;

static {
CassandraSqlAttributesGetter attributesGetter = new CassandraSqlAttributesGetter();
CassandraTelemetry telemetry =
CassandraTelemetry.builder(GlobalOpenTelemetry.get())
.setInstrumentationName("io.opentelemetry.cassandra-4.0")
.setStatementSanitizationEnabled(CommonConfig.get().isStatementSanitizationEnabled())
.build();

INSTRUMENTER =
Instrumenter.<CassandraRequest, ExecutionInfo>builder(
GlobalOpenTelemetry.get(),
INSTRUMENTATION_NAME,
DbClientSpanNameExtractor.create(attributesGetter))
.addAttributesExtractor(
SqlClientAttributesExtractor.builder(attributesGetter)
.setTableAttribute(SemanticAttributes.DB_CASSANDRA_TABLE)
.setStatementSanitizationEnabled(
CommonConfig.get().isStatementSanitizationEnabled())
.build())
.addAttributesExtractor(
NetClientAttributesExtractor.create(new CassandraNetAttributesGetter()))
.addAttributesExtractor(new CassandraAttributesExtractor())
.buildInstrumenter(SpanKindExtractor.alwaysClient());
INSTRUMENTER = telemetry.getInstrumenter();
}

public static Instrumenter<CassandraRequest, ExecutionInfo> instrumenter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;

import static io.opentelemetry.javaagent.instrumentation.cassandra.v4_0.CassandraSingletons.instrumenter;

import com.datastax.oss.driver.api.core.CqlSession;
import io.opentelemetry.instrumentation.cassandra.v4_0.TracingCqlSession;
import java.util.function.Function;

public class CompletionStageFunction implements Function<Object, Object> {
Expand All @@ -19,6 +22,6 @@ public Object apply(Object session) {
if (session.getClass().getName().endsWith("cassandra4.TracingCqlSession")) {
return session;
}
return new TracingCqlSession((CqlSession) session);
return new TracingCqlSession((CqlSession) session, instrumenter());
}
}
16 changes: 16 additions & 0 deletions instrumentation/cassandra/cassandra-4.0/library/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
plugins {
id("otel.library-instrumentation")
}

dependencies {
library("com.datastax.oss:java-driver-core:4.0.0")

compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")
}

tasks {
test {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].getService())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
package io.opentelemetry.instrumentation.cassandra.v4_0;

import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
Expand All @@ -16,7 +16,7 @@
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import javax.annotation.Nullable;

final class CassandraAttributesExtractor
public final class CassandraAttributesExtractor
SimoneGiusso marked this conversation as resolved.
Show resolved Hide resolved
implements AttributesExtractor<CassandraRequest, ExecutionInfo> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
package io.opentelemetry.instrumentation.cassandra.v4_0;

import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.metadata.Node;
Expand All @@ -12,7 +12,7 @@
import java.net.SocketAddress;
import javax.annotation.Nullable;

final class CassandraNetAttributesGetter
public final class CassandraNetAttributesGetter
extends InetSocketAddressNetClientAttributesGetter<CassandraRequest, ExecutionInfo> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
package io.opentelemetry.instrumentation.cassandra.v4_0;

import com.datastax.oss.driver.api.core.session.Session;
import com.google.auto.value.AutoValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;
package io.opentelemetry.instrumentation.cassandra.v4_0;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import io.opentelemetry.instrumentation.api.instrumenter.db.SqlClientAttributesGetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import javax.annotation.Nullable;

final class CassandraSqlAttributesGetter implements SqlClientAttributesGetter<CassandraRequest> {
public final class CassandraSqlAttributesGetter
implements SqlClientAttributesGetter<CassandraRequest> {

@Override
public String system(CassandraRequest request) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.cassandra.v4_0;

import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.db.DbClientSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.db.SqlClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.net.NetClientAttributesExtractor;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;

public final class CassandraTelemetry {
SimoneGiusso marked this conversation as resolved.
Show resolved Hide resolved

SimoneGiusso marked this conversation as resolved.
Show resolved Hide resolved
public static CassandraTelemetryBuilder builder(OpenTelemetry openTelemetry) {
return new CassandraTelemetryBuilder(openTelemetry);
}

private final Instrumenter<CassandraRequest, ExecutionInfo> instrumenter;

CassandraTelemetry(
OpenTelemetry openTelemetry,
String instrumentationName,
boolean statementSanitizationEnabled) {
this.instrumenter =
createInstrumenter(openTelemetry, instrumentationName, statementSanitizationEnabled);
}

private static Instrumenter<CassandraRequest, ExecutionInfo> createInstrumenter(
SimoneGiusso marked this conversation as resolved.
Show resolved Hide resolved
OpenTelemetry openTelemetry, String instrumenterName, boolean statementSanitizationEnabled) {
CassandraSqlAttributesGetter attributesGetter = new CassandraSqlAttributesGetter();

return Instrumenter.<CassandraRequest, ExecutionInfo>builder(
openTelemetry, instrumenterName, DbClientSpanNameExtractor.create(attributesGetter))
.addAttributesExtractor(
SqlClientAttributesExtractor.builder(attributesGetter)
.setTableAttribute(SemanticAttributes.DB_CASSANDRA_TABLE)
.setStatementSanitizationEnabled(statementSanitizationEnabled)
.build())
.addAttributesExtractor(
NetClientAttributesExtractor.create(new CassandraNetAttributesGetter()))
.addAttributesExtractor(new CassandraAttributesExtractor())
.buildInstrumenter(SpanKindExtractor.alwaysClient());
}

public Instrumenter<CassandraRequest, ExecutionInfo> getInstrumenter() {
return instrumenter;
}
SimoneGiusso marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.cassandra.v4_0;

import io.opentelemetry.api.OpenTelemetry;

public class CassandraTelemetryBuilder {
SimoneGiusso marked this conversation as resolved.
Show resolved Hide resolved

static final String DEFAULT_INSTRUMENTATION_NAME = "io.opentelemetry.cassandra";

private final OpenTelemetry openTelemetry;

private String instrumentationName = DEFAULT_INSTRUMENTATION_NAME;
private boolean statementSanitizationEnabled;

public CassandraTelemetryBuilder(OpenTelemetry openTelemetry) {
SimoneGiusso marked this conversation as resolved.
Show resolved Hide resolved
this.openTelemetry = openTelemetry;
}

public CassandraTelemetryBuilder setInstrumentationName(String instrumentationName) {
SimoneGiusso marked this conversation as resolved.
Show resolved Hide resolved
this.instrumentationName = instrumentationName;
return this;
}

public CassandraTelemetryBuilder setStatementSanitizationEnabled(boolean enabled) {
this.statementSanitizationEnabled = enabled;
return this;
}

public CassandraTelemetry build() {
return new CassandraTelemetry(openTelemetry, instrumentationName, statementSanitizationEnabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_0;

import static io.opentelemetry.javaagent.instrumentation.cassandra.v4_0.CassandraSingletons.instrumenter;
package io.opentelemetry.instrumentation.cassandra.v4_0;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
Expand All @@ -25,6 +23,7 @@
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand All @@ -33,9 +32,12 @@

public class TracingCqlSession implements CqlSession {
private final CqlSession session;
private final Instrumenter<CassandraRequest, ExecutionInfo> instrumenter;

public TracingCqlSession(CqlSession session) {
public TracingCqlSession(
CqlSession session, Instrumenter<CassandraRequest, ExecutionInfo> instrumenter) {
this.session = session;
this.instrumenter = instrumenter;
}

@Override
Expand Down Expand Up @@ -158,31 +160,31 @@ public <REQUEST extends Request, RESULT> RESULT execute(
@Override
public ResultSet execute(String query) {
CassandraRequest request = CassandraRequest.create(session, query);
Context context = instrumenter().start(Context.current(), request);
Context context = instrumenter.start(Context.current(), request);
ResultSet resultSet;
try (Scope ignored = context.makeCurrent()) {
resultSet = session.execute(query);
} catch (RuntimeException e) {
instrumenter().end(context, request, getExecutionInfo(e), e);
instrumenter.end(context, request, getExecutionInfo(e), e);
throw e;
}
instrumenter().end(context, request, resultSet.getExecutionInfo(), null);
instrumenter.end(context, request, resultSet.getExecutionInfo(), null);
return resultSet;
}

@Override
public ResultSet execute(Statement<?> statement) {
String query = getQuery(statement);
CassandraRequest request = CassandraRequest.create(session, query);
Context context = instrumenter().start(Context.current(), request);
Context context = instrumenter.start(Context.current(), request);
ResultSet resultSet;
try (Scope ignored = context.makeCurrent()) {
resultSet = session.execute(statement);
} catch (RuntimeException e) {
instrumenter().end(context, request, getExecutionInfo(e), e);
instrumenter.end(context, request, getExecutionInfo(e), e);
throw e;
}
instrumenter().end(context, request, resultSet.getExecutionInfo(), null);
instrumenter.end(context, request, resultSet.getExecutionInfo(), null);
return resultSet;
}

Expand All @@ -199,21 +201,17 @@ public CompletionStage<AsyncResultSet> executeAsync(String query) {
return executeAsync(request, () -> session.executeAsync(query));
}

private static CompletionStage<AsyncResultSet> executeAsync(
private CompletionStage<AsyncResultSet> executeAsync(
CassandraRequest request, Supplier<CompletionStage<AsyncResultSet>> query) {
Context parentContext = Context.current();
Context context = instrumenter().start(parentContext, request);
Context context = instrumenter.start(parentContext, request);
try (Scope ignored = context.makeCurrent()) {
CompletionStage<AsyncResultSet> stage = query.get();
return wrap(
stage.whenComplete(
(asyncResultSet, throwable) ->
instrumenter()
.end(
context,
request,
getExecutionInfo(asyncResultSet, throwable),
throwable)),
instrumenter.end(
context, request, getExecutionInfo(asyncResultSet, throwable), throwable)),
parentContext);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
plugins {
id("otel.javaagent-instrumentation")
}

muzzle {
pass {
group.set("com.datastax.oss")
module.set("java-driver-core")
versions.set("[4.14,)")
SimoneGiusso marked this conversation as resolved.
Show resolved Hide resolved
assertInverse.set(true)
}
}

dependencies {
implementation(project(":instrumentation:cassandra:cassandra-4.0:library"))

library("com.datastax.oss:java-driver-core:4.14.1")

compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")

testImplementation("io.projectreactor:reactor-core:3.4.21")
}

tasks {
test {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].getService())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v4_14;

import static java.util.Collections.singletonList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class CassandraClientInstrumentationModule extends InstrumentationModule {
public CassandraClientInstrumentationModule() {
super("cassandra", "cassandra-4.14");
}

SimoneGiusso marked this conversation as resolved.
Show resolved Hide resolved
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new SessionBuilderInstrumentation());
}
}
Loading