Skip to content

Commit

Permalink
Merge pull request #640 from AndreKurait/ReadTimeoutHandlerFix
Browse files Browse the repository at this point in the history
Fix ReadTimeoutException between requests
  • Loading branch information
AndreKurait committed May 9, 2024
2 parents 516ba38 + 2ae2959 commit e80b6c2
Show file tree
Hide file tree
Showing 12 changed files with 162 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.opensearch.migrations.replay.util.TrackedFuture;

import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;

@Slf4j
Expand All @@ -32,7 +31,6 @@ public class ClientConnectionPool {
private final SslContext sslContext;
public final NioEventLoopGroup eventLoopGroup;
private final LoadingCache<Key, ConnectionReplaySession> connectionId2ChannelCache;
private final Duration timeout;

@EqualsAndHashCode
@AllArgsConstructor
Expand All @@ -48,11 +46,9 @@ private Key getKey(String connectionId, int sessionNumber) {
public ClientConnectionPool(@NonNull URI serverUri,
SslContext sslContext,
@NonNull String targetConnectionPoolName,
int numThreads,
@NonNull Duration timeout) {
int numThreads) {
this.serverUri = serverUri;
this.sslContext = sslContext;
this.timeout = timeout;
this.eventLoopGroup =
new NioEventLoopGroup(numThreads, new DefaultThreadFactory(targetConnectionPoolName));

Expand Down Expand Up @@ -80,7 +76,7 @@ public ConnectionReplaySession buildConnectionReplaySession(IReplayContexts.ICha
return new AdaptiveRateLimiter<String, ChannelFuture>()
.get(() ->
NettyPacketToHttpConsumer.createClientConnection(eventLoop,
sslContext, serverUri, connectionContext, timeout)
sslContext, serverUri, connectionContext)
.whenComplete((v,t)-> {
if (t == null) {
log.atDebug().setMessage(() -> "New network connection result for " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,7 @@ public static void main(String[] args) throws Exception {
var tr = new TrafficReplayerTopLevel(topContext, uri, authTransformer,
new TransformationLoader().getTransformerFactoryLoader(uri.getHost(), params.userAgent, transformerConfig),
TrafficReplayerTopLevel.makeClientConnectionPool(
uri, params.allowInsecureConnections, params.numClientThreads,
Duration.ofSeconds(params.targetServerResponseTimeoutSeconds)),
uri, params.allowInsecureConnections, params.numClientThreads),
new TrafficStreamLimiter(params.maxConcurrentRequests), orderedRequestTracker);
activeContextMonitor = new ActiveContextMonitor(
globalContextTracker, perContextTracker, orderedRequestTracker, 64,
Expand All @@ -326,7 +325,9 @@ public static void main(String[] args) throws Exception {
setupShutdownHookForReplayer(tr);
var tupleWriter = new TupleParserChainConsumer(new ResultsToLogsConsumer());
var timeShifter = new TimeShifter(params.speedupFactor);
tr.setupRunAndWaitForReplayWithShutdownChecks(Duration.ofSeconds(params.observedPacketConnectionTimeout),
tr.setupRunAndWaitForReplayWithShutdownChecks(
Duration.ofSeconds(params.observedPacketConnectionTimeout),
Duration.ofSeconds(params.targetServerResponseTimeoutSeconds),
blockingTrafficSource, timeShifter, tupleWriter);
log.info("Done processing TrafficStreams");
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,19 @@ public TrafficReplayerTopLevel(IRootReplayerContext context,
}

public static ClientConnectionPool makeClientConnectionPool(URI serverUri, boolean allowInsecureConnections,
int numSendingThreads, Duration timeout)
int numSendingThreads)
throws SSLException {
return makeClientConnectionPool(serverUri, allowInsecureConnections, numSendingThreads, null, timeout);
return makeClientConnectionPool(serverUri, allowInsecureConnections, numSendingThreads, null);
}

public static ClientConnectionPool makeClientConnectionPool(URI serverUri,
boolean allowInsecureConnections,
int numSendingThreads,
String connectionPoolName,
Duration timeout) throws SSLException {
String connectionPoolName) throws SSLException {
return new ClientConnectionPool(serverUri, loadSslContext(serverUri, allowInsecureConnections),
connectionPoolName != null ? connectionPoolName :
getTargetConnectionPoolName(targetConnectionPoolUniqueCounter.getAndIncrement()),
numSendingThreads, timeout);
numSendingThreads);
}

public static String getTargetConnectionPoolName(int i) {
Expand All @@ -130,12 +129,15 @@ public static SslContext loadSslContext(URI serverUri, boolean allowInsecureConn
}

public void setupRunAndWaitForReplayToFinish(Duration observedPacketConnectionTimeout,
Duration targetServerResponseTimeout,
BlockingTrafficSource trafficSource,
TimeShifter timeShifter,
Consumer<SourceTargetCaptureTuple> resultTupleConsumer)
throws InterruptedException, ExecutionException {

var senderOrchestrator = new RequestSenderOrchestrator(clientConnectionPool, NettyPacketToHttpConsumer::new);
var senderOrchestrator = new RequestSenderOrchestrator(clientConnectionPool,
(replaySession, ctx) -> new NettyPacketToHttpConsumer(replaySession, ctx,
targetServerResponseTimeout));
var replayEngine = new ReplayEngine(senderOrchestrator, trafficSource, timeShifter);

CapturedTrafficToHttpTransactionAccumulator trafficToHttpTransactionAccumulator =
Expand Down Expand Up @@ -204,13 +206,14 @@ protected void wrapUpWorkAndEmitSummary(ReplayEngine replayEngine,
}

public void setupRunAndWaitForReplayWithShutdownChecks(Duration observedPacketConnectionTimeout,
Duration targetServerResponseTimeout,
BlockingTrafficSource trafficSource,
TimeShifter timeShifter,
Consumer<SourceTargetCaptureTuple> resultTupleConsumer)
throws TrafficReplayer.TerminationException, ExecutionException, InterruptedException {
try {
setupRunAndWaitForReplayToFinish(observedPacketConnectionTimeout, trafficSource,
timeShifter, resultTupleConsumer);
setupRunAndWaitForReplayToFinish(observedPacketConnectionTimeout, targetServerResponseTimeout,
trafficSource, timeShifter, resultTupleConsumer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TrafficReplayer.TerminationException(shutdownReasonRef.get(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
Expand Down Expand Up @@ -53,6 +54,7 @@ public class NettyPacketToHttpConsumer implements IPacketFinalizingConsumer<Aggr
public static final String BACKSIDE_HTTP_WATCHER_HANDLER_NAME = "BACKSIDE_HTTP_WATCHER_HANDLER";
public static final String CONNECTION_CLOSE_HANDLER_NAME = "CONNECTION_CLOSE_HANDLER";
public static final String SSL_HANDLER_NAME = "ssl";
public static final String READ_TIMEOUT_HANDLER_NAME = "readTimeoutHandler";
public static final String WRITE_COUNT_WATCHER_HANDLER_NAME = "writeCountWatcher";
public static final String READ_COUNT_WATCHER_HANDLER_NAME = "readCountWatcher";

Expand All @@ -66,29 +68,40 @@ public class NettyPacketToHttpConsumer implements IPacketFinalizingConsumer<Aggr
private Channel channel;
AggregatedRawResponse.Builder responseBuilder;
IWithTypedEnclosingScope<IReplayContexts.ITargetRequestContext> currentRequestContextUnion;
Duration readTimeoutDuration;

private static class ConnectionClosedListenerHandler extends ReadTimeoutHandler {
private static class ConnectionClosedListenerHandler extends ChannelInboundHandlerAdapter {
private final IReplayContexts.ISocketContext socketContext;
ConnectionClosedListenerHandler(IReplayContexts.IChannelKeyContext channelKeyContext,
Duration timeout) {
super(timeout.toMillis(), TimeUnit.MILLISECONDS);
ConnectionClosedListenerHandler(IReplayContexts.IChannelKeyContext channelKeyContext) {
socketContext = channelKeyContext.createSocketContext();
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
socketContext.close();
super.channelUnregistered(ctx);
super.channelInactive(ctx);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
socketContext.addTraceException(cause, true);
log.atDebug().setMessage("Exception caught in ConnectionClosedListenerHandler." +
"Closing channel due to exception").setCause(cause).log();
ctx.close();
super.exceptionCaught(ctx, cause);
}
}

public NettyPacketToHttpConsumer(ConnectionReplaySession replaySession,
IReplayContexts.IReplayerHttpTransactionContext ctx) {
IReplayContexts.IReplayerHttpTransactionContext ctx,
Duration readTimeoutDuration) {
this.replaySession = replaySession;
var parentContext = ctx.createTargetRequestContext();
this.setCurrentMessageContext(parentContext.createHttpSendingContext());
responseBuilder = AggregatedRawResponse.builder(Instant.now());
log.atDebug().setMessage(() -> "C'tor: incoming session=" + replaySession).log();
this.activeChannelFuture = activateLiveChannel();
this.readTimeoutDuration = readTimeoutDuration;
}

private TrackedFuture<String, Void> activateLiveChannel() {
Expand Down Expand Up @@ -141,8 +154,7 @@ public IReplayContexts.ITargetRequestContext getParentContext() {
createClientConnection(EventLoopGroup eventLoopGroup,
SslContext sslContext,
URI serverUri,
IReplayContexts.IChannelKeyContext channelKeyContext,
Duration timeout) {
IReplayContexts.IChannelKeyContext channelKeyContext) {
String host = serverUri.getHost();
int port = serverUri.getPort();
log.atTrace().setMessage(()->"Active - setting up backend connection to " + host + ":" + port).log();
Expand All @@ -153,7 +165,7 @@ public IReplayContexts.ITargetRequestContext getParentContext() {
@Override
protected void initChannel(@NonNull Channel ch) throws Exception {
ch.pipeline().addFirst(CONNECTION_CLOSE_HANDLER_NAME,
new ConnectionClosedListenerHandler(channelKeyContext, timeout));
new ConnectionClosedListenerHandler(channelKeyContext));
}
})
.channel(NioSocketChannel.class)
Expand Down Expand Up @@ -223,6 +235,8 @@ private void initializeChannelPipeline() {
}
getParentContext().onBytesReceived(size);
}));
pipeline.addLast(READ_TIMEOUT_HANDLER_NAME,
new ReadTimeoutHandler(this.readTimeoutDuration.toMillis(), TimeUnit.MILLISECONDS));
addLoggingHandlerLast(pipeline, "B");
pipeline.addLast(new BacksideSnifferHandler(responseBuilder));
addLoggingHandlerLast(pipeline, "C");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package org.opensearch.migrations.replay.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCountUtil;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.replay.AggregatedRawResponse;
Expand Down Expand Up @@ -54,7 +51,8 @@ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
aggregatedRawResponseBuilder.addErrorCause(cause);
triggerResponseCallbackAndRemoveCallback();
super.exceptionCaught(ctx, cause);
// AggregatedRawResponseBuilder will contain exception context so
// Exception caught event should not to propagate downstream
}

private void triggerResponseCallbackAndRemoveCallback() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.opensearch.migrations.replay;

import static org.opensearch.migrations.replay.datahandlers.NettyPacketToHttpConsumerTest.REGULAR_RESPONSE_TIMEOUT;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.FullHttpResponse;
Expand Down Expand Up @@ -90,8 +92,7 @@ public void testFutureGraphBuildout() throws Exception {
final int NUM_PACKETS = 3;

var clientConnectionPool = TrafficReplayerTopLevel.makeClientConnectionPool(new URI("http://localhost"),
false, 1, "testFutureGraphBuildout targetConnectionPool",
Duration.ofSeconds(30));
false, 1, "testFutureGraphBuildout targetConnectionPool");
var connectionToConsumerMap = new HashMap<Long, BlockingPacketConsumer>();
var senderOrchestrator = new RequestSenderOrchestrator(clientConnectionPool, (s,c) ->
connectionToConsumerMap.get(c.getSourceRequestIndex()));
Expand Down Expand Up @@ -168,10 +169,10 @@ public void testThatSchedulingWorks() throws Exception {
r -> TestHttpServerContext.makeResponse(r, Duration.ofMillis(100)))) {
var testServerUri = httpServer.localhostEndpoint();
var clientConnectionPool = TrafficReplayerTopLevel.makeClientConnectionPool(testServerUri, false,
1, "targetConnectionPool for testThatSchedulingWorks",
Duration.ofSeconds(30));
1, "targetConnectionPool for testThatSchedulingWorks");
var senderOrchestrator =
new RequestSenderOrchestrator(clientConnectionPool, NettyPacketToHttpConsumer::new);
new RequestSenderOrchestrator(clientConnectionPool,
(replaySession, ctx) -> new NettyPacketToHttpConsumer(replaySession, ctx, REGULAR_RESPONSE_TIMEOUT));
var baseTime = Instant.now();
Instant lastEndTime = baseTime;
var scheduledItems = new ArrayList<TrackedFuture<String, AggregatedRawResponse>>();
Expand Down
Loading

0 comments on commit e80b6c2

Please sign in to comment.