Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Fix OngoingSegmentReplications to key by allocation ID instead of DiscoveryNode. #4182

Merged
merged 2 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,54 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
}
}

public void testMultipleShards() throws Exception {
Settings indexSettings = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
createIndex(INDEX_NAME, indexSettings);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(1, 200);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);
refresh(INDEX_NAME);
waitForReplicaUpdate();

assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);

flushAndRefresh(INDEX_NAME);
waitForReplicaUpdate();
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

ensureGreen(INDEX_NAME);
assertSegmentStats(REPLICA_COUNT);
}
}

public void testReplicationAfterForceMerge() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
Expand Down Expand Up @@ -262,15 +310,17 @@ private void waitForReplicaUpdate() throws Exception {
final Map<Boolean, List<ShardSegments>> segmentListMap = segmentsByShardType(replicationGroupSegments);
final List<ShardSegments> primaryShardSegmentsList = segmentListMap.get(true);
final List<ShardSegments> replicaShardSegments = segmentListMap.get(false);

// if we don't have any segments yet, proceed.
final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get();
final Map<String, Segment> latestPrimarySegments = getLatestSegments(primaryShardSegments);
final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get();
for (ShardSegments shardSegments : replicaShardSegments) {
final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments()
.stream()
.anyMatch(segment -> segment.getGeneration() == latestPrimaryGen);
assertTrue(isReplicaCaughtUpToPrimary);
if (primaryShardSegments.getSegments().isEmpty() == false) {
final Map<String, Segment> latestPrimarySegments = getLatestSegments(primaryShardSegments);
final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get();
for (ShardSegments shardSegments : replicaShardSegments) {
final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments()
.stream()
.anyMatch(segment -> segment.getGeneration() == latestPrimaryGen);
assertTrue(isReplicaCaughtUpToPrimary);
}
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* Manages references to ongoing segrep events on a node.
Expand All @@ -38,7 +41,7 @@ class OngoingSegmentReplications {
private final RecoverySettings recoverySettings;
private final IndicesService indicesService;
private final Map<ReplicationCheckpoint, CopyState> copyStateMap;
private final Map<DiscoveryNode, SegmentReplicationSourceHandler> nodesToHandlers;
private final Map<String, SegmentReplicationSourceHandler> allocationIdToHandlers;
Copy link
Member

@ankitkala ankitkala Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also remove the handler from allocationIdToHandlers after the segments have been copied()?

LGTM


/**
* Constructor.
Expand All @@ -50,7 +53,7 @@ class OngoingSegmentReplications {
this.indicesService = indicesService;
this.recoverySettings = recoverySettings;
this.copyStateMap = Collections.synchronizedMap(new HashMap<>());
this.nodesToHandlers = ConcurrentCollections.newConcurrentMap();
this.allocationIdToHandlers = ConcurrentCollections.newConcurrentMap();
}

/**
Expand Down Expand Up @@ -96,8 +99,7 @@ synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) thro
* @param listener {@link ActionListener} that resolves when sending files is complete.
*/
void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentFilesResponse> listener) {
final DiscoveryNode node = request.getTargetNode();
final SegmentReplicationSourceHandler handler = nodesToHandlers.get(node);
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.get(request.getTargetAllocationId());
if (handler != null) {
if (handler.isReplicating()) {
throw new OpenSearchException(
Expand All @@ -108,7 +110,7 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
}
// update the given listener to release the CopyState before it resolves.
final ActionListener<GetSegmentFilesResponse> wrappedListener = ActionListener.runBefore(listener, () -> {
final SegmentReplicationSourceHandler sourceHandler = nodesToHandlers.remove(node);
final SegmentReplicationSourceHandler sourceHandler = allocationIdToHandlers.remove(request.getTargetAllocationId());
if (sourceHandler != null) {
removeCopyState(sourceHandler.getCopyState());
}
Expand All @@ -123,19 +125,6 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
}
}

/**
* Cancel any ongoing replications for a given {@link DiscoveryNode}
*
* @param node {@link DiscoveryNode} node for which to cancel replication events.
*/
void cancelReplication(DiscoveryNode node) {
final SegmentReplicationSourceHandler handler = nodesToHandlers.remove(node);
if (handler != null) {
handler.cancel("Cancel on node left");
removeCopyState(handler.getCopyState());
}
}

/**
* Prepare for a Replication event. This method constructs a {@link CopyState} holding files to be sent off of the current
* nodes's store. This state is intended to be sent back to Replicas before copy is initiated so the replica can perform a diff against its
Expand All @@ -149,9 +138,9 @@ void cancelReplication(DiscoveryNode node) {
*/
CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException {
final CopyState copyState = getCachedCopyState(request.getCheckpoint());
if (nodesToHandlers.putIfAbsent(
request.getTargetNode(),
createTargetHandler(request.getTargetNode(), copyState, fileChunkWriter)
if (allocationIdToHandlers.putIfAbsent(
request.getTargetAllocationId(),
createTargetHandler(request.getTargetNode(), copyState, request.getTargetAllocationId(), fileChunkWriter)
) != null) {
throw new OpenSearchException(
"Shard copy {} on node {} already replicating",
Expand All @@ -163,18 +152,23 @@ CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter f
}

/**
* Cancel all Replication events for the given shard, intended to be called when the current primary is shutting down.
* Cancel all Replication events for the given shard, intended to be called when a primary is shutting down.
*
* @param shard {@link IndexShard}
* @param reason {@link String} - Reason for the cancel
*/
synchronized void cancel(IndexShard shard, String reason) {
for (SegmentReplicationSourceHandler entry : nodesToHandlers.values()) {
if (entry.getCopyState().getShard().equals(shard)) {
entry.cancel(reason);
}
}
copyStateMap.clear();
cancelHandlers(handler -> handler.getCopyState().getShard().shardId().equals(shard.shardId()), reason);
}

/**
* Cancel any ongoing replications for a given {@link DiscoveryNode}
*
* @param node {@link DiscoveryNode} node for which to cancel replication events.
*/
void cancelReplication(DiscoveryNode node) {
cancelHandlers(handler -> handler.getTargetNode().equals(node), "Node left");

}

/**
Expand All @@ -186,19 +180,25 @@ boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) {
}

int size() {
return nodesToHandlers.size();
return allocationIdToHandlers.size();
}

int cachedCopyStateSize() {
return copyStateMap.size();
}

private SegmentReplicationSourceHandler createTargetHandler(DiscoveryNode node, CopyState copyState, FileChunkWriter fileChunkWriter) {
private SegmentReplicationSourceHandler createTargetHandler(
DiscoveryNode node,
CopyState copyState,
String allocationId,
FileChunkWriter fileChunkWriter
) {
return new SegmentReplicationSourceHandler(
node,
fileChunkWriter,
copyState.getShard().getThreadPool(),
copyState,
allocationId,
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
recoverySettings.getMaxConcurrentFileChunks()
);
Expand Down Expand Up @@ -231,4 +231,23 @@ private synchronized void removeCopyState(CopyState copyState) {
copyStateMap.remove(copyState.getRequestedReplicationCheckpoint());
}
}

/**
* Remove handlers from allocationIdToHandlers map based on a filter predicate.
* This will also decref the handler's CopyState reference.
*/
private void cancelHandlers(Predicate<? super SegmentReplicationSourceHandler> predicate, String reason) {
final List<String> allocationIds = allocationIdToHandlers.values()
.stream()
.filter(predicate)
.map(SegmentReplicationSourceHandler::getAllocationId)
.collect(Collectors.toList());
for (String allocationId : allocationIds) {
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId);
if (handler != null) {
handler.cancel(reason);
removeCopyState(handler.getCopyState());
}
}
}
Comment on lines +239 to +252
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

This is better than previous iteration :)

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class SegmentReplicationSourceHandler {
private final List<Closeable> resources = new CopyOnWriteArrayList<>();
private final Logger logger;
private final AtomicBoolean isReplicating = new AtomicBoolean();
private final DiscoveryNode targetNode;
private final String allocationId;

/**
* Constructor.
Expand All @@ -70,9 +72,11 @@ class SegmentReplicationSourceHandler {
FileChunkWriter writer,
ThreadPool threadPool,
CopyState copyState,
String allocationId,
int fileChunkSizeInBytes,
int maxConcurrentFileChunks
) {
this.targetNode = targetNode;
this.shard = copyState.getShard();
this.logger = Loggers.getLogger(
SegmentReplicationSourceHandler.class,
Expand All @@ -89,6 +93,7 @@ class SegmentReplicationSourceHandler {
fileChunkSizeInBytes,
maxConcurrentFileChunks
);
this.allocationId = allocationId;
this.copyState = copyState;
}

Expand Down Expand Up @@ -118,7 +123,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene
logger.debug(
"delaying replication of {} as it is not listed as assigned to target node {}",
shard.shardId(),
request.getTargetNode()
targetNode
);
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
}
Expand Down Expand Up @@ -175,4 +180,12 @@ CopyState getCopyState() {
public boolean isReplicating() {
return isReplicating.get();
}

public DiscoveryNode getTargetNode() {
return targetNode;
}

public String getAllocationId() {
return allocationId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ public void testCancelReplication() throws IOException {
}

public void testMultipleReplicasUseSameCheckpoint() throws IOException {
IndexShard secondReplica = newShard(primary.shardId(), false);
recoverReplica(secondReplica, primary, true);

OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings);
final CheckpointInfoRequest request = new CheckpointInfoRequest(
1L,
Expand All @@ -172,7 +175,7 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException {

final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest(
1L,
replica.routingEntry().allocationId().getId(),
secondReplica.routingEntry().allocationId().getId(),
replicaDiscoveryNode,
testCheckpoint
);
Expand All @@ -187,6 +190,7 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException {
assertEquals(0, copyState.refCount());
assertEquals(0, replications.size());
assertEquals(0, replications.cachedCopyStateSize());
closeShards(secondReplica);
}

public void testStartCopyWithoutPrepareStep() {
Expand Down Expand Up @@ -272,4 +276,40 @@ public void onFailure(Exception e) {
}
});
}

public void testCancelAllReplicationsForShard() throws IOException {
// This tests when primary has multiple ongoing replications.
IndexShard replica_2 = newShard(primary.shardId(), false);
recoverReplica(replica_2, primary, true);

OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings);
final CheckpointInfoRequest request = new CheckpointInfoRequest(
1L,
replica.routingEntry().allocationId().getId(),
primaryDiscoveryNode,
testCheckpoint
);

final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class));
assertEquals(1, copyState.refCount());

final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest(
1L,
replica_2.routingEntry().allocationId().getId(),
replicaDiscoveryNode,
testCheckpoint
);
replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class));

assertEquals(2, copyState.refCount());
assertEquals(2, replications.size());
assertEquals(1, replications.cachedCopyStateSize());

// cancel the primary's ongoing replications.
replications.cancel(primary, "Test");
assertEquals(0, copyState.refCount());
assertEquals(0, replications.size());
assertEquals(0, replications.cachedCopyStateSize());
closeShards(replica_2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public void testSendFiles() throws IOException {
chunkWriter,
threadPool,
copyState,
primary.routingEntry().allocationId().getId(),
5000,
1
);
Expand Down Expand Up @@ -100,6 +101,7 @@ public void testSendFiles_emptyRequest() throws IOException {
chunkWriter,
threadPool,
copyState,
primary.routingEntry().allocationId().getId(),
5000,
1
);
Expand Down Expand Up @@ -138,6 +140,7 @@ public void testSendFileFails() throws IOException {
chunkWriter,
threadPool,
copyState,
primary.routingEntry().allocationId().getId(),
5000,
1
);
Expand Down Expand Up @@ -175,6 +178,7 @@ public void testReplicationAlreadyRunning() throws IOException {
chunkWriter,
threadPool,
copyState,
primary.routingEntry().allocationId().getId(),
5000,
1
);
Expand Down