diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index bc5c5abb5386d..043a5850aef05 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -11,16 +11,14 @@ import com.carrotsearch.randomizedtesting.RandomizedTest; import org.opensearch.OpenSearchCorruptionException; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; -import org.opensearch.action.admin.indices.segments.IndexShardSegments; -import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; -import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; -import org.opensearch.action.admin.indices.segments.ShardSegments; +import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; @@ -32,8 +30,9 @@ import org.opensearch.index.Index; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; -import org.opensearch.index.engine.Segment; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.replication.common.ReplicationType; @@ -44,7 +43,6 @@ import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; -import java.io.IOException; import java.util.Collection; import java.util.Arrays; import java.util.List; @@ -53,9 +51,9 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import java.util.stream.Collectors; +import static java.util.Arrays.asList; import static org.hamcrest.Matchers.equalTo; import static org.opensearch.index.query.QueryBuilders.matchQuery; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -71,7 +69,7 @@ public class SegmentReplicationIT extends OpenSearchIntegTestCase { @Override protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class); + return asList(MockTransportService.TestPlugin.class); } @Override @@ -110,11 +108,9 @@ public void ingestDocs(int docCount) throws Exception { indexer.start(docCount); waitForDocs(docCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); } } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testPrimaryStopped_ReplicaPromoted() throws Exception { final String primary = internalCluster().startNode(); createIndex(INDEX_NAME); @@ -125,9 +121,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + waitForSearchableDocs(1, primary, replica); // index another doc but don't refresh, we will ensure this is searchable once replica is promoted. client().prepareIndex(INDEX_NAME).setId("2").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); @@ -139,6 +133,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica); assertNotNull(replicaShardRouting); assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary()); + refresh(INDEX_NAME); assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); // assert we can index into the new primary. @@ -150,13 +145,10 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { ensureGreen(INDEX_NAME); client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(nodeC).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 4); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 4); - assertSegmentStats(REPLICA_COUNT); + waitForSearchableDocs(4, nodeC, replica); + verifyStoreContent(); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testRestartPrimary() throws Exception { final String primary = internalCluster().startNode(); createIndex(INDEX_NAME); @@ -170,8 +162,7 @@ public void testRestartPrimary() throws Exception { client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertDocCounts(initialDocCount, replica, primary); + waitForSearchableDocs(initialDocCount, replica, primary); internalCluster().restartNode(primary); ensureGreen(INDEX_NAME); @@ -179,13 +170,10 @@ public void testRestartPrimary() throws Exception { assertEquals(getNodeContainingPrimaryShard().getName(), replica); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - - assertDocCounts(initialDocCount, replica, primary); - assertSegmentStats(REPLICA_COUNT); + waitForSearchableDocs(initialDocCount, replica, primary); + verifyStoreContent(); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testCancelPrimaryAllocation() throws Exception { // this test cancels allocation on the primary - promoting the new replica and recreating the former primary as a replica. final String primary = internalCluster().startNode(); @@ -199,10 +187,9 @@ public void testCancelPrimaryAllocation() throws Exception { client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertDocCounts(initialDocCount, replica, primary); + waitForSearchableDocs(initialDocCount, replica, primary); - final IndexShard indexShard = getIndexShard(primary); + final IndexShard indexShard = getIndexShard(primary, INDEX_NAME); client().admin() .cluster() .prepareReroute() @@ -214,15 +201,13 @@ public void testCancelPrimaryAllocation() throws Exception { assertEquals(getNodeContainingPrimaryShard().getName(), replica); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - - assertDocCounts(initialDocCount, replica, primary); - assertSegmentStats(REPLICA_COUNT); + waitForSearchableDocs(initialDocCount, replica, primary); + verifyStoreContent(); } /** * This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery. - * + *

* TODO: Ignoring this test as its flaky and needs separate fix */ @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") @@ -234,6 +219,7 @@ public void testAddNewReplicaFailure() throws Exception { prepareCreate( INDEX_NAME, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ).get(); logger.info("--> index 10 docs"); @@ -292,7 +278,6 @@ public void testAddNewReplicaFailure() throws Exception { assertFalse(indicesService.hasIndex(resolveIndex(INDEX_NAME))); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { final String nodeA = internalCluster().startNode(); final String nodeB = internalCluster().startNode(); @@ -314,10 +299,7 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); - - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, nodeA, nodeB); final int additionalDocCount = scaledRandomIntBetween(0, 200); final int expectedHitCount = initialDocCount + additionalDocCount; @@ -325,12 +307,10 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { waitForDocs(expectedHitCount, indexer); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + waitForSearchableDocs(expectedHitCount, nodeA, nodeB); ensureGreen(INDEX_NAME); - assertSegmentStats(REPLICA_COUNT); + verifyStoreContent(); } } @@ -355,12 +335,8 @@ public void testIndexReopenClose() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); flush(INDEX_NAME); - waitForReplicaUpdate(); + waitForSearchableDocs(initialDocCount, primary, replica); } - - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - logger.info("--> Closing the index "); client().admin().indices().prepareClose(INDEX_NAME).get(); @@ -368,8 +344,8 @@ public void testIndexReopenClose() throws Exception { client().admin().indices().prepareOpen(INDEX_NAME).get(); ensureGreen(INDEX_NAME); - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, primary, replica); + verifyStoreContent(); } public void testMultipleShards() throws Exception { @@ -400,10 +376,7 @@ public void testMultipleShards() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); - - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, nodeA, nodeB); final int additionalDocCount = scaledRandomIntBetween(0, 200); final int expectedHitCount = initialDocCount + additionalDocCount; @@ -411,12 +384,10 @@ public void testMultipleShards() throws Exception { waitForDocs(expectedHitCount, indexer); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + waitForSearchableDocs(expectedHitCount, nodeA, nodeB); ensureGreen(INDEX_NAME); - assertSegmentStats(REPLICA_COUNT); + verifyStoreContent(); } } @@ -444,24 +415,17 @@ public void testReplicationAfterForceMerge() throws Exception { waitForDocs(initialDocCount, indexer); flush(INDEX_NAME); - waitForReplicaUpdate(); - // wait a short amount of time to give replication a chance to complete. - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, nodeA, nodeB); // Index a second set of docs so we can merge into one segment. indexer.start(additionalDocCount); waitForDocs(expectedHitCount, indexer); + waitForSearchableDocs(expectedHitCount, nodeA, nodeB); // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - - ensureGreen(INDEX_NAME); - assertSegmentStats(REPLICA_COUNT); + verifyStoreContent(); } } @@ -476,7 +440,7 @@ public void testCancellation() throws Exception { SegmentReplicationSourceService.class, primaryNode ); - final IndexShard primaryShard = getIndexShard(primaryNode); + final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME); CountDownLatch latch = new CountDownLatch(1); @@ -556,10 +520,10 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); + waitForSearchableDocs(3, primaryNode, replicaNode); assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); - assertSegmentStats(REPLICA_COUNT); + verifyStoreContent(); } public void testDeleteOperations() throws Exception { @@ -583,20 +547,13 @@ public void testDeleteOperations() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); - - // wait a short amount of time to give replication a chance to complete. - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, nodeA, nodeB); final int additionalDocCount = scaledRandomIntBetween(0, 200); final int expectedHitCount = initialDocCount + additionalDocCount; indexer.start(additionalDocCount); waitForDocs(expectedHitCount, indexer); - waitForReplicaUpdate(); - - assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + waitForSearchableDocs(expectedHitCount, nodeA, nodeB); ensureGreen(INDEX_NAME); @@ -605,32 +562,18 @@ public void testDeleteOperations() throws Exception { client(nodeA).prepareDelete(INDEX_NAME, id).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); - assertBusy(() -> { - final long nodeA_Count = client(nodeA).prepareSearch(INDEX_NAME) - .setSize(0) - .setPreference("_only_local") - .get() - .getHits() - .getTotalHits().value; - assertEquals(expectedHitCount - 1, nodeA_Count); - final long nodeB_Count = client(nodeB).prepareSearch(INDEX_NAME) - .setSize(0) - .setPreference("_only_local") - .get() - .getHits() - .getTotalHits().value; - assertEquals(expectedHitCount - 1, nodeB_Count); - }, 5, TimeUnit.SECONDS); + waitForSearchableDocs(expectedHitCount - 1, nodeA, nodeB); + verifyStoreContent(); } } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testUpdateOperations() throws Exception { - final String primary = internalCluster().startNode(); + internalCluster().startClusterManagerOnlyNode(); + final String primary = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); ensureYellow(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); final int initialDocCount = scaledRandomIntBetween(0, 200); try ( @@ -647,20 +590,13 @@ public void testUpdateOperations() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); - - // wait a short amount of time to give replication a chance to complete. - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + waitForSearchableDocs(initialDocCount, asList(primary, replica)); final int additionalDocCount = scaledRandomIntBetween(0, 200); final int expectedHitCount = initialDocCount + additionalDocCount; indexer.start(additionalDocCount); waitForDocs(expectedHitCount, indexer); - waitForReplicaUpdate(); - - assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + waitForSearchableDocs(expectedHitCount, asList(primary, replica)); Set ids = indexer.getIds(); String id = ids.toArray()[0].toString(); @@ -672,69 +608,24 @@ public void testUpdateOperations() throws Exception { assertEquals(2, updateResponse.getVersion()); refresh(INDEX_NAME); - waitForReplicaUpdate(); + verifyStoreContent(); assertSearchHits(client(primary).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); assertSearchHits(client(replica).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); - - } - } - - private void assertSegmentStats(int numberOfReplicas) throws IOException { - final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet(); - - List segmentsByIndex = getShardSegments(indicesSegmentResponse); - - // There will be an entry in the list for each index. - for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { - - // Separate Primary & replica shards ShardSegments. - final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); - final List primaryShardSegmentsList = segmentListMap.get(true); - final List replicaShardSegments = segmentListMap.get(false); - - assertEquals("There should only be one primary in the replicationGroup", primaryShardSegmentsList.size(), 1); - final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); - final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); - - assertEquals( - "There should be a ShardSegment entry for each replica in the replicationGroup", - numberOfReplicas, - replicaShardSegments.size() - ); - - for (ShardSegments shardSegment : replicaShardSegments) { - final Map latestReplicaSegments = getLatestSegments(shardSegment); - for (Segment replicaSegment : latestReplicaSegments.values()) { - final Segment primarySegment = latestPrimarySegments.get(replicaSegment.getName()); - assertEquals(replicaSegment.getGeneration(), primarySegment.getGeneration()); - assertEquals(replicaSegment.getNumDocs(), primarySegment.getNumDocs()); - assertEquals(replicaSegment.getDeletedDocs(), primarySegment.getDeletedDocs()); - assertEquals(replicaSegment.getSize(), primarySegment.getSize()); - } - - // Fetch the IndexShard for this replica and try and build its SegmentInfos from the previous commit point. - // This ensures the previous commit point is not wiped. - final ShardRouting replicaShardRouting = shardSegment.getShardRouting(); - ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState(); - final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId()); - IndexShard indexShard = getIndexShard(replicaNode.getName()); - // calls to readCommit will fail if a valid commit point and all its segments are not in the store. - indexShard.store().readLastCommittedSegmentsInfo(); - } } } public void testDropPrimaryDuringReplication() throws Exception { + final int replica_count = 6; final Settings settings = Settings.builder() .put(indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 6) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replica_count) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); final String primaryNode = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME, settings); - internalCluster().startDataOnlyNodes(6); + final List dataNodes = internalCluster().startDataOnlyNodes(6); ensureGreen(INDEX_NAME); int initialDocCount = scaledRandomIntBetween(100, 200); @@ -757,85 +648,93 @@ public void testDropPrimaryDuringReplication() throws Exception { ensureYellow(INDEX_NAME); // start another replica. - internalCluster().startDataOnlyNode(); + dataNodes.add(internalCluster().startDataOnlyNode()); ensureGreen(INDEX_NAME); // index another doc and refresh - without this the new replica won't catch up. - client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get(); + String docId = String.valueOf(initialDocCount + 1); + client().prepareIndex(INDEX_NAME).setId(docId).setSource("foo", "bar").get(); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); - assertSegmentStats(6); + waitForSearchableDocs(initialDocCount + 1, dataNodes); + verifyStoreContent(); } } /** - * Waits until the replica is caught up to the latest primary segments gen. - * @throws Exception if assertion fails + * Waits until all given nodes have at least the expected docCount. + * + * @param docCount - Expected Doc count. + * @param nodes - List of node names. */ - private void waitForReplicaUpdate() throws Exception { + private void waitForSearchableDocs(long docCount, List nodes) throws Exception { // wait until the replica has the latest segment generation. assertBusy(() -> { - final IndicesSegmentResponse indicesSegmentResponse = client().admin() - .indices() - .segments(new IndicesSegmentsRequest()) - .actionGet(); - List segmentsByIndex = getShardSegments(indicesSegmentResponse); - for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { - final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); - final List primaryShardSegmentsList = segmentListMap.get(true); - final List replicaShardSegments = segmentListMap.get(false); - // if we don't have any segments yet, proceed. - final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); - logger.debug("Primary Segments: {}", primaryShardSegments.getSegments()); - if (primaryShardSegments.getSegments().isEmpty() == false && replicaShardSegments != null) { - final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); - final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get(); - for (ShardSegments shardSegments : replicaShardSegments) { - logger.debug("Replica {} Segments: {}", shardSegments.getShardRouting(), shardSegments.getSegments()); - final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments() - .stream() - .anyMatch(segment -> segment.getGeneration() == latestPrimaryGen); - assertTrue(isReplicaCaughtUpToPrimary); - } + for (String node : nodes) { + final SearchResponse response = client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(); + final long hits = response.getHits().getTotalHits().value; + if (hits < docCount) { + fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits); } } - }); + }, 1, TimeUnit.MINUTES); } - private IndexShard getIndexShard(String node) { - final Index index = resolveIndex(INDEX_NAME); - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); - IndexService indexService = indicesService.indexServiceSafe(index); - final Optional shardId = indexService.shardIds().stream().findFirst(); - return indexService.getShard(shardId.get()); + private void waitForSearchableDocs(long docCount, String... nodes) throws Exception { + waitForSearchableDocs(docCount, Arrays.stream(nodes).collect(Collectors.toList())); } - private List getShardSegments(IndicesSegmentResponse indicesSegmentResponse) { - return indicesSegmentResponse.getIndices() - .values() - .stream() // get list of IndexSegments - .flatMap(is -> is.getShards().values().stream()) // Map to shard replication group - .map(IndexShardSegments::getShards) // get list of segments across replication group - .collect(Collectors.toList()); + private void verifyStoreContent() throws Exception { + assertBusy(() -> { + final ClusterState clusterState = getClusterState(); + for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { + for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) { + final ShardRouting primaryRouting = shardRoutingTable.primaryShard(); + final String indexName = primaryRouting.getIndexName(); + final List replicaRouting = shardRoutingTable.replicaShards(); + final IndexShard primaryShard = getIndexShard(clusterState, primaryRouting, indexName); + final Map primarySegmentMetadata = primaryShard.getSegmentMetadataMap(); + for (ShardRouting replica : replicaRouting) { + IndexShard replicaShard = getIndexShard(clusterState, replica, indexName); + final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff( + primarySegmentMetadata, + replicaShard.getSegmentMetadataMap() + ); + if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) { + fail( + "Expected no missing or different segments between primary and replica but diff was missing: " + + recoveryDiff.missing + + " Different: " + + recoveryDiff.different + + " Primary Replication Checkpoint : " + + primaryShard.getLatestReplicationCheckpoint() + + " Replica Replication Checkpoint: " + + replicaShard.getLatestReplicationCheckpoint() + ); + } + // calls to readCommit will fail if a valid commit point and all its segments are not in the store. + replicaShard.store().readLastCommittedSegmentsInfo(); + } + } + } + }, 1, TimeUnit.MINUTES); } - private Map getLatestSegments(ShardSegments segments) { - final Optional generation = segments.getSegments().stream().map(Segment::getGeneration).max(Long::compare); - final Long latestPrimaryGen = generation.get(); - return segments.getSegments() - .stream() - .filter(s -> s.getGeneration() == latestPrimaryGen) - .collect(Collectors.toMap(Segment::getName, Function.identity())); + private IndexShard getIndexShard(ClusterState state, ShardRouting routing, String indexName) { + return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), indexName); } - private Map> segmentsByShardType(ShardSegments[] replicationGroupSegments) { - return Arrays.stream(replicationGroupSegments).collect(Collectors.groupingBy(s -> s.getShardRouting().primary())); + private IndexShard getIndexShard(String node, String indexName) { + final Index index = resolveIndex(indexName); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexService indexService = indicesService.indexServiceSafe(index); + final Optional shardId = indexService.shardIds().stream().findFirst(); + return indexService.getShard(shardId.get()); } @Nullable private ShardRouting getShardRoutingForNodeName(String nodeName) { - final ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); + final ClusterState state = getClusterState(); for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(INDEX_NAME)) { for (ShardRouting shardRouting : shardRoutingTable.activeShards()) { final String nodeId = shardRouting.currentNodeId(); @@ -854,8 +753,12 @@ private void assertDocCounts(int expectedDocCount, String... nodeNames) { } } + private ClusterState getClusterState() { + return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); + } + private DiscoveryNode getNodeContainingPrimaryShard() { - final ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); + final ClusterState state = getClusterState(); final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard(); return state.nodes().resolveNode(primaryShard.currentNodeId()); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index d05dc3dabe1ae..385655b93bd10 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1580,6 +1580,19 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException { } } + /** + * Fetch a map of StoreFileMetadata for each segment from the latest SegmentInfos. + * This is used to compute diffs for segment replication. + * + * @return - Map of Segment Filename to its {@link StoreFileMetadata} + * @throws IOException - When there is an error loading metadata from the store. + */ + public Map getSegmentMetadataMap() throws IOException { + try (final GatedCloseable snapshot = getSegmentInfosSnapshot()) { + return store.getSegmentMetadataMap(snapshot.get()); + } + } + /** * Fails the shard and marks the shard store as corrupted if * e is caused by index corruption diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index aadcb577f6174..3ecdbdfd7be6e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -22,7 +22,6 @@ import org.opensearch.action.StepListener; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.util.CancellableThreads; import org.opensearch.index.shard.IndexShard; @@ -38,8 +37,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Collections; -import java.util.Map; /** * Represents the target of a replication event. @@ -173,8 +170,8 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener getMetadataMap() throws IOException { - if (indexShard.getSegmentInfosSnapshot() == null) { - return Collections.emptyMap(); - } - try (final GatedCloseable snapshot = indexShard.getSegmentInfosSnapshot()) { - return store.getSegmentMetadataMap(snapshot.get()); - } - } - @Override protected void onCancel(String reason) { cancellableThreads.cancel(reason); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index ffea4aaf6b7c4..5900ac3ad29f3 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -363,8 +363,8 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener)); - when(segrepTarget.getMetadataMap()).thenReturn(SI_SNAPSHOT_DIFFERENT); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + when(spyIndexShard.getSegmentMetadataMap()).thenReturn(SI_SNAPSHOT_DIFFERENT); segrepTarget.startReplication(new ActionListener() { @Override public void onResponse(Void replicationResponse) { @@ -415,8 +415,8 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener)); - when(segrepTarget.getMetadataMap()).thenReturn(storeMetadataSnapshots.get(0).asMap()); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + when(spyIndexShard.getSegmentMetadataMap()).thenReturn(storeMetadataSnapshots.get(0).asMap()); segrepTarget.startReplication(new ActionListener() { @Override public void onResponse(Void replicationResponse) {