Skip to content

Commit

Permalink
Correctly clone IndexInputs to avoid race
Browse files Browse the repository at this point in the history
In PR opensearch-project#6345 I did remove a duplicate clone, however this resulted in
cloning the IndexInput in the wrong place. When requesting a file that
needs to be downloaded, we have a mechanism to ensure that concurrent
calls do not end up duplicating the download, which results in multiple
threads being given the same instance. The clone must happen _after_
this point to ensure that each thread gets its own clone.

Signed-off-by: Andrew Ross <andrross@amazon.com>
  • Loading branch information
andrross committed Feb 20, 2023
1 parent 70d25ef commit cc23e5a
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

/**
* This acts as entry point to fetch {@link BlobFetchRequest} and return actual {@link IndexInput}. Utilizes the BlobContainer interface to
Expand Down Expand Up @@ -53,21 +52,20 @@ public TransferManager(final BlobContainer blobContainer, final ExecutorService
* @return future of IndexInput augmented with internal caching maintenance tasks
*/
public CompletableFuture<IndexInput> asyncFetchBlob(BlobFetchRequest blobFetchRequest) {
return asyncFetchBlob(blobFetchRequest.getFilePath(), () -> {
return invocationLinearizer.linearize(blobFetchRequest.getFilePath(), p -> {
try {
return fetchBlob(blobFetchRequest);
} catch (IOException e) {
throw new IllegalStateException(e);
}
});
}

private CompletableFuture<IndexInput> asyncFetchBlob(Path path, Supplier<IndexInput> indexInputSupplier) {
return invocationLinearizer.linearize(path, p -> indexInputSupplier.get());
}).thenApply(IndexInput::clone);
}

/*
This method accessed through the ConcurrentInvocationLinearizer so read-check-write is acceptable here
/**
* Fetches the "origin" IndexInput from the cache, downloading it first if it is
* not already cached. This instance must be cloned before using. This method is
* accessed through the ConcurrentInvocationLinearizer so read-check-write is
* acceptable here
*/
private IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException {
// check if the origin is already in block cache
Expand Down Expand Up @@ -101,8 +99,7 @@ private IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExcepti
fileCache.put(blobFetchRequest.getFilePath(), newOrigin);
origin = newOrigin;
}
// always, need to clone to do refcount += 1, and rely on GC to clean these IndexInput which will refcount -= 1
return origin.clone();
return origin;
}

private IndexInput downloadBlockLocally(BlobFetchRequest blobFetchRequest) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store.remote.utils;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.SimpleFSLockFactory;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Before;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheFactory;
import org.opensearch.test.OpenSearchTestCase;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;

@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public class TransferManagerTests extends OpenSearchTestCase {
private final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(1024 * 1024, 8);
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private MMapDirectory directory;
private BlobContainer blobContainer;
private TransferManager transferManager;

@Before
public void setUp() throws Exception {
super.setUp();
directory = new MMapDirectory(createTempDir(), SimpleFSLockFactory.INSTANCE);
blobContainer = mock(BlobContainer.class);
doAnswer(i -> new ByteArrayInputStream(new byte[] { 0, 1, 2, 3, 4, 5, 6, 7 })).when(blobContainer).readBlob("blob", 0, 8);
transferManager = new TransferManager(blobContainer, executor, fileCache);
}

@After
public void tearDown() throws Exception {
super.tearDown();
executor.shutdown();
assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS));
}

public void testSingleAccess() throws Exception {
try (IndexInput i = fetchBlob()) {
i.seek(7);
MatcherAssert.assertThat(i.readByte(), equalTo((byte) 7));
}
}

public void testConcurrentAccess() throws Exception {
// Kick off multiple threads that all concurrently request the same resource
final ExecutorService testRunner = Executors.newFixedThreadPool(8);
try {
final List<Future<IndexInput>> futures = new ArrayList<>();
for (int i = 0; i < 8; i++) {
futures.add(testRunner.submit(this::fetchBlob));
}
// Wait for all threads to complete
for (Future<IndexInput> future : futures) {
future.get(1, TimeUnit.SECONDS);
}
// Assert that all IndexInputs are independently positioned by seeking
// to the end and closing each one. If not independent, then this would
// result in EOFExceptions and/or NPEs.
for (Future<IndexInput> future : futures) {
try (IndexInput i = future.get()) {
i.seek(7);
MatcherAssert.assertThat(i.readByte(), equalTo((byte) 7));
}
}
} finally {
testRunner.shutdown();
assertTrue(testRunner.awaitTermination(1, TimeUnit.SECONDS));
}
}

private IndexInput fetchBlob() throws ExecutionException, InterruptedException {
return transferManager.asyncFetchBlob(
BlobFetchRequest.builder().blobName("blob").position(0).fileName("file").directory(directory).length(8).build()
).get();
}
}

0 comments on commit cc23e5a

Please sign in to comment.