From 432efc19f90a34a7b298f91a3b5ff7b342bdf0bd Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Wed, 26 Jul 2023 10:51:12 +0530 Subject: [PATCH 1/9] Allocation service changes for batch assignment Signed-off-by: Gaurav Chandani --- .../routing/allocation/AllocationService.java | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java index 5169e63aeb9a5..68df385248ce2 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java @@ -547,6 +547,22 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) { existingShardsAllocator.beforeAllocation(allocation); } + // batch Mode enabled setting to be added + boolean batchModeEnabled = true; + if (batchModeEnabled) { + // since allocators is per index setting, to have batch assignment verify allocators same for all shards + // if not fallback to single assignment + ExistingShardsAllocator allocator = verifySameAllocatorForAllShards(allocation); + if (allocator != null) { + allocator.allocateUnassignedBatch(allocation, true); + for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) { + existingShardsAllocator.afterPrimariesBeforeReplicas(allocation); + } + allocator.allocateUnassignedBatch(allocation, false); + return; + } + } + final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator(); while (primaryIterator.hasNext()) { @@ -569,6 +585,27 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { } } + private ExistingShardsAllocator verifySameAllocatorForAllShards(RoutingAllocation allocation) { + // if there is a single Allocator set in Allocation Service then use it for all shards + if (existingShardsAllocators.size() == 1) { + return existingShardsAllocators.values().iterator().next(); + } + RoutingNodes.UnassignedShards unassignedShards = allocation.routingNodes().unassigned(); + RoutingNodes.UnassignedShards.UnassignedIterator iterator = unassignedShards.iterator(); + ExistingShardsAllocator currentAllocatorForShard =null; + if (unassignedShards.size() > 0) { + ShardRouting shard = iterator.next(); + currentAllocatorForShard= getAllocatorForShard(shard, allocation); + while (iterator.hasNext()){ + ExistingShardsAllocator allocatorForShard = getAllocatorForShard(iterator.next(), allocation); + if (currentAllocatorForShard.getClass().getName().equals(allocatorForShard.getClass().getName())==false){ + return null; + } + } + } + return currentAllocatorForShard; + } + private void disassociateDeadNodes(RoutingAllocation allocation) { for (Iterator it = allocation.routingNodes().mutableIterator(); it.hasNext();) { RoutingNode node = it.next(); From eecf79fcc927b4bd2b46c09a98896240b2347ce4 Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Thu, 28 Sep 2023 13:27:39 +0530 Subject: [PATCH 2/9] Added batch mode setting Signed-off-by: Gaurav Chandani --- .../routing/allocation/AllocationService.java | 46 +++++++++++++++---- .../allocation/ExistingShardsAllocator.java | 7 +++ 2 files changed, 43 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java index 68df385248ce2..3c38445d558c7 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.Version; import org.opensearch.cluster.ClusterInfoService; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.RestoreInProgress; @@ -55,6 +56,7 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.common.collect.ImmutableOpenMap; +import org.opensearch.common.settings.Settings; import org.opensearch.gateway.GatewayAllocator; import org.opensearch.gateway.PriorityComparator; import org.opensearch.snapshots.SnapshotsInfoService; @@ -73,6 +75,7 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; +import static org.opensearch.cluster.routing.allocation.ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED; /** * This service manages the node allocation of a cluster. For this reason the @@ -87,6 +90,7 @@ public class AllocationService { private static final Logger logger = LogManager.getLogger(AllocationService.class); private final AllocationDeciders allocationDeciders; + private Settings settings; private Map existingShardsAllocators; private final ShardsAllocator shardsAllocator; private final ClusterInfoService clusterInfoService; @@ -114,6 +118,22 @@ public AllocationService( this.shardsAllocator = shardsAllocator; this.clusterInfoService = clusterInfoService; this.snapshotsInfoService = snapshotsInfoService; + this.settings = Settings.EMPTY; + } + + public AllocationService( + AllocationDeciders allocationDeciders, + ShardsAllocator shardsAllocator, + ClusterInfoService clusterInfoService, + SnapshotsInfoService snapshotsInfoService, + Settings settings + + ) { + this.allocationDeciders = allocationDeciders; + this.shardsAllocator = shardsAllocator; + this.clusterInfoService = clusterInfoService; + this.snapshotsInfoService = snapshotsInfoService; + this.settings = settings; } /** @@ -547,12 +567,13 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) { existingShardsAllocator.beforeAllocation(allocation); } - // batch Mode enabled setting to be added - boolean batchModeEnabled = true; - if (batchModeEnabled) { + + Boolean batchModeEnabled = EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED.get(settings); + + if (batchModeEnabled && allocation.nodes().getMinNodeVersion().onOrAfter(Version.CURRENT)) { // since allocators is per index setting, to have batch assignment verify allocators same for all shards // if not fallback to single assignment - ExistingShardsAllocator allocator = verifySameAllocatorForAllShards(allocation); + ExistingShardsAllocator allocator = verifySameAllocatorForAllUnassignedShards(allocation); if (allocator != null) { allocator.allocateUnassignedBatch(allocation, true); for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) { @@ -563,7 +584,6 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { } } - final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator(); while (primaryIterator.hasNext()) { final ShardRouting shardRouting = primaryIterator.next(); @@ -585,20 +605,26 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { } } - private ExistingShardsAllocator verifySameAllocatorForAllShards(RoutingAllocation allocation) { + /** + * Verify if all unassigned shards are allocated by the same allocator, if yes then return the allocator, else + * return null + * @param allocation {@link RoutingAllocation} + * @return {@link ExistingShardsAllocator} or null + */ + private ExistingShardsAllocator verifySameAllocatorForAllUnassignedShards(RoutingAllocation allocation) { // if there is a single Allocator set in Allocation Service then use it for all shards if (existingShardsAllocators.size() == 1) { return existingShardsAllocators.values().iterator().next(); } RoutingNodes.UnassignedShards unassignedShards = allocation.routingNodes().unassigned(); RoutingNodes.UnassignedShards.UnassignedIterator iterator = unassignedShards.iterator(); - ExistingShardsAllocator currentAllocatorForShard =null; + ExistingShardsAllocator currentAllocatorForShard = null; if (unassignedShards.size() > 0) { ShardRouting shard = iterator.next(); - currentAllocatorForShard= getAllocatorForShard(shard, allocation); - while (iterator.hasNext()){ + currentAllocatorForShard = getAllocatorForShard(shard, allocation); + while (iterator.hasNext()) { ExistingShardsAllocator allocatorForShard = getAllocatorForShard(iterator.next(), allocation); - if (currentAllocatorForShard.getClass().getName().equals(allocatorForShard.getClass().getName())==false){ + if (currentAllocatorForShard.getClass().getName().equals(allocatorForShard.getClass().getName()) == false) { return null; } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java index f1889cdf780d4..f18779d06e082 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java @@ -60,6 +60,13 @@ public interface ExistingShardsAllocator { Setting.Property.PrivateIndex ); + public static final Setting EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED = Setting.boolSetting( + "cluster.allocator.existing_shards_allocator.batch_enable", + true, + Setting.Property.NodeScope + ); + + /** * Called before starting a round of allocation, allowing the allocator to invalidate some caches if appropriate. */ From c2be078e547febb21830571df40f3e1bdf16bcbe Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Wed, 29 Nov 2023 17:15:02 +0530 Subject: [PATCH 3/9] Added missed clusterModule changes Signed-off-by: Gaurav Chandani --- server/src/main/java/org/opensearch/cluster/ClusterModule.java | 2 +- .../java/org/opensearch/common/settings/ClusterSettings.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index 8a4e17e5c0dc3..cbf514d051662 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -150,7 +150,7 @@ public ClusterModule( this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins); this.clusterService = clusterService; this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext); - this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService); + this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, settings); } public static List getNamedWriteables() { diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 208a358d38395..eadf0abaee92b 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -79,6 +79,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.OperationRouting; import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; +import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator; import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; @@ -246,6 +247,7 @@ public void apply(Settings value, Settings current, Settings previous) { DanglingIndicesState.AUTO_IMPORT_DANGLING_INDICES_SETTING, EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING, EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING, + ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED, FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING, FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING, FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP_SETTING, From 3bb7dd8a75e4024f099a9948ee142f28782689bb Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Tue, 5 Dec 2023 19:03:22 +0530 Subject: [PATCH 4/9] Addressed PR comments 1. Moved setting of batch enable disable in this PR 2. Added java docs Signed-off-by: Gaurav Chandani --- .../org/opensearch/cluster/ClusterModule.java | 8 +++- .../routing/allocation/AllocationService.java | 10 ++--- .../allocation/ExistingShardsAllocator.java | 38 +++++++++++++++++-- 3 files changed, 45 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index cbf514d051662..17e24a5b6220e 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -150,7 +150,13 @@ public ClusterModule( this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins); this.clusterService = clusterService; this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext); - this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, settings); + this.allocationService = new AllocationService( + allocationDeciders, + shardsAllocator, + clusterInfoService, + snapshotsInfoService, + settings + ); } public static List getNamedWriteables() { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java index 3c38445d558c7..9426f1cf3f80d 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java @@ -75,7 +75,7 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; -import static org.opensearch.cluster.routing.allocation.ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED; +import static org.opensearch.cluster.routing.allocation.ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE; /** * This service manages the node allocation of a cluster. For this reason the @@ -114,11 +114,7 @@ public AllocationService( ClusterInfoService clusterInfoService, SnapshotsInfoService snapshotsInfoService ) { - this.allocationDeciders = allocationDeciders; - this.shardsAllocator = shardsAllocator; - this.clusterInfoService = clusterInfoService; - this.snapshotsInfoService = snapshotsInfoService; - this.settings = Settings.EMPTY; + this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, Settings.EMPTY); } public AllocationService( @@ -568,7 +564,7 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { existingShardsAllocator.beforeAllocation(allocation); } - Boolean batchModeEnabled = EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED.get(settings); + Boolean batchModeEnabled = EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(settings); if (batchModeEnabled && allocation.nodes().getMinNodeVersion().onOrAfter(Version.CURRENT)) { // since allocators is per index setting, to have batch assignment verify allocators same for all shards diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java index f18779d06e082..f0e5342f8893a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java @@ -60,13 +60,30 @@ public interface ExistingShardsAllocator { Setting.Property.PrivateIndex ); - public static final Setting EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED = Setting.boolSetting( + /** + * Boolean setting to enable/disable batch allocation of unassigned shards already existing on disk. + * This will allow sending all Unassigned Shards to the ExistingShard Allocator to make decision to allocate + * in one or more go. + * + * Enable this setting if your ExistingShardAllocator is implementing the + * {@link ExistingShardsAllocator#allocateUnassignedBatch(RoutingAllocation, boolean)} method. + * The default implementation of this method is not optimized and assigns shards one by one. + * + * If enable to true then it expects all indices of the shard to use same {@link ExistingShardsAllocator}, otherwise + * Allocation Service will fallback to default implementation i.e. {@link ExistingShardsAllocator#allocateUnassigned(ShardRouting, RoutingAllocation, UnassignedAllocationHandler)} + * + * If no plugin overrides {@link ExistingShardsAllocator} then default implementation will be use for it , i.e, + * {@link GatewayAllocator} + * + * TODO: Currently its implementation is WIP for GatewayAllocator so setting enabling wont have any effect + * https://github.com/opensearch-project/OpenSearch/issues/5098 + */ + Setting EXISTING_SHARDS_ALLOCATOR_BATCH_MODE = Setting.boolSetting( "cluster.allocator.existing_shards_allocator.batch_enable", - true, + false, Setting.Property.NodeScope ); - /** * Called before starting a round of allocation, allowing the allocator to invalidate some caches if appropriate. */ @@ -87,6 +104,21 @@ void allocateUnassigned( UnassignedAllocationHandler unassignedAllocationHandler ); + /** + * Allocate all unassigned shards in the given {@link RoutingAllocation} for which this {@link ExistingShardsAllocator} is responsible. + * Default implementation calls {@link #allocateUnassigned(ShardRouting, RoutingAllocation, UnassignedAllocationHandler)} for each Unassigned shard + * and is kept here for backward compatibility. + */ + default void allocateUnassignedBatch(RoutingAllocation allocation, boolean primary) { + RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); + while (iterator.hasNext()) { + ShardRouting shardRouting = iterator.next(); + if (shardRouting.primary() == primary) { + allocateUnassigned(shardRouting, allocation, iterator); + } + } + } + /** * Returns an explanation for a single unassigned shard. */ From 6e645a039e8573a53a1f8524a03d2f5e762d3bd0 Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Thu, 14 Dec 2023 16:45:03 +0530 Subject: [PATCH 5/9] Added Changes for ShardBatchGatewayAllocator Signed-off-by: Gaurav Chandani --- .../cluster/routing/allocation/AllocationService.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java index 9426f1cf3f80d..04b5a2179b069 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java @@ -59,11 +59,13 @@ import org.opensearch.common.settings.Settings; import org.opensearch.gateway.GatewayAllocator; import org.opensearch.gateway.PriorityComparator; +import org.opensearch.gateway.ShardsBatchGatewayAllocator; import org.opensearch.snapshots.SnapshotsInfoService; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -571,12 +573,21 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { // if not fallback to single assignment ExistingShardsAllocator allocator = verifySameAllocatorForAllUnassignedShards(allocation); if (allocator != null) { + // use batch mode implementation of GatewayAllocator + if (allocator.getClass() == GatewayAllocator.class) { + allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME); + } + allocator.allocateUnassignedBatch(allocation, true); for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) { existingShardsAllocator.afterPrimariesBeforeReplicas(allocation); } allocator.allocateUnassignedBatch(allocation, false); return; + } else { + // it means though batch mode is enabled but some indices have custom allocator set and we cant do Batch recover in that + // case fallback to single assignment and + logger.debug("Batch mode is enabled but some indices have custom allocator set. Falling back to single assignment"); } } From b1994fa1aff4789ae1ea125522a198c2e5970935 Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Wed, 20 Dec 2023 11:46:15 +0530 Subject: [PATCH 6/9] Cosmetic changes in AllocationService Signed-off-by: Gaurav Chandani --- .../cluster/routing/allocation/AllocationService.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java index 04b5a2179b069..9123724d648c4 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java @@ -571,7 +571,7 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { if (batchModeEnabled && allocation.nodes().getMinNodeVersion().onOrAfter(Version.CURRENT)) { // since allocators is per index setting, to have batch assignment verify allocators same for all shards // if not fallback to single assignment - ExistingShardsAllocator allocator = verifySameAllocatorForAllUnassignedShards(allocation); + ExistingShardsAllocator allocator = getAndVerifySameAllocatorForAllUnassignedShards(allocation); if (allocator != null) { // use batch mode implementation of GatewayAllocator if (allocator.getClass() == GatewayAllocator.class) { @@ -618,7 +618,7 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { * @param allocation {@link RoutingAllocation} * @return {@link ExistingShardsAllocator} or null */ - private ExistingShardsAllocator verifySameAllocatorForAllUnassignedShards(RoutingAllocation allocation) { + private ExistingShardsAllocator getAndVerifySameAllocatorForAllUnassignedShards(RoutingAllocation allocation) { // if there is a single Allocator set in Allocation Service then use it for all shards if (existingShardsAllocators.size() == 1) { return existingShardsAllocators.values().iterator().next(); @@ -627,8 +627,7 @@ private ExistingShardsAllocator verifySameAllocatorForAllUnassignedShards(Routin RoutingNodes.UnassignedShards.UnassignedIterator iterator = unassignedShards.iterator(); ExistingShardsAllocator currentAllocatorForShard = null; if (unassignedShards.size() > 0) { - ShardRouting shard = iterator.next(); - currentAllocatorForShard = getAllocatorForShard(shard, allocation); + currentAllocatorForShard = getAllocatorForShard(iterator.next(), allocation); while (iterator.hasNext()) { ExistingShardsAllocator allocatorForShard = getAllocatorForShard(iterator.next(), allocation); if (currentAllocatorForShard.getClass().getName().equals(allocatorForShard.getClass().getName()) == false) { From 257f36d299f321f750fcd880b9d62601737ce44c Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Fri, 12 Jan 2024 13:34:31 +0530 Subject: [PATCH 7/9] PR comments 1. Made changes so that Allocation Service run only default implementation of batch mode 2. Renamed methods 3. Added and modified documenatation Signed-off-by: Gaurav Chandani --- .../routing/allocation/AllocationService.java | 77 +++++-------------- .../allocation/ExistingShardsAllocator.java | 14 ++-- 2 files changed, 30 insertions(+), 61 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java index 9123724d648c4..b721cf84d40cb 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java @@ -55,7 +55,6 @@ import org.opensearch.cluster.routing.allocation.command.AllocationCommands; import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; -import org.opensearch.common.collect.ImmutableOpenMap; import org.opensearch.common.settings.Settings; import org.opensearch.gateway.GatewayAllocator; import org.opensearch.gateway.PriorityComparator; @@ -201,9 +200,9 @@ private ClusterState buildResult(ClusterState oldState, RoutingAllocation alloca if (restoreInProgress != null) { RestoreInProgress updatedRestoreInProgress = allocation.updateRestoreInfoWithRoutingChanges(restoreInProgress); if (updatedRestoreInProgress != restoreInProgress) { - ImmutableOpenMap.Builder customsBuilder = ImmutableOpenMap.builder(allocation.getCustoms()); + final Map customsBuilder = new HashMap<>(allocation.getCustoms()); customsBuilder.put(RestoreInProgress.TYPE, updatedRestoreInProgress); - newStateBuilder.customs(customsBuilder.build()); + newStateBuilder.customs(customsBuilder); } } return newStateBuilder.build(); @@ -566,30 +565,22 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { existingShardsAllocator.beforeAllocation(allocation); } + /* + Use batch mode if enabled and there is no custom allocator set for Allocation service + */ Boolean batchModeEnabled = EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(settings); - - if (batchModeEnabled && allocation.nodes().getMinNodeVersion().onOrAfter(Version.CURRENT)) { - // since allocators is per index setting, to have batch assignment verify allocators same for all shards - // if not fallback to single assignment - ExistingShardsAllocator allocator = getAndVerifySameAllocatorForAllUnassignedShards(allocation); - if (allocator != null) { - // use batch mode implementation of GatewayAllocator - if (allocator.getClass() == GatewayAllocator.class) { - allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME); - } - - allocator.allocateUnassignedBatch(allocation, true); - for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) { - existingShardsAllocator.afterPrimariesBeforeReplicas(allocation); - } - allocator.allocateUnassignedBatch(allocation, false); - return; - } else { - // it means though batch mode is enabled but some indices have custom allocator set and we cant do Batch recover in that - // case fallback to single assignment and - logger.debug("Batch mode is enabled but some indices have custom allocator set. Falling back to single assignment"); - } - } + if (batchModeEnabled && allocation.nodes().getMinNodeVersion().onOrAfter(Version.CURRENT) && existingShardsAllocators.size() == 2) { + /* + If we do not have any custom allocator set then we will be using ShardsBatchGatewayAllocator + Currently AllocationService will not run any custom Allocator that implements allocateAllUnassignedShards + */ + ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME); + allocator.allocateAllUnassignedShards(allocation, true); + allocator.afterPrimariesBeforeReplicas(allocation); + allocator.allocateAllUnassignedShards(allocation, false); + return; + } + logger.warn("Falling back to single shard assignment since batch mode disable or multiple custom allocators set"); final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator(); while (primaryIterator.hasNext()) { @@ -612,32 +603,6 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { } } - /** - * Verify if all unassigned shards are allocated by the same allocator, if yes then return the allocator, else - * return null - * @param allocation {@link RoutingAllocation} - * @return {@link ExistingShardsAllocator} or null - */ - private ExistingShardsAllocator getAndVerifySameAllocatorForAllUnassignedShards(RoutingAllocation allocation) { - // if there is a single Allocator set in Allocation Service then use it for all shards - if (existingShardsAllocators.size() == 1) { - return existingShardsAllocators.values().iterator().next(); - } - RoutingNodes.UnassignedShards unassignedShards = allocation.routingNodes().unassigned(); - RoutingNodes.UnassignedShards.UnassignedIterator iterator = unassignedShards.iterator(); - ExistingShardsAllocator currentAllocatorForShard = null; - if (unassignedShards.size() > 0) { - currentAllocatorForShard = getAllocatorForShard(iterator.next(), allocation); - while (iterator.hasNext()) { - ExistingShardsAllocator allocatorForShard = getAllocatorForShard(iterator.next(), allocation); - if (currentAllocatorForShard.getClass().getName().equals(allocatorForShard.getClass().getName()) == false) { - return null; - } - } - } - return currentAllocatorForShard; - } - private void disassociateDeadNodes(RoutingAllocation allocation) { for (Iterator it = allocation.routingNodes().mutableIterator(); it.hasNext();) { RoutingNode node = it.next(); @@ -677,9 +642,9 @@ private void applyStartedShards(RoutingAllocation routingAllocation, List EXISTING_SHARDS_ALLOCATOR_BATCH_MODE = Setting.boolSetting( - "cluster.allocator.existing_shards_allocator.batch_enable", + "cluster.allocator.existing_shards_allocator.batch_enabled", false, Setting.Property.NodeScope ); @@ -108,8 +110,10 @@ void allocateUnassigned( * Allocate all unassigned shards in the given {@link RoutingAllocation} for which this {@link ExistingShardsAllocator} is responsible. * Default implementation calls {@link #allocateUnassigned(ShardRouting, RoutingAllocation, UnassignedAllocationHandler)} for each Unassigned shard * and is kept here for backward compatibility. + * + * Allocation service will currently run the default implementation of it implemented by {@link ShardsBatchGatewayAllocator} */ - default void allocateUnassignedBatch(RoutingAllocation allocation, boolean primary) { + default void allocateAllUnassignedShards(RoutingAllocation allocation, boolean primary) { RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); while (iterator.hasNext()) { ShardRouting shardRouting = iterator.next(); From 1574fc2ef469641fe59b1eac3ca3fbc92d795071 Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Mon, 12 Feb 2024 15:14:27 +0530 Subject: [PATCH 8/9] Fixed PR comments around documentation Signed-off-by: Gaurav Chandani --- .../cluster/routing/allocation/AllocationService.java | 8 +++++--- .../routing/allocation/ExistingShardsAllocator.java | 9 ++------- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java index b721cf84d40cb..c2f793d6977aa 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java @@ -55,6 +55,7 @@ import org.opensearch.cluster.routing.allocation.command.AllocationCommands; import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; +import org.opensearch.common.collect.ImmutableOpenMap; import org.opensearch.common.settings.Settings; import org.opensearch.gateway.GatewayAllocator; import org.opensearch.gateway.PriorityComparator; @@ -200,9 +201,9 @@ private ClusterState buildResult(ClusterState oldState, RoutingAllocation alloca if (restoreInProgress != null) { RestoreInProgress updatedRestoreInProgress = allocation.updateRestoreInfoWithRoutingChanges(restoreInProgress); if (updatedRestoreInProgress != restoreInProgress) { - final Map customsBuilder = new HashMap<>(allocation.getCustoms()); + ImmutableOpenMap.Builder customsBuilder = ImmutableOpenMap.builder(allocation.getCustoms()); customsBuilder.put(RestoreInProgress.TYPE, updatedRestoreInProgress); - newStateBuilder.customs(customsBuilder); + newStateBuilder.customs(customsBuilder.build()); } } return newStateBuilder.build(); @@ -577,6 +578,7 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME); allocator.allocateAllUnassignedShards(allocation, true); allocator.afterPrimariesBeforeReplicas(allocation); + // Replicas Assignment allocator.allocateAllUnassignedShards(allocation, false); return; } @@ -704,7 +706,7 @@ private ExistingShardsAllocator getAllocatorForShard(ShardRouting shardRouting, final String allocatorName = ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING.get( routingAllocation.metadata().getIndexSafe(shardRouting.index()).getSettings() ); - ExistingShardsAllocator existingShardsAllocator = existingShardsAllocators.get(allocatorName); + final ExistingShardsAllocator existingShardsAllocator = existingShardsAllocators.get(allocatorName); return existingShardsAllocator != null ? existingShardsAllocator : new NotFoundAllocator(allocatorName); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java index 4b3325ce73301..fb2a37237f8b6 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java @@ -70,15 +70,10 @@ public interface ExistingShardsAllocator { * {@link ExistingShardsAllocator#allocateAllUnassignedShards(RoutingAllocation, boolean)} method. * The default implementation of this method is not optimized and assigns shards one by one. * - * If enable to true then it expects all indices of the shard to use same {@link ExistingShardsAllocator}, otherwise - * Allocation Service will fallback to default implementation i.e. {@link ExistingShardsAllocator#allocateUnassigned(ShardRouting, RoutingAllocation, UnassignedAllocationHandler)} - * * If no plugin overrides {@link ExistingShardsAllocator} then default implementation will be use for it , i.e, - * {@link ShardsBatchGatewayAllocator}. Right now even if plugin implements it, AllocationService will run the - * default implementation to enable Batch mode of assignment + * {@link ShardsBatchGatewayAllocator}. * - * TODO: Currently its implementation is WIP for GatewayAllocator so setting enabling wont have any effect - * https://github.com/opensearch-project/OpenSearch/issues/5098 + * This setting is experimental at this point. */ Setting EXISTING_SHARDS_ALLOCATOR_BATCH_MODE = Setting.boolSetting( "cluster.allocator.existing_shards_allocator.batch_enabled", From d3b11404e9f92d0f490c11c09042917e756df518 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Wed, 13 Mar 2024 12:52:18 +0530 Subject: [PATCH 9/9] Correct version check with 3.0.0 Signed-off-by: Aman Khare --- .../cluster/routing/allocation/AllocationService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java index c2f793d6977aa..8c6820c0825e1 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java @@ -570,7 +570,7 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { Use batch mode if enabled and there is no custom allocator set for Allocation service */ Boolean batchModeEnabled = EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.get(settings); - if (batchModeEnabled && allocation.nodes().getMinNodeVersion().onOrAfter(Version.CURRENT) && existingShardsAllocators.size() == 2) { + if (batchModeEnabled && allocation.nodes().getMinNodeVersion().onOrAfter(Version.V_3_0_0) && existingShardsAllocators.size() == 2) { /* If we do not have any custom allocator set then we will be using ShardsBatchGatewayAllocator Currently AllocationService will not run any custom Allocator that implements allocateAllUnassignedShards