Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added transport action for bulk async shard fetch for primary shards #8218

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a9bc1c4
Added transport action for bulk async shard fetch for primary shards
sudarshan-baliga Jul 24, 2023
13b026d
Add PSA Async batch shard fetch transport integ test
sudarshan-baliga Jul 31, 2023
8404fc8
Refactor PSA Async batch shard fetch transport integ test
sudarshan-baliga Aug 3, 2023
767c47b
Merge branch 'main' into async-shard-fetch-psatransport
sudarshan-baliga Aug 4, 2023
d80f441
Update the documentation of TransportNodesListGatewayStartedShardsBatch.
sudarshan-baliga Aug 6, 2023
b8b5d88
Merge branch 'main' into async-shard-fetch-psatransport
sudarshan-baliga Aug 6, 2023
06e1243
Transport PSA change
shiv0408 Sep 25, 2023
933bcfc
Renamed Transport File to TransportNodesListGatewayStartedBatchShards
shiv0408 Sep 25, 2023
1c381dc
Removed AsyncBatchShardFetch
shiv0408 Sep 26, 2023
b8a141d
Renamed test files
shiv0408 Sep 26, 2023
84f63ac
Removed changes part of other PRs
shiv0408 Sep 26, 2023
ffd6031
Corrected the references of TransportNodesListGatewayStartedBatchShards
shiv0408 Sep 28, 2023
bc84bd9
Rename NodeGatewayStartedShards to NodeGatewayStartedShard
shiv0408 Dec 11, 2023
94f27cc
modify the request signature
shiv0408 Dec 12, 2023
4a62b19
Remove ShardAttributes because added in #8742
shiv0408 Dec 12, 2023
5c1f446
Added a Helper class to remove code duplication
shiv0408 Dec 12, 2023
0ff9a02
Merge branch 'main' into async-shard-fetch-psatransport
shiv0408 Dec 12, 2023
93a548b
Merge branch 'main' into async-shard-fetch-psatransport
shiv0408 Jan 8, 2024
89c1143
Fix build failure after main merge
shiv0408 Jan 8, 2024
55e43cf
Renamed TransportNodesListGatewayStartedShardsBatch
shiv0408 Jan 25, 2024
f8fb3cd
Merge branch 'main' into async-shard-fetch-psatransport
shiv0408 Jan 25, 2024
551fc60
Address review comment
shiv0408 Jan 25, 2024
b57c330
Move integration tests to RecoveryFromGatewayIT
shiv0408 Jan 29, 2024
1dbd248
Added UsingBatchAction suffix to ITs
shiv0408 Feb 1, 2024
f8d8a6a
Fixed comment
shiv0408 Feb 1, 2024
d1e90a1
Merge branch 'main' into async-shard-fetch-psatransport
shiv0408 Feb 1, 2024
8f113fc
empty commit to trigger workflow
shiv0408 Feb 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway;

import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.indices.store.ShardAttributes;

import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import static org.opensearch.test.OpenSearchIntegTestCase.client;
import static org.opensearch.test.OpenSearchIntegTestCase.internalCluster;
import static org.opensearch.test.OpenSearchIntegTestCase.resolveIndex;

public class GatewayRecoveryTestUtils {

public static DiscoveryNode[] getDiscoveryNodes() throws ExecutionException, InterruptedException {
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.local(false);
clusterStateRequest.clear().nodes(true).routingTable(true).indices("*");
ClusterStateResponse clusterStateResponse = client().admin().cluster().state(clusterStateRequest).get();
final List<DiscoveryNode> nodes = new LinkedList<>(clusterStateResponse.getState().nodes().getDataNodes().values());
DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()];
nodes.toArray(disNodesArr);
return disNodesArr;
}

public static Map<ShardId, ShardAttributes> prepareRequestMap(String[] indices, int primaryShardCount) {
Map<ShardId, ShardAttributes> shardIdShardAttributesMap = new HashMap<>();
for (String indexName : indices) {
final Index index = resolveIndex(indexName);
final String customDataPath = IndexMetadata.INDEX_DATA_PATH_SETTING.get(
client().admin().indices().prepareGetSettings(indexName).get().getIndexToSettings().get(indexName)
);
for (int shardIdNum = 0; shardIdNum < primaryShardCount; shardIdNum++) {
final ShardId shardId = new ShardId(index, shardIdNum);
shardIdShardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath));
}
}
return shardIdShardAttributesMap;
}

public static void corruptShard(String nodeName, ShardId shardId) throws IOException, InterruptedException {
for (Path path : internalCluster().getInstance(NodeEnvironment.class, nodeName).availableShardPaths(shardId)) {
final Path indexPath = path.resolve(ShardPath.INDEX_FOLDER_NAME);
if (Files.exists(indexPath)) { // multi data path might only have one path in use
try (DirectoryStream<Path> stream = Files.newDirectoryStream(indexPath)) {
for (Path item : stream) {
if (item.getFileName().toString().startsWith("segments_")) {
Files.delete(item);
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.admin.indices.stats.IndexStats;
import org.opensearch.action.admin.indices.stats.ShardStats;
Expand All @@ -60,6 +62,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.store.ShardAttributes;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.InternalTestCluster.RestartCallback;
Expand All @@ -85,6 +88,9 @@
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.gateway.GatewayRecoveryTestUtils.corruptShard;
import static org.opensearch.gateway.GatewayRecoveryTestUtils.getDiscoveryNodes;
import static org.opensearch.gateway.GatewayRecoveryTestUtils.prepareRequestMap;
import static org.opensearch.gateway.GatewayService.RECOVER_AFTER_NODES_SETTING;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.index.query.QueryBuilders.termQuery;
Expand Down Expand Up @@ -734,4 +740,97 @@ public void testMessyElectionsStillMakeClusterGoGreen() throws Exception {
internalCluster().fullRestart();
ensureGreen("test");
}

public void testSingleShardFetchUsingBatchAction() {
String indexName = "test";
int numOfShards = 1;
prepareIndex(indexName, numOfShards);
Map<ShardId, ShardAttributes> shardIdShardAttributesMap = prepareRequestMap(new String[] { indexName }, numOfShards);

ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName).get();

TransportNodesListGatewayStartedShardsBatch.NodesGatewayStartedShardsBatch response;
response = ActionTestUtils.executeBlocking(
internalCluster().getInstance(TransportNodesListGatewayStartedShardsBatch.class),
new TransportNodesListGatewayStartedShardsBatch.Request(searchShardsResponse.getNodes(), shardIdShardAttributesMap)
);
final Index index = resolveIndex(indexName);
final ShardId shardId = new ShardId(index, 0);
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards = response.getNodesMap()
.get(searchShardsResponse.getNodes()[0].getId())
.getNodeGatewayStartedShardsBatch()
.get(shardId);
assertNodeGatewayStartedShardsHappyCase(nodeGatewayStartedShards);
}

public void testShardFetchMultiNodeMultiIndexesUsingBatchAction() {
// start node
internalCluster().startNode();
String indexName1 = "test1";
String indexName2 = "test2";
int numShards = internalCluster().numDataNodes();
// assign one primary shard each to the data nodes
prepareIndex(indexName1, numShards);
prepareIndex(indexName2, numShards);
Map<ShardId, ShardAttributes> shardIdShardAttributesMap = prepareRequestMap(new String[] { indexName1, indexName2 }, numShards);
ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName1, indexName2).get();
assertEquals(internalCluster().numDataNodes(), searchShardsResponse.getNodes().length);
TransportNodesListGatewayStartedShardsBatch.NodesGatewayStartedShardsBatch response;
response = ActionTestUtils.executeBlocking(
internalCluster().getInstance(TransportNodesListGatewayStartedShardsBatch.class),
new TransportNodesListGatewayStartedShardsBatch.Request(searchShardsResponse.getNodes(), shardIdShardAttributesMap)
);
for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
ShardId shardId = clusterSearchShardsGroup.getShardId();
assertEquals(1, clusterSearchShardsGroup.getShards().length);
String nodeId = clusterSearchShardsGroup.getShards()[0].currentNodeId();
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards = response.getNodesMap()
.get(nodeId)
.getNodeGatewayStartedShardsBatch()
.get(shardId);
assertNodeGatewayStartedShardsHappyCase(nodeGatewayStartedShards);
}
}

public void testShardFetchCorruptedShardsUsingBatchAction() throws Exception {
String indexName = "test";
int numOfShards = 1;
prepareIndex(indexName, numOfShards);
Map<ShardId, ShardAttributes> shardIdShardAttributesMap = prepareRequestMap(new String[] { indexName }, numOfShards);
ClusterSearchShardsResponse searchShardsResponse = client().admin().cluster().prepareSearchShards(indexName).get();
final Index index = resolveIndex(indexName);
final ShardId shardId = new ShardId(index, 0);
corruptShard(searchShardsResponse.getNodes()[0].getName(), shardId);
TransportNodesListGatewayStartedShardsBatch.NodesGatewayStartedShardsBatch response;
internalCluster().restartNode(searchShardsResponse.getNodes()[0].getName());
response = ActionTestUtils.executeBlocking(
internalCluster().getInstance(TransportNodesListGatewayStartedShardsBatch.class),
new TransportNodesListGatewayStartedShardsBatch.Request(getDiscoveryNodes(), shardIdShardAttributesMap)
);
DiscoveryNode[] discoveryNodes = getDiscoveryNodes();
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards = response.getNodesMap()
.get(discoveryNodes[0].getId())
.getNodeGatewayStartedShardsBatch()
.get(shardId);
assertNotNull(nodeGatewayStartedShards.storeException());
assertNotNull(nodeGatewayStartedShards.allocationId());
assertTrue(nodeGatewayStartedShards.primary());
}

private void assertNodeGatewayStartedShardsHappyCase(
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard nodeGatewayStartedShards
) {
assertNull(nodeGatewayStartedShards.storeException());
assertNotNull(nodeGatewayStartedShards.allocationId());
assertTrue(nodeGatewayStartedShards.primary());
}

private void prepareIndex(String indexName, int numberOfPrimaryShards) {
createIndex(
indexName,
Settings.builder().put(SETTING_NUMBER_OF_SHARDS, numberOfPrimaryShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build()
);
index(indexName, "type", "1", Collections.emptyMap());
flush(indexName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ protected void configure() {
bind(GatewayService.class).asEagerSingleton();
bind(TransportNodesListGatewayMetaState.class).asEagerSingleton();
bind(TransportNodesListGatewayStartedShards.class).asEagerSingleton();
bind(TransportNodesListGatewayStartedShardsBatch.class).asEagerSingleton();
bind(LocalAllocateDangledIndices.class).asEagerSingleton();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.shard.ShardStateMetadata;
import org.opensearch.index.store.Store;
import org.opensearch.indices.IndicesService;

import java.io.IOException;

/**
* This class has the common code used in {@link TransportNodesListGatewayStartedShards} and
* {@link TransportNodesListGatewayStartedShardsBatch} to get the shard info on the local node.
* <p>
* This class should not be used to add more functions and will be removed when the
* {@link TransportNodesListGatewayStartedShards} will be deprecated and all the code will be moved to
* {@link TransportNodesListGatewayStartedShardsBatch}
*
* @opensearch.internal
*/
public class TransportNodesGatewayStartedShardHelper {

Check warning on line 39 in server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java#L39

Added line #L39 was not covered by tests
public static TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard getShardInfoOnLocalNode(
Logger logger,
final ShardId shardId,
NamedXContentRegistry namedXContentRegistry,
NodeEnvironment nodeEnv,
IndicesService indicesService,
String shardDataPathInRequest,
Settings settings,
ClusterService clusterService
) throws IOException {
logger.trace("{} loading local shard state info", shardId);
ShardStateMetadata shardStateMetadata = ShardStateMetadata.FORMAT.loadLatestState(
logger,
namedXContentRegistry,
nodeEnv.availableShardPaths(shardId)
);
if (shardStateMetadata != null) {
if (indicesService.getShardOrNull(shardId) == null
&& shardStateMetadata.indexDataLocation == ShardStateMetadata.IndexDataLocation.LOCAL) {
final String customDataPath;
if (shardDataPathInRequest != null) {
customDataPath = shardDataPathInRequest;
} else {
// TODO: Fallback for BWC with older OpenSearch versions.
// Remove once request.getCustomDataPath() always returns non-null
final IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex());

Check warning on line 65 in server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java#L65

Added line #L65 was not covered by tests
if (metadata != null) {
customDataPath = new IndexSettings(metadata, settings).customDataPath();

Check warning on line 67 in server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java#L67

Added line #L67 was not covered by tests
} else {
logger.trace("{} node doesn't have meta data for the requests index", shardId);
throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex());

Check warning on line 70 in server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java#L69-L70

Added lines #L69 - L70 were not covered by tests
}
}
// we don't have an open shard on the store, validate the files on disk are openable
ShardPath shardPath = null;
try {
shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath);
if (shardPath == null) {
throw new IllegalStateException(shardId + " no shard path found");

Check warning on line 78 in server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java#L78

Added line #L78 was not covered by tests
}
Store.tryOpenIndex(shardPath.resolveIndex(), shardId, nodeEnv::shardLock, logger);
} catch (Exception exception) {
final ShardPath finalShardPath = shardPath;
logger.trace(
() -> new ParameterizedMessage(

Check warning on line 84 in server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java#L81-L84

Added lines #L81 - L84 were not covered by tests
"{} can't open index for shard [{}] in path [{}]",
shardId,
shardStateMetadata,
(finalShardPath != null) ? finalShardPath.resolveIndex() : ""
),
exception
);
String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null;
return new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard(

Check warning on line 93 in server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java#L93

Added line #L93 was not covered by tests
allocationId,
shardStateMetadata.primary,
null,
exception
);
}
}

logger.debug("{} shard state info found: [{}]", shardId, shardStateMetadata);
String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null;
final IndexShard shard = indicesService.getShardOrNull(shardId);
return new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard(
allocationId,
shardStateMetadata.primary,
shard != null ? shard.getLatestReplicationCheckpoint() : null
);
}
logger.trace("{} no local shard info found", shardId);
return new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShard(null, false, null);
}
}
Loading
Loading