diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java index 2053800504c89..8166c0008ed83 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/AbstractRemoteStoreMockRepositoryIntegTestCase.java @@ -114,6 +114,10 @@ protected void cleanupRepo() { } protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) { + return setup(repoLocation, ioFailureRate, skipExceptionBlobList, maxFailure, 0); + } + + protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure, int replicaCount) { // The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in /// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the // repository creation can happen without failure. @@ -128,6 +132,7 @@ protected String setup(Path repoLocation, double ioFailureRate, String skipExcep internalCluster().startClusterManagerOnlyNode(settings.build()); String dataNodeName = internalCluster().startDataOnlyNode(settings.build()); + internalCluster().startDataOnlyNodes(replicaCount, settings.build()); createIndex(INDEX_NAME); logger.info("--> Created index={}", INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java new file mode 100644 index 0000000000000..b7b3f1d14f422 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java @@ -0,0 +1,133 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.index.Index; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.SegmentReplicationState; +import org.opensearch.indices.replication.SegmentReplicationTarget; +import org.opensearch.indices.replication.SegmentReplicationTargetService; +import org.opensearch.indices.replication.common.ReplicationCollection; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; +import java.util.Optional; +import java.util.Set; + +/** + * This class runs tests with remote store + segRep while blocking file downloads + */ +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationUsingRemoteStoreDisruptionIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { + + @Override + public Settings indexSettings() { + return remoteStoreIndexSettings(1); + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + public void testCancelReplicationWhileSyncingSegments() throws Exception { + Path location = randomRepoPath().toAbsolutePath(); + setup(location, 0d, "metadata", Long.MAX_VALUE, 1); + + final Set dataNodeNames = internalCluster().getDataNodeNames(); + final String replicaNode = getNode(dataNodeNames, false); + final String primaryNode = getNode(dataNodeNames, true); + + SegmentReplicationTargetService targetService = internalCluster().getInstance(SegmentReplicationTargetService.class, replicaNode); + ensureGreen(INDEX_NAME); + blockNodeOnAnySegmentFile(REPOSITORY_NAME, replicaNode); + final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME); + indexSingleDoc(); + refresh(INDEX_NAME); + waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10)); + final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()); + assertEquals(SegmentReplicationState.Stage.GET_FILES, state.getStage()); + ReplicationCollection.ReplicationRef segmentReplicationTargetReplicationRef = targetService.get( + state.getReplicationId() + ); + final SegmentReplicationTarget segmentReplicationTarget = segmentReplicationTargetReplicationRef.get(); + // close the target ref here otherwise it will hold a refcount + segmentReplicationTargetReplicationRef.close(); + assertNotNull(segmentReplicationTarget); + assertTrue(segmentReplicationTarget.refCount() > 0); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); + assertBusy(() -> { + assertTrue(indexShard.routingEntry().primary()); + assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId())); + assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount()); + }); + unblockNode(REPOSITORY_NAME, replicaNode); + cleanupRepo(); + } + + public void testCancelReplicationWhileFetchingMetadata() throws Exception { + Path location = randomRepoPath().toAbsolutePath(); + setup(location, 0d, "metadata", Long.MAX_VALUE, 1); + + final Set dataNodeNames = internalCluster().getDataNodeNames(); + final String replicaNode = getNode(dataNodeNames, false); + final String primaryNode = getNode(dataNodeNames, true); + + SegmentReplicationTargetService targetService = internalCluster().getInstance(SegmentReplicationTargetService.class, replicaNode); + ensureGreen(INDEX_NAME); + blockNodeOnAnyFiles(REPOSITORY_NAME, replicaNode); + final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME); + indexSingleDoc(); + refresh(INDEX_NAME); + waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10)); + final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()); + assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, state.getStage()); + ReplicationCollection.ReplicationRef segmentReplicationTargetReplicationRef = targetService.get( + state.getReplicationId() + ); + final SegmentReplicationTarget segmentReplicationTarget = segmentReplicationTargetReplicationRef.get(); + // close the target ref here otherwise it will hold a refcount + segmentReplicationTargetReplicationRef.close(); + assertNotNull(segmentReplicationTarget); + assertTrue(segmentReplicationTarget.refCount() > 0); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); + assertBusy(() -> { + assertTrue(indexShard.routingEntry().primary()); + assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId())); + assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount()); + }); + unblockNode(REPOSITORY_NAME, replicaNode); + cleanupRepo(); + } + + private String getNode(Set dataNodeNames, boolean primary) { + assertEquals(2, dataNodeNames.size()); + for (String name : dataNodeNames) { + final IndexShard indexShard = getIndexShard(name, INDEX_NAME); + if (indexShard.routingEntry().primary() == primary) { + return name; + } + } + return null; + } + + private IndexShard getIndexShard(String node, String indexName) { + final Index index = resolveIndex(indexName); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexService indexService = indicesService.indexService(index); + assertNotNull(indexService); + final Optional shardId = indexService.shardIds().stream().findFirst(); + return shardId.map(indexService::getShard).orElse(null); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java b/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java index 4fc721f2b96b5..727c57afd289b 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteStoreFileDownloader.java @@ -16,7 +16,7 @@ import org.opensearch.common.Nullable; import org.opensearch.common.annotation.InternalApi; import org.opensearch.common.logging.Loggers; -import org.opensearch.common.util.concurrent.UncategorizedExecutionException; +import org.opensearch.common.util.CancellableThreads; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.recovery.RecoverySettings; @@ -51,9 +51,16 @@ public RemoteStoreFileDownloader(ShardId shardId, ThreadPool threadPool, Recover * @param source The remote directory to copy segment files from * @param destination The local directory to copy segment files to * @param toDownloadSegments The list of segment files to download + * @param listener Callback listener to be notified upon completion */ - public void download(Directory source, Directory destination, Collection toDownloadSegments) throws IOException { - downloadInternal(source, destination, null, toDownloadSegments, () -> {}); + public void downloadAsync( + CancellableThreads cancellableThreads, + Directory source, + Directory destination, + Collection toDownloadSegments, + ActionListener listener + ) { + downloadInternal(cancellableThreads, source, destination, null, toDownloadSegments, () -> {}, listener); } /** @@ -74,17 +81,37 @@ public void download( Directory secondDestination, Collection toDownloadSegments, Runnable onFileCompletion - ) throws IOException { - downloadInternal(source, destination, secondDestination, toDownloadSegments, onFileCompletion); + ) throws InterruptedException, IOException { + final CancellableThreads cancellableThreads = new CancellableThreads(); + final PlainActionFuture listener = PlainActionFuture.newFuture(); + downloadInternal(cancellableThreads, source, destination, secondDestination, toDownloadSegments, onFileCompletion, listener); + try { + listener.get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw new RuntimeException(e); + } catch (InterruptedException e) { + // If the blocking call on the PlainActionFuture itself is interrupted, then we must + // cancel the asynchronous work we were waiting on + cancellableThreads.cancel(e.getMessage()); + Thread.currentThread().interrupt(); + throw e; + } } private void downloadInternal( + CancellableThreads cancellableThreads, Directory source, Directory destination, @Nullable Directory secondDestination, Collection toDownloadSegments, - Runnable onFileCompletion - ) throws IOException { + Runnable onFileCompletion, + ActionListener listener + ) { final Queue queue = new ConcurrentLinkedQueue<>(toDownloadSegments); // Choose the minimum of: // - number of files to download @@ -95,25 +122,14 @@ private void downloadInternal( Math.min(threadPool.info(ThreadPool.Names.REMOTE_RECOVERY).getMax(), recoverySettings.getMaxConcurrentRemoteStoreStreams()) ); logger.trace("Starting download of {} files with {} threads", queue.size(), threads); - final PlainActionFuture> listener = PlainActionFuture.newFuture(); - final ActionListener allFilesListener = new GroupedActionListener<>(listener, threads); + final ActionListener allFilesListener = new GroupedActionListener<>(ActionListener.map(listener, r -> null), threads); for (int i = 0; i < threads; i++) { - copyOneFile(source, destination, secondDestination, queue, onFileCompletion, allFilesListener); - } - try { - listener.actionGet(); - } catch (UncategorizedExecutionException e) { - // Any IOException will be double-wrapped so dig it out and throw it - if (e.getCause() instanceof ExecutionException) { - if (e.getCause().getCause() instanceof IOException) { - throw (IOException) e.getCause().getCause(); - } - } - throw e; + copyOneFile(cancellableThreads, source, destination, secondDestination, queue, onFileCompletion, allFilesListener); } } private void copyOneFile( + CancellableThreads cancellableThreads, Directory source, Directory destination, @Nullable Directory secondDestination, @@ -129,18 +145,20 @@ private void copyOneFile( threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY).submit(() -> { logger.trace("Downloading file {}", file); try { - destination.copyFrom(source, file, file, IOContext.DEFAULT); - onFileCompletion.run(); - if (secondDestination != null) { - secondDestination.copyFrom(destination, file, file, IOContext.DEFAULT); - } + cancellableThreads.executeIO(() -> { + destination.copyFrom(source, file, file, IOContext.DEFAULT); + onFileCompletion.run(); + if (secondDestination != null) { + secondDestination.copyFrom(destination, file, file, IOContext.DEFAULT); + } + }); } catch (Exception e) { // Clear the queue to stop any future processing, report the failure, then return queue.clear(); listener.onFailure(e); return; } - copyOneFile(source, destination, secondDestination, queue, onFileCompletion, listener); + copyOneFile(cancellableThreads, source, destination, secondDestination, queue, onFileCompletion, listener); }); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index 12eabf1e6554f..b06b3e0497cf7 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -15,6 +15,7 @@ import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.util.Version; import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.util.CancellableThreads; import org.opensearch.core.action.ActionListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; @@ -24,11 +25,14 @@ import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -43,6 +47,7 @@ public class RemoteStoreReplicationSource implements SegmentReplicationSource { private final IndexShard indexShard; private final RemoteSegmentStoreDirectory remoteDirectory; + private final CancellableThreads cancellableThreads = new CancellableThreads(); public RemoteStoreReplicationSource(IndexShard indexShard) { this.indexShard = indexShard; @@ -61,7 +66,7 @@ public void getCheckpointMetadata( // TODO: Need to figure out a way to pass this information for segment metadata via remote store. try (final GatedCloseable segmentInfosSnapshot = indexShard.getSegmentInfosSnapshot()) { final Version version = segmentInfosSnapshot.get().getCommitLuceneVersion(); - RemoteSegmentMetadata mdFile = remoteDirectory.init(); + final RemoteSegmentMetadata mdFile = getRemoteSegmentMetadata(); // During initial recovery flow, the remote store might not // have metadata as primary hasn't uploaded anything yet. if (mdFile == null && indexShard.state().equals(IndexShardState.STARTED) == false) { @@ -106,39 +111,50 @@ public void getSegmentFiles( } logger.debug("Downloading segment files from remote store {}", filesToFetch); - RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.readLatestMetadataFile(); - Collection directoryFiles = List.of(indexShard.store().directory().listAll()); - if (remoteSegmentMetadata != null) { - try { - indexShard.store().incRef(); - indexShard.remoteStore().incRef(); - final Directory storeDirectory = indexShard.store().directory(); - final List toDownloadSegmentNames = new ArrayList<>(); - for (StoreFileMetadata fileMetadata : filesToFetch) { - String file = fileMetadata.name(); - assert directoryFiles.contains(file) == false : "Local store already contains the file " + file; - toDownloadSegmentNames.add(file); - } - indexShard.getFileDownloader() - .download( - remoteDirectory, - new ReplicationStatsDirectoryWrapper(storeDirectory, fileProgressTracker), - toDownloadSegmentNames - ); - logger.debug("Downloaded segment files from remote store {}", filesToFetch); - } finally { - indexShard.store().decRef(); - indexShard.remoteStore().decRef(); + if (remoteMetadataExists()) { + final Directory storeDirectory = indexShard.store().directory(); + final Collection directoryFiles = List.of(storeDirectory.listAll()); + final List toDownloadSegmentNames = new ArrayList<>(); + for (StoreFileMetadata fileMetadata : filesToFetch) { + String file = fileMetadata.name(); + assert directoryFiles.contains(file) == false : "Local store already contains the file " + file; + toDownloadSegmentNames.add(file); } + indexShard.getFileDownloader() + .downloadAsync( + cancellableThreads, + remoteDirectory, + new ReplicationStatsDirectoryWrapper(storeDirectory, fileProgressTracker), + toDownloadSegmentNames, + ActionListener.map(listener, r -> new GetSegmentFilesResponse(filesToFetch)) + ); + } else { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } - listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); - } catch (Exception e) { + } catch (IOException | RuntimeException e) { listener.onFailure(e); } } + @Override + public void cancel() { + this.cancellableThreads.cancel("Canceled by target"); + } + @Override public String getDescription() { return "RemoteStoreReplicationSource"; } + + private boolean remoteMetadataExists() throws IOException { + final AtomicBoolean metadataExists = new AtomicBoolean(false); + cancellableThreads.executeIO(() -> metadataExists.set(remoteDirectory.readLatestMetadataFile() != null)); + return metadataExists.get(); + } + + private RemoteSegmentMetadata getRemoteSegmentMetadata() throws IOException { + AtomicReference mdFile = new AtomicReference<>(); + cancellableThreads.executeIO(() -> mdFile.set(remoteDirectory.init())); + return mdFile.get(); + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index ec6b4d06b32c3..aac59df4f6573 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -91,6 +91,9 @@ public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIn // make sure the store is not released until we are done. this.cancellableThreads = new CancellableThreads(); store.incRef(); + if (indexShard.indexSettings().isRemoteStoreEnabled()) { + indexShard.remoteStore().incRef(); + } } public long getId() { @@ -278,6 +281,12 @@ public abstract void writeFileChunk( ); protected void closeInternal() { - store.decRef(); + try { + store.decRef(); + } finally { + if (indexShard.indexSettings().isRemoteStoreEnabled()) { + indexShard.remoteStore().decRef(); + } + } } } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java b/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java index 588d9e8bb13a2..6d8b3fe4d69fb 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteStoreFileDownloaderTests.java @@ -9,12 +9,18 @@ package org.opensearch.index.store; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.NIOFSDirectory; +import org.opensearch.OpenSearchTimeoutException; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.CancellableThreads; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.test.OpenSearchTestCase; @@ -31,8 +37,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; public class RemoteStoreFileDownloaderTests extends OpenSearchTestCase { @@ -76,31 +84,132 @@ public void stopThreadPool() throws Exception { } public void testDownload() throws IOException { - fileDownloader.download(source, destination, files.keySet()); + final PlainActionFuture l = new PlainActionFuture<>(); + fileDownloader.downloadAsync(new CancellableThreads(), source, destination, files.keySet(), l); + l.actionGet(); assertContent(files, destination); } - public void testDownloadWithSecondDestination() throws IOException { + public void testDownloadWithSecondDestination() throws IOException, InterruptedException { fileDownloader.download(source, destination, secondDestination, files.keySet(), () -> {}); assertContent(files, destination); assertContent(files, secondDestination); } - public void testDownloadWithFileCompletionHandler() throws IOException { + public void testDownloadWithFileCompletionHandler() throws IOException, InterruptedException { final AtomicInteger counter = new AtomicInteger(0); fileDownloader.download(source, destination, null, files.keySet(), counter::incrementAndGet); assertContent(files, destination); assertEquals(files.size(), counter.get()); } - public void testDownloadNonExistentFile() { - assertThrows(NoSuchFileException.class, () -> fileDownloader.download(source, destination, Set.of("not real"))); + public void testDownloadNonExistentFile() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + fileDownloader.downloadAsync(new CancellableThreads(), source, destination, Set.of("not real"), new ActionListener<>() { + @Override + public void onResponse(Void unused) {} + + @Override + public void onFailure(Exception e) { + assertEquals(NoSuchFileException.class, e.getClass()); + latch.countDown(); + } + }); + assertTrue(latch.await(10, TimeUnit.SECONDS)); } - public void testDownloadExtraNonExistentFile() { - List filesWithExtra = new ArrayList<>(files.keySet()); + public void testDownloadExtraNonExistentFile() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final List filesWithExtra = new ArrayList<>(files.keySet()); filesWithExtra.add("not real"); - assertThrows(NoSuchFileException.class, () -> fileDownloader.download(source, destination, filesWithExtra)); + fileDownloader.downloadAsync(new CancellableThreads(), source, destination, filesWithExtra, new ActionListener<>() { + @Override + public void onResponse(Void unused) {} + + @Override + public void onFailure(Exception e) { + assertEquals(NoSuchFileException.class, e.getClass()); + latch.countDown(); + } + }); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + } + + public void testCancellable() { + final CancellableThreads cancellableThreads = new CancellableThreads(); + final PlainActionFuture blockingListener = new PlainActionFuture<>(); + final Directory blockingDestination = new FilterDirectory(destination) { + @Override + public void copyFrom(Directory from, String src, String dest, IOContext context) { + try { + Thread.sleep(60_000); // Will be interrupted + fail("Expected to be interrupted"); + } catch (InterruptedException e) { + throw new RuntimeException("Failed due to interrupt", e); + } + } + }; + fileDownloader.downloadAsync(cancellableThreads, source, blockingDestination, files.keySet(), blockingListener); + assertThrows( + "Expected to timeout due to blocking directory", + OpenSearchTimeoutException.class, + () -> blockingListener.actionGet(TimeValue.timeValueMillis(500)) + ); + cancellableThreads.cancel("test"); + assertThrows( + "Expected to complete with cancellation failure", + CancellableThreads.ExecutionCancelledException.class, + blockingListener::actionGet + ); + } + + public void testBlockingCallCanBeInterrupted() throws Exception { + final Directory blockingDestination = new FilterDirectory(destination) { + @Override + public void copyFrom(Directory from, String src, String dest, IOContext context) { + try { + Thread.sleep(60_000); // Will be interrupted + fail("Expected to be interrupted"); + } catch (InterruptedException e) { + throw new RuntimeException("Failed due to interrupt", e); + } + } + }; + final AtomicReference capturedException = new AtomicReference<>(); + final Thread thread = new Thread(() -> { + try { + fileDownloader.download(source, blockingDestination, null, files.keySet(), () -> {}); + } catch (Exception e) { + capturedException.set(e); + } + }); + thread.start(); + thread.interrupt(); + thread.join(); + assertEquals(InterruptedException.class, capturedException.get().getClass()); + } + + public void testIOException() throws IOException, InterruptedException { + final Directory failureDirectory = new FilterDirectory(destination) { + @Override + public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException { + throw new IOException("test"); + } + }; + assertThrows(IOException.class, () -> fileDownloader.download(source, failureDirectory, null, files.keySet(), () -> {})); + + final CountDownLatch latch = new CountDownLatch(1); + fileDownloader.downloadAsync(new CancellableThreads(), source, failureDirectory, files.keySet(), new ActionListener<>() { + @Override + public void onResponse(Void unused) {} + + @Override + public void onFailure(Exception e) { + assertEquals(IOException.class, e.getClass()); + latch.countDown(); + } + }); + assertTrue(latch.await(10, TimeUnit.SECONDS)); } private static void assertContent(Map expected, Directory destination) throws IOException { diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index 1bb1e44a8a600..0ee889af5ce1a 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -331,6 +331,12 @@ public static void blockNodeOnAnyFiles(String repository, String nodeName) { ); } + public static void blockNodeOnAnySegmentFile(String repository, String nodeName) { + ((MockRepository) internalCluster().getInstance(RepositoriesService.class, nodeName).repository(repository)).blockOnSegmentFiles( + true + ); + } + public static void blockDataNode(String repository, String nodeName) { ((MockRepository) internalCluster().getInstance(RepositoriesService.class, nodeName).repository(repository)).blockOnDataFiles(true); } diff --git a/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java index 7db71c4be0968..72c4ba44d0a31 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java @@ -139,6 +139,8 @@ public long getFailureCount() { private volatile boolean blockOnDataFiles; + private volatile boolean blockOnSegmentFiles; + private volatile boolean blockOnDeleteIndexN; /** @@ -190,6 +192,7 @@ public MockRepository( maximumNumberOfFailures = metadata.settings().getAsLong("max_failure_number", 100L); blockOnAnyFiles = metadata.settings().getAsBoolean("block_on_control", false); blockOnDataFiles = metadata.settings().getAsBoolean("block_on_data", false); + blockOnSegmentFiles = metadata.settings().getAsBoolean("block_on_segment", false); blockAndFailOnWriteSnapFile = metadata.settings().getAsBoolean("block_on_snap", false); randomPrefix = metadata.settings().get("random", "default"); waitAfterUnblock = metadata.settings().getAsLong("wait_after_unblock", 0L); @@ -237,6 +240,7 @@ public synchronized void unblock() { blocked = false; // Clean blocking flags, so we wouldn't try to block again blockOnDataFiles = false; + blockOnSegmentFiles = false; blockOnAnyFiles = false; blockAndFailOnWriteIndexFile = false; blockOnWriteIndexFile = false; @@ -259,6 +263,14 @@ public void setBlockOnAnyFiles(boolean blocked) { blockOnAnyFiles = blocked; } + public void blockOnSegmentFiles(boolean blocked) { + blockOnSegmentFiles = blocked; + } + + public void setBlockOnSegmentFiles(boolean blocked) { + blockOnSegmentFiles = blocked; + } + public void setBlockAndFailOnWriteSnapFiles(boolean blocked) { blockAndFailOnWriteSnapFile = blocked; } @@ -306,6 +318,7 @@ private synchronized boolean blockExecution() { boolean wasBlocked = false; try { while (blockOnDataFiles + || blockOnSegmentFiles || blockOnAnyFiles || blockAndFailOnWriteIndexFile || blockOnWriteIndexFile @@ -407,6 +420,8 @@ private void maybeIOExceptionOrBlock(String blobName) throws IOException { blockExecutionAndMaybeWait(blobName); } else if (blobName.startsWith("snap-") && blockAndFailOnWriteSnapFile) { blockExecutionAndFail(blobName); + } else if (blockOnSegmentFiles && blobName.contains(".si__")) { + blockExecutionAndMaybeWait(blobName); } } }