From b7c5d6dabc6a0cb29e097aea2dde4fd65a5be952 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Thu, 23 Jun 2022 00:33:43 +0000 Subject: [PATCH 1/6] Adding Latest Recevied checkpoint, replay checkpoint logic along with tests Signed-off-by: Rishikesh1159 --- .../SegmentReplicationTargetService.java | 26 +++++++++++- .../SegmentReplicationTargetServiceTests.java | 42 +++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index c44b27911bb7a..d7c12559a7238 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -31,6 +31,8 @@ import org.opensearch.transport.TransportRequestHandler; import org.opensearch.transport.TransportService; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; /** @@ -49,6 +51,8 @@ public class SegmentReplicationTargetService implements IndexEventListener { private final SegmentReplicationSourceFactory sourceFactory; + private static final Map latestReceivedCheckpoint = new HashMap<>(); + /** * The internal actions * @@ -84,6 +88,13 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh } } + ReplicationCheckpoint getLatestReceivedCheckpoint(ShardId shardId) { + if (latestReceivedCheckpoint.containsKey(shardId)) { + return latestReceivedCheckpoint.get(shardId); + } + return null; + } + /** * Invoked when a new checkpoint is received from a primary shard. * It checks if a new checkpoint should be processed or not and starts replication if needed. @@ -91,6 +102,13 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh * @param replicaShard replica shard on which checkpoint is received */ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) { + if (getLatestReceivedCheckpoint(replicaShard.shardId()) != null) { + if (receivedCheckpoint.isAheadOf(latestReceivedCheckpoint.get(replicaShard.shardId()))) { + latestReceivedCheckpoint.replace(replicaShard.shardId(), receivedCheckpoint); + } + } else { + latestReceivedCheckpoint.put(replicaShard.shardId(), receivedCheckpoint); + } if (onGoingReplications.isShardReplicating(replicaShard.shardId())) { logger.trace( () -> new ParameterizedMessage( @@ -103,7 +121,13 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) { startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() { @Override - public void onReplicationDone(SegmentReplicationState state) {} + public void onReplicationDone(SegmentReplicationState state) { + // if we received a checkpoint during the copy event that is ahead of this + // try and process it. + if (getLatestReceivedCheckpoint(replicaShard.shardId()).isAheadOf(replicaShard.getLatestReplicationCheckpoint())) { + onNewCheckpoint(getLatestReceivedCheckpoint(replicaShard.shardId()), replicaShard); + } + } @Override public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 33734fe85def5..ab3388b0389d0 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -177,6 +177,48 @@ public void testNewCheckpoint_validationPassesAndReplicationFails() throws IOExc closeShard(indexShard, false); } + public void testReplicationOnDone() throws IOException { + SegmentReplicationTargetService spy = spy(sut); + IndexShard spyShard = spy(indexShard); + ReplicationCheckpoint cp = indexShard.getLatestReplicationCheckpoint(); + ReplicationCheckpoint newCheckpoint = new ReplicationCheckpoint( + cp.getShardId(), + cp.getPrimaryTerm(), + cp.getSegmentsGen(), + cp.getSeqNo(), + cp.getSegmentInfosVersion() + 1 + ); + ReplicationCheckpoint anotherNewCheckpoint = new ReplicationCheckpoint( + cp.getShardId(), + cp.getPrimaryTerm(), + cp.getSegmentsGen(), + cp.getSeqNo(), + cp.getSegmentInfosVersion() + 2 + ); + ArgumentCaptor captor = ArgumentCaptor.forClass( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + doNothing().when(spy).startReplication(any(), any(), any()); + spy.onNewCheckpoint(newCheckpoint, spyShard); + spy.onNewCheckpoint(anotherNewCheckpoint, spyShard); + verify(spy, times(1)).startReplication(eq(newCheckpoint), any(), captor.capture()); + verify(spy, times(1)).onNewCheckpoint(eq(anotherNewCheckpoint), any()); + SegmentReplicationTargetService.SegmentReplicationListener listener = captor.getValue(); + listener.onDone(new SegmentReplicationState(new ReplicationLuceneIndex())); + verify(spy, times(2)).onNewCheckpoint(eq(anotherNewCheckpoint), any()); + closeShard(indexShard, false); + + } + + public void testLatestReceivedCheckpoint() { + SegmentReplicationTargetService spy = spy(sut); + assertEquals(null, spy.getLatestReceivedCheckpoint(indexShard.shardId())); + spy.onNewCheckpoint(checkpoint, indexShard); + assertEquals(checkpoint, spy.getLatestReceivedCheckpoint(indexShard.shardId())); + spy.onNewCheckpoint(indexShard.getLatestReplicationCheckpoint(), indexShard); + assertEquals(indexShard.getLatestReplicationCheckpoint(), spy.getLatestReceivedCheckpoint(indexShard.shardId())); + } + public void testBeforeIndexShardClosed_CancelsOngoingReplications() { final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, From fcdfbd11b58f1620c49ff4565fa1584c0519c439 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Thu, 23 Jun 2022 00:53:41 +0000 Subject: [PATCH 2/6] Making getLatestReceivedCheckpoint() as private and adding java docs Signed-off-by: Rishikesh1159 --- .../replication/SegmentReplicationTargetService.java | 11 +++++++++-- .../SegmentReplicationTargetServiceTests.java | 9 --------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index d7c12559a7238..9b7a9cf69063d 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -51,7 +51,7 @@ public class SegmentReplicationTargetService implements IndexEventListener { private final SegmentReplicationSourceFactory sourceFactory; - private static final Map latestReceivedCheckpoint = new HashMap<>(); + private final Map latestReceivedCheckpoint = new HashMap<>(); /** * The internal actions @@ -88,7 +88,12 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh } } - ReplicationCheckpoint getLatestReceivedCheckpoint(ShardId shardId) { + /** + * Returns the Latest checkpoint received by replica shard based on shard ID. + * Returns null if replica shard has not received a checkpoint before + * @param shardId Shard Id of replica shard + */ + private ReplicationCheckpoint getLatestReceivedCheckpoint(ShardId shardId) { if (latestReceivedCheckpoint.containsKey(shardId)) { return latestReceivedCheckpoint.get(shardId); } @@ -102,6 +107,8 @@ ReplicationCheckpoint getLatestReceivedCheckpoint(ShardId shardId) { * @param replicaShard replica shard on which checkpoint is received */ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) { + + // Checks if received checkpoint is already present and ahead then it replaces old received checkpoint if (getLatestReceivedCheckpoint(replicaShard.shardId()) != null) { if (receivedCheckpoint.isAheadOf(latestReceivedCheckpoint.get(replicaShard.shardId()))) { latestReceivedCheckpoint.replace(replicaShard.shardId(), receivedCheckpoint); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index ab3388b0389d0..b72f089defc8a 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -210,15 +210,6 @@ public void testReplicationOnDone() throws IOException { } - public void testLatestReceivedCheckpoint() { - SegmentReplicationTargetService spy = spy(sut); - assertEquals(null, spy.getLatestReceivedCheckpoint(indexShard.shardId())); - spy.onNewCheckpoint(checkpoint, indexShard); - assertEquals(checkpoint, spy.getLatestReceivedCheckpoint(indexShard.shardId())); - spy.onNewCheckpoint(indexShard.getLatestReplicationCheckpoint(), indexShard); - assertEquals(indexShard.getLatestReplicationCheckpoint(), spy.getLatestReceivedCheckpoint(indexShard.shardId())); - } - public void testBeforeIndexShardClosed_CancelsOngoingReplications() { final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, From 72c36ee80b4c9300e1a981f1bb457fc560e619be Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Tue, 28 Jun 2022 15:20:15 +0000 Subject: [PATCH 3/6] Wrapping checkpoint processing in a runnable and addressing PR comments Signed-off-by: Rishikesh1159 --- .../SegmentReplicationTargetService.java | 19 ++++--------------- .../SegmentReplicationTargetServiceTests.java | 3 ++- 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 9b7a9cf69063d..820d5a79da465 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -88,18 +88,6 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh } } - /** - * Returns the Latest checkpoint received by replica shard based on shard ID. - * Returns null if replica shard has not received a checkpoint before - * @param shardId Shard Id of replica shard - */ - private ReplicationCheckpoint getLatestReceivedCheckpoint(ShardId shardId) { - if (latestReceivedCheckpoint.containsKey(shardId)) { - return latestReceivedCheckpoint.get(shardId); - } - return null; - } - /** * Invoked when a new checkpoint is received from a primary shard. * It checks if a new checkpoint should be processed or not and starts replication if needed. @@ -109,7 +97,7 @@ private ReplicationCheckpoint getLatestReceivedCheckpoint(ShardId shardId) { public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) { // Checks if received checkpoint is already present and ahead then it replaces old received checkpoint - if (getLatestReceivedCheckpoint(replicaShard.shardId()) != null) { + if (latestReceivedCheckpoint.get(replicaShard.shardId()) != null) { if (receivedCheckpoint.isAheadOf(latestReceivedCheckpoint.get(replicaShard.shardId()))) { latestReceivedCheckpoint.replace(replicaShard.shardId(), receivedCheckpoint); } @@ -131,8 +119,9 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe public void onReplicationDone(SegmentReplicationState state) { // if we received a checkpoint during the copy event that is ahead of this // try and process it. - if (getLatestReceivedCheckpoint(replicaShard.shardId()).isAheadOf(replicaShard.getLatestReplicationCheckpoint())) { - onNewCheckpoint(getLatestReceivedCheckpoint(replicaShard.shardId()), replicaShard); + if (latestReceivedCheckpoint.get(replicaShard.shardId()).isAheadOf(replicaShard.getLatestReplicationCheckpoint())) { + threadPool.generic() + .execute(() -> onNewCheckpoint(latestReceivedCheckpoint.get(replicaShard.shardId()), replicaShard)); } } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index b72f089defc8a..57d86d7688a76 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -205,7 +205,8 @@ public void testReplicationOnDone() throws IOException { verify(spy, times(1)).onNewCheckpoint(eq(anotherNewCheckpoint), any()); SegmentReplicationTargetService.SegmentReplicationListener listener = captor.getValue(); listener.onDone(new SegmentReplicationState(new ReplicationLuceneIndex())); - verify(spy, times(2)).onNewCheckpoint(eq(anotherNewCheckpoint), any()); + doNothing().when(spy).onNewCheckpoint(any(), any()); + verify(spy, timeout(0).times(2)).onNewCheckpoint(eq(anotherNewCheckpoint), any()); closeShard(indexShard, false); } From e0ec134cc2d4f94489d3e989a19f695d04caa253 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 20 Jul 2022 17:18:36 +0000 Subject: [PATCH 4/6] Adding condition to check if fork of thread is necessary. Signed-off-by: Rishikesh1159 --- .../replication/SegmentReplicationTargetService.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 820d5a79da465..fc7ef72b9a9f3 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -113,6 +113,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe ); return; } + final Thread thread = Thread.currentThread(); if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) { startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() { @Override @@ -120,8 +121,14 @@ public void onReplicationDone(SegmentReplicationState state) { // if we received a checkpoint during the copy event that is ahead of this // try and process it. if (latestReceivedCheckpoint.get(replicaShard.shardId()).isAheadOf(replicaShard.getLatestReplicationCheckpoint())) { - threadPool.generic() - .execute(() -> onNewCheckpoint(latestReceivedCheckpoint.get(replicaShard.shardId()), replicaShard)); + Runnable runnable = () -> onNewCheckpoint(latestReceivedCheckpoint.get(replicaShard.shardId()), replicaShard); + // Checks if we are using same thread and forks if necessary. + if (thread == Thread.currentThread()){ + threadPool.generic().execute(runnable); + } + else{ + runnable.run(); + } } } From 57afbd64fa5c6a3c75096aab029550f33e3e866e Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 20 Jul 2022 17:50:20 +0000 Subject: [PATCH 5/6] Apply spotlesscheck Signed-off-by: Rishikesh1159 --- .../indices/replication/SegmentReplicationTargetService.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index fc7ef72b9a9f3..4f9bf7d60c25b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -123,10 +123,9 @@ public void onReplicationDone(SegmentReplicationState state) { if (latestReceivedCheckpoint.get(replicaShard.shardId()).isAheadOf(replicaShard.getLatestReplicationCheckpoint())) { Runnable runnable = () -> onNewCheckpoint(latestReceivedCheckpoint.get(replicaShard.shardId()), replicaShard); // Checks if we are using same thread and forks if necessary. - if (thread == Thread.currentThread()){ + if (thread == Thread.currentThread()) { threadPool.generic().execute(runnable); - } - else{ + } else { runnable.run(); } } From 4322d354f32f77b08a802aaae5b97af15d8afa20 Mon Sep 17 00:00:00 2001 From: Rishikesh1159 Date: Wed, 20 Jul 2022 20:24:32 +0000 Subject: [PATCH 6/6] Fix import statements Signed-off-by: Rishikesh1159 --- .../SegmentReplicationTargetServiceTests.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 57d86d7688a76..a4f70125e8b21 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -25,7 +25,15 @@ import java.io.IOException; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; public class SegmentReplicationTargetServiceTests extends IndexShardTestCase {