Skip to content

Commit

Permalink
PR feedback.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Jan 26, 2023
1 parent 66f9042 commit eb6f377
Showing 1 changed file with 70 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
Expand Down Expand Up @@ -149,7 +151,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception {
client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get();
refresh(INDEX_NAME);
waitForSearchableDocs(4, nodeC, replica);
assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT);
verifyStoreContent();
}

public void testRestartPrimary() throws Exception {
Expand All @@ -174,7 +176,7 @@ public void testRestartPrimary() throws Exception {

flushAndRefresh(INDEX_NAME);
waitForSearchableDocs(initialDocCount, replica, primary);
assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT);
verifyStoreContent();
}

public void testCancelPrimaryAllocation() throws Exception {
Expand All @@ -192,7 +194,7 @@ public void testCancelPrimaryAllocation() throws Exception {

waitForSearchableDocs(initialDocCount, replica, primary);

final IndexShard indexShard = getIndexShard(primary);
final IndexShard indexShard = getIndexShard(primary, INDEX_NAME);
client().admin()
.cluster()
.prepareReroute()
Expand All @@ -205,23 +207,26 @@ public void testCancelPrimaryAllocation() throws Exception {

flushAndRefresh(INDEX_NAME);
waitForSearchableDocs(initialDocCount, replica, primary);
assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT);
verifyStoreContent();
}

/**
* This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery.
* <p>
* TODO: Ignoring this test as its flaky and needs separate fix
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
// @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testAddNewReplicaFailure() throws Exception {
logger.info("--> starting [Primary Node] ...");
final String primaryNode = internalCluster().startNode();

logger.info("--> creating test index ...");
prepareCreate(
INDEX_NAME,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)

).get();

logger.info("--> index 10 docs");
Expand Down Expand Up @@ -312,7 +317,7 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
waitForSearchableDocs(expectedHitCount, nodeA, nodeB);

ensureGreen(INDEX_NAME);
assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT);
verifyStoreContent();
}
}

Expand Down Expand Up @@ -347,7 +352,7 @@ public void testIndexReopenClose() throws Exception {

ensureGreen(INDEX_NAME);
waitForSearchableDocs(initialDocCount, primary, replica);
assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT);
verifyStoreContent();
}

public void testMultipleShards() throws Exception {
Expand Down Expand Up @@ -389,7 +394,7 @@ public void testMultipleShards() throws Exception {
waitForSearchableDocs(expectedHitCount, nodeA, nodeB);

ensureGreen(INDEX_NAME);
assertIdenticalSegments(3, REPLICA_COUNT);
verifyStoreContent();
}
}

Expand Down Expand Up @@ -427,7 +432,7 @@ public void testReplicationAfterForceMerge() throws Exception {
// Force a merge here so that the in memory SegmentInfos does not reference old segments on disk.
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
refresh(INDEX_NAME);
assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT);
verifyStoreContent();
}
}

Expand All @@ -442,7 +447,7 @@ public void testCancellation() throws Exception {
SegmentReplicationSourceService.class,
primaryNode
);
final IndexShard primaryShard = getIndexShard(primaryNode);
final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);

CountDownLatch latch = new CountDownLatch(1);

Expand Down Expand Up @@ -525,7 +530,7 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
waitForSearchableDocs(3, primaryNode, replicaNode);
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);
assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT);
verifyStoreContent();
}

public void testDeleteOperations() throws Exception {
Expand Down Expand Up @@ -565,7 +570,7 @@ public void testDeleteOperations() throws Exception {

refresh(INDEX_NAME);
waitForSearchableDocs(expectedHitCount - 1, nodeA, nodeB);
assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT);
verifyStoreContent();
}
}

Expand Down Expand Up @@ -611,70 +616,14 @@ public void testUpdateOperations() throws Exception {

refresh(INDEX_NAME);

assertIdenticalSegments(SHARD_COUNT, REPLICA_COUNT);
verifyStoreContent();
assertSearchHits(client(primary).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id);
assertSearchHits(client(replica).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id);
}
}

private void assertIdenticalSegments(int numberOfShards, int numberOfReplicas) throws Exception {
assertBusy(() -> {
final IndicesSegmentResponse indicesSegmentResponse = client().admin()
.indices()
.segments(new IndicesSegmentsRequest())
.actionGet();
List<ShardSegments[]> segmentsByIndex = getShardSegments(indicesSegmentResponse);

// There will be an entry in the list for each index.
assertEquals("Expected a different number of shards in the index", numberOfShards, segmentsByIndex.size());
for (ShardSegments[] replicationGroupSegments : segmentsByIndex) {
// Separate Primary & replica shards ShardSegments.
final Map<Boolean, List<ShardSegments>> segmentListMap = segmentsByShardType(replicationGroupSegments);
final List<ShardSegments> primaryShardSegmentsList = segmentListMap.get(true);
final List<ShardSegments> replicaShardSegmentsList = segmentListMap.get(false);
assertEquals("There should only be one primary in the replicationGroup", 1, primaryShardSegmentsList.size());
assertEquals(
"There should be a ShardSegment entry for each replica in the replicationGroup",
numberOfReplicas,
replicaShardSegmentsList.size()
);
final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get();
final IndexShard primaryShard = getIndexShard(primaryShardSegments);
final Map<String, StoreFileMetadata> primarySegmentMetadata = primaryShard.getSegmentMetadataMap();
for (ShardSegments replicaShardSegments : replicaShardSegmentsList) {
final IndexShard replicaShard = getIndexShard(replicaShardSegments);
final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff(
primarySegmentMetadata,
replicaShard.getSegmentMetadataMap()
);
if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) {
fail(
"Expected no missing or different segments between primary and replica but diff was missing: "
+ recoveryDiff.missing
+ " Different: "
+ recoveryDiff.different
+ " Primary Replication Checkpoint : "
+ primaryShard.getLatestReplicationCheckpoint()
+ " Replica Replication Checkpoint: "
+ replicaShard.getLatestReplicationCheckpoint()
);
}
// calls to readCommit will fail if a valid commit point and all its segments are not in the store.
replicaShard.store().readLastCommittedSegmentsInfo();
}
}
}, 1, TimeUnit.MINUTES);
}

private IndexShard getIndexShard(ShardSegments shardSegments) {
final ShardRouting replicaShardRouting = shardSegments.getShardRouting();
ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId());
return getIndexShard(replicaNode.getName());
}

public void testDropPrimaryDuringReplication() throws Exception {
int replica_count = 6;
final int replica_count = 6;
final Settings settings = Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replica_count)
Expand Down Expand Up @@ -715,7 +664,7 @@ public void testDropPrimaryDuringReplication() throws Exception {

flushAndRefresh(INDEX_NAME);
waitForSearchableDocs(initialDocCount + 1, dataNodes);
assertIdenticalSegments(SHARD_COUNT, replica_count);
verifyStoreContent();
}
}

Expand All @@ -742,30 +691,58 @@ private void waitForSearchableDocs(long docCount, String... nodes) throws Except
waitForSearchableDocs(docCount, Arrays.stream(nodes).collect(Collectors.toList()));
}

private IndexShard getIndexShard(String node) {
final Index index = resolveIndex(INDEX_NAME);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
final Optional<Integer> shardId = indexService.shardIds().stream().findFirst();
return indexService.getShard(shardId.get());

private void verifyStoreContent() throws Exception {
assertBusy(() -> {
final ClusterState clusterState = getClusterState();
for (IndexRoutingTable indexRoutingTable: clusterState.routingTable()) {
for (IndexShardRoutingTable shardRoutingTable: indexRoutingTable) {
final ShardRouting primaryRouting = shardRoutingTable.primaryShard();
final String indexName = primaryRouting.getIndexName();
final List<ShardRouting> replicaRouting = shardRoutingTable.replicaShards();
final IndexShard primaryShard = getIndexShard(clusterState, primaryRouting, indexName);
final Map<String, StoreFileMetadata> primarySegmentMetadata = primaryShard.getSegmentMetadataMap();
for(ShardRouting replica: replicaRouting) {
IndexShard replicaShard = getIndexShard(clusterState, replica, indexName);
final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff(
primarySegmentMetadata,
replicaShard.getSegmentMetadataMap()
);
if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) {
fail(
"Expected no missing or different segments between primary and replica but diff was missing: "
+ recoveryDiff.missing
+ " Different: "
+ recoveryDiff.different
+ " Primary Replication Checkpoint : "
+ primaryShard.getLatestReplicationCheckpoint()
+ " Replica Replication Checkpoint: "
+ replicaShard.getLatestReplicationCheckpoint()
);
}
// calls to readCommit will fail if a valid commit point and all its segments are not in the store.
replicaShard.store().readLastCommittedSegmentsInfo();
}
}
}
}, 1, TimeUnit.MINUTES);
}

private List<ShardSegments[]> getShardSegments(IndicesSegmentResponse indicesSegmentResponse) {
return indicesSegmentResponse.getIndices()
.values()
.stream() // get list of IndexSegments
.flatMap(is -> is.getShards().values().stream()) // Map to shard replication group
.map(IndexShardSegments::getShards) // get list of segments across replication group
.collect(Collectors.toList());
private IndexShard getIndexShard(ClusterState state, ShardRouting routing, String indexName) {
return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), indexName);
}

private Map<Boolean, List<ShardSegments>> segmentsByShardType(ShardSegments[] replicationGroupSegments) {
return Arrays.stream(replicationGroupSegments).collect(Collectors.groupingBy(s -> s.getShardRouting().primary()));
private IndexShard getIndexShard(String node, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
final Optional<Integer> shardId = indexService.shardIds().stream().findFirst();
return indexService.getShard(shardId.get());
}

@Nullable
private ShardRouting getShardRoutingForNodeName(String nodeName) {
final ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
final ClusterState state = getClusterState();
for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(INDEX_NAME)) {
for (ShardRouting shardRouting : shardRoutingTable.activeShards()) {
final String nodeId = shardRouting.currentNodeId();
Expand All @@ -784,8 +761,12 @@ private void assertDocCounts(int expectedDocCount, String... nodeNames) {
}
}

private ClusterState getClusterState() {
return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
}

private DiscoveryNode getNodeContainingPrimaryShard() {
final ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
final ClusterState state = getClusterState();
final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard();
return state.nodes().resolveNode(primaryShard.currentNodeId());
}
Expand Down

0 comments on commit eb6f377

Please sign in to comment.