diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index d7c12559a7238..9b7a9cf69063d 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -51,7 +51,7 @@ public class SegmentReplicationTargetService implements IndexEventListener { private final SegmentReplicationSourceFactory sourceFactory; - private static final Map latestReceivedCheckpoint = new HashMap<>(); + private final Map latestReceivedCheckpoint = new HashMap<>(); /** * The internal actions @@ -88,7 +88,12 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh } } - ReplicationCheckpoint getLatestReceivedCheckpoint(ShardId shardId) { + /** + * Returns the Latest checkpoint received by replica shard based on shard ID. + * Returns null if replica shard has not received a checkpoint before + * @param shardId Shard Id of replica shard + */ + private ReplicationCheckpoint getLatestReceivedCheckpoint(ShardId shardId) { if (latestReceivedCheckpoint.containsKey(shardId)) { return latestReceivedCheckpoint.get(shardId); } @@ -102,6 +107,8 @@ ReplicationCheckpoint getLatestReceivedCheckpoint(ShardId shardId) { * @param replicaShard replica shard on which checkpoint is received */ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) { + + // Checks if received checkpoint is already present and ahead then it replaces old received checkpoint if (getLatestReceivedCheckpoint(replicaShard.shardId()) != null) { if (receivedCheckpoint.isAheadOf(latestReceivedCheckpoint.get(replicaShard.shardId()))) { latestReceivedCheckpoint.replace(replicaShard.shardId(), receivedCheckpoint); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index ab3388b0389d0..b72f089defc8a 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -210,15 +210,6 @@ public void testReplicationOnDone() throws IOException { } - public void testLatestReceivedCheckpoint() { - SegmentReplicationTargetService spy = spy(sut); - assertEquals(null, spy.getLatestReceivedCheckpoint(indexShard.shardId())); - spy.onNewCheckpoint(checkpoint, indexShard); - assertEquals(checkpoint, spy.getLatestReceivedCheckpoint(indexShard.shardId())); - spy.onNewCheckpoint(indexShard.getLatestReplicationCheckpoint(), indexShard); - assertEquals(indexShard.getLatestReplicationCheckpoint(), spy.getLatestReceivedCheckpoint(indexShard.shardId())); - } - public void testBeforeIndexShardClosed_CancelsOngoingReplications() { final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint,