Skip to content

Commit

Permalink
BaseGatewayShardAllocator changes for Assigning the batch of shards
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Chandani <chngau@amazon.com>
  • Loading branch information
Gaurav614 committed Jul 19, 2023
1 parent e1da84d commit 37796c1
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.AllocationDecision;
Expand All @@ -46,6 +47,8 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

/**
* An abstract class that implements basic functionality for allocating
Expand Down Expand Up @@ -74,7 +77,26 @@ public void allocateUnassigned(
ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler
) {
final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger);
executeDecision(shardRouting, allocateUnassignedDecision, allocation, unassignedAllocationHandler);
}

public void allocateUnassignedBatch(Set<ShardRouting> shards, RoutingAllocation allocation) {
// make Allocation Decisions for all shards
ConcurrentMap<ShardRouting, AllocateUnassignedDecision> decisionMap = makeAllocationDecision(shards, allocation, logger);
// get all unassigned shards
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()){
ShardRouting shard = iterator.next();
if (shards.contains(shard)) {
executeDecision(shard, decisionMap.get(shard), allocation, iterator);
}
}
}

private void executeDecision(ShardRouting shardRouting,
AllocateUnassignedDecision allocateUnassignedDecision,
RoutingAllocation allocation,
ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler) {
if (allocateUnassignedDecision.isDecisionTaken() == false) {
// no decision was taken by this allocator
return;
Expand All @@ -91,7 +113,6 @@ public void allocateUnassigned(
unassignedAllocationHandler.removeAndIgnore(allocateUnassignedDecision.getAllocationStatus(), allocation.changes());
}
}

protected long getExpectedShardSize(ShardRouting shardRouting, RoutingAllocation allocation) {
if (shardRouting.primary()) {
if (shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
Expand Down Expand Up @@ -120,6 +141,12 @@ public abstract AllocateUnassignedDecision makeAllocationDecision(
Logger logger
);

public abstract ConcurrentMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
Set<ShardRouting> shards,
RoutingAllocation allocation,
Logger logger
);

/**
* Builds decisions for all nodes in the cluster, so that the explain API can provide information on
* allocation decisions for each node, while still waiting to allocate the shard (e.g. due to fetching shard data).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -457,6 +458,12 @@ private static NodesToAllocate buildNodesToAllocate(

protected abstract FetchResult<NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation);

@Override
// to be override
public ConcurrentMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(Set<ShardRouting> shards, RoutingAllocation allocation, Logger logger) {
return null;
}

private static class NodeShardsResult {
final List<NodeGatewayStartedShards> orderedAllocationCandidates;
final int allocationsFound;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;

Expand Down Expand Up @@ -495,6 +496,13 @@ private static boolean canPerformOperationBasedRecovery(

protected abstract AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> fetchData(ShardRouting shard, RoutingAllocation allocation);

@Override
// to be override
public ConcurrentMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(Set<ShardRouting> shards, RoutingAllocation allocation, Logger logger) {
return null;
}


/**
* Returns a boolean indicating whether fetching shard data has been triggered at any point for the given shard.
*/
Expand Down

0 comments on commit 37796c1

Please sign in to comment.