From dae1566c4ac7395bc4202cd0571ca31f72bbdeb1 Mon Sep 17 00:00:00 2001 From: Andrew Ross Date: Fri, 17 Feb 2023 10:03:02 -0800 Subject: [PATCH] Use java.lang.ref.Cleaner to close cloned IndexInputs (#6351) As detailed [in this issue][1], Lucene does not close the cloned IndexInput instances, so we are using the Cleaner mechanism from the JDK to close any unclosed clones. A single static Cleaner instance to ensure any unclosed clone of an IndexInput is closed. This instance creates a single daemon thread on which it performs the cleaning actions. For an already-closed IndexInput, the cleaning action is a no-op. For an open IndexInput, the close action will decrement a reference count. [1]: https://github.com/opensearch-project/OpenSearch/issues/5243#issuecomment-1399037364 Signed-off-by: Andrew Ross --- .../snapshots/SearchableSnapshotIT.java | 5 +- .../remote/file/OnDemandBlockIndexInput.java | 175 ++++++++++++------ .../file/OnDemandBlockSnapshotIndexInput.java | 6 +- .../file/CleanerDaemonThreadLeakFilter.java | 23 +++ ...OnDemandBlockIndexInputLifecycleTests.java | 173 +++++++++++++++++ .../OnDemandBlockSnapshotIndexInputTests.java | 23 +-- 6 files changed, 331 insertions(+), 74 deletions(-) create mode 100644 server/src/test/java/org/opensearch/index/store/remote/file/CleanerDaemonThreadLeakFilter.java create mode 100644 server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInputLifecycleTests.java diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java index e06c220e2caf9..823609e57fca0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java @@ -4,6 +4,7 @@ */ package org.opensearch.snapshots; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import org.hamcrest.MatcherAssert; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; @@ -27,6 +28,7 @@ import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.Index; import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; import org.opensearch.monitor.fs.FsInfo; import org.opensearch.repositories.fs.FsRepository; @@ -34,14 +36,15 @@ import java.nio.file.Path; import java.util.List; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.notNullValue; import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.FS; import static org.opensearch.common.util.CollectionUtils.iterableAsArrayList; +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) public final class SearchableSnapshotIT extends AbstractSnapshotIntegTestCase { @Override diff --git a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInput.java index c80c29dbc845a..e97f093bb7703 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInput.java @@ -8,11 +8,17 @@ package org.opensearch.index.store.remote.file; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.RandomAccessInput; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import java.io.Closeable; import java.io.EOFException; import java.io.IOException; +import java.lang.ref.Cleaner; +import java.util.Objects; /** * Class acts as a virtual file mechanism for the accessed files and only fetches the required blocks of the actual file. @@ -22,9 +28,27 @@ * This class delegate the responsibility of actually fetching the block when demanded to its subclasses using * {@link OnDemandBlockIndexInput#fetchBlock(int)}. * + * Like {@link IndexInput}, this class may only be used from one thread as it is not thread safe. + * However, a cleaning action may run from another thread triggered by the {@link Cleaner}, but + * this is okay because at that point the {@link OnDemandBlockIndexInput} instance is phantom + * reachable and therefore not able to be accessed by any other thread. + * * @opensearch.internal */ abstract class OnDemandBlockIndexInput extends IndexInput implements RandomAccessInput { + private static final Logger logger = LogManager.getLogger(OnDemandBlockIndexInput.class); + + public static final String CLEANER_THREAD_NAME_PREFIX = "index-input-cleaner"; + + /** + * A single static Cleaner instance to ensure any unclosed clone of an + * IndexInput is closed. This instance creates a single daemon thread on + * which it performs the cleaning actions. For an already-closed IndexInput, + * the cleaning action is a no-op. For an open IndexInput, the close action + * will decrement a reference count. + */ + private static final Cleaner CLEANER = Cleaner.create(OpenSearchExecutors.daemonThreadFactory(CLEANER_THREAD_NAME_PREFIX)); + /** * Start offset of the virtual file : non-zero in the slice case */ @@ -55,16 +79,12 @@ abstract class OnDemandBlockIndexInput extends IndexInput implements RandomAcces */ protected final int blockMask; - // Variables for actual held open block - /** - * Current block for read, it should be a cloned block always. In current implementation this will be a FileCachedIndexInput - */ - protected IndexInput currentBlock; - /** * ID of the current block */ - protected int currentBlockId; + private int currentBlockId; + + private final BlockHolder blockHolder = new BlockHolder(); OnDemandBlockIndexInput(Builder builder) { super(builder.resourceDescription); @@ -74,6 +94,7 @@ abstract class OnDemandBlockIndexInput extends IndexInput implements RandomAcces this.blockSizeShift = builder.blockSizeShift; this.blockSize = builder.blockSize; this.blockMask = builder.blockMask; + CLEANER.register(this, blockHolder); } /** @@ -89,16 +110,7 @@ abstract class OnDemandBlockIndexInput extends IndexInput implements RandomAcces protected abstract IndexInput fetchBlock(int blockId) throws IOException; @Override - public OnDemandBlockIndexInput clone() { - OnDemandBlockIndexInput clone = buildSlice("clone", offset, length()); - // Ensures that clones may be positioned at the same point as the blocked file they were cloned from - if (currentBlock != null) { - clone.currentBlock = currentBlock.clone(); - clone.currentBlockId = currentBlockId; - } - - return clone; - } + public abstract OnDemandBlockIndexInput clone(); @Override public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { @@ -123,17 +135,13 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw @Override public void close() throws IOException { - // current block - if (currentBlock != null) { - currentBlock.close(); - currentBlock = null; - currentBlockId = 0; - } + blockHolder.close(); + currentBlockId = 0; } @Override public long getFilePointer() { - if (currentBlock == null) return 0L; + if (blockHolder.block == null) return 0L; return currentBlockStart() + currentBlockPosition() - offset; } @@ -144,20 +152,20 @@ public long length() { @Override public byte readByte() throws IOException { - if (currentBlock == null) { + if (blockHolder.block == null) { // seek to the beginning seek(0); } else if (currentBlockPosition() >= blockSize) { int blockId = currentBlockId + 1; demandBlock(blockId); } - return currentBlock.readByte(); + return blockHolder.block.readByte(); } @Override public short readShort() throws IOException { - if (currentBlock != null && Short.BYTES <= (blockSize - currentBlockPosition())) { - return currentBlock.readShort(); + if (blockHolder.block != null && Short.BYTES <= (blockSize - currentBlockPosition())) { + return blockHolder.block.readShort(); } else { return super.readShort(); } @@ -165,8 +173,8 @@ public short readShort() throws IOException { @Override public int readInt() throws IOException { - if (currentBlock != null && Integer.BYTES <= (blockSize - currentBlockPosition())) { - return currentBlock.readInt(); + if (blockHolder.block != null && Integer.BYTES <= (blockSize - currentBlockPosition())) { + return blockHolder.block.readInt(); } else { return super.readInt(); } @@ -174,8 +182,8 @@ public int readInt() throws IOException { @Override public long readLong() throws IOException { - if (currentBlock != null && Long.BYTES <= (blockSize - currentBlockPosition())) { - return currentBlock.readLong(); + if (blockHolder.block != null && Long.BYTES <= (blockSize - currentBlockPosition())) { + return blockHolder.block.readLong(); } else { return super.readLong(); } @@ -183,8 +191,8 @@ public long readLong() throws IOException { @Override public final int readVInt() throws IOException { - if (currentBlock != null && 5 <= (blockSize - currentBlockPosition())) { - return currentBlock.readVInt(); + if (blockHolder.block != null && 5 <= (blockSize - currentBlockPosition())) { + return blockHolder.block.readVInt(); } else { return super.readVInt(); } @@ -192,8 +200,8 @@ public final int readVInt() throws IOException { @Override public final long readVLong() throws IOException { - if (currentBlock != null && 9 <= (blockSize - currentBlockPosition())) { - return currentBlock.readVLong(); + if (blockHolder.block != null && 9 <= (blockSize - currentBlockPosition())) { + return blockHolder.block.readVLong(); } else { return super.readVLong(); } @@ -212,14 +220,14 @@ public void seek(long pos) throws IOException { public final byte readByte(long pos) throws IOException { // adjust the pos if it's sliced pos = pos + offset; - if (currentBlock != null && isInCurrentBlockRange(pos)) { + if (blockHolder.block != null && isInCurrentBlockRange(pos)) { // the block contains the byte - return ((RandomAccessInput) currentBlock).readByte(getBlockOffset(pos)); + return ((RandomAccessInput) blockHolder.block).readByte(getBlockOffset(pos)); } else { // the block does not have the byte, seek to the pos first seekInternal(pos); // then read the byte - return currentBlock.readByte(); + return blockHolder.block.readByte(); } } @@ -227,9 +235,9 @@ public final byte readByte(long pos) throws IOException { public short readShort(long pos) throws IOException { // adjust the pos if it's sliced pos = pos + offset; - if (currentBlock != null && isInCurrentBlockRange(pos, Short.BYTES)) { + if (blockHolder.block != null && isInCurrentBlockRange(pos, Short.BYTES)) { // the block contains enough data to satisfy this request - return ((RandomAccessInput) currentBlock).readShort(getBlockOffset(pos)); + return ((RandomAccessInput) blockHolder.block).readShort(getBlockOffset(pos)); } else { // the block does not have enough data, seek to the pos first seekInternal(pos); @@ -242,9 +250,9 @@ public short readShort(long pos) throws IOException { public int readInt(long pos) throws IOException { // adjust the pos if it's sliced pos = pos + offset; - if (currentBlock != null && isInCurrentBlockRange(pos, Integer.BYTES)) { + if (blockHolder.block != null && isInCurrentBlockRange(pos, Integer.BYTES)) { // the block contains enough data to satisfy this request - return ((RandomAccessInput) currentBlock).readInt(getBlockOffset(pos)); + return ((RandomAccessInput) blockHolder.block).readInt(getBlockOffset(pos)); } else { // the block does not have enough data, seek to the pos first seekInternal(pos); @@ -257,9 +265,9 @@ public int readInt(long pos) throws IOException { public long readLong(long pos) throws IOException { // adjust the pos if it's sliced pos = pos + offset; - if (currentBlock != null && isInCurrentBlockRange(pos, Long.BYTES)) { + if (blockHolder.block != null && isInCurrentBlockRange(pos, Long.BYTES)) { // the block contains enough data to satisfy this request - return ((RandomAccessInput) currentBlock).readLong(getBlockOffset(pos)); + return ((RandomAccessInput) blockHolder.block).readLong(getBlockOffset(pos)); } else { // the block does not have enough data, seek to the pos first seekInternal(pos); @@ -270,7 +278,7 @@ public long readLong(long pos) throws IOException { @Override public final void readBytes(byte[] b, int offset, int len) throws IOException { - if (currentBlock == null) { + if (blockHolder.block == null) { // lazy seek to the beginning seek(0); } @@ -278,11 +286,11 @@ public final void readBytes(byte[] b, int offset, int len) throws IOException { int available = blockSize - currentBlockPosition(); if (len <= available) { // the block contains enough data to satisfy this request - currentBlock.readBytes(b, offset, len); + blockHolder.block.readBytes(b, offset, len); } else { // the block does not have enough data. First serve all we've got. if (available > 0) { - currentBlock.readBytes(b, offset, available); + blockHolder.block.readBytes(b, offset, available); offset += available; len -= available; } @@ -293,7 +301,7 @@ public final void readBytes(byte[] b, int offset, int len) throws IOException { int blockId = currentBlockId + 1; int toRead = Math.min(len, blockSize); demandBlock(blockId); - currentBlock.readBytes(b, offset, toRead); + blockHolder.block.readBytes(b, offset, toRead); offset += toRead; len -= toRead; } @@ -306,10 +314,10 @@ public final void readBytes(byte[] b, int offset, int len) throws IOException { * NOTE: the pos should be an adjusted position for slices */ private void seekInternal(long pos) throws IOException { - if (currentBlock == null || !isInCurrentBlockRange(pos)) { + if (blockHolder.block == null || !isInCurrentBlockRange(pos)) { demandBlock(getBlock(pos)); } - currentBlock.seek(getBlockOffset(pos)); + blockHolder.block.seek(getBlockOffset(pos)); } /** @@ -331,17 +339,22 @@ private boolean isInCurrentBlockRange(long pos, int len) { } private void demandBlock(int blockId) throws IOException { - if (currentBlock != null && currentBlockId == blockId) return; + if (blockHolder.block != null && currentBlockId == blockId) return; // close the current block before jumping to the new block - if (currentBlock != null) { - currentBlock.close(); - } + blockHolder.close(); - currentBlock = fetchBlock(blockId); + blockHolder.set(fetchBlock(blockId)); currentBlockId = blockId; } + protected void cloneBlock(OnDemandBlockIndexInput other) { + if (other.blockHolder.block != null) { + this.blockHolder.set(other.blockHolder.block.clone()); + this.currentBlockId = other.currentBlockId; + } + } + protected int getBlock(long pos) { return (int) (pos >>> blockSizeShift); } @@ -359,7 +372,7 @@ protected long currentBlockStart() { } protected int currentBlockPosition() { - return (int) currentBlock.getFilePointer(); + return (int) blockHolder.block.getFilePointer(); } public static Builder builder() { @@ -409,4 +422,52 @@ Builder blockSizeShift(int blockSizeShift) { return this; } } + + /** + * Simple class to hold the currently open IndexInput backing an instance + * of an {@link OnDemandBlockIndexInput}. Lucene may clone one of these + * instances, and per the contract[1], the clones will never be closed. + * However, closing the instances is critical for our reference counting. + * Therefore, we are using the {@link Cleaner} mechanism from the JDK to + * close these clones when they become phantom reachable. The clean action + * must not hold a reference to the {@link OnDemandBlockIndexInput} itself + * (otherwise it would never become phantom reachable!) so we need a wrapper + * instance to hold the current underlying IndexInput, while allowing it to + * be changed out with different instances as {@link OnDemandBlockIndexInput} + * reads through the data. + * + * This class implements {@link Runnable} so that it can be passed directly + * to the cleaner to run its close action. + * + * [1]: https://github.com/apache/lucene/blob/8340b01c3cc229f33584ce2178b07b8984daa6a9/lucene/core/src/java/org/apache/lucene/store/IndexInput.java#L32-L33 + */ + private static class BlockHolder implements Closeable, Runnable { + private volatile IndexInput block; + + private void set(IndexInput block) { + if (this.block != null) { + throw new IllegalStateException("Previous block was not closed!"); + } + this.block = Objects.requireNonNull(block); + } + + @Override + public void close() throws IOException { + if (block != null) { + block.close(); + block = null; + } + } + + @Override + public void run() { + try { + close(); + } catch (IOException e) { + // Exceptions thrown in the cleaning action are ignored, + // so log and swallow the exception here + logger.info("Exception thrown while closing block owned by phantom reachable instance", e); + } + } + } } diff --git a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java index a724597069ebc..f147835877ed1 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java @@ -158,11 +158,7 @@ protected IndexInput fetchBlock(int blockId) throws IOException { public OnDemandBlockSnapshotIndexInput clone() { OnDemandBlockSnapshotIndexInput clone = buildSlice("clone", 0L, this.length); // ensures that clones may be positioned at the same point as the blocked file they were cloned from - if (currentBlock != null) { - clone.currentBlock = currentBlock.clone(); - clone.currentBlockId = currentBlockId; - } - + clone.cloneBlock(this); return clone; } diff --git a/server/src/test/java/org/opensearch/index/store/remote/file/CleanerDaemonThreadLeakFilter.java b/server/src/test/java/org/opensearch/index/store/remote/file/CleanerDaemonThreadLeakFilter.java new file mode 100644 index 0000000000000..8f7568a68f0d0 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/remote/file/CleanerDaemonThreadLeakFilter.java @@ -0,0 +1,23 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.file; + +import com.carrotsearch.randomizedtesting.ThreadFilter; + +/** + * The {@link java.lang.ref.Cleaner} instance used by {@link OnDemandBlockSnapshotIndexInput} creates + * a daemon thread which is never stopped, nor do we have a handle to stop it. This filter + * excludes that thread from the leak detection logic. + */ +public final class CleanerDaemonThreadLeakFilter implements ThreadFilter { + @Override + public boolean reject(Thread t) { + return t.getName().startsWith(OnDemandBlockSnapshotIndexInput.CLEANER_THREAD_NAME_PREFIX); + } +} diff --git a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInputLifecycleTests.java b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInputLifecycleTests.java new file mode 100644 index 0000000000000..4c6138d66d2f0 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInputLifecycleTests.java @@ -0,0 +1,173 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.file; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.apache.lucene.store.IndexInput; +import org.hamcrest.MatcherAssert; +import org.junit.After; +import org.opensearch.test.OpenSearchTestCase; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import static org.hamcrest.Matchers.hasSize; + +/** + * Unit test to ensure that {@link OnDemandBlockIndexInput} properly closes + * all of its backing IndexInput instances, as the reference counting logic + * relies on this behavior. + */ +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) +public class OnDemandBlockIndexInputLifecycleTests extends OpenSearchTestCase { + private static final int ONE_MB_SHIFT = 20; + private static final int ONE_MB = 1 << ONE_MB_SHIFT; + private static final int TWO_MB = ONE_MB * 2; + + private final List allIndexInputs = new ArrayList<>(); + + @After + public void tearDown() throws Exception { + super.tearDown(); + + assertBusy(() -> { + System.gc(); // Do not rely on GC to be deterministic, hence the polling + assertTrue("Expected all IndexInputs to be closed", allIndexInputs.stream().allMatch(CloseTrackingIndexInput::isClosed)); + }, 5, TimeUnit.SECONDS); + } + + public void testClose() throws IOException { + try (OnDemandBlockIndexInput indexInput = createTestOnDemandBlockIndexInput()) { + indexInput.seek(0); + } + } + + public void testCloseWhenSeekingMultipleChunks() throws IOException { + try (OnDemandBlockIndexInput indexInput = createTestOnDemandBlockIndexInput()) { + indexInput.seek(0); + indexInput.seek(ONE_MB + 1); + } + MatcherAssert.assertThat("Expected to seek past first block and create a second block", allIndexInputs, hasSize(2)); + } + + public void testUnclosedCloneIsClosed() throws IOException { + try (OnDemandBlockIndexInput indexInput = createTestOnDemandBlockIndexInput()) { + indexInput.seek(0); + + // Clone is abandoned without closing + indexInput.clone().seek(0); + } + } + + public void testUnclosedSliceIsClosed() throws IOException { + try (OnDemandBlockIndexInput indexInput = createTestOnDemandBlockIndexInput()) { + indexInput.seek(0); + + // Clone is abandoned without closing + indexInput.slice("slice", 0, 100).seek(0); + } + } + + private OnDemandBlockIndexInput createTestOnDemandBlockIndexInput() { + return new TestOnDemandBlockIndexInput(this::createCloseTrackingIndexInput, false); + } + + private IndexInput createCloseTrackingIndexInput() { + final CloseTrackingIndexInput i = new CloseTrackingIndexInput(); + allIndexInputs.add(i); + return i; + } + + /** + * Concrete implementation of {@link OnDemandBlockIndexInput} that creates + * {@link CloseTrackingIndexInput} index inputs when it needs to fetch a + * new block. + */ + private static class TestOnDemandBlockIndexInput extends OnDemandBlockIndexInput { + private final Supplier indexInputSupplier; + + TestOnDemandBlockIndexInput(Supplier indexInputSupplier, boolean isClone) { + super( + builder().blockSizeShift(ONE_MB_SHIFT) + .offset(0) + .length(TWO_MB) + .isClone(isClone) + .resourceDescription(TestOnDemandBlockIndexInput.class.getName()) + ); + this.indexInputSupplier = indexInputSupplier; + } + + @Override + protected OnDemandBlockIndexInput buildSlice(String sliceDescription, long offset, long length) { + return new TestOnDemandBlockIndexInput(this.indexInputSupplier, true); + } + + @Override + protected IndexInput fetchBlock(int blockId) throws IOException { + return indexInputSupplier.get(); + } + + @Override + public OnDemandBlockIndexInput clone() { + return new TestOnDemandBlockIndexInput(this.indexInputSupplier, true); + } + } + + /** + * Simple implementation of an IndexInput that just tracks whether it has + * been closed. All other methods do nothing useful. + */ + private static class CloseTrackingIndexInput extends IndexInput { + + private boolean isClosed = false; + + protected CloseTrackingIndexInput() { + super("TestIndexInput"); + } + + public boolean isClosed() { + return isClosed; + } + + @Override + public void close() { + isClosed = true; + } + + @Override + public long getFilePointer() { + return 0; + } + + @Override + public void seek(long pos) {} + + @Override + public long length() { + return 0; + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) { + return null; + } + + @Override + public byte readByte() throws IOException { + return 0; + } + + @Override + public void readBytes(byte[] b, int offset, int len) {} + } +} diff --git a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java index 6f90c2b464621..7bfe57a46d5dc 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java @@ -32,10 +32,13 @@ import java.nio.file.Path; import java.util.concurrent.CompletableFuture; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) public class OnDemandBlockSnapshotIndexInputTests extends OpenSearchTestCase { // params shared by all test cases private static final String RESOURCE_DESCRIPTION = "Test OnDemandBlockSnapshotIndexInput Block Size"; @@ -279,10 +282,10 @@ public static void testCurrentBlockPosition(OnDemandBlockSnapshotIndexInput bloc public static void testClone(OnDemandBlockSnapshotIndexInput blockedSnapshotFile, int blockSize) throws IOException { blockedSnapshotFile.seek(blockSize + 1); OnDemandBlockSnapshotIndexInput clonedFile = blockedSnapshotFile.clone(); - assertEquals(clonedFile.currentBlock.getFilePointer(), blockedSnapshotFile.currentBlock.getFilePointer()); + assertEquals(clonedFile.currentBlockPosition(), blockedSnapshotFile.currentBlockPosition()); assertEquals(clonedFile.getFilePointer(), blockedSnapshotFile.getFilePointer()); clonedFile.seek(blockSize + 11); - assertNotEquals(clonedFile.currentBlock.getFilePointer(), blockedSnapshotFile.currentBlock.getFilePointer()); + assertNotEquals(clonedFile.currentBlockPosition(), blockedSnapshotFile.currentBlockPosition()); } public static void testSlice(OnDemandBlockSnapshotIndexInput blockedSnapshotFile, int blockSize) throws IOException { @@ -301,12 +304,10 @@ public static void testSlice(OnDemandBlockSnapshotIndexInput blockedSnapshotFile newSlice.seek(0); assertEquals(0, newSlice.getFilePointer()); - assertEquals(0, newSlice.currentBlockId); - assertEquals(blockSize - 11, newSlice.currentBlock.getFilePointer()); + assertEquals(blockSize - 11, newSlice.currentBlockPosition()); newSlice.seek(21); assertEquals(21, newSlice.getFilePointer()); - assertEquals(1, newSlice.currentBlockId); - assertEquals(10, newSlice.currentBlock.getFilePointer()); + assertEquals(10, newSlice.currentBlockPosition()); try { newSlice.seek(23); @@ -318,11 +319,11 @@ public static void testSlice(OnDemandBlockSnapshotIndexInput blockedSnapshotFile public static void testGetFilePointer(OnDemandBlockSnapshotIndexInput blockedSnapshotFile, int blockSize) throws IOException { blockedSnapshotFile.seek(blockSize - 11); - assertEquals(blockSize - 11, blockedSnapshotFile.currentBlock.getFilePointer()); + assertEquals(blockSize - 11, blockedSnapshotFile.currentBlockPosition()); blockedSnapshotFile.seek(blockSize + 5); - assertEquals(5, blockedSnapshotFile.currentBlock.getFilePointer()); + assertEquals(5, blockedSnapshotFile.currentBlockPosition()); blockedSnapshotFile.seek(blockSize * 2); - assertEquals(0, blockedSnapshotFile.currentBlock.getFilePointer()); + assertEquals(0, blockedSnapshotFile.currentBlockPosition()); } public static void testReadByte(OnDemandBlockSnapshotIndexInput blockedSnapshotFile, int blockSize) throws IOException { @@ -406,10 +407,10 @@ public static void testReadVLong(OnDemandBlockSnapshotIndexInput blockedSnapshot public static void testSeek(OnDemandBlockSnapshotIndexInput blockedSnapshotFile, int blockSize, int fileSize) throws IOException { blockedSnapshotFile.seek(0); - assertEquals(0, blockedSnapshotFile.currentBlock.getFilePointer()); + assertEquals(0, blockedSnapshotFile.currentBlockPosition()); blockedSnapshotFile.seek(blockSize + 11); - assertEquals(11, blockedSnapshotFile.currentBlock.getFilePointer()); + assertEquals(11, blockedSnapshotFile.currentBlockPosition()); try { blockedSnapshotFile.seek(fileSize + 1);