Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
andrross committed Feb 18, 2023
1 parent dae1566 commit cf5b295
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,13 @@

package org.opensearch.index.store.remote.file;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IndexInput;
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
import org.opensearch.index.store.remote.utils.BlobFetchRequest;
import org.opensearch.index.store.remote.utils.TransferManager;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

/**
* This is an implementation of {@link OnDemandBlockIndexInput} where this class provides the main IndexInput using shard snapshot files.
Expand All @@ -28,8 +24,6 @@
* @opensearch.internal
*/
public class OnDemandBlockSnapshotIndexInput extends OnDemandBlockIndexInput {
private static final Logger logger = LogManager.getLogger(OnDemandBlockSnapshotIndexInput.class);

/**
* Where this class fetches IndexInput parts from
*/
Expand Down Expand Up @@ -146,12 +140,7 @@ protected IndexInput fetchBlock(int blockId) throws IOException {
.directory(directory)
.fileName(blockFileName)
.build();
try {
return transferManager.asyncFetchBlob(blobFetchRequest).get();
} catch (InterruptedException | ExecutionException e) {
logger.error(() -> new ParameterizedMessage("unexpected failure while fetching [{}]", blobFetchRequest), e);
throw new IllegalStateException(e);
}
return transferManager.fetchBlob(blobFetchRequest);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

Expand All @@ -50,59 +52,68 @@ public TransferManager(final BlobContainer blobContainer, final ExecutorService
/**
* Given a blobFetchRequest, return it's corresponding IndexInput.
* @param blobFetchRequest to fetch
* @return future of IndexInput augmented with internal caching maintenance tasks
* @return
*/
public CompletableFuture<IndexInput> asyncFetchBlob(BlobFetchRequest blobFetchRequest) {
return asyncFetchBlob(blobFetchRequest.getFilePath(), () -> {
try {
return fetchBlob(blobFetchRequest);
} catch (IOException e) {
throw new IllegalStateException(e);
public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) {
try {
final IndexInput origin = fetchOriginIndexInput(blobFetchRequest).get();
return origin.clone();
} catch (InterruptedException | ExecutionException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
});
}

private CompletableFuture<IndexInput> asyncFetchBlob(Path path, Supplier<IndexInput> indexInputSupplier) {
return invocationLinearizer.linearize(path, p -> indexInputSupplier.get());
throw new RuntimeException(e);
}
}

/*
This method accessed through the ConcurrentInvocationLinearizer so read-check-write is acceptable here
/**
* Fetches the "origin" IndexInput used in the cache. This instance must
* always be cloned before being returned. Concurrent calls to this method
* will be returned the same CompletableFuture instance to avoid duplicate
* downloads, and therefore must independently clone the returned IndexInput.
* See {@link #fetchBlob}.
*/
private IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException {
// check if the origin is already in block cache
IndexInput origin = fileCache.computeIfPresent(blobFetchRequest.getFilePath(), (path, cachedIndexInput) -> {
if (cachedIndexInput.isClosed()) {
// if it's already in the file cache, but closed, open it and replace the original one
try {
IndexInput luceneIndexInput = blobFetchRequest.getDirectory().openInput(blobFetchRequest.getFileName(), IOContext.READ);
return new FileCachedIndexInput(fileCache, blobFetchRequest.getFilePath(), luceneIndexInput);
} catch (IOException ioe) {
logger.warn("Open index input " + blobFetchRequest.getFilePath() + " got error ", ioe);
// open failed so return null to download the file again
return null;
}
private CompletableFuture<IndexInput> fetchOriginIndexInput(BlobFetchRequest blobFetchRequest) {
return invocationLinearizer.linearize(blobFetchRequest.getFilePath(), p -> {
// check if the origin is already in block cache
IndexInput origin = fileCache.computeIfPresent(blobFetchRequest.getFilePath(), (path, cachedIndexInput) -> {
if (cachedIndexInput.isClosed()) {
// if it's already in the file cache, but closed, open it and replace the original one
try {
IndexInput luceneIndexInput =
blobFetchRequest.getDirectory().openInput(blobFetchRequest.getFileName(), IOContext.READ);
return new FileCachedIndexInput(fileCache, blobFetchRequest.getFilePath(), luceneIndexInput);
} catch (IOException ioe) {
logger.warn("Open index input " + blobFetchRequest.getFilePath() + " got error ", ioe);
// open failed so return null to download the file again
return null;
}

}
// already in the cache and ready to be used (open)
return cachedIndexInput;
});
}
// already in the cache and ready to be used (open)
return cachedIndexInput;
});

if (Objects.isNull(origin)) {
// origin is not in file cache, download origin
if (Objects.isNull(origin)) {
// origin is not in file cache, download origin

// open new origin
IndexInput downloaded = downloadBlockLocally(blobFetchRequest);
// open new origin
final IndexInput downloaded;
try {
downloaded = downloadBlockLocally(blobFetchRequest);
} catch (IOException e) {
throw new UncheckedIOException(e);
}

// refcount = 0 at the beginning
FileCachedIndexInput newOrigin = new FileCachedIndexInput(fileCache, blobFetchRequest.getFilePath(), downloaded);
// refcount = 0 at the beginning
FileCachedIndexInput newOrigin = new FileCachedIndexInput(fileCache, blobFetchRequest.getFilePath(), downloaded);

// put origin into file cache
fileCache.put(blobFetchRequest.getFilePath(), newOrigin);
origin = newOrigin;
}
// always, need to clone to do refcount += 1, and rely on GC to clean these IndexInput which will refcount -= 1
return origin.clone();
// put origin into file cache
fileCache.put(blobFetchRequest.getFilePath(), newOrigin);
origin = newOrigin;
}
return origin;
});
}

private IndexInput downloadBlockLocally(BlobFetchRequest blobFetchRequest) throws IOException {
Expand Down

0 comments on commit cf5b295

Please sign in to comment.