Skip to content

Commit

Permalink
Make RemoteStoreReplicationSource#getSegmentFiles asynchronous (opens…
Browse files Browse the repository at this point in the history
…earch-project#10725)

* Make RemoteStoreReplicationSource#getSegmentFiles asynchronous

Also make the remote store download process cancellable in case the
replication event is canceled.

Signed-off-by: Andrew Ross <andrross@amazon.com>

* Add ITs ensuring segRep targets are cleaned up on cancellation during metadata and segment fetch steps.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Wrap metadata fetch in cancellableThreads.executeIO

Signed-off-by: Marc Handalian <handalm@amazon.com>

* self review

Signed-off-by: Marc Handalian <handalm@amazon.com>

* spotless

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Add missing node settings when bootstrapping nodes in tests.

Signed-off-by: Marc Handalian <handalm@amazon.com>

---------

Signed-off-by: Andrew Ross <andrross@amazon.com>
Signed-off-by: Marc Handalian <handalm@amazon.com>
Co-authored-by: Andrew Ross <andrross@amazon.com>
  • Loading branch information
2 people authored and austintlee committed Oct 23, 2023
1 parent eda0db5 commit e4911df
Show file tree
Hide file tree
Showing 8 changed files with 373 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ protected void cleanupRepo() {
}

protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) {
return setup(repoLocation, ioFailureRate, skipExceptionBlobList, maxFailure, 0);
}

protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure, int replicaCount) {
// The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in
/// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the
// repository creation can happen without failure.
Expand All @@ -128,6 +132,7 @@ protected String setup(Path repoLocation, double ioFailureRate, String skipExcep

internalCluster().startClusterManagerOnlyNode(settings.build());
String dataNodeName = internalCluster().startDataOnlyNode(settings.build());
internalCluster().startDataOnlyNodes(replicaCount, settings.build());
createIndex(INDEX_NAME);
logger.info("--> Created index={}", INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTarget;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.Optional;
import java.util.Set;

/**
* This class runs tests with remote store + segRep while blocking file downloads
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationUsingRemoteStoreDisruptionIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {

@Override
public Settings indexSettings() {
return remoteStoreIndexSettings(1);
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

public void testCancelReplicationWhileSyncingSegments() throws Exception {
Path location = randomRepoPath().toAbsolutePath();
setup(location, 0d, "metadata", Long.MAX_VALUE, 1);

final Set<String> dataNodeNames = internalCluster().getDataNodeNames();
final String replicaNode = getNode(dataNodeNames, false);
final String primaryNode = getNode(dataNodeNames, true);

SegmentReplicationTargetService targetService = internalCluster().getInstance(SegmentReplicationTargetService.class, replicaNode);
ensureGreen(INDEX_NAME);
blockNodeOnAnySegmentFile(REPOSITORY_NAME, replicaNode);
final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME);
indexSingleDoc();
refresh(INDEX_NAME);
waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10));
final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId());
assertEquals(SegmentReplicationState.Stage.GET_FILES, state.getStage());
ReplicationCollection.ReplicationRef<SegmentReplicationTarget> segmentReplicationTargetReplicationRef = targetService.get(
state.getReplicationId()
);
final SegmentReplicationTarget segmentReplicationTarget = segmentReplicationTargetReplicationRef.get();
// close the target ref here otherwise it will hold a refcount
segmentReplicationTargetReplicationRef.close();
assertNotNull(segmentReplicationTarget);
assertTrue(segmentReplicationTarget.refCount() > 0);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
assertBusy(() -> {
assertTrue(indexShard.routingEntry().primary());
assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()));
assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount());
});
unblockNode(REPOSITORY_NAME, replicaNode);
cleanupRepo();
}

public void testCancelReplicationWhileFetchingMetadata() throws Exception {
Path location = randomRepoPath().toAbsolutePath();
setup(location, 0d, "metadata", Long.MAX_VALUE, 1);

final Set<String> dataNodeNames = internalCluster().getDataNodeNames();
final String replicaNode = getNode(dataNodeNames, false);
final String primaryNode = getNode(dataNodeNames, true);

SegmentReplicationTargetService targetService = internalCluster().getInstance(SegmentReplicationTargetService.class, replicaNode);
ensureGreen(INDEX_NAME);
blockNodeOnAnyFiles(REPOSITORY_NAME, replicaNode);
final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME);
indexSingleDoc();
refresh(INDEX_NAME);
waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10));
final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId());
assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, state.getStage());
ReplicationCollection.ReplicationRef<SegmentReplicationTarget> segmentReplicationTargetReplicationRef = targetService.get(
state.getReplicationId()
);
final SegmentReplicationTarget segmentReplicationTarget = segmentReplicationTargetReplicationRef.get();
// close the target ref here otherwise it will hold a refcount
segmentReplicationTargetReplicationRef.close();
assertNotNull(segmentReplicationTarget);
assertTrue(segmentReplicationTarget.refCount() > 0);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
assertBusy(() -> {
assertTrue(indexShard.routingEntry().primary());
assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()));
assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount());
});
unblockNode(REPOSITORY_NAME, replicaNode);
cleanupRepo();
}

private String getNode(Set<String> dataNodeNames, boolean primary) {
assertEquals(2, dataNodeNames.size());
for (String name : dataNodeNames) {
final IndexShard indexShard = getIndexShard(name, INDEX_NAME);
if (indexShard.routingEntry().primary() == primary) {
return name;
}
}
return null;
}

private IndexShard getIndexShard(String node, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexService(index);
assertNotNull(indexService);
final Optional<Integer> shardId = indexService.shardIds().stream().findFirst();
return shardId.map(indexService::getShard).orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.util.concurrent.UncategorizedExecutionException;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.recovery.RecoverySettings;
Expand Down Expand Up @@ -51,9 +51,16 @@ public RemoteStoreFileDownloader(ShardId shardId, ThreadPool threadPool, Recover
* @param source The remote directory to copy segment files from
* @param destination The local directory to copy segment files to
* @param toDownloadSegments The list of segment files to download
* @param listener Callback listener to be notified upon completion
*/
public void download(Directory source, Directory destination, Collection<String> toDownloadSegments) throws IOException {
downloadInternal(source, destination, null, toDownloadSegments, () -> {});
public void downloadAsync(
CancellableThreads cancellableThreads,
Directory source,
Directory destination,
Collection<String> toDownloadSegments,
ActionListener<Void> listener
) {
downloadInternal(cancellableThreads, source, destination, null, toDownloadSegments, () -> {}, listener);
}

/**
Expand All @@ -74,17 +81,37 @@ public void download(
Directory secondDestination,
Collection<String> toDownloadSegments,
Runnable onFileCompletion
) throws IOException {
downloadInternal(source, destination, secondDestination, toDownloadSegments, onFileCompletion);
) throws InterruptedException, IOException {
final CancellableThreads cancellableThreads = new CancellableThreads();
final PlainActionFuture<Void> listener = PlainActionFuture.newFuture();
downloadInternal(cancellableThreads, source, destination, secondDestination, toDownloadSegments, onFileCompletion, listener);
try {
listener.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new RuntimeException(e);
} catch (InterruptedException e) {
// If the blocking call on the PlainActionFuture itself is interrupted, then we must
// cancel the asynchronous work we were waiting on
cancellableThreads.cancel(e.getMessage());
Thread.currentThread().interrupt();
throw e;
}
}

private void downloadInternal(
CancellableThreads cancellableThreads,
Directory source,
Directory destination,
@Nullable Directory secondDestination,
Collection<String> toDownloadSegments,
Runnable onFileCompletion
) throws IOException {
Runnable onFileCompletion,
ActionListener<Void> listener
) {
final Queue<String> queue = new ConcurrentLinkedQueue<>(toDownloadSegments);
// Choose the minimum of:
// - number of files to download
Expand All @@ -95,25 +122,14 @@ private void downloadInternal(
Math.min(threadPool.info(ThreadPool.Names.REMOTE_RECOVERY).getMax(), recoverySettings.getMaxConcurrentRemoteStoreStreams())
);
logger.trace("Starting download of {} files with {} threads", queue.size(), threads);
final PlainActionFuture<Collection<Void>> listener = PlainActionFuture.newFuture();
final ActionListener<Void> allFilesListener = new GroupedActionListener<>(listener, threads);
final ActionListener<Void> allFilesListener = new GroupedActionListener<>(ActionListener.map(listener, r -> null), threads);
for (int i = 0; i < threads; i++) {
copyOneFile(source, destination, secondDestination, queue, onFileCompletion, allFilesListener);
}
try {
listener.actionGet();
} catch (UncategorizedExecutionException e) {
// Any IOException will be double-wrapped so dig it out and throw it
if (e.getCause() instanceof ExecutionException) {
if (e.getCause().getCause() instanceof IOException) {
throw (IOException) e.getCause().getCause();
}
}
throw e;
copyOneFile(cancellableThreads, source, destination, secondDestination, queue, onFileCompletion, allFilesListener);
}
}

private void copyOneFile(
CancellableThreads cancellableThreads,
Directory source,
Directory destination,
@Nullable Directory secondDestination,
Expand All @@ -129,18 +145,20 @@ private void copyOneFile(
threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY).submit(() -> {
logger.trace("Downloading file {}", file);
try {
destination.copyFrom(source, file, file, IOContext.DEFAULT);
onFileCompletion.run();
if (secondDestination != null) {
secondDestination.copyFrom(destination, file, file, IOContext.DEFAULT);
}
cancellableThreads.executeIO(() -> {
destination.copyFrom(source, file, file, IOContext.DEFAULT);
onFileCompletion.run();
if (secondDestination != null) {
secondDestination.copyFrom(destination, file, file, IOContext.DEFAULT);
}
});
} catch (Exception e) {
// Clear the queue to stop any future processing, report the failure, then return
queue.clear();
listener.onFailure(e);
return;
}
copyOneFile(source, destination, secondDestination, queue, onFileCompletion, listener);
copyOneFile(cancellableThreads, source, destination, secondDestination, queue, onFileCompletion, listener);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.util.Version;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
Expand All @@ -24,11 +25,14 @@
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

Expand All @@ -43,6 +47,7 @@ public class RemoteStoreReplicationSource implements SegmentReplicationSource {

private final IndexShard indexShard;
private final RemoteSegmentStoreDirectory remoteDirectory;
private final CancellableThreads cancellableThreads = new CancellableThreads();

public RemoteStoreReplicationSource(IndexShard indexShard) {
this.indexShard = indexShard;
Expand All @@ -61,7 +66,7 @@ public void getCheckpointMetadata(
// TODO: Need to figure out a way to pass this information for segment metadata via remote store.
try (final GatedCloseable<SegmentInfos> segmentInfosSnapshot = indexShard.getSegmentInfosSnapshot()) {
final Version version = segmentInfosSnapshot.get().getCommitLuceneVersion();
RemoteSegmentMetadata mdFile = remoteDirectory.init();
final RemoteSegmentMetadata mdFile = getRemoteSegmentMetadata();
// During initial recovery flow, the remote store might not
// have metadata as primary hasn't uploaded anything yet.
if (mdFile == null && indexShard.state().equals(IndexShardState.STARTED) == false) {
Expand Down Expand Up @@ -106,39 +111,50 @@ public void getSegmentFiles(
}
logger.debug("Downloading segment files from remote store {}", filesToFetch);

RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.readLatestMetadataFile();
Collection<String> directoryFiles = List.of(indexShard.store().directory().listAll());
if (remoteSegmentMetadata != null) {
try {
indexShard.store().incRef();
indexShard.remoteStore().incRef();
final Directory storeDirectory = indexShard.store().directory();
final List<String> toDownloadSegmentNames = new ArrayList<>();
for (StoreFileMetadata fileMetadata : filesToFetch) {
String file = fileMetadata.name();
assert directoryFiles.contains(file) == false : "Local store already contains the file " + file;
toDownloadSegmentNames.add(file);
}
indexShard.getFileDownloader()
.download(
remoteDirectory,
new ReplicationStatsDirectoryWrapper(storeDirectory, fileProgressTracker),
toDownloadSegmentNames
);
logger.debug("Downloaded segment files from remote store {}", filesToFetch);
} finally {
indexShard.store().decRef();
indexShard.remoteStore().decRef();
if (remoteMetadataExists()) {
final Directory storeDirectory = indexShard.store().directory();
final Collection<String> directoryFiles = List.of(storeDirectory.listAll());
final List<String> toDownloadSegmentNames = new ArrayList<>();
for (StoreFileMetadata fileMetadata : filesToFetch) {
String file = fileMetadata.name();
assert directoryFiles.contains(file) == false : "Local store already contains the file " + file;
toDownloadSegmentNames.add(file);
}
indexShard.getFileDownloader()
.downloadAsync(
cancellableThreads,
remoteDirectory,
new ReplicationStatsDirectoryWrapper(storeDirectory, fileProgressTracker),
toDownloadSegmentNames,
ActionListener.map(listener, r -> new GetSegmentFilesResponse(filesToFetch))
);
} else {
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
}
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
} catch (Exception e) {
} catch (IOException | RuntimeException e) {
listener.onFailure(e);
}
}

@Override
public void cancel() {
this.cancellableThreads.cancel("Canceled by target");
}

@Override
public String getDescription() {
return "RemoteStoreReplicationSource";
}

private boolean remoteMetadataExists() throws IOException {
final AtomicBoolean metadataExists = new AtomicBoolean(false);
cancellableThreads.executeIO(() -> metadataExists.set(remoteDirectory.readLatestMetadataFile() != null));
return metadataExists.get();
}

private RemoteSegmentMetadata getRemoteSegmentMetadata() throws IOException {
AtomicReference<RemoteSegmentMetadata> mdFile = new AtomicReference<>();
cancellableThreads.executeIO(() -> mdFile.set(remoteDirectory.init()));
return mdFile.get();
}
}
Loading

0 comments on commit e4911df

Please sign in to comment.