Skip to content

Commit

Permalink
Removed filtering per bacth in allocateUnassignedBatch
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Chandani <chngau@amazon.com>
  • Loading branch information
Gaurav614 committed Sep 28, 2023
1 parent 20e11a4 commit 1a043e7
Showing 1 changed file with 24 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,11 @@
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* An abstract class that implements basic functionality for allocating
* shards to nodes based on shard copies that already exist in the cluster.
*
* <p>
* Individual implementations of this class are responsible for providing
* the logic to determine to which nodes (if any) those shards are allocated.
*
Expand All @@ -68,8 +67,9 @@ public abstract class BaseGatewayShardAllocator {
* Allocate an unassigned shard to nodes (if any) where valid copies of the shard already exist.
* It is up to the individual implementations of {@link #makeAllocationDecision(ShardRouting, RoutingAllocation, Logger)}
* to make decisions on assigning shards to nodes.
* @param shardRouting the shard to allocate
* @param allocation the allocation state container object
*
* @param shardRouting the shard to allocate
* @param allocation the allocation state container object
* @param unassignedAllocationHandler handles the allocation of the current shard
*/
public void allocateUnassigned(
Expand All @@ -81,36 +81,28 @@ public void allocateUnassigned(
executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler);
}

/**
* Allocate Set of unassigned shard to nodes where valid copies of the shard already exists
* @param shards the shards to allocate
* @param allocation the allocation state container object
*/
public void allocateUnassignedBatch(Set<ShardRouting> shards, RoutingAllocation allocation) {
// make Allocation Decisions for all shards
HashMap<ShardRouting, AllocateUnassignedDecision> decisionMap = makeAllocationDecision(shards, allocation, logger);
assert shards.size() == decisionMap.size() : "make allocation decision didn't return allocation decision for " + "some shards";
// get all unassigned shards
// get all unassigned shards iterator
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();

while (iterator.hasNext()) {
ShardRouting shard = iterator.next();
try {
if (decisionMap.isEmpty() == false) {
if (shards.stream()
.filter(shardRouting -> shardRouting.shardId().equals(shard.shardId()) && shardRouting.primary() == shard.primary())
.count() == 1) {
List<ShardRouting> matchedShardRouting = decisionMap.keySet()
.stream()
.filter(
shardRouting -> shardRouting.shardId().equals(shard.shardId()) && shardRouting.primary() == shard.primary()
)
.collect(Collectors.toList());
if (matchedShardRouting.size() == 1) {
executeDecision(shard, decisionMap.remove(matchedShardRouting.get(0)), allocation, iterator);
} else if (matchedShardRouting.size() > 1) {
// Adding this just to check the behaviour if we ever land up here.
throw new IllegalStateException("decision map must have single entry for 1 shard");
}
if (decisionMap.containsKey(shard)) {
executeDecision(shard, decisionMap.remove(shard), allocation, iterator);
}
}
} catch (Exception e) {
logger.error("failed to execute decision for shard {} ", shard, e);
logger.error("Failed to execute decision for shard {} ", shard, e);
}
}
}
Expand Down Expand Up @@ -155,9 +147,9 @@ protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation
* {@link #allocateUnassigned(ShardRouting, RoutingAllocation, ExistingShardsAllocator.UnassignedAllocationHandler)} to make decisions
* about whether or not the shard can be allocated by this allocator and if so, to which node it will be allocated.
*
* @param unassignedShard the unassigned shard to allocate
* @param allocation the current routing state
* @param logger the logger
* @param unassignedShard the unassigned shard to allocate
* @param allocation the current routing state
* @param logger the logger
* @return an {@link AllocateUnassignedDecision} with the final decision of whether to allocate and details of the decision
*/
public abstract AllocateUnassignedDecision makeAllocationDecision(
Expand All @@ -166,11 +158,17 @@ public abstract AllocateUnassignedDecision makeAllocationDecision(
Logger logger
);

public abstract HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
Set<ShardRouting> shards,
RoutingAllocation allocation,
Logger logger
);
) {
HashMap<ShardRouting, AllocateUnassignedDecision> allocationDecisions = new HashMap<>();
for (ShardRouting unassignedShard : shards) {
allocationDecisions.put(unassignedShard, makeAllocationDecision(unassignedShard, allocation, logger));
}
return allocationDecisions;
}

/**
* Builds decisions for all nodes in the cluster, so that the explain API can provide information on
Expand Down

0 comments on commit 1a043e7

Please sign in to comment.