Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make RemoteStoreReplicationSource#getSegmentFiles asynchronous #10725

Merged
merged 6 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ protected void cleanupRepo() {
}

protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) {
return setup(repoLocation, ioFailureRate, skipExceptionBlobList, maxFailure, 0);
}

protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure, int replicaCount) {
// The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in
/// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the
// repository creation can happen without failure.
Expand All @@ -128,6 +132,7 @@ protected String setup(Path repoLocation, double ioFailureRate, String skipExcep

internalCluster().startClusterManagerOnlyNode(settings.build());
String dataNodeName = internalCluster().startDataOnlyNode(settings.build());
internalCluster().startDataOnlyNodes(replicaCount, settings.build());
createIndex(INDEX_NAME);
logger.info("--> Created index={}", INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTarget;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.Optional;
import java.util.Set;

/**
* This class runs tests with remote store + segRep while blocking file downloads
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationUsingRemoteStoreDisruptionIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {

@Override
public Settings indexSettings() {
return remoteStoreIndexSettings(1);
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

public void testCancelReplicationWhileSyncingSegments() throws Exception {
Path location = randomRepoPath().toAbsolutePath();
setup(location, 0d, "metadata", Long.MAX_VALUE, 1);

final Set<String> dataNodeNames = internalCluster().getDataNodeNames();
final String replicaNode = getNode(dataNodeNames, false);
final String primaryNode = getNode(dataNodeNames, true);

SegmentReplicationTargetService targetService = internalCluster().getInstance(SegmentReplicationTargetService.class, replicaNode);
ensureGreen(INDEX_NAME);
blockNodeOnAnySegmentFile(REPOSITORY_NAME, replicaNode);
final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME);
indexSingleDoc();
refresh(INDEX_NAME);
waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10));
final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId());
assertEquals(SegmentReplicationState.Stage.GET_FILES, state.getStage());
ReplicationCollection.ReplicationRef<SegmentReplicationTarget> segmentReplicationTargetReplicationRef = targetService.get(
state.getReplicationId()
);
final SegmentReplicationTarget segmentReplicationTarget = segmentReplicationTargetReplicationRef.get();
// close the target ref here otherwise it will hold a refcount
segmentReplicationTargetReplicationRef.close();
assertNotNull(segmentReplicationTarget);
assertTrue(segmentReplicationTarget.refCount() > 0);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
assertBusy(() -> {
assertTrue(indexShard.routingEntry().primary());
assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()));
assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount());
});
unblockNode(REPOSITORY_NAME, replicaNode);
cleanupRepo();
}

public void testCancelReplicationWhileFetchingMetadata() throws Exception {
Path location = randomRepoPath().toAbsolutePath();
setup(location, 0d, "metadata", Long.MAX_VALUE, 1);

final Set<String> dataNodeNames = internalCluster().getDataNodeNames();
final String replicaNode = getNode(dataNodeNames, false);
final String primaryNode = getNode(dataNodeNames, true);

SegmentReplicationTargetService targetService = internalCluster().getInstance(SegmentReplicationTargetService.class, replicaNode);
ensureGreen(INDEX_NAME);
blockNodeOnAnyFiles(REPOSITORY_NAME, replicaNode);
final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME);
indexSingleDoc();
refresh(INDEX_NAME);
waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10));
final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId());
assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, state.getStage());
ReplicationCollection.ReplicationRef<SegmentReplicationTarget> segmentReplicationTargetReplicationRef = targetService.get(
state.getReplicationId()
);
final SegmentReplicationTarget segmentReplicationTarget = segmentReplicationTargetReplicationRef.get();
// close the target ref here otherwise it will hold a refcount
segmentReplicationTargetReplicationRef.close();
assertNotNull(segmentReplicationTarget);
assertTrue(segmentReplicationTarget.refCount() > 0);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
assertBusy(() -> {
assertTrue(indexShard.routingEntry().primary());
assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()));
assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount());
});
unblockNode(REPOSITORY_NAME, replicaNode);
cleanupRepo();
}

private String getNode(Set<String> dataNodeNames, boolean primary) {
assertEquals(2, dataNodeNames.size());
for (String name : dataNodeNames) {
final IndexShard indexShard = getIndexShard(name, INDEX_NAME);
if (indexShard.routingEntry().primary() == primary) {
return name;
}
}
return null;
}

private IndexShard getIndexShard(String node, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexService(index);
assertNotNull(indexService);
final Optional<Integer> shardId = indexService.shardIds().stream().findFirst();
return shardId.map(indexService::getShard).orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.util.concurrent.UncategorizedExecutionException;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.recovery.RecoverySettings;
Expand Down Expand Up @@ -51,9 +51,16 @@
* @param source The remote directory to copy segment files from
* @param destination The local directory to copy segment files to
* @param toDownloadSegments The list of segment files to download
* @param listener Callback listener to be notified upon completion
*/
public void download(Directory source, Directory destination, Collection<String> toDownloadSegments) throws IOException {
downloadInternal(source, destination, null, toDownloadSegments, () -> {});
public void downloadAsync(
CancellableThreads cancellableThreads,
Directory source,
Directory destination,
Collection<String> toDownloadSegments,
ActionListener<Void> listener
) {
downloadInternal(cancellableThreads, source, destination, null, toDownloadSegments, () -> {}, listener);
}

/**
Expand All @@ -74,17 +81,37 @@
Directory secondDestination,
Collection<String> toDownloadSegments,
Runnable onFileCompletion
) throws IOException {
downloadInternal(source, destination, secondDestination, toDownloadSegments, onFileCompletion);
) throws InterruptedException, IOException {
final CancellableThreads cancellableThreads = new CancellableThreads();
final PlainActionFuture<Void> listener = PlainActionFuture.newFuture();
downloadInternal(cancellableThreads, source, destination, secondDestination, toDownloadSegments, onFileCompletion, listener);
try {
listener.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();

Check warning on line 92 in server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java#L92

Added line #L92 was not covered by tests
} else if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new RuntimeException(e);

Check warning on line 96 in server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java#L96

Added line #L96 was not covered by tests
} catch (InterruptedException e) {
// If the blocking call on the PlainActionFuture itself is interrupted, then we must
// cancel the asynchronous work we were waiting on
cancellableThreads.cancel(e.getMessage());
Thread.currentThread().interrupt();
throw e;
}
}

private void downloadInternal(
CancellableThreads cancellableThreads,
Directory source,
Directory destination,
@Nullable Directory secondDestination,
Collection<String> toDownloadSegments,
Runnable onFileCompletion
) throws IOException {
Runnable onFileCompletion,
ActionListener<Void> listener
) {
final Queue<String> queue = new ConcurrentLinkedQueue<>(toDownloadSegments);
// Choose the minimum of:
// - number of files to download
Expand All @@ -95,25 +122,14 @@
Math.min(threadPool.info(ThreadPool.Names.REMOTE_RECOVERY).getMax(), recoverySettings.getMaxConcurrentRemoteStoreStreams())
);
logger.trace("Starting download of {} files with {} threads", queue.size(), threads);
final PlainActionFuture<Collection<Void>> listener = PlainActionFuture.newFuture();
final ActionListener<Void> allFilesListener = new GroupedActionListener<>(listener, threads);
final ActionListener<Void> allFilesListener = new GroupedActionListener<>(ActionListener.map(listener, r -> null), threads);
for (int i = 0; i < threads; i++) {
copyOneFile(source, destination, secondDestination, queue, onFileCompletion, allFilesListener);
}
try {
listener.actionGet();
} catch (UncategorizedExecutionException e) {
// Any IOException will be double-wrapped so dig it out and throw it
if (e.getCause() instanceof ExecutionException) {
if (e.getCause().getCause() instanceof IOException) {
throw (IOException) e.getCause().getCause();
}
}
throw e;
copyOneFile(cancellableThreads, source, destination, secondDestination, queue, onFileCompletion, allFilesListener);
}
}

private void copyOneFile(
CancellableThreads cancellableThreads,
Directory source,
Directory destination,
@Nullable Directory secondDestination,
Expand All @@ -129,18 +145,20 @@
threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY).submit(() -> {
logger.trace("Downloading file {}", file);
try {
destination.copyFrom(source, file, file, IOContext.DEFAULT);
onFileCompletion.run();
if (secondDestination != null) {
secondDestination.copyFrom(destination, file, file, IOContext.DEFAULT);
}
cancellableThreads.executeIO(() -> {
destination.copyFrom(source, file, file, IOContext.DEFAULT);
onFileCompletion.run();
if (secondDestination != null) {
secondDestination.copyFrom(destination, file, file, IOContext.DEFAULT);
}
});
} catch (Exception e) {
// Clear the queue to stop any future processing, report the failure, then return
queue.clear();
listener.onFailure(e);
return;
}
copyOneFile(source, destination, secondDestination, queue, onFileCompletion, listener);
copyOneFile(cancellableThreads, source, destination, secondDestination, queue, onFileCompletion, listener);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.util.Version;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
Expand All @@ -24,11 +25,14 @@
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

Expand All @@ -43,6 +47,7 @@

private final IndexShard indexShard;
private final RemoteSegmentStoreDirectory remoteDirectory;
private final CancellableThreads cancellableThreads = new CancellableThreads();

public RemoteStoreReplicationSource(IndexShard indexShard) {
this.indexShard = indexShard;
Expand All @@ -61,7 +66,7 @@
// TODO: Need to figure out a way to pass this information for segment metadata via remote store.
try (final GatedCloseable<SegmentInfos> segmentInfosSnapshot = indexShard.getSegmentInfosSnapshot()) {
final Version version = segmentInfosSnapshot.get().getCommitLuceneVersion();
RemoteSegmentMetadata mdFile = remoteDirectory.init();
final RemoteSegmentMetadata mdFile = getRemoteSegmentMetadata();
// During initial recovery flow, the remote store might not
// have metadata as primary hasn't uploaded anything yet.
if (mdFile == null && indexShard.state().equals(IndexShardState.STARTED) == false) {
Expand Down Expand Up @@ -106,39 +111,50 @@
}
logger.debug("Downloading segment files from remote store {}", filesToFetch);

RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.readLatestMetadataFile();
Collection<String> directoryFiles = List.of(indexShard.store().directory().listAll());
if (remoteSegmentMetadata != null) {
try {
indexShard.store().incRef();
indexShard.remoteStore().incRef();
final Directory storeDirectory = indexShard.store().directory();
final List<String> toDownloadSegmentNames = new ArrayList<>();
for (StoreFileMetadata fileMetadata : filesToFetch) {
String file = fileMetadata.name();
assert directoryFiles.contains(file) == false : "Local store already contains the file " + file;
toDownloadSegmentNames.add(file);
}
indexShard.getFileDownloader()
.download(
remoteDirectory,
new ReplicationStatsDirectoryWrapper(storeDirectory, fileProgressTracker),
toDownloadSegmentNames
);
logger.debug("Downloaded segment files from remote store {}", filesToFetch);
} finally {
indexShard.store().decRef();
indexShard.remoteStore().decRef();
if (remoteMetadataExists()) {
final Directory storeDirectory = indexShard.store().directory();
final Collection<String> directoryFiles = List.of(storeDirectory.listAll());
final List<String> toDownloadSegmentNames = new ArrayList<>();
for (StoreFileMetadata fileMetadata : filesToFetch) {
String file = fileMetadata.name();
assert directoryFiles.contains(file) == false : "Local store already contains the file " + file;
toDownloadSegmentNames.add(file);
}
indexShard.getFileDownloader()
.downloadAsync(
cancellableThreads,
remoteDirectory,
new ReplicationStatsDirectoryWrapper(storeDirectory, fileProgressTracker),
toDownloadSegmentNames,
ActionListener.map(listener, r -> new GetSegmentFilesResponse(filesToFetch))
);
} else {
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));

Check warning on line 132 in server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java#L132

Added line #L132 was not covered by tests
}
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
} catch (Exception e) {
} catch (IOException | RuntimeException e) {

Check warning on line 134 in server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java#L134

Added line #L134 was not covered by tests
listener.onFailure(e);
}
}

@Override
public void cancel() {
this.cancellableThreads.cancel("Canceled by target");
}

Check warning on line 142 in server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java#L141-L142

Added lines #L141 - L142 were not covered by tests

@Override
public String getDescription() {
return "RemoteStoreReplicationSource";
}

private boolean remoteMetadataExists() throws IOException {
final AtomicBoolean metadataExists = new AtomicBoolean(false);
cancellableThreads.executeIO(() -> metadataExists.set(remoteDirectory.readLatestMetadataFile() != null));
return metadataExists.get();
}

private RemoteSegmentMetadata getRemoteSegmentMetadata() throws IOException {
AtomicReference<RemoteSegmentMetadata> mdFile = new AtomicReference<>();
cancellableThreads.executeIO(() -> mdFile.set(remoteDirectory.init()));
return mdFile.get();
}
}
Loading
Loading