Skip to content

Commit

Permalink
Retry download of RemoteFSTranslog to fix transient race conditions (o…
Browse files Browse the repository at this point in the history
…pensearch-project#9565)

Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
gbbafna authored and shiv0408 committed Apr 25, 2024
1 parent 43dad0f commit 804a8d8
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ protected Settings featureFlagSettings() {
.build();
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9191")
public void testPrimaryRelocationWhileIndexing() throws Exception {
super.testPrimaryRelocationWhileIndexing();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Locale;
Expand Down Expand Up @@ -68,6 +70,7 @@ public class RemoteFsTranslog extends Translog {
private final SetOnce<Boolean> olderPrimaryCleaned = new SetOnce<>();

private static final int REMOTE_DELETION_PERMITS = 2;
private static final int DOWNLOAD_RETRIES = 2;
public static final String TRANSLOG = "translog";

// Semaphore used to allow only single remote generation to happen at a time
Expand Down Expand Up @@ -161,7 +164,28 @@ public static void download(Repository repository, ShardId shardId, ThreadPool t
RemoteFsTranslog.download(translogTransferManager, location, logger);
}

public static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
/*
In Primary to Primary relocation , there can be concurrent upload and download of translog.
While translog files are getting downloaded by new primary, it might hence be deleted by the primary
Hence we retry if tlog/ckp files are not found .
This doesn't happen in last download , where it is ensured that older primary has stopped modifying tlog data.
*/
IOException ex = null;
for (int i = 0; i <= DOWNLOAD_RETRIES; i++) {
try {
downloadOnce(translogTransferManager, location, logger);
return;
} catch (FileNotFoundException | NoSuchFileException e) {
// continue till download retries
ex = e;
}
}
throw ex;
}

static private void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
logger.trace("Downloading translog files from remote");
RemoteTranslogTransferTracker statsTracker = translogTransferManager.getRemoteTranslogTransferTracker();
long prevDownloadBytesSucceeded = statsTracker.getDownloadBytesSucceeded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.opensearch.index.seqno.LocalCheckpointTrackerTests;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand All @@ -59,12 +61,14 @@

import java.io.Closeable;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -98,6 +102,10 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@LuceneTestCase.SuppressFileSystems("ExtrasFS")

Expand Down Expand Up @@ -1466,6 +1474,46 @@ public void testCloseIntoReader() throws IOException {
}
}

public void testDownloadWithRetries() throws IOException {
long generation = 1, primaryTerm = 1;
Path location = createTempDir();
TranslogTransferMetadata translogTransferMetadata = new TranslogTransferMetadata(primaryTerm, generation, generation, 1);
Map<String, String> generationToPrimaryTermMapper = new HashMap<>();
generationToPrimaryTermMapper.put(String.valueOf(generation), String.valueOf(primaryTerm));
translogTransferMetadata.setGenerationToPrimaryTermMapper(generationToPrimaryTermMapper);

TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class);
RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class);
when(mockTransfer.readMetadata()).thenReturn(translogTransferMetadata);
when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker);

// Always File not found
when(mockTransfer.downloadTranslog(any(), any(), any())).thenThrow(new NoSuchFileException("File not found"));
TranslogTransferManager finalMockTransfer = mockTransfer;
assertThrows(NoSuchFileException.class, () -> RemoteFsTranslog.download(finalMockTransfer, location, logger));

// File not found in first attempt . File found in second attempt.
mockTransfer = mock(TranslogTransferManager.class);
when(mockTransfer.readMetadata()).thenReturn(translogTransferMetadata);
when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker);
String msg = "File not found";
Exception toThrow = randomBoolean() ? new NoSuchFileException(msg) : new FileNotFoundException(msg);
when(mockTransfer.downloadTranslog(any(), any(), any())).thenThrow(toThrow).thenReturn(true);

AtomicLong downloadCounter = new AtomicLong();
doAnswer(invocation -> {
if (downloadCounter.incrementAndGet() <= 1) {
throw new NoSuchFileException("File not found");
} else if (downloadCounter.get() == 2) {
Files.createFile(location.resolve(Translog.getCommitCheckpointFileName(generation)));
}
return true;
}).when(mockTransfer).downloadTranslog(any(), any(), any());

// no exception thrown
RemoteFsTranslog.download(mockTransfer, location, logger);
}

public class ThrowingBlobRepository extends FsRepository {
private final Environment environment;

Expand Down

0 comments on commit 804a8d8

Please sign in to comment.