Skip to content

Commit

Permalink
Correctly clone IndexInputs to avoid race
Browse files Browse the repository at this point in the history
In PR opensearch-project#6345 I did remove a duplicate clone, however this resulted in
cloning the IndexInput in the wrong place. When requesting a file that
needs to be downloaded, we have a mechanism to ensure that concurrent
calls do not end up duplicating the download, which results in multiple
threads being given the same instance. The clone must happen _after_
this point to ensure that each thread gets its own clone.

Signed-off-by: Andrew Ross <andrross@amazon.com>
  • Loading branch information
andrross committed Feb 18, 2023
1 parent cf5b295 commit 41b1d0d
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ public TransferManager(final BlobContainer blobContainer, final ExecutorService
/**
* Given a blobFetchRequest, return it's corresponding IndexInput.
* @param blobFetchRequest to fetch
* @return
* @return The IndexInput of the requested data
*/
public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) {
try {
final IndexInput origin = fetchOriginIndexInput(blobFetchRequest).get();
// The origin instances stays in the cache with a ref count of zero
// and must be cloned before being returned.
return origin.clone();
} catch (InterruptedException | ExecutionException e) {
if (e instanceof InterruptedException) {
Expand Down Expand Up @@ -94,7 +96,7 @@ private CompletableFuture<IndexInput> fetchOriginIndexInput(BlobFetchRequest blo
return cachedIndexInput;
});

if (Objects.isNull(origin)) {
if (origin == null) {
// origin is not in file cache, download origin

// open new origin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.io.EOFException;
import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

Expand Down Expand Up @@ -142,10 +141,8 @@ private OnDemandBlockSnapshotIndexInput createOnDemandBlockSnapshotIndexInput(in

doAnswer(invocation -> {
BlobFetchRequest blobFetchRequest = invocation.getArgument(0);
return CompletableFuture.completedFuture(
blobFetchRequest.getDirectory().openInput(blobFetchRequest.getFileName(), IOContext.READ)
);
}).when(transferManager).asyncFetchBlob(any());
return blobFetchRequest.getDirectory().openInput(blobFetchRequest.getFileName(), IOContext.READ);
}).when(transferManager).fetchBlob(any());

FSDirectory directory = null;
try {
Expand Down

0 comments on commit 41b1d0d

Please sign in to comment.