diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/DocRepMigrationTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/DocRepMigrationTestCase.java index 5240949ff87b9..61def4ec6e2a4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/DocRepMigrationTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/DocRepMigrationTestCase.java @@ -29,7 +29,7 @@ public void testMixedModeAddDocRep() throws Exception { ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - addRemote = false; + setAddRemote(false); internalCluster().startNode(); String[] allNodes = internalCluster().getNodeNames(); assertBusy(() -> { assertEquals(client.admin().cluster().prepareClusterStats().get().getNodes().size(), allNodes.length); }); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java index 0c35f91121059..6f468f25ee5f1 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java @@ -16,15 +16,20 @@ import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; +import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.repositories.fs.ReloadableFsRepository; import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Before; import java.nio.file.Path; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -51,6 +56,16 @@ public class MigrationBaseTestCase extends OpenSearchIntegTestCase { randomAlphaOfLength(5) ); + void setAddRemote(boolean addRemote) { + this.addRemote = addRemote; + } + + @Before + public void setUp() throws Exception { + super.setUp(); + setAddRemote(false); + } + protected Settings nodeSettings(int nodeOrdinal) { if (segmentRepoPath == null || translogRepoPath == null) { segmentRepoPath = randomRepoPath().toAbsolutePath(); @@ -114,6 +129,20 @@ public BulkResponse indexBulk(String indexName, int numDocs) { return client().bulk(bulkRequest).actionGet(); } + Map getShardCountByNodeId() { + final Map shardCountByNodeId = new HashMap<>(); + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + for (final RoutingNode node : clusterState.getRoutingNodes()) { + logger.info( + "----> node {} has {} shards", + node.nodeId(), + clusterState.getRoutingNodes().node(node.nodeId()).numberOfOwningShards() + ); + shardCountByNodeId.put(node.nodeId(), clusterState.getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); + } + return shardCountByNodeId; + } + private void indexSingleDoc(String indexName) { IndexResponse indexResponse = client().prepareIndex(indexName).setId("id").setSource("field", "value").get(); assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); @@ -128,6 +157,8 @@ public class AsyncIndexingService { private AtomicBoolean finished = new AtomicBoolean(); private Thread indexingThread; + private int refreshFrequency = 3; + AsyncIndexingService(String indexName) { this.indexName = indexName; } @@ -151,10 +182,21 @@ private Thread getIndexingThread() { while (finished.get() == false) { indexSingleDoc(indexName); long currentDocCount = indexedDocs.incrementAndGet(); + if (currentDocCount > 0 && currentDocCount % refreshFrequency == 0) { + logger.info("--> [iteration {}] flushing index", currentDocCount); + if (rarely()) { + client().admin().indices().prepareFlush(indexName).get(); + } else { + client().admin().indices().prepareRefresh(indexName).get(); + } + } logger.info("Completed ingestion of {} docs", currentDocCount); - } }); } + + public void setRefreshFrequency(int refreshFrequency) { + this.refreshFrequency = refreshFrequency; + } } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java index 18f07910403d4..24a332212be6a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteDualReplicationIT.java @@ -89,7 +89,7 @@ public void testRemotePrimaryDocRepReplica() throws Exception { initDocRepToRemoteMigration(); logger.info("---> Starting 1 remote enabled data node"); - addRemote = true; + setAddRemote(true); String remoteNodeName = internalCluster().startDataOnlyNode(); internalCluster().validateClusterFormed(); assertEquals( @@ -161,7 +161,7 @@ public void testRemotePrimaryDocRepAndRemoteReplica() throws Exception { initDocRepToRemoteMigration(); logger.info("---> Starting 1 remote enabled data node"); - addRemote = true; + setAddRemote(true); String remoteNodeName = internalCluster().startDataOnlyNode(); internalCluster().validateClusterFormed(); @@ -346,7 +346,7 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception { ensureGreen(FAILOVER_REMOTE_TO_DOCREP); initDocRepToRemoteMigration(); logger.info("---> Starting 1 remote enabled data node"); - addRemote = true; + setAddRemote(true); String remoteNodeName = internalCluster().startDataOnlyNode(); internalCluster().validateClusterFormed(); assertEquals( @@ -603,7 +603,7 @@ public void testFailoverRemotePrimaryToDocrepReplicaReseedToRemotePrimary() thro ensureGreen(FAILOVER_REMOTE_TO_DOCREP); logger.info("---> Starting a new remote enabled node"); - addRemote = true; + setAddRemote(true); String remoteNodeName = internalCluster().startDataOnlyNode(); internalCluster().validateClusterFormed(); assertEquals( diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java index 4a4057def4207..8f6c1e2d9a68c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java @@ -30,10 +30,12 @@ import org.opensearch.test.transport.MockTransportService; import java.util.Collection; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Arrays.asList; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -43,7 +45,6 @@ protected int maximumNumberOfShards() { return 1; } - // ToDo : Fix me when we support migration of replicas protected int maximumNumberOfReplicas() { return 0; } @@ -52,9 +53,9 @@ protected Collection> nodePlugins() { return asList(MockTransportService.TestPlugin.class); } - public void testMixedModeRelocation() throws Exception { - String docRepNode = internalCluster().startNode(); - Client client = internalCluster().client(docRepNode); + public void testRemotePrimaryRelocation() throws Exception { + List docRepNodes = internalCluster().startNodes(2); + Client client = internalCluster().client(docRepNodes.get(0)); ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); @@ -70,10 +71,13 @@ public void testMixedModeRelocation() throws Exception { refresh("test"); // add remote node in mixed mode cluster - addRemote = true; + setAddRemote(true); String remoteNode = internalCluster().startNode(); internalCluster().validateClusterFormed(); + updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + String remoteNode2 = internalCluster().startNode(); internalCluster().validateClusterFormed(); @@ -87,8 +91,13 @@ public void testMixedModeRelocation() throws Exception { int finalCurrentDoc1 = currentDoc; waitUntil(() -> numAutoGenDocs.get() > finalCurrentDoc1 + 5); - logger.info("--> relocating from {} to {} ", docRepNode, remoteNode); - client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, docRepNode, remoteNode)).execute().actionGet(); + logger.info("--> relocating from {} to {} ", docRepNodes, remoteNode); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand("test", 0, primaryNodeName("test"), remoteNode)) + .execute() + .actionGet(); ClusterHealthResponse clusterHealthResponse = client().admin() .cluster() .prepareHealth() @@ -159,7 +168,7 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { refresh("test"); // add remote node in mixed mode cluster - addRemote = true; + setAddRemote(true); String remoteNode = internalCluster().startNode(); internalCluster().validateClusterFormed(); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java new file mode 100644 index 0000000000000..196ecb991bbc0 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java @@ -0,0 +1,180 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotemigration; + +import com.carrotsearch.randomizedtesting.generators.RandomNumbers; + +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.common.Priority; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.hamcrest.OpenSearchAssertions; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) + +public class RemoteReplicaRecoveryIT extends MigrationBaseTestCase { + + protected int maximumNumberOfShards() { + return 1; + } + + protected int maximumNumberOfReplicas() { + return 1; + } + + protected int minimumNumberOfReplicas() { + return 1; + } + + /* + Brings up new replica copies on remote and docrep nodes, when primary is on a remote node + Live indexing is happening meanwhile + */ + public void testReplicaRecovery() throws Exception { + internalCluster().setBootstrapClusterManagerNodeIndex(0); + String primaryNode = internalCluster().startNode(); + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + // create shard with 0 replica and 1 shard + client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get(); + String replicaNode = internalCluster().startNode(); + ensureGreen("test"); + + AtomicInteger numAutoGenDocs = new AtomicInteger(); + final AtomicBoolean finished = new AtomicBoolean(false); + Thread indexingThread = getThread(finished, numAutoGenDocs); + + refresh("test"); + + // add remote node in mixed mode cluster + setAddRemote(true); + String remoteNode = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + + updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + String remoteNode2 = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + + // identify the primary + + Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000)); + logger.info("--> relocating primary from {} to {} ", primaryNode, remoteNode); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand("test", 0, primaryNode, remoteNode)) + .execute() + .actionGet(); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setTimeout(TimeValue.timeValueSeconds(60)) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .execute() + .actionGet(); + + assertEquals(0, clusterHealthResponse.getRelocatingShards()); + logger.info("--> relocation of primary from docrep to remote complete"); + Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000)); + + logger.info("--> getting up the new replicas now to doc rep node as well as remote node "); + // Increase replica count to 3 + client().admin() + .indices() + .updateSettings( + new UpdateSettingsRequest("test").settings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 3) + .put("index.routing.allocation.exclude._name", remoteNode) + .build() + ) + ) + .get(); + + client().admin() + .cluster() + .prepareHealth() + .setTimeout(TimeValue.timeValueSeconds(60)) + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .execute() + .actionGet(); + logger.info("--> replica is up now on another docrep now as well as remote node"); + + assertEquals(0, clusterHealthResponse.getRelocatingShards()); + + Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000)); + + // Stop replicas on docrep now. + // ToDo : Remove once we have dual replication enabled + client().admin() + .indices() + .updateSettings( + new UpdateSettingsRequest("test").settings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.routing.allocation.exclude._name", primaryNode + "," + replicaNode) + .build() + ) + ) + .get(); + + finished.set(true); + indexingThread.join(); + refresh("test"); + OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get()); + OpenSearchAssertions.assertHitCount( + client().prepareSearch("test") + .setTrackTotalHits(true)// extra paranoia ;) + .setQuery(QueryBuilders.termQuery("auto", true)) + // .setPreference("_prefer_nodes:" + (remoteNode+ "," + remoteNode2)) + .get(), + numAutoGenDocs.get() + ); + + } + + private Thread getThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) { + Thread indexingThread = new Thread(() -> { + while (finished.get() == false && numAutoGenDocs.get() < 100) { + IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get(); + assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); + DeleteResponse deleteResponse = client().prepareDelete("test", "id").get(); + assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); + client().prepareIndex("test").setSource("auto", true).get(); + numAutoGenDocs.incrementAndGet(); + logger.info("Indexed {} docs here", numAutoGenDocs.get()); + } + }); + indexingThread.start(); + return indexingThread; + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationTestCase.java index 7d816a5e18698..4b1c91f1d57ca 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationTestCase.java @@ -8,34 +8,48 @@ package org.opensearch.remotemigration; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.query.QueryBuilders; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.snapshots.SnapshotInfo; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.hamcrest.OpenSearchAssertions; import java.nio.file.Path; import java.util.List; +import java.util.Map; import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) public class RemoteStoreMigrationTestCase extends MigrationBaseTestCase { + protected int maximumNumberOfReplicas() { + return 1; + } + + protected int minimumNumberOfReplicas() { + return 1; + } + public void testMixedModeAddRemoteNodes() throws Exception { internalCluster().setBootstrapClusterManagerNodeIndex(0); List cmNodes = internalCluster().startNodes(1); Client client = internalCluster().client(cmNodes.get(0)); - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + initDocRepToRemoteMigration(); // add remote node in mixed mode cluster - addRemote = true; + setAddRemote(true); internalCluster().startNode(); internalCluster().startNode(); internalCluster().validateClusterFormed(); @@ -46,7 +60,7 @@ public void testMixedModeAddRemoteNodes() throws Exception { assertEquals(1, getRepositoriesResponse.repositories().size()); // add docrep mode in mixed mode cluster - addRemote = true; + setAddRemote(true); internalCluster().startNode(); assertBusy(() -> { assertEquals(client.admin().cluster().prepareClusterStats().get().getNodes().size(), internalCluster().getNodeNames().length); @@ -121,4 +135,81 @@ public void testNoShallowSnapshotInMixedMode() throws Exception { SnapshotInfo snapshotInfo2 = RemoteStoreMigrationShardAllocationBaseTestCase.createSnapshot(shallowSnapshotRepoName, snapshot2); assertEquals(snapshotInfo2.isRemoteStoreIndexShallowCopyEnabled(), false); } + + /* + Tests end to end remote migration via Blue Green mechanism + - Starts docrep nodes with multiple nodes, indices, replicas copies + - Adds remote nodes to cluster + - Excludes docrep nodes. + - Asserts all shards are migrated to remote store + - Asserts doc count across all shards + - Continuos indexing with refresh/flush happening + */ + public void testEndToEndRemoteMigration() throws Exception { + internalCluster().setBootstrapClusterManagerNodeIndex(0); + List docRepNodes = internalCluster().startNodes(2); + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get(); + ensureGreen("test"); + + logger.info("---> Starting doc ingestion in parallel thread"); + AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test"); + asyncIndexingService.startIndexing(); + + setAddRemote(true); + + updateSettingsRequest.persistentSettings( + Settings.builder() + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") + .put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store") + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + internalCluster().startNodes(2); + + assertAcked( + internalCluster().client() + .admin() + .indices() + .prepareUpdateSettings() + .setIndices("test") + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.routing.allocation.exclude._name", String.join(",", docRepNodes)) + .build() + ) + .get() + ); + + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setTimeout(TimeValue.timeValueSeconds(45)) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .execute() + .actionGet(); + assertTrue(clusterHealthResponse.getRelocatingShards() == 0); + logger.info("---> Stopping indexing thread"); + asyncIndexingService.stopIndexing(); + Map shardCountByNodeId = getShardCountByNodeId(); + assertThat("node0 has 0 shards", shardCountByNodeId.get(docRepNodes.get(0)), equalTo(null)); + assertThat("node1 has 0 shards", shardCountByNodeId.get(docRepNodes.get(1)), equalTo(null)); + refresh("test"); + waitForReplication("test"); + OpenSearchAssertions.assertHitCount( + client().prepareSearch("test").setTrackTotalHits(true).get(), + asyncIndexingService.getIndexedDocs() + ); + OpenSearchAssertions.assertHitCount( + client().prepareSearch("test") + .setTrackTotalHits(true)// extra paranoia ;) + .setQuery(QueryBuilders.termQuery("auto", true)) + .get(), + asyncIndexingService.getIndexedDocs() + ); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/ResizeIndexMigrationTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/ResizeIndexMigrationTestCase.java index b57bc60c50e8c..b817906a8f828 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/ResizeIndexMigrationTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/ResizeIndexMigrationTestCase.java @@ -32,7 +32,7 @@ public class ResizeIndexMigrationTestCase extends MigrationBaseTestCase { * and index is on DocRep node, and migration to remote store is in progress. * */ public void testFailResizeIndexWhileDocRepToRemoteStoreMigration() throws Exception { - addRemote = false; + setAddRemote(false); // create a docrep cluster internalCluster().startClusterManagerOnlyNode(); internalCluster().validateClusterFormed(); @@ -127,7 +127,7 @@ public void testFailResizeIndexWhileDocRepToRemoteStoreMigration() throws Except * */ public void testFailResizeIndexWhileRemoteStoreToDocRepMigration() throws Exception { // creates a remote cluster - addRemote = true; + setAddRemote(true); internalCluster().startClusterManagerOnlyNode(); internalCluster().validateClusterFormed(); diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index f89ae42f1cdc5..33f6d1736f3a8 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -101,6 +101,7 @@ import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.regex.Regex; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.FeatureFlagSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -153,6 +154,7 @@ import org.opensearch.ingest.IngestMetadata; import org.opensearch.monitor.os.OsInfo; import org.opensearch.node.NodeMocksPlugin; +import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.plugins.NetworkPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.repositories.fs.ReloadableFsRepository; @@ -225,6 +227,8 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.test.XContentTestUtils.convertToMap; import static org.opensearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -2445,6 +2449,15 @@ protected RefreshResponse refreshAndWaitForReplication(String... indices) { return refreshResponse; } + public boolean isMigratingToRemoteStore() { + ClusterSettings clusterSettings = clusterService().getClusterSettings(); + boolean isMixedMode = clusterSettings.get(REMOTE_STORE_COMPATIBILITY_MODE_SETTING) + .equals(RemoteStoreNodeService.CompatibilityMode.MIXED); + boolean isRemoteStoreMigrationDirection = clusterSettings.get(MIGRATION_DIRECTION_SETTING) + .equals(RemoteStoreNodeService.Direction.REMOTE_STORE); + return (isMixedMode && isRemoteStoreMigrationDirection); + } + /** * Waits until active/started replica shards are caught up with primary shard only when Segment Replication is enabled. * This doesn't wait for inactive/non-started replica shards to become active/started. @@ -2469,11 +2482,13 @@ protected void waitForReplication(String... indices) { for (ShardRouting replica : replicaRouting) { if (replica.state().toString().equals("STARTED")) { IndexShard replicaShard = getIndexShard(replica, index); - assertEquals( - "replica shards haven't caught up with primary", - getLatestSegmentInfoVersion(primaryShard), - getLatestSegmentInfoVersion(replicaShard) - ); + if (replicaShard.indexSettings().isSegRepEnabledOrRemoteNode()) { + assertEquals( + "replica shards haven't caught up with primary", + getLatestSegmentInfoVersion(primaryShard), + getLatestSegmentInfoVersion(replicaShard) + ); + } } } } @@ -2497,7 +2512,7 @@ protected void waitForReplication(String... indices) { * Checks if Segment Replication is enabled on Index. */ protected boolean isSegmentReplicationEnabledForIndex(String index) { - return clusterService().state().getMetadata().isSegmentReplicationEnabled(index); + return clusterService().state().getMetadata().isSegmentReplicationEnabled(index) || isMigratingToRemoteStore(); } protected IndexShard getIndexShard(ShardRouting routing, String indexName) {