Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request opensearch-project#475 from gregschohn/ReplayerIns…
Browse files Browse the repository at this point in the history
…trumentation

Replayer instrumentation
  • Loading branch information
gregschohn committed Feb 9, 2024
2 parents ed96552 + 8a875b3 commit ffbb46e
Show file tree
Hide file tree
Showing 213 changed files with 6,999 additions and 1,727 deletions.
18 changes: 10 additions & 8 deletions TrafficCapture/captureKafkaOffloader/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,22 @@ repositories {
}

dependencies {
api 'io.netty:netty-buffer:4.1.100.Final'
implementation project(':captureOffloader')
implementation project(':coreUtilities')
implementation 'org.projectlombok:lombok:1.18.26'
implementation 'com.google.protobuf:protobuf-java:3.22.2'
implementation 'org.apache.kafka:kafka-clients:3.6.0'
implementation 'software.amazon.msk:aws-msk-iam-auth:1.1.9'
implementation 'org.slf4j:slf4j-api:2.0.7'
implementation group: 'com.google.protobuf', name:'protobuf-java', version:'3.22.2'
api group:'io.netty', name:'netty-buffer', version: '4.1.100.Final'
implementation group: 'org.projectlombok', name:'lombok', version:'1.18.26'
implementation group: 'org.apache.kafka', name:'kafka-clients', version:'3.6.0'
implementation group: 'org.slf4j', name:'slf4j-api', version:'2.0.7'
implementation group: 'software.amazon.msk', name:'aws-msk-iam-auth', version:'1.1.9'

testImplementation project(':captureProtobufs')
testImplementation 'org.mockito:mockito-core:4.6.1'
testImplementation 'org.mockito:mockito-junit-jupiter:4.6.1'
testImplementation testFixtures(project(path: ':coreUtilities'))
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.20.0'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.20.0'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl', version: '2.20.0'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '4.6.1'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '4.6.1'
testImplementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.7'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.opensearch.migrations.coreutils.MetricsAttributeKey;
import org.opensearch.migrations.coreutils.MetricsEvent;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.opensearch.migrations.coreutils.MetricsLogger;
import org.opensearch.migrations.tracing.commoncontexts.IConnectionContext;
import org.opensearch.migrations.trafficcapture.CodedOutputStreamHolder;
import org.opensearch.migrations.trafficcapture.IChannelConnectionCaptureSerializer;
import org.opensearch.migrations.trafficcapture.IConnectionCaptureFactory;
import org.opensearch.migrations.trafficcapture.OrderedStreamLifecyleManager;
import org.opensearch.migrations.trafficcapture.StreamChannelConnectionCaptureSerializer;
import org.opensearch.migrations.coreutils.MetricsLogger;
import org.opensearch.migrations.trafficcapture.kafkaoffloader.tracing.IRootKafkaOffloaderContext;
import org.opensearch.migrations.trafficcapture.kafkaoffloader.tracing.KafkaRecordContext;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;

Expand All @@ -32,27 +37,31 @@ public class KafkaCaptureFactory implements IConnectionCaptureFactory<RecordMeta
// general Kafka message overhead
public static final int KAFKA_MESSAGE_OVERHEAD_BYTES = 500;

private final IRootKafkaOffloaderContext rootScope;
private final String nodeId;
// Potential future optimization here to use a direct buffer (e.g. nio) instead of byte array
private final Producer<String, byte[]> producer;
private final String topicNameForTraffic;
private final int bufferSize;

public KafkaCaptureFactory(String nodeId, Producer<String, byte[]> producer,
public KafkaCaptureFactory(IRootKafkaOffloaderContext rootScope, String nodeId, Producer<String, byte[]> producer,
String topicNameForTraffic, int messageSize) {
this.rootScope = rootScope;
this.nodeId = nodeId;
this.producer = producer;
this.topicNameForTraffic = topicNameForTraffic;
this.bufferSize = messageSize - KAFKA_MESSAGE_OVERHEAD_BYTES;
}

public KafkaCaptureFactory(String nodeId, Producer<String, byte[]> producer, int messageSize) {
this(nodeId, producer, DEFAULT_TOPIC_NAME_FOR_TRAFFIC, messageSize);
public KafkaCaptureFactory(IRootKafkaOffloaderContext rootScope, String nodeId, Producer<String, byte[]> producer, int messageSize) {
this(rootScope, nodeId, producer, DEFAULT_TOPIC_NAME_FOR_TRAFFIC, messageSize);
}

@Override
public IChannelConnectionCaptureSerializer<RecordMetadata> createOffloader(String connectionId) {
return new StreamChannelConnectionCaptureSerializer<>(nodeId, connectionId, new StreamManager(connectionId));
public IChannelConnectionCaptureSerializer<RecordMetadata>
createOffloader(IConnectionContext ctx) {
return new StreamChannelConnectionCaptureSerializer<>(nodeId, ctx.getConnectionId(),
new StreamManager(rootScope, ctx));
}

@AllArgsConstructor
Expand All @@ -65,12 +74,22 @@ static class CodedOutputStreamWrapper implements CodedOutputStreamHolder {
}
}

@AllArgsConstructor
class StreamManager extends OrderedStreamLifecyleManager<RecordMetadata> {
String connectionId;
IConnectionContext telemetryContext;
IRootKafkaOffloaderContext rootScope;
Instant startTime;

public StreamManager(IRootKafkaOffloaderContext rootScope, IConnectionContext ctx) {
// TODO - add https://opentelemetry.io/blog/2022/instrument-kafka-clients/
this.rootScope = rootScope;
this.telemetryContext = ctx;
this.startTime = Instant.now();
}

@Override
public CodedOutputStreamWrapper createStream() {
telemetryContext.getCurrentSpan().addEvent("streamCreated");

ByteBuffer bb = ByteBuffer.allocate(bufferSize);
return new CodedOutputStreamWrapper(CodedOutputStream.newInstance(bb), bb);
}
Expand All @@ -84,7 +103,7 @@ public CodedOutputStreamWrapper createStream() {
}
var osh = (CodedOutputStreamWrapper) outputStreamHolder;

// Structured context for MetricsLogger
final var connectionId = telemetryContext.getConnectionId();
try {
String recordId = String.format("%s.%d", connectionId, index);
var byteBuffer = osh.byteBuffer;
Expand All @@ -93,8 +112,12 @@ public CodedOutputStreamWrapper createStream() {
// Used to essentially wrap Future returned by Producer to CompletableFuture
var cf = new CompletableFuture<RecordMetadata>();
log.debug("Sending Kafka producer record: {} for topic: {}", recordId, topicNameForTraffic);

var flushContext = rootScope.createKafkaRecordContext(telemetryContext,
topicNameForTraffic, recordId, kafkaRecord.value().length);

// Async request to Kafka cluster
producer.send(kafkaRecord, handleProducerRecordSent(cf, recordId));
producer.send(kafkaRecord, handleProducerRecordSent(cf, recordId, flushContext));
metricsLogger.atSuccess(MetricsEvent.RECORD_SENT_TO_KAFKA)
.setAttribute(MetricsAttributeKey.CHANNEL_ID, connectionId)
.setAttribute(MetricsAttributeKey.TOPIC_NAME, topicNameForTraffic)
Expand All @@ -108,29 +131,35 @@ public CodedOutputStreamWrapper createStream() {
throw e;
}
}

/**
* The default KafkaProducer comes with built-in retry and error-handling logic that suits many cases. From the
* documentation here for retry: https://kafka.apache.org/35/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
* "If the request fails, the producer can automatically retry. The retries setting defaults to Integer.MAX_VALUE,
* and it's recommended to use delivery.timeout.ms to control retry behavior, instead of retries."
* <p>
* Apart from this the KafkaProducer has logic for deciding whether an error is transient and should be
* retried or not retried at all: https://kafka.apache.org/35/javadoc/org/apache/kafka/common/errors/RetriableException.html
* as well as basic retry backoff
*/
private Callback handleProducerRecordSent(CompletableFuture<RecordMetadata> cf, String recordId) {
return (metadata, exception) -> {
if (exception != null) {
log.error("Error sending producer record: {}", recordId, exception);
cf.completeExceptionally(exception);
} else {
log.debug("Kafka producer record: {} has finished sending for topic: {} and partition {}",
recordId, metadata.topic(), metadata.partition());
cf.complete(metadata);
}
};
}
}

/**
* The default KafkaProducer comes with built-in retry and error-handling logic that suits many cases. From the
* documentation here for retry: https://kafka.apache.org/35/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
* "If the request fails, the producer can automatically retry. The retries setting defaults to Integer.MAX_VALUE,
* and it's recommended to use delivery.timeout.ms to control retry behavior, instead of retries."
* <p>
* Apart from this the KafkaProducer has logic for deciding whether an error is transient and should be
* retried or not retried at all: https://kafka.apache.org/35/javadoc/org/apache/kafka/common/errors/RetriableException.html
* as well as basic retry backoff
*/
private Callback handleProducerRecordSent(CompletableFuture<RecordMetadata> cf, String recordId,
KafkaRecordContext flushContext) {
// Keep this out of the inner class because it is more unsafe to include it within
// the inner class since the inner class has context that shouldn't be used. This keeps
// that field out of scope.
return (metadata, exception) -> {
log.atInfo().setMessage(()->"kafka completed sending a record").log();
if (exception != null) {
flushContext.addException(exception);
log.error("Error sending producer record: {}", recordId, exception);
cf.completeExceptionally(exception);
} else {
log.debug("Kafka producer record: {} has finished sending for topic: {} and partition {}",
recordId, metadata.topic(), metadata.partition());
cf.complete(metadata);
}
flushContext.close();
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.opensearch.migrations.trafficcapture.kafkaoffloader.tracing;

import org.opensearch.migrations.tracing.IInstrumentConstructor;
import org.opensearch.migrations.tracing.commoncontexts.IConnectionContext;

public interface IRootKafkaOffloaderContext extends IInstrumentConstructor {
KafkaRecordContext.MetricInstruments getKafkaOffloadingInstruments();

default KafkaRecordContext createKafkaRecordContext(IConnectionContext telemetryContext,
String topicNameForTraffic,
String recordId,
int length) {
return new KafkaRecordContext(this, telemetryContext, topicNameForTraffic, recordId, length);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.opensearch.migrations.trafficcapture.kafkaoffloader.tracing;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import lombok.Getter;
import lombok.NonNull;
import org.opensearch.migrations.tracing.BaseNestedSpanContext;
import org.opensearch.migrations.tracing.CommonScopedMetricInstruments;
import org.opensearch.migrations.tracing.IScopedInstrumentationAttributes;
import org.opensearch.migrations.tracing.commoncontexts.IConnectionContext;

public class KafkaRecordContext extends
BaseNestedSpanContext<IRootKafkaOffloaderContext, IConnectionContext>
implements IScopedInstrumentationAttributes {
public static final String ACTIVITY_NAME = "kafkaCommit";

static final AttributeKey<String> TOPIC_ATTR = AttributeKey.stringKey("topic");
static final AttributeKey<String> RECORD_ID_ATTR = AttributeKey.stringKey("recordId");
static final AttributeKey<Long> RECORD_SIZE_ATTR = AttributeKey.longKey("recordSize");

@Getter
public final String topic;
@Getter
public final String recordId;

public KafkaRecordContext(IRootKafkaOffloaderContext rootScope, IConnectionContext enclosingScope,
String topic, String recordId, int recordSize) {
super(rootScope, enclosingScope);
this.topic = topic;
this.recordId = recordId;
initializeSpan();
getCurrentSpan().setAttribute(RECORD_SIZE_ATTR, recordSize);
}

public static class MetricInstruments extends CommonScopedMetricInstruments {
private MetricInstruments(Meter meter, String activityName) {
super(meter, activityName);
}
}

public static @NonNull MetricInstruments makeMetrics(Meter meter) {
return new MetricInstruments(meter, ACTIVITY_NAME);
}

@Override
public @NonNull MetricInstruments getMetrics() {
return getRootInstrumentationScope().getKafkaOffloadingInstruments();
}

@Override
public String getActivityName() { return "stream_flush_called"; }

@Override
public AttributesBuilder fillAttributesForSpansBelow(AttributesBuilder builder) {
return super.fillAttributesForSpansBelow(builder)
.put(TOPIC_ATTR, getTopic())
.put(RECORD_ID_ATTR, getRecordId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.migrations.trafficcapture.IChannelConnectionCaptureSerializer;
import org.opensearch.migrations.trafficcapture.kafkaoffloader.tracing.TestRootKafkaOffloaderContext;
import org.opensearch.migrations.trafficcapture.tracing.ConnectionContext;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static org.mockito.ArgumentMatchers.any;
Expand All @@ -42,26 +42,24 @@ public class KafkaCaptureFactoryTest {
private String connectionId = "0242c0fffea82008-0000000a-00000003-62993a3207f92af6-9093ce33";
private String topic = "test_topic";


@Test
public void testLargeRequestIsWithinKafkaMessageSizeLimit() throws IOException, ExecutionException, InterruptedException {
final var referenceTimestamp = Instant.now(Clock.systemUTC());

int maxAllowableMessageSize = 1024*1024;
MockProducer<String, byte[]> producer = new MockProducer<>(true, new StringSerializer(), new ByteArraySerializer());
KafkaCaptureFactory kafkaCaptureFactory =
new KafkaCaptureFactory(TEST_NODE_ID_STRING, producer, maxAllowableMessageSize);
IChannelConnectionCaptureSerializer serializer = kafkaCaptureFactory.createOffloader(connectionId);

StringBuilder sb = new StringBuilder();
for (int i = 0; i < 15000; i++) {
sb.append("{ \"create\": { \"_index\": \"office-index\" } }\n{ \"title\": \"Malone's Cones\", \"year\": 2013 }\n");
}
Assertions.assertTrue(sb.toString().getBytes().length > 1024*1024);
byte[] fakeDataBytes = sb.toString().getBytes(StandardCharsets.UTF_8);
new KafkaCaptureFactory(TestRootKafkaOffloaderContext.noTracking(),
TEST_NODE_ID_STRING, producer, maxAllowableMessageSize);
var serializer = kafkaCaptureFactory.createOffloader(createCtx());

var testStr = "{ \"create\": { \"_index\": \"office-index\" } }\n{ \"title\": \"Malone's Cones\", \"year\": 2013 }\n"
.repeat(15000);
var fakeDataBytes = testStr.getBytes(StandardCharsets.UTF_8);
Assertions.assertTrue(fakeDataBytes.length > 1024*1024);
var bb = Unpooled.wrappedBuffer(fakeDataBytes);
serializer.addReadEvent(referenceTimestamp, bb);
CompletableFuture future = serializer.flushCommitAndResetStream(true);
var future = serializer.flushCommitAndResetStream(true);
future.get();
for (ProducerRecord<String, byte[]> record : producer.history()) {
int recordSize = calculateRecordSize(record, null);
Expand All @@ -73,6 +71,10 @@ public void testLargeRequestIsWithinKafkaMessageSizeLimit() throws IOException,
producer.close();
}

private static ConnectionContext createCtx() {
return new ConnectionContext(new TestRootKafkaOffloaderContext(), "test", "test");
}

/**
* This size calculation is based off the KafkaProducer client request size validation check done when Producer
* records are sent. This validation appears to be consistent for several versions now, here is a reference to
Expand All @@ -96,8 +98,9 @@ private int calculateRecordSize(ProducerRecord<String, byte[]> record, String re
@Test
public void testLinearOffloadingIsSuccessful() throws IOException {
KafkaCaptureFactory kafkaCaptureFactory =
new KafkaCaptureFactory(TEST_NODE_ID_STRING, mockProducer, 1024*1024);
IChannelConnectionCaptureSerializer offloader = kafkaCaptureFactory.createOffloader(connectionId);
new KafkaCaptureFactory(TestRootKafkaOffloaderContext.noTracking(),
TEST_NODE_ID_STRING, mockProducer, 1024*1024);
var offloader = kafkaCaptureFactory.createOffloader(createCtx());

List<Callback> recordSentCallbacks = new ArrayList<>(3);
when(mockProducer.send(any(), any())).thenAnswer(invocation -> {
Expand All @@ -112,11 +115,11 @@ public void testLinearOffloadingIsSuccessful() throws IOException {
byte[] fakeDataBytes = "FakeData".getBytes(StandardCharsets.UTF_8);
var bb = Unpooled.wrappedBuffer(fakeDataBytes);
offloader.addReadEvent(ts, bb);
CompletableFuture cf1 = offloader.flushCommitAndResetStream(false);
var cf1 = offloader.flushCommitAndResetStream(false);
offloader.addReadEvent(ts, bb);
CompletableFuture cf2 = offloader.flushCommitAndResetStream(false);
var cf2 = offloader.flushCommitAndResetStream(false);
offloader.addReadEvent(ts, bb);
CompletableFuture cf3 = offloader.flushCommitAndResetStream(false);
var cf3 = offloader.flushCommitAndResetStream(false);
bb.release();

Assertions.assertEquals(false, cf1.isDone());
Expand Down
Loading

0 comments on commit ffbb46e

Please sign in to comment.