From 1a043e7914091671b11466bc2ca50b8a9b7c3cce Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Thu, 28 Sep 2023 12:48:26 +0530 Subject: [PATCH] Removed filtering per bacth in allocateUnassignedBatch Signed-off-by: Gaurav Chandani --- .../gateway/BaseGatewayShardAllocator.java | 50 +++++++++---------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java index f48a9bcc42cd4..865eeccb8bb82 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/BaseGatewayShardAllocator.java @@ -49,12 +49,11 @@ import java.util.HashMap; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; /** * An abstract class that implements basic functionality for allocating * shards to nodes based on shard copies that already exist in the cluster. - * + *

* Individual implementations of this class are responsible for providing * the logic to determine to which nodes (if any) those shards are allocated. * @@ -68,8 +67,9 @@ public abstract class BaseGatewayShardAllocator { * Allocate an unassigned shard to nodes (if any) where valid copies of the shard already exist. * It is up to the individual implementations of {@link #makeAllocationDecision(ShardRouting, RoutingAllocation, Logger)} * to make decisions on assigning shards to nodes. - * @param shardRouting the shard to allocate - * @param allocation the allocation state container object + * + * @param shardRouting the shard to allocate + * @param allocation the allocation state container object * @param unassignedAllocationHandler handles the allocation of the current shard */ public void allocateUnassigned( @@ -81,36 +81,28 @@ public void allocateUnassigned( executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler); } + /** + * Allocate Set of unassigned shard to nodes where valid copies of the shard already exists + * @param shards the shards to allocate + * @param allocation the allocation state container object + */ public void allocateUnassignedBatch(Set shards, RoutingAllocation allocation) { // make Allocation Decisions for all shards HashMap decisionMap = makeAllocationDecision(shards, allocation, logger); assert shards.size() == decisionMap.size() : "make allocation decision didn't return allocation decision for " + "some shards"; - // get all unassigned shards + // get all unassigned shards iterator RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); while (iterator.hasNext()) { ShardRouting shard = iterator.next(); try { if (decisionMap.isEmpty() == false) { - if (shards.stream() - .filter(shardRouting -> shardRouting.shardId().equals(shard.shardId()) && shardRouting.primary() == shard.primary()) - .count() == 1) { - List matchedShardRouting = decisionMap.keySet() - .stream() - .filter( - shardRouting -> shardRouting.shardId().equals(shard.shardId()) && shardRouting.primary() == shard.primary() - ) - .collect(Collectors.toList()); - if (matchedShardRouting.size() == 1) { - executeDecision(shard, decisionMap.remove(matchedShardRouting.get(0)), allocation, iterator); - } else if (matchedShardRouting.size() > 1) { - // Adding this just to check the behaviour if we ever land up here. - throw new IllegalStateException("decision map must have single entry for 1 shard"); - } + if (decisionMap.containsKey(shard)) { + executeDecision(shard, decisionMap.remove(shard), allocation, iterator); } } } catch (Exception e) { - logger.error("failed to execute decision for shard {} ", shard, e); + logger.error("Failed to execute decision for shard {} ", shard, e); } } } @@ -155,9 +147,9 @@ protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation * {@link #allocateUnassigned(ShardRouting, RoutingAllocation, ExistingShardsAllocator.UnassignedAllocationHandler)} to make decisions * about whether or not the shard can be allocated by this allocator and if so, to which node it will be allocated. * - * @param unassignedShard the unassigned shard to allocate - * @param allocation the current routing state - * @param logger the logger + * @param unassignedShard the unassigned shard to allocate + * @param allocation the current routing state + * @param logger the logger * @return an {@link AllocateUnassignedDecision} with the final decision of whether to allocate and details of the decision */ public abstract AllocateUnassignedDecision makeAllocationDecision( @@ -166,11 +158,17 @@ public abstract AllocateUnassignedDecision makeAllocationDecision( Logger logger ); - public abstract HashMap makeAllocationDecision( + public HashMap makeAllocationDecision( Set shards, RoutingAllocation allocation, Logger logger - ); + ) { + HashMap allocationDecisions = new HashMap<>(); + for (ShardRouting unassignedShard : shards) { + allocationDecisions.put(unassignedShard, makeAllocationDecision(unassignedShard, allocation, logger)); + } + return allocationDecisions; + } /** * Builds decisions for all nodes in the cluster, so that the explain API can provide information on