Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BaseGatewayShardAllocator changes for Assigning the batch of shards #8776

Merged
merged 9 commits into from
Mar 15, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
Expand All @@ -45,7 +46,9 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;

/**
* An abstract class that implements basic functionality for allocating
Expand All @@ -64,8 +67,9 @@
* Allocate an unassigned shard to nodes (if any) where valid copies of the shard already exist.
* It is up to the individual implementations of {@link #makeAllocationDecision(ShardRouting, RoutingAllocation, Logger)}
* to make decisions on assigning shards to nodes.
* @param shardRouting the shard to allocate
* @param allocation the allocation state container object
*
* @param shardRouting the shard to allocate
* @param allocation the allocation state container object
* @param unassignedAllocationHandler handles the allocation of the current shard
*/
public void allocateUnassigned(
Expand All @@ -74,7 +78,46 @@
ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler
) {
final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger);
executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler);
}

/**
* Allocate Batch of unassigned shard to nodes where valid copies of the shard already exists
* @param shardRoutings the shards to allocate
* @param allocation the allocation state container object
*/
public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAllocation allocation) {
// make Allocation Decisions for all shards
HashMap<ShardRouting, AllocateUnassignedDecision> decisionMap = makeAllocationDecision(shardRoutings, allocation, logger);

Check warning on line 91 in server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java#L91

Added line #L91 was not covered by tests
assert shardRoutings.size() == decisionMap.size() : "make allocation decision didn't return allocation decision for "
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
+ "some shards";
// get all unassigned shards iterator
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();

Check warning on line 95 in server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java#L95

Added line #L95 was not covered by tests
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved

while (iterator.hasNext()) {
ShardRouting shard = iterator.next();

Check warning on line 98 in server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java#L98

Added line #L98 was not covered by tests
try {
if (decisionMap.isEmpty() == false) {
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
if (decisionMap.containsKey(shard)) {
executeDecision(shard, decisionMap.remove(shard), allocation, iterator);

Check warning on line 102 in server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java#L102

Added line #L102 was not covered by tests
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
// no need to keep iterating the unassigned shards, if we don't have anything in decision map
break;

Check warning on line 106 in server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java#L106

Added line #L106 was not covered by tests
}
} catch (Exception e) {
logger.error("Failed to execute decision for shard {} while initializing {}", shard, e);
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
throw e;
}
}
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 113 in server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java#L108-L113

Added lines #L108 - L113 were not covered by tests

private void executeDecision(
ShardRouting shardRouting,
AllocateUnassignedDecision allocateUnassignedDecision,
RoutingAllocation allocation,
ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler
) {
if (allocateUnassignedDecision.isDecisionTaken() == false) {
// no decision was taken by this allocator
return;
Expand Down Expand Up @@ -109,9 +152,9 @@
* {@link #allocateUnassigned(ShardRouting, RoutingAllocation, ExistingShardsAllocator.UnassignedAllocationHandler)} to make decisions
* about whether or not the shard can be allocated by this allocator and if so, to which node it will be allocated.
*
* @param unassignedShard the unassigned shard to allocate
* @param allocation the current routing state
* @param logger the logger
* @param unassignedShard the unassigned shard to allocate
* @param allocation the current routing state
* @param logger the logger
* @return an {@link AllocateUnassignedDecision} with the final decision of whether to allocate and details of the decision
*/
public abstract AllocateUnassignedDecision makeAllocationDecision(
Expand All @@ -120,6 +163,21 @@
Logger logger
);

public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
List<ShardRouting> unassignedShardBatch,
RoutingAllocation allocation,
Logger logger
) {

return (HashMap<ShardRouting, AllocateUnassignedDecision>) unassignedShardBatch.stream()
.collect(
Collectors.toMap(
unassignedShard -> unassignedShard,
unassignedShard -> makeAllocationDecision(unassignedShard, allocation, logger)

Check warning on line 176 in server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java#L172-L176

Added lines #L172 - L176 were not covered by tests
)
);
}

/**
* Builds decisions for all nodes in the cluster, so that the explain API can provide information on
* allocation decisions for each node, while still waiting to allocate the shard (e.g. due to fetching shard data).
Expand Down
Loading