Skip to content

Commit

Permalink
[Segment Replication] Fix flaky testSnapshotOnSegRep_RestoreOnSegRepD…
Browse files Browse the repository at this point in the history
…uringIngestion test (#6280)

* [Segment Replication] Fix flaky testSnapshotOnSegRep_RestoreOnSegRepDuringIngestion test

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Fix spotless failures

Signed-off-by: Suraj Singh <surajrider@gmail.com>

---------

Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 committed Feb 10, 2023
1 parent cef446c commit bcdf0c9
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,15 @@ protected DiscoveryNode getNodeContainingPrimaryShard() {
* @param nodes - List of node names.
*/
protected void waitForSearchableDocs(long docCount, List<String> nodes) throws Exception {
// wait until the replica has the latest segment generation.
waitForSearchableDocs(INDEX_NAME, docCount, nodes);
}

public static void waitForSearchableDocs(String indexName, long docCount, List<String> nodes) throws Exception {
// wait until the replica has the latest segment generation.
assertBusy(() -> {
for (String node : nodes) {
final SearchResponse response = client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get();
final SearchResponse response = client(node).prepareSearch(indexName).setSize(0).setPreference("_only_local").get();
final long hits = response.getHits().getTotalHits().value;
if (hits < docCount) {
fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.snapshots;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
Expand All @@ -22,15 +21,15 @@
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.indices.replication.SegmentReplicationBaseIT.waitForSearchableDocs;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

Expand Down Expand Up @@ -79,20 +78,8 @@ protected boolean addMockInternalEngine() {
}

public void ingestData(int docCount, String indexName) throws Exception {
try (
BackgroundIndexer indexer = new BackgroundIndexer(
indexName,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(docCount);
waitForDocs(docCount, indexer);
refresh(indexName);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(indexName).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
}

Expand Down Expand Up @@ -123,11 +110,8 @@ public void createSnapshot() {
.setWaitForCompletion(true)
.setIndices(INDEX_NAME)
.get();
assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
);
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
assertEquals(createSnapshotResponse.getSnapshotInfo().successfulShards(), createSnapshotResponse.getSnapshotInfo().totalShards());
assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS);
}

public RestoreSnapshotResponse restoreSnapshotWithSettings(Settings indexSettings) {
Expand All @@ -154,7 +138,7 @@ public void testRestoreOnSegRep() throws Exception {
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null);

// Assertions
assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED));
assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED);
ensureGreen(RESTORED_INDEX_NAME);
GetSettingsResponse settingsResponse = client().admin()
.indices()
Expand All @@ -165,9 +149,9 @@ public void testRestoreOnSegRep() throws Exception {
assertHitCount(resp, DOC_COUNT);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testSnapshotOnSegRep_RestoreOnSegRepDuringIngestion() throws Exception {
startClusterWithSettings(segRepEnableIndexSettings(), 1);
List<String> nodes = startClusterWithSettings(segRepEnableIndexSettings(), 1);
waitForSearchableDocs(INDEX_NAME, DOC_COUNT, nodes);
createSnapshot();
// Delete index
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
Expand All @@ -176,16 +160,23 @@ public void testSnapshotOnSegRep_RestoreOnSegRepDuringIngestion() throws Excepti
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null);

// Assertions
assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED));
ingestData(5000, RESTORED_INDEX_NAME);
ensureGreen(RESTORED_INDEX_NAME);
assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED);
assertBusy(() -> ensureGreen(RESTORED_INDEX_NAME), 60, TimeUnit.SECONDS);
final int docCountPostRestore = 1001;
final int totalDocCount = DOC_COUNT + docCountPostRestore;
for (int i = DOC_COUNT; i < totalDocCount; i++) {
client().prepareIndex(RESTORED_INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
flushAndRefresh(RESTORED_INDEX_NAME);
assertBusy(() -> ensureGreen(RESTORED_INDEX_NAME), 60, TimeUnit.SECONDS);
waitForSearchableDocs(RESTORED_INDEX_NAME, totalDocCount, nodes);
GetSettingsResponse settingsResponse = client().admin()
.indices()
.getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME))
.get();
assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT");
SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get();
assertHitCount(resp, DOC_COUNT + 5000);
assertHitCount(resp, totalDocCount);
}

public void testSnapshotOnDocRep_RestoreOnSegRep() throws Exception {
Expand All @@ -197,7 +188,7 @@ public void testSnapshotOnDocRep_RestoreOnSegRep() throws Exception {
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexSegRepSettings());

// Assertions
assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED));
assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED);
ensureGreen(RESTORED_INDEX_NAME);
GetSettingsResponse settingsResponse = client().admin()
.indices()
Expand All @@ -219,7 +210,7 @@ public void testSnapshotOnSegRep_RestoreOnDocRep() throws Exception {
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexDocRepSettings());

// Assertions
assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED));
assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED);
ensureGreen(RESTORED_INDEX_NAME);
GetSettingsResponse settingsResponse = client().admin()
.indices()
Expand All @@ -239,7 +230,7 @@ public void testSnapshotOnDocRep_RestoreOnDocRep() throws Exception {
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexDocRepSettings());

// Assertions
assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED));
assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED);
ensureGreen(RESTORED_INDEX_NAME);
GetSettingsResponse settingsResponse = client().admin()
.indices()
Expand All @@ -265,7 +256,7 @@ public void testRestoreOnReplicaNode() throws Exception {
RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null);

// Assertions
assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED));
assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED);
internalCluster().startNode();
ensureGreen(RESTORED_INDEX_NAME);
GetSettingsResponse settingsResponse = client().admin()
Expand Down

0 comments on commit bcdf0c9

Please sign in to comment.