From c3ea2ca04304b46e7c17d2be448fea74e97ba79a Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Mon, 5 Sep 2022 10:32:17 -0700 Subject: [PATCH 1/3] [Semgnet Replication] Update flaky testOnNewCheckpointFromNewPrimaryCancelOngoingReplication unit test Signed-off-by: Suraj Singh --- .../replication/SegmentReplicationTarget.java | 2 +- .../SegmentReplicationTargetServiceTests.java | 20 +++++++++++-------- .../index/shard/IndexShardTestCase.java | 1 + 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 7c28406036ddd..6a9406aca13b9 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -160,9 +160,9 @@ public void startReplication(ActionListener listener) { final StepListener getFilesListener = new StepListener<>(); final StepListener finalizeListener = new StepListener<>(); + cancellableThreads.checkForCancel(); logger.trace("[shardId {}] Replica starting replication [id {}]", shardId().getId(), getId()); // Get list of files to copy from this checkpoint. - cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO); source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 1d253b0a9a300..f2eb635f24bbf 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -15,6 +15,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.CancellableThreads; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; @@ -29,6 +30,7 @@ import java.util.concurrent.TimeUnit; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.doAnswer; @@ -37,6 +39,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.eq; +import static org.opensearch.indices.replication.SegmentReplicationState.Stage.CANCELLED; public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { @@ -215,24 +218,25 @@ public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication() throws I // Mocking response when startReplication is called on targetSpy we send a new checkpoint to serviceSpy and later reduce countdown // of latch. doAnswer(invocation -> { - final ActionListener listener = invocation.getArgument(0); + // short circuit loop on new checkpoint request + doReturn(null).when(serviceSpy).startReplication(eq(newPrimaryCheckpoint), eq(replicaShard), any()); // a new checkpoint arrives before we've completed. serviceSpy.onNewCheckpoint(newPrimaryCheckpoint, replicaShard); - listener.onResponse(null); - latch.countDown(); + try { + invocation.callRealMethod(); + } catch (CancellableThreads.ExecutionCancelledException e) { + latch.countDown(); + } return null; }).when(targetSpy).startReplication(any()); - doNothing().when(targetSpy).onDone(); // start replication. This adds the target to on-ongoing replication collection serviceSpy.startReplication(targetSpy); - + latch.await(); // wait for the new checkpoint to arrive, before the listener completes. - latch.await(5, TimeUnit.SECONDS); - doNothing().when(targetSpy).startReplication(any()); + assertEquals(CANCELLED, targetSpy.state().getStage()); verify(targetSpy, times(1)).cancel("Cancelling stuck target after new primary"); verify(serviceSpy, times(1)).startReplication(eq(newPrimaryCheckpoint), eq(replicaShard), any()); - closeShards(replicaShard); } public void testNewCheckpointBehindCurrentCheckpoint() { diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 1b40cb4f2dfa3..0838a1fe87aa4 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1207,6 +1207,7 @@ public void getCheckpointMetadata( copyState.getPendingDeleteFiles() ) ); + copyState.decRef(); } catch (IOException e) { logger.error("Unexpected error computing CopyState", e); Assert.fail("Failed to compute copyState"); From 48e4c69c2f340130de449d8ba92a1fc8fc24c2ae Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Mon, 5 Sep 2022 11:06:49 -0700 Subject: [PATCH 2/3] Add changelog entry Signed-off-by: Suraj Singh --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c5af055dca8a6..f1cf14c1871dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fix flaky random test `NRTReplicationEngineTests.testUpdateSegments` ([#4352](https://github.com/opensearch-project/OpenSearch/pull/4352)) - [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386)) - [Segment Replication] Add check to cancel ongoing replication with old primary on onNewCheckpoint on replica ([#4363](https://github.com/opensearch-project/OpenSearch/pull/4363)) +- [Semgnet Replication] Update flaky testOnNewCheckpointFromNewPrimaryCancelOngoingReplication unit test ([#4414](https://github.com/opensearch-project/OpenSearch/pull/4414)) ### Security - CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341)) From 3f9b840c25fc1f814fd726efc71ef54d137daf99 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Mon, 5 Sep 2022 12:34:45 -0700 Subject: [PATCH 3/3] Update changelog entry Signed-off-by: Suraj Singh --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f1cf14c1871dd..1bc3e156aa634 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,7 +41,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fix flaky random test `NRTReplicationEngineTests.testUpdateSegments` ([#4352](https://github.com/opensearch-project/OpenSearch/pull/4352)) - [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386)) - [Segment Replication] Add check to cancel ongoing replication with old primary on onNewCheckpoint on replica ([#4363](https://github.com/opensearch-project/OpenSearch/pull/4363)) -- [Semgnet Replication] Update flaky testOnNewCheckpointFromNewPrimaryCancelOngoingReplication unit test ([#4414](https://github.com/opensearch-project/OpenSearch/pull/4414)) +- [Segment Replication] Update flaky testOnNewCheckpointFromNewPrimaryCancelOngoingReplication unit test ([#4414](https://github.com/opensearch-project/OpenSearch/pull/4414)) ### Security - CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341))