Skip to content

Commit

Permalink
Add more remote store migration integration tests (#13115) (#13434)
Browse files Browse the repository at this point in the history
(cherry picked from commit 78087ac)

Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent a9709d2 commit 29d7331
Show file tree
Hide file tree
Showing 8 changed files with 364 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void testMixedModeAddDocRep() throws Exception {
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
addRemote = false;
setAddRemote(false);
internalCluster().startNode();
String[] allNodes = internalCluster().getNodeNames();
assertBusy(() -> { assertEquals(client.admin().cluster().prepareClusterStats().get().getNodes().size(), allNodes.length); });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -51,6 +56,16 @@ public class MigrationBaseTestCase extends OpenSearchIntegTestCase {
randomAlphaOfLength(5)
);

void setAddRemote(boolean addRemote) {
this.addRemote = addRemote;
}

@Before
public void setUp() throws Exception {
super.setUp();
setAddRemote(false);
}

protected Settings nodeSettings(int nodeOrdinal) {
if (segmentRepoPath == null || translogRepoPath == null) {
segmentRepoPath = randomRepoPath().toAbsolutePath();
Expand Down Expand Up @@ -114,6 +129,20 @@ public BulkResponse indexBulk(String indexName, int numDocs) {
return client().bulk(bulkRequest).actionGet();
}

Map<String, Integer> getShardCountByNodeId() {
final Map<String, Integer> shardCountByNodeId = new HashMap<>();
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
for (final RoutingNode node : clusterState.getRoutingNodes()) {
logger.info(
"----> node {} has {} shards",
node.nodeId(),
clusterState.getRoutingNodes().node(node.nodeId()).numberOfOwningShards()
);
shardCountByNodeId.put(node.nodeId(), clusterState.getRoutingNodes().node(node.nodeId()).numberOfOwningShards());
}
return shardCountByNodeId;
}

private void indexSingleDoc(String indexName) {
IndexResponse indexResponse = client().prepareIndex(indexName).setId("id").setSource("field", "value").get();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
Expand All @@ -128,6 +157,8 @@ public class AsyncIndexingService {
private AtomicBoolean finished = new AtomicBoolean();
private Thread indexingThread;

private int refreshFrequency = 3;

AsyncIndexingService(String indexName) {
this.indexName = indexName;
}
Expand All @@ -151,10 +182,21 @@ private Thread getIndexingThread() {
while (finished.get() == false) {
indexSingleDoc(indexName);
long currentDocCount = indexedDocs.incrementAndGet();
if (currentDocCount > 0 && currentDocCount % refreshFrequency == 0) {
logger.info("--> [iteration {}] flushing index", currentDocCount);
if (rarely()) {
client().admin().indices().prepareFlush(indexName).get();
} else {
client().admin().indices().prepareRefresh(indexName).get();
}
}
logger.info("Completed ingestion of {} docs", currentDocCount);

}
});
}

public void setRefreshFrequency(int refreshFrequency) {
this.refreshFrequency = refreshFrequency;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testRemotePrimaryDocRepReplica() throws Exception {
initDocRepToRemoteMigration();

logger.info("---> Starting 1 remote enabled data node");
addRemote = true;
setAddRemote(true);
String remoteNodeName = internalCluster().startDataOnlyNode();
internalCluster().validateClusterFormed();
assertEquals(
Expand Down Expand Up @@ -161,7 +161,7 @@ public void testRemotePrimaryDocRepAndRemoteReplica() throws Exception {
initDocRepToRemoteMigration();

logger.info("---> Starting 1 remote enabled data node");
addRemote = true;
setAddRemote(true);

String remoteNodeName = internalCluster().startDataOnlyNode();
internalCluster().validateClusterFormed();
Expand Down Expand Up @@ -346,7 +346,7 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception {
ensureGreen(FAILOVER_REMOTE_TO_DOCREP);
initDocRepToRemoteMigration();
logger.info("---> Starting 1 remote enabled data node");
addRemote = true;
setAddRemote(true);
String remoteNodeName = internalCluster().startDataOnlyNode();
internalCluster().validateClusterFormed();
assertEquals(
Expand Down Expand Up @@ -603,7 +603,7 @@ public void testFailoverRemotePrimaryToDocrepReplicaReseedToRemotePrimary() thro
ensureGreen(FAILOVER_REMOTE_TO_DOCREP);

logger.info("---> Starting a new remote enabled node");
addRemote = true;
setAddRemote(true);
String remoteNodeName = internalCluster().startDataOnlyNode();
internalCluster().validateClusterFormed();
assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import org.opensearch.test.transport.MockTransportService;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Arrays.asList;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

Expand All @@ -43,7 +45,6 @@ protected int maximumNumberOfShards() {
return 1;
}

// ToDo : Fix me when we support migration of replicas
protected int maximumNumberOfReplicas() {
return 0;
}
Expand All @@ -52,9 +53,9 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return asList(MockTransportService.TestPlugin.class);
}

public void testMixedModeRelocation() throws Exception {
String docRepNode = internalCluster().startNode();
Client client = internalCluster().client(docRepNode);
public void testRemotePrimaryRelocation() throws Exception {
List<String> docRepNodes = internalCluster().startNodes(2);
Client client = internalCluster().client(docRepNodes.get(0));
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
Expand All @@ -70,10 +71,13 @@ public void testMixedModeRelocation() throws Exception {
refresh("test");

// add remote node in mixed mode cluster
addRemote = true;
setAddRemote(true);
String remoteNode = internalCluster().startNode();
internalCluster().validateClusterFormed();

updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

String remoteNode2 = internalCluster().startNode();
internalCluster().validateClusterFormed();

Expand All @@ -87,8 +91,13 @@ public void testMixedModeRelocation() throws Exception {
int finalCurrentDoc1 = currentDoc;
waitUntil(() -> numAutoGenDocs.get() > finalCurrentDoc1 + 5);

logger.info("--> relocating from {} to {} ", docRepNode, remoteNode);
client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, docRepNode, remoteNode)).execute().actionGet();
logger.info("--> relocating from {} to {} ", docRepNodes, remoteNode);
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand("test", 0, primaryNodeName("test"), remoteNode))
.execute()
.actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
Expand Down Expand Up @@ -159,7 +168,7 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
refresh("test");

// add remote node in mixed mode cluster
addRemote = true;
setAddRemote(true);
String remoteNode = internalCluster().startNode();
internalCluster().validateClusterFormed();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotemigration;

import com.carrotsearch.randomizedtesting.generators.RandomNumbers;

import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.hamcrest.OpenSearchAssertions;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)

public class RemoteReplicaRecoveryIT extends MigrationBaseTestCase {

protected int maximumNumberOfShards() {
return 1;
}

protected int maximumNumberOfReplicas() {
return 1;
}

protected int minimumNumberOfReplicas() {
return 1;
}

/*
Brings up new replica copies on remote and docrep nodes, when primary is on a remote node
Live indexing is happening meanwhile
*/
public void testReplicaRecovery() throws Exception {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
String primaryNode = internalCluster().startNode();
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// create shard with 0 replica and 1 shard
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
String replicaNode = internalCluster().startNode();
ensureGreen("test");

AtomicInteger numAutoGenDocs = new AtomicInteger();
final AtomicBoolean finished = new AtomicBoolean(false);
Thread indexingThread = getThread(finished, numAutoGenDocs);

refresh("test");

// add remote node in mixed mode cluster
setAddRemote(true);
String remoteNode = internalCluster().startNode();
internalCluster().validateClusterFormed();

updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

String remoteNode2 = internalCluster().startNode();
internalCluster().validateClusterFormed();

// identify the primary

Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));
logger.info("--> relocating primary from {} to {} ", primaryNode, remoteNode);
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand("test", 0, primaryNode, remoteNode))
.execute()
.actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(60))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();

assertEquals(0, clusterHealthResponse.getRelocatingShards());
logger.info("--> relocation of primary from docrep to remote complete");
Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));

logger.info("--> getting up the new replicas now to doc rep node as well as remote node ");
// Increase replica count to 3
client().admin()
.indices()
.updateSettings(
new UpdateSettingsRequest("test").settings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 3)
.put("index.routing.allocation.exclude._name", remoteNode)
.build()
)
)
.get();

client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(60))
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.execute()
.actionGet();
logger.info("--> replica is up now on another docrep now as well as remote node");

assertEquals(0, clusterHealthResponse.getRelocatingShards());

Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));

// Stop replicas on docrep now.
// ToDo : Remove once we have dual replication enabled
client().admin()
.indices()
.updateSettings(
new UpdateSettingsRequest("test").settings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.routing.allocation.exclude._name", primaryNode + "," + replicaNode)
.build()
)
)
.get();

finished.set(true);
indexingThread.join();
refresh("test");
OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get());
OpenSearchAssertions.assertHitCount(
client().prepareSearch("test")
.setTrackTotalHits(true)// extra paranoia ;)
.setQuery(QueryBuilders.termQuery("auto", true))
// .setPreference("_prefer_nodes:" + (remoteNode+ "," + remoteNode2))
.get(),
numAutoGenDocs.get()
);

}

private Thread getThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) {
Thread indexingThread = new Thread(() -> {
while (finished.get() == false && numAutoGenDocs.get() < 100) {
IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
DeleteResponse deleteResponse = client().prepareDelete("test", "id").get();
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
client().prepareIndex("test").setSource("auto", true).get();
numAutoGenDocs.incrementAndGet();
logger.info("Indexed {} docs here", numAutoGenDocs.get());
}
});
indexingThread.start();
return indexingThread;
}

}
Loading

0 comments on commit 29d7331

Please sign in to comment.