Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BaseGatewayShardAllocator changes for Assigning the batch of shards #8776

Merged
merged 9 commits into from
Mar 15, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
Expand All @@ -45,12 +46,14 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;

/**
* An abstract class that implements basic functionality for allocating
* shards to nodes based on shard copies that already exist in the cluster.
*
* <p>
* Individual implementations of this class are responsible for providing
* the logic to determine to which nodes (if any) those shards are allocated.
*
Expand All @@ -64,8 +67,9 @@ public abstract class BaseGatewayShardAllocator {
* Allocate an unassigned shard to nodes (if any) where valid copies of the shard already exist.
* It is up to the individual implementations of {@link #makeAllocationDecision(ShardRouting, RoutingAllocation, Logger)}
* to make decisions on assigning shards to nodes.
* @param shardRouting the shard to allocate
* @param allocation the allocation state container object
*
* @param shardRouting the shard to allocate
* @param allocation the allocation state container object
* @param unassignedAllocationHandler handles the allocation of the current shard
*/
public void allocateUnassigned(
Expand All @@ -74,7 +78,41 @@ public void allocateUnassigned(
ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler
) {
final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger);
executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler);
}

/**
* Allocate Set of unassigned shard to nodes where valid copies of the shard already exists
* @param shards the shards to allocate
* @param allocation the allocation state container object
*/
public void allocateUnassignedBatch(Set<ShardRouting> shards, RoutingAllocation allocation) {
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
// make Allocation Decisions for all shards
HashMap<ShardRouting, AllocateUnassignedDecision> decisionMap = makeAllocationDecision(shards, allocation, logger);
assert shards.size() == decisionMap.size() : "make allocation decision didn't return allocation decision for " + "some shards";
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
// get all unassigned shards iterator
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved

while (iterator.hasNext()) {
ShardRouting shard = iterator.next();
try {
if (decisionMap.isEmpty() == false) {
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
if (decisionMap.containsKey(shard)) {
executeDecision(shard, decisionMap.remove(shard), allocation, iterator);
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
}
}
} catch (Exception e) {
logger.error("Failed to execute decision for shard {} ", shard, e);
}
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
}
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
}

private void executeDecision(
ShardRouting shardRouting,
AllocateUnassignedDecision allocateUnassignedDecision,
RoutingAllocation allocation,
ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler
) {
if (allocateUnassignedDecision.isDecisionTaken() == false) {
// no decision was taken by this allocator
return;
Expand Down Expand Up @@ -109,9 +147,9 @@ protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation
* {@link #allocateUnassigned(ShardRouting, RoutingAllocation, ExistingShardsAllocator.UnassignedAllocationHandler)} to make decisions
* about whether or not the shard can be allocated by this allocator and if so, to which node it will be allocated.
*
* @param unassignedShard the unassigned shard to allocate
* @param allocation the current routing state
* @param logger the logger
* @param unassignedShard the unassigned shard to allocate
* @param allocation the current routing state
* @param logger the logger
* @return an {@link AllocateUnassignedDecision} with the final decision of whether to allocate and details of the decision
*/
public abstract AllocateUnassignedDecision makeAllocationDecision(
Expand All @@ -120,6 +158,18 @@ public abstract AllocateUnassignedDecision makeAllocationDecision(
Logger logger
);

public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
Set<ShardRouting> shards,
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
RoutingAllocation allocation,
Logger logger
) {
HashMap<ShardRouting, AllocateUnassignedDecision> allocationDecisions = new HashMap<>();
for (ShardRouting unassignedShard : shards) {
allocationDecisions.put(unassignedShard, makeAllocationDecision(unassignedShard, allocation, logger));
}
return allocationDecisions;
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Builds decisions for all nodes in the cluster, so that the explain API can provide information on
* allocation decisions for each node, while still waiting to allocate the shard (e.g. due to fetching shard data).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -457,6 +458,16 @@ private static NodesToAllocate buildNodesToAllocate(

protected abstract FetchResult<NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation);

@Override
// to be override
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
Set<ShardRouting> shards,
RoutingAllocation allocation,
Logger logger
) {
return null;
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
}

private static class NodeShardsResult {
final List<NodeGatewayStartedShards> orderedAllocationCandidates;
final int allocationsFound;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,16 @@ private static boolean canPerformOperationBasedRecovery(

protected abstract AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> fetchData(ShardRouting shard, RoutingAllocation allocation);

@Override
// to be override
public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
Set<ShardRouting> shards,
RoutingAllocation allocation,
Logger logger
) {
return null;
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Returns a boolean indicating whether fetching shard data has been triggered at any point for the given shard.
*/
Expand Down
Loading