From b7c5d6dabc6a0cb29e097aea2dde4fd65a5be952 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Thu, 23 Jun 2022 00:33:43 +0000 Subject: [PATCH] Adding Latest Recevied checkpoint, replay checkpoint logic along with tests Signed-off-by: Rishikesh1159 --- .../SegmentReplicationTargetService.java | 26 +++++++++++- .../SegmentReplicationTargetServiceTests.java | 42 +++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index c44b27911bb7a..d7c12559a7238 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -31,6 +31,8 @@ import org.opensearch.transport.TransportRequestHandler; import org.opensearch.transport.TransportService; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; /** @@ -49,6 +51,8 @@ public class SegmentReplicationTargetService implements IndexEventListener { private final SegmentReplicationSourceFactory sourceFactory; + private static final Map latestReceivedCheckpoint = new HashMap<>(); + /** * The internal actions * @@ -84,6 +88,13 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh } } + ReplicationCheckpoint getLatestReceivedCheckpoint(ShardId shardId) { + if (latestReceivedCheckpoint.containsKey(shardId)) { + return latestReceivedCheckpoint.get(shardId); + } + return null; + } + /** * Invoked when a new checkpoint is received from a primary shard. * It checks if a new checkpoint should be processed or not and starts replication if needed. @@ -91,6 +102,13 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh * @param replicaShard replica shard on which checkpoint is received */ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) { + if (getLatestReceivedCheckpoint(replicaShard.shardId()) != null) { + if (receivedCheckpoint.isAheadOf(latestReceivedCheckpoint.get(replicaShard.shardId()))) { + latestReceivedCheckpoint.replace(replicaShard.shardId(), receivedCheckpoint); + } + } else { + latestReceivedCheckpoint.put(replicaShard.shardId(), receivedCheckpoint); + } if (onGoingReplications.isShardReplicating(replicaShard.shardId())) { logger.trace( () -> new ParameterizedMessage( @@ -103,7 +121,13 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) { startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() { @Override - public void onReplicationDone(SegmentReplicationState state) {} + public void onReplicationDone(SegmentReplicationState state) { + // if we received a checkpoint during the copy event that is ahead of this + // try and process it. + if (getLatestReceivedCheckpoint(replicaShard.shardId()).isAheadOf(replicaShard.getLatestReplicationCheckpoint())) { + onNewCheckpoint(getLatestReceivedCheckpoint(replicaShard.shardId()), replicaShard); + } + } @Override public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 33734fe85def5..ab3388b0389d0 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -177,6 +177,48 @@ public void testNewCheckpoint_validationPassesAndReplicationFails() throws IOExc closeShard(indexShard, false); } + public void testReplicationOnDone() throws IOException { + SegmentReplicationTargetService spy = spy(sut); + IndexShard spyShard = spy(indexShard); + ReplicationCheckpoint cp = indexShard.getLatestReplicationCheckpoint(); + ReplicationCheckpoint newCheckpoint = new ReplicationCheckpoint( + cp.getShardId(), + cp.getPrimaryTerm(), + cp.getSegmentsGen(), + cp.getSeqNo(), + cp.getSegmentInfosVersion() + 1 + ); + ReplicationCheckpoint anotherNewCheckpoint = new ReplicationCheckpoint( + cp.getShardId(), + cp.getPrimaryTerm(), + cp.getSegmentsGen(), + cp.getSeqNo(), + cp.getSegmentInfosVersion() + 2 + ); + ArgumentCaptor captor = ArgumentCaptor.forClass( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + doNothing().when(spy).startReplication(any(), any(), any()); + spy.onNewCheckpoint(newCheckpoint, spyShard); + spy.onNewCheckpoint(anotherNewCheckpoint, spyShard); + verify(spy, times(1)).startReplication(eq(newCheckpoint), any(), captor.capture()); + verify(spy, times(1)).onNewCheckpoint(eq(anotherNewCheckpoint), any()); + SegmentReplicationTargetService.SegmentReplicationListener listener = captor.getValue(); + listener.onDone(new SegmentReplicationState(new ReplicationLuceneIndex())); + verify(spy, times(2)).onNewCheckpoint(eq(anotherNewCheckpoint), any()); + closeShard(indexShard, false); + + } + + public void testLatestReceivedCheckpoint() { + SegmentReplicationTargetService spy = spy(sut); + assertEquals(null, spy.getLatestReceivedCheckpoint(indexShard.shardId())); + spy.onNewCheckpoint(checkpoint, indexShard); + assertEquals(checkpoint, spy.getLatestReceivedCheckpoint(indexShard.shardId())); + spy.onNewCheckpoint(indexShard.getLatestReplicationCheckpoint(), indexShard); + assertEquals(indexShard.getLatestReplicationCheckpoint(), spy.getLatestReceivedCheckpoint(indexShard.shardId())); + } + public void testBeforeIndexShardClosed_CancelsOngoingReplications() { final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint,