diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index 5f7433126db57..dfffeaf860734 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -113,10 +113,15 @@ protected DiscoveryNode getNodeContainingPrimaryShard() { * @param nodes - List of node names. */ protected void waitForSearchableDocs(long docCount, List nodes) throws Exception { + // wait until the replica has the latest segment generation. + waitForSearchableDocs(INDEX_NAME, docCount, nodes); + } + + public static void waitForSearchableDocs(String indexName, long docCount, List nodes) throws Exception { // wait until the replica has the latest segment generation. assertBusy(() -> { for (String node : nodes) { - final SearchResponse response = client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(); + final SearchResponse response = client(node).prepareSearch(indexName).setSize(0).setPreference("_only_local").get(); final long hits = response.getHits().getTotalHits().value; if (hits < docCount) { fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits); diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java index 2b44a20406b05..f69b8a8ad3323 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java @@ -8,7 +8,6 @@ package org.opensearch.snapshots; -import com.carrotsearch.randomizedtesting.RandomizedTest; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; @@ -22,15 +21,15 @@ import org.opensearch.index.query.QueryBuilders; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.rest.RestStatus; -import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; -import static org.hamcrest.Matchers.equalTo; +import static org.opensearch.indices.replication.SegmentReplicationBaseIT.waitForSearchableDocs; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -79,20 +78,8 @@ protected boolean addMockInternalEngine() { } public void ingestData(int docCount, String indexName) throws Exception { - try ( - BackgroundIndexer indexer = new BackgroundIndexer( - indexName, - "_doc", - client(), - -1, - RandomizedTest.scaledRandomIntBetween(2, 5), - false, - random() - ) - ) { - indexer.start(docCount); - waitForDocs(docCount, indexer); - refresh(indexName); + for (int i = 0; i < docCount; i++) { + client().prepareIndex(indexName).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); } } @@ -123,11 +110,8 @@ public void createSnapshot() { .setWaitForCompletion(true) .setIndices(INDEX_NAME) .get(); - assertThat( - createSnapshotResponse.getSnapshotInfo().successfulShards(), - equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()) - ); - assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + assertEquals(createSnapshotResponse.getSnapshotInfo().successfulShards(), createSnapshotResponse.getSnapshotInfo().totalShards()); + assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS); } public RestoreSnapshotResponse restoreSnapshotWithSettings(Settings indexSettings) { @@ -154,7 +138,7 @@ public void testRestoreOnSegRep() throws Exception { RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null); // Assertions - assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED); ensureGreen(RESTORED_INDEX_NAME); GetSettingsResponse settingsResponse = client().admin() .indices() @@ -165,9 +149,9 @@ public void testRestoreOnSegRep() throws Exception { assertHitCount(resp, DOC_COUNT); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testSnapshotOnSegRep_RestoreOnSegRepDuringIngestion() throws Exception { - startClusterWithSettings(segRepEnableIndexSettings(), 1); + List nodes = startClusterWithSettings(segRepEnableIndexSettings(), 1); + waitForSearchableDocs(INDEX_NAME, DOC_COUNT, nodes); createSnapshot(); // Delete index assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); @@ -176,16 +160,23 @@ public void testSnapshotOnSegRep_RestoreOnSegRepDuringIngestion() throws Excepti RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null); // Assertions - assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); - ingestData(5000, RESTORED_INDEX_NAME); - ensureGreen(RESTORED_INDEX_NAME); + assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED); + assertBusy(() -> ensureGreen(RESTORED_INDEX_NAME), 60, TimeUnit.SECONDS); + final int docCountPostRestore = 1001; + final int totalDocCount = DOC_COUNT + docCountPostRestore; + for (int i = DOC_COUNT; i < totalDocCount; i++) { + client().prepareIndex(RESTORED_INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + flushAndRefresh(RESTORED_INDEX_NAME); + assertBusy(() -> ensureGreen(RESTORED_INDEX_NAME), 60, TimeUnit.SECONDS); + waitForSearchableDocs(RESTORED_INDEX_NAME, totalDocCount, nodes); GetSettingsResponse settingsResponse = client().admin() .indices() .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) .get(); assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT"); SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); - assertHitCount(resp, DOC_COUNT + 5000); + assertHitCount(resp, totalDocCount); } public void testSnapshotOnDocRep_RestoreOnSegRep() throws Exception { @@ -197,7 +188,7 @@ public void testSnapshotOnDocRep_RestoreOnSegRep() throws Exception { RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexSegRepSettings()); // Assertions - assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED); ensureGreen(RESTORED_INDEX_NAME); GetSettingsResponse settingsResponse = client().admin() .indices() @@ -219,7 +210,7 @@ public void testSnapshotOnSegRep_RestoreOnDocRep() throws Exception { RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexDocRepSettings()); // Assertions - assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED); ensureGreen(RESTORED_INDEX_NAME); GetSettingsResponse settingsResponse = client().admin() .indices() @@ -239,7 +230,7 @@ public void testSnapshotOnDocRep_RestoreOnDocRep() throws Exception { RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexDocRepSettings()); // Assertions - assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED); ensureGreen(RESTORED_INDEX_NAME); GetSettingsResponse settingsResponse = client().admin() .indices() @@ -265,7 +256,7 @@ public void testRestoreOnReplicaNode() throws Exception { RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null); // Assertions - assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED); internalCluster().startNode(); ensureGreen(RESTORED_INDEX_NAME); GetSettingsResponse settingsResponse = client().admin()