From 5f2e66bfb11ebaea536ee62bc3764a848a6ab330 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Sat, 13 Aug 2022 22:48:16 +0530 Subject: [PATCH] [Remote Store] Add RemoteSegmentStoreDirectory to interact with remote segment store (#4020) * Add RemoteSegmentStoreDirectory to interact with remote segment store Signed-off-by: Sachin Kale --- .../index/store/RemoteDirectory.java | 12 +- .../store/RemoteSegmentStoreDirectory.java | 372 ++++++++++++++++++ .../index/store/RemoteDirectoryTests.java | 20 + .../RemoteSegmentStoreDirectoryTests.java | 339 ++++++++++++++++ 4 files changed, 742 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java create mode 100644 server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 855457f275122..62e2b12896411 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -33,7 +33,7 @@ * * @opensearch.internal */ -public final class RemoteDirectory extends Directory { +public class RemoteDirectory extends Directory { private final BlobContainer blobContainer; @@ -50,6 +50,16 @@ public String[] listAll() throws IOException { return blobContainer.listBlobs().keySet().stream().sorted().toArray(String[]::new); } + /** + * Returns names of files with given prefix in this directory. + * @param filenamePrefix The prefix to match against file names in the directory + * @return A list of the matching filenames in the directory + * @throws IOException if there were any failures in reading from the blob container + */ + public Collection listFilesByPrefix(String filenamePrefix) throws IOException { + return blobContainer.listBlobsByPrefix(filenamePrefix).keySet(); + } + /** * Removes an existing file in the directory. * diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java new file mode 100644 index 0000000000000..d7d6b29d08bfc --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -0,0 +1,372 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.opensearch.common.UUIDs; + +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * A RemoteDirectory extension for remote segment store. We need to make sure we don't overwrite a segment file once uploaded. + * In order to prevent segment overwrite which can occur due to two primary nodes for the same shard at the same time, + * a unique suffix is added to the uploaded segment file. This class keeps track of filename of segments stored + * in remote segment store vs filename in local filesystem and provides the consistent Directory interface so that + * caller will be accessing segment files in the same way as {@code FSDirectory}. Apart from storing actual segment files, + * remote segment store also keeps track of refresh checkpoints as metadata in a separate path which is handled by + * another instance of {@code RemoteDirectory}. + * @opensearch.internal + */ +public final class RemoteSegmentStoreDirectory extends FilterDirectory { + /** + * Each segment file is uploaded with unique suffix. + * For example, _0.cfe in local filesystem will be uploaded to remote segment store as _0.cfe__gX7bNIIBrs0AUNsR2yEG + */ + public static final String SEGMENT_NAME_UUID_SEPARATOR = "__"; + + public static final MetadataFilenameUtils.MetadataFilenameComparator METADATA_FILENAME_COMPARATOR = + new MetadataFilenameUtils.MetadataFilenameComparator(); + + /** + * remoteDataDirectory is used to store segment files at path: cluster_UUID/index_UUID/shardId/segments/data + */ + private final RemoteDirectory remoteDataDirectory; + /** + * remoteMetadataDirectory is used to store metadata files at path: cluster_UUID/index_UUID/shardId/segments/metadata + */ + private final RemoteDirectory remoteMetadataDirectory; + + /** + * To prevent explosion of refresh metadata files, we replace refresh files for the given primary term and generation + * This is achieved by uploading refresh metadata file with the same UUID suffix. + */ + private String metadataFileUniqueSuffix; + + /** + * Keeps track of local segment filename to uploaded filename along with other attributes like checksum. + * This map acts as a cache layer for uploaded segment filenames which helps avoid calling listAll() each time. + * It is important to initialize this map on creation of RemoteSegmentStoreDirectory and update it on each upload and delete. + */ + private Map segmentsUploadedToRemoteStore; + + private static final Logger logger = LogManager.getLogger(RemoteSegmentStoreDirectory.class); + + public RemoteSegmentStoreDirectory(RemoteDirectory remoteDataDirectory, RemoteDirectory remoteMetadataDirectory) throws IOException { + super(remoteDataDirectory); + this.remoteDataDirectory = remoteDataDirectory; + this.remoteMetadataDirectory = remoteMetadataDirectory; + init(); + } + + /** + * Initializes the cache which keeps track of all the segment files uploaded to the remote segment store. + * As this cache is specific to an instance of RemoteSegmentStoreDirectory, it is possible that cache becomes stale + * if another instance of RemoteSegmentStoreDirectory is used to upload/delete segment files. + * It is caller's responsibility to call init() again to ensure that cache is properly updated. + * @throws IOException if there were any failures in reading the metadata file + */ + public void init() throws IOException { + this.metadataFileUniqueSuffix = UUIDs.base64UUID(); + this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(readLatestMetadataFile()); + } + + /** + * Read the latest metadata file to get the list of segments uploaded to the remote segment store. + * We upload a metadata file per refresh, but it is not unique per refresh. Refresh metadata file is unique for a given commit. + * The format of refresh metadata filename is: refresh_metadata__PrimaryTerm__Generation__UUID + * Refresh metadata files keep track of active segments for the shard at the time of refresh. + * In order to get the list of segment files uploaded to the remote segment store, we need to read the latest metadata file. + * Each metadata file contains a map where + * Key is - Segment local filename and + * Value is - local filename::uploaded filename::checksum + * @return Map of segment filename to uploaded filename with checksum + * @throws IOException if there were any failures in reading the metadata file + */ + private Map readLatestMetadataFile() throws IOException { + Map segmentMetadataMap = new HashMap<>(); + + Collection metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX); + Optional latestMetadataFile = metadataFiles.stream().max(METADATA_FILENAME_COMPARATOR); + + if (latestMetadataFile.isPresent()) { + logger.info("Reading latest Metadata file {}", latestMetadataFile.get()); + segmentMetadataMap = readMetadataFile(latestMetadataFile.get()); + } else { + logger.info("No metadata file found, this can happen for new index with no data uploaded to remote segment store"); + } + + return segmentMetadataMap; + } + + private Map readMetadataFile(String metadataFilename) throws IOException { + try (IndexInput indexInput = remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)) { + Map segmentMetadata = indexInput.readMapOfStrings(); + return segmentMetadata.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> UploadedSegmentMetadata.fromString(entry.getValue()))); + } + } + + /** + * Metadata of a segment that is uploaded to remote segment store. + */ + static class UploadedSegmentMetadata { + private static final String SEPARATOR = "::"; + private final String originalFilename; + private final String uploadedFilename; + private final String checksum; + + UploadedSegmentMetadata(String originalFilename, String uploadedFilename, String checksum) { + this.originalFilename = originalFilename; + this.uploadedFilename = uploadedFilename; + this.checksum = checksum; + } + + @Override + public String toString() { + return String.join(SEPARATOR, originalFilename, uploadedFilename, checksum); + } + + public static UploadedSegmentMetadata fromString(String uploadedFilename) { + String[] values = uploadedFilename.split(SEPARATOR); + return new UploadedSegmentMetadata(values[0], values[1], values[2]); + } + } + + /** + * Contains utility methods that provide various parts of metadata filename along with comparator + * Each metadata filename is of format: PREFIX__PrimaryTerm__Generation__UUID + */ + static class MetadataFilenameUtils { + public static final String SEPARATOR = "__"; + public static final String METADATA_PREFIX = "metadata"; + + /** + * Comparator to sort the metadata filenames. The order of sorting is: Primary Term, Generation, UUID + * Even though UUID sort does not provide any info on recency, it provides a consistent way to sort the filenames. + */ + static class MetadataFilenameComparator implements Comparator { + @Override + public int compare(String first, String second) { + String[] firstTokens = first.split(SEPARATOR); + String[] secondTokens = second.split(SEPARATOR); + if (!firstTokens[0].equals(secondTokens[0])) { + return firstTokens[0].compareTo(secondTokens[0]); + } + long firstPrimaryTerm = getPrimaryTerm(firstTokens); + long secondPrimaryTerm = getPrimaryTerm(secondTokens); + if (firstPrimaryTerm != secondPrimaryTerm) { + return firstPrimaryTerm > secondPrimaryTerm ? 1 : -1; + } else { + long firstGeneration = getGeneration(firstTokens); + long secondGeneration = getGeneration(secondTokens); + if (firstGeneration != secondGeneration) { + return firstGeneration > secondGeneration ? 1 : -1; + } else { + return getUuid(firstTokens).compareTo(getUuid(secondTokens)); + } + } + } + } + + // Visible for testing + static String getMetadataFilename(long primaryTerm, long generation, String uuid) { + return String.join( + SEPARATOR, + METADATA_PREFIX, + Long.toString(primaryTerm), + Long.toString(generation, Character.MAX_RADIX), + uuid + ); + } + + // Visible for testing + static long getPrimaryTerm(String[] filenameTokens) { + return Long.parseLong(filenameTokens[1]); + } + + // Visible for testing + static long getGeneration(String[] filenameTokens) { + return Long.parseLong(filenameTokens[2], Character.MAX_RADIX); + } + + // Visible for testing + static String getUuid(String[] filenameTokens) { + return filenameTokens[3]; + } + } + + /** + * Returns list of all the segment files uploaded to remote segment store till the last refresh checkpoint. + * Any segment file that is uploaded without corresponding metadata file will not be visible as part of listAll(). + * We chose not to return cache entries for listAll as cache can have entries for stale segments as well. + * Even if we plan to delete stale segments from remote segment store, it will be a periodic operation. + * @return segment filenames stored in remote segment store + * @throws IOException if there were any failures in reading the metadata file + */ + @Override + public String[] listAll() throws IOException { + return readLatestMetadataFile().keySet().toArray(new String[0]); + } + + /** + * Delete segment file from remote segment store. + * @param name the name of an existing segment file in local filesystem. + * @throws IOException if the file exists but could not be deleted. + */ + @Override + public void deleteFile(String name) throws IOException { + String remoteFilename = getExistingRemoteFilename(name); + if (remoteFilename != null) { + remoteDataDirectory.deleteFile(remoteFilename); + segmentsUploadedToRemoteStore.remove(name); + } + } + + /** + * Returns the byte length of a segment file in the remote segment store. + * @param name the name of an existing segment file in local filesystem. + * @throws IOException in case of I/O error + * @throws NoSuchFileException if the file does not exist in the cache or remote segment store + */ + @Override + public long fileLength(String name) throws IOException { + String remoteFilename = getExistingRemoteFilename(name); + if (remoteFilename != null) { + return remoteDataDirectory.fileLength(remoteFilename); + } else { + throw new NoSuchFileException(name); + } + } + + /** + * Creates and returns a new instance of {@link RemoteIndexOutput} which will be used to copy files to the remote + * segment store. + * @param name the name of the file to create. + * @throws IOException in case of I/O error + */ + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + return remoteDataDirectory.createOutput(getNewRemoteSegmentFilename(name), context); + } + + /** + * Opens a stream for reading an existing file and returns {@link RemoteIndexInput} enclosing the stream. + * @param name the name of an existing file. + * @throws IOException in case of I/O error + * @throws NoSuchFileException if the file does not exist either in cache or remote segment store + */ + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + String remoteFilename = getExistingRemoteFilename(name); + if (remoteFilename != null) { + return remoteDataDirectory.openInput(remoteFilename, context); + } else { + throw new NoSuchFileException(name); + } + } + + /** + * Copies an existing src file from directory from to a non-existent file dest in this directory. + * Once the segment is uploaded to remote segment store, update the cache accordingly. + */ + @Override + public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException { + String remoteFilename = getNewRemoteSegmentFilename(dest); + remoteDataDirectory.copyFrom(from, src, remoteFilename, context); + String checksum = getChecksumOfLocalFile(from, src); + UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum); + segmentsUploadedToRemoteStore.put(src, segmentMetadata); + } + + /** + * Checks if the file exists in the uploadedSegments cache and the checksum matches. + * It is important to match the checksum as the same segment filename can be used for different + * segments due to a concurrency issue. + * @param localFilename filename of segment stored in local filesystem + * @param checksum checksum of the segment file + * @return true if file exists in cache and checksum matches. + */ + public boolean containsFile(String localFilename, String checksum) { + return segmentsUploadedToRemoteStore.containsKey(localFilename) + && segmentsUploadedToRemoteStore.get(localFilename).checksum.equals(checksum); + } + + /** + * Upload metadata file + * @param segmentFiles segment files that are part of the shard at the time of the latest refresh + * @param storeDirectory instance of local directory to temporarily create metadata file before upload + * @param primaryTerm primary term to be used in the name of metadata file + * @param generation commit generation + * @throws IOException in case of I/O error while uploading the metadata file + */ + public void uploadMetadata(Collection segmentFiles, Directory storeDirectory, long primaryTerm, long generation) + throws IOException { + synchronized (this) { + String metadataFilename = MetadataFilenameUtils.getMetadataFilename(primaryTerm, generation, this.metadataFileUniqueSuffix); + IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT); + Map uploadedSegments = new HashMap<>(); + for (String file : segmentFiles) { + if (segmentsUploadedToRemoteStore.containsKey(file)) { + uploadedSegments.put(file, segmentsUploadedToRemoteStore.get(file).toString()); + } else { + throw new NoSuchFileException(file); + } + } + indexOutput.writeMapOfStrings(uploadedSegments); + indexOutput.close(); + storeDirectory.sync(Collections.singleton(metadataFilename)); + remoteMetadataDirectory.copyFrom(storeDirectory, metadataFilename, metadataFilename, IOContext.DEFAULT); + storeDirectory.deleteFile(metadataFilename); + } + } + + private String getChecksumOfLocalFile(Directory directory, String file) throws IOException { + try (IndexInput indexInput = directory.openInput(file, IOContext.DEFAULT)) { + return Long.toString(CodecUtil.retrieveChecksum(indexInput)); + } + } + + private String getExistingRemoteFilename(String localFilename) { + if (segmentsUploadedToRemoteStore.containsKey(localFilename)) { + return segmentsUploadedToRemoteStore.get(localFilename).uploadedFilename; + } else { + return null; + } + } + + private String getNewRemoteSegmentFilename(String localFilename) { + return localFilename + SEGMENT_NAME_UUID_SEPARATOR + UUIDs.base64UUID(); + } + + private String getLocalSegmentFilename(String remoteFilename) { + return remoteFilename.split(SEGMENT_NAME_UUID_SEPARATOR)[0]; + } + + // Visible for testing + Map getSegmentsUploadedToRemoteStore() { + return this.segmentsUploadedToRemoteStore; + } +} diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java index 2ded77d2cecfd..97575248b4ad3 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java @@ -15,11 +15,13 @@ import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.support.PlainBlobMetadata; +import org.opensearch.common.collect.Set; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -67,6 +69,24 @@ public void testListAllException() throws IOException { assertThrows(IOException.class, () -> remoteDirectory.listAll()); } + public void testListFilesByPrefix() throws IOException { + Map fileNames = Stream.of("abc", "abd", "abe", "abf", "abg") + .collect(Collectors.toMap(filename -> filename, filename -> new PlainBlobMetadata(filename, 100))); + + when(blobContainer.listBlobsByPrefix("ab")).thenReturn(fileNames); + + Collection actualFileNames = remoteDirectory.listFilesByPrefix("ab"); + Collection expectedFileName = Set.of("abc", "abd", "abe", "abf", "abg"); + assertEquals(expectedFileName, actualFileNames); + } + + public void testListFilesByPrefixException() throws IOException { + when(blobContainer.listBlobsByPrefix("abc")).thenThrow(new IOException("Error reading blob store")); + + assertThrows(IOException.class, () -> remoteDirectory.listFilesByPrefix("abc")); + verify(blobContainer).listBlobsByPrefix("abc"); + } + public void testDeleteFile() throws IOException { remoteDirectory.deleteFile("segment_1"); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java new file mode 100644 index 0000000000000..4eabfa74625f2 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -0,0 +1,339 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.tests.util.LuceneTestCase; +import org.junit.Before; +import org.opensearch.common.collect.Set; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.startsWith; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RemoteSegmentStoreDirectoryTests extends OpenSearchTestCase { + private RemoteDirectory remoteDataDirectory; + private RemoteDirectory remoteMetadataDirectory; + + private RemoteSegmentStoreDirectory remoteSegmentStoreDirectory; + + @Before + public void setup() throws IOException { + remoteDataDirectory = mock(RemoteDirectory.class); + remoteMetadataDirectory = mock(RemoteDirectory.class); + + remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(remoteDataDirectory, remoteMetadataDirectory); + } + + public void testUploadedSegmentMetadataToString() { + RemoteSegmentStoreDirectory.UploadedSegmentMetadata metadata = new RemoteSegmentStoreDirectory.UploadedSegmentMetadata( + "abc", + "pqr", + "123456" + ); + assertEquals("abc::pqr::123456", metadata.toString()); + } + + public void testUploadedSegmentMetadataFromString() { + RemoteSegmentStoreDirectory.UploadedSegmentMetadata metadata = RemoteSegmentStoreDirectory.UploadedSegmentMetadata.fromString( + "_0.cfe::_0.cfe__uuidxyz::4567" + ); + assertEquals("_0.cfe::_0.cfe__uuidxyz::4567", metadata.toString()); + } + + public void testGetMetadataFilename() { + // Generation 23 is replaced by n due to radix 32 + assertEquals( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX + "__12__n__uuid1", + RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(12, 23, "uuid1") + ); + } + + public void testGetPrimaryTermGenerationUuid() { + String[] filenameTokens = "abc__12__n__uuid_xyz".split(RemoteSegmentStoreDirectory.MetadataFilenameUtils.SEPARATOR); + assertEquals(12, RemoteSegmentStoreDirectory.MetadataFilenameUtils.getPrimaryTerm(filenameTokens)); + assertEquals(23, RemoteSegmentStoreDirectory.MetadataFilenameUtils.getGeneration(filenameTokens)); + assertEquals("uuid_xyz", RemoteSegmentStoreDirectory.MetadataFilenameUtils.getUuid(filenameTokens)); + } + + public void testMetadataFilenameComparator() { + List metadataFilenames = new ArrayList<>( + List.of( + "abc__10__20__uuid1", + "abc__12__2__uuid2", + "pqr__1__1__uuid0", + "abc__3__n__uuid3", + "abc__10__8__uuid8", + "abc__3__a__uuid4", + "abc__3__a__uuid5" + ) + ); + metadataFilenames.sort(RemoteSegmentStoreDirectory.METADATA_FILENAME_COMPARATOR); + assertEquals( + List.of( + "abc__3__a__uuid4", + "abc__3__a__uuid5", + "abc__3__n__uuid3", + "abc__10__8__uuid8", + "abc__10__20__uuid1", + "abc__12__2__uuid2", + "pqr__1__1__uuid0" + ), + metadataFilenames + ); + } + + public void testInitException() throws IOException { + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenThrow( + new IOException("Error") + ); + + assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.init()); + } + + public void testInitNoMetadataFile() throws IOException { + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( + List.of() + ); + + remoteSegmentStoreDirectory.init(); + Map actualCache = remoteSegmentStoreDirectory + .getSegmentsUploadedToRemoteStore(); + + assertEquals(Set.of(), actualCache.keySet()); + } + + private Map getDummyMetadata(String prefix, int commitGeneration) { + Map metadata = new HashMap<>(); + metadata.put(prefix + ".cfe", prefix + ".cfe::" + prefix + ".cfe__qrt::" + randomIntBetween(1000, 5000)); + metadata.put(prefix + ".cfs", prefix + ".cfs::" + prefix + ".cfs__zxd::" + randomIntBetween(1000, 5000)); + metadata.put(prefix + ".si", prefix + ".si::" + prefix + ".si__yui::" + randomIntBetween(1000, 5000)); + metadata.put( + "segments_" + commitGeneration, + "segments_" + commitGeneration + "::segments_" + commitGeneration + "__exv::" + randomIntBetween(1000, 5000) + ); + return metadata; + } + + private void populateMetadata() throws IOException { + List metadataFiles = List.of("metadata__1__5__abc", "metadata__1__6__pqr", "metadata__2__1__zxv"); + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( + metadataFiles + ); + + IndexInput indexInput = mock(IndexInput.class); + Map dummyMetadata = getDummyMetadata("_0", 1); + when(indexInput.readMapOfStrings()).thenReturn(dummyMetadata); + when(remoteMetadataDirectory.openInput("metadata__2__1__zxv", IOContext.DEFAULT)).thenReturn(indexInput); + } + + public void testInit() throws IOException { + populateMetadata(); + + when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenReturn( + List.of("metadata__1__5__abc", "metadata__1__6__pqr", "metadata__2__1__zxv") + ); + + remoteSegmentStoreDirectory.init(); + + Map actualCache = remoteSegmentStoreDirectory + .getSegmentsUploadedToRemoteStore(); + + assertEquals(Set.of("_0.cfe", "_0.cfs", "_0.si", "segments_1"), actualCache.keySet()); + } + + public void testListAll() throws IOException { + populateMetadata(); + + assertEquals(Set.of("_0.cfe", "_0.cfs", "_0.si", "segments_1"), Set.of(remoteSegmentStoreDirectory.listAll())); + } + + public void testDeleteFile() throws IOException { + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Map uploadedSegments = remoteSegmentStoreDirectory + .getSegmentsUploadedToRemoteStore(); + + assertTrue(uploadedSegments.containsKey("_0.si")); + assertFalse(uploadedSegments.containsKey("_100.si")); + + remoteSegmentStoreDirectory.deleteFile("_0.si"); + remoteSegmentStoreDirectory.deleteFile("_100.si"); + + verify(remoteDataDirectory).deleteFile(startsWith("_0.si")); + verify(remoteDataDirectory, times(0)).deleteFile(startsWith("_100.si")); + assertFalse(uploadedSegments.containsKey("_0.si")); + } + + public void testDeleteFileException() throws IOException { + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + doThrow(new IOException("Error")).when(remoteDataDirectory).deleteFile(any()); + assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.deleteFile("_0.si")); + } + + public void testFileLenght() throws IOException { + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Map uploadedSegments = remoteSegmentStoreDirectory + .getSegmentsUploadedToRemoteStore(); + + assertTrue(uploadedSegments.containsKey("_0.si")); + + when(remoteDataDirectory.fileLength(startsWith("_0.si"))).thenReturn(1234L); + + assertEquals(1234L, remoteSegmentStoreDirectory.fileLength("_0.si")); + } + + public void testFileLenghtNoSuchFile() throws IOException { + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Map uploadedSegments = remoteSegmentStoreDirectory + .getSegmentsUploadedToRemoteStore(); + + assertFalse(uploadedSegments.containsKey("_100.si")); + assertThrows(NoSuchFileException.class, () -> remoteSegmentStoreDirectory.fileLength("_100.si")); + } + + public void testCreateOutput() throws IOException { + IndexOutput indexOutput = mock(IndexOutput.class); + when(remoteDataDirectory.createOutput(startsWith("abc"), eq(IOContext.DEFAULT))).thenReturn(indexOutput); + + assertEquals(indexOutput, remoteSegmentStoreDirectory.createOutput("abc", IOContext.DEFAULT)); + } + + public void testCreateOutputException() { + when(remoteDataDirectory.createOutput(startsWith("abc"), eq(IOContext.DEFAULT))).thenThrow(new IOException("Error")); + + assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.createOutput("abc", IOContext.DEFAULT)); + } + + public void testOpenInput() throws IOException { + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + IndexInput indexInput = mock(IndexInput.class); + when(remoteDataDirectory.openInput(startsWith("_0.si"), eq(IOContext.DEFAULT))).thenReturn(indexInput); + + assertEquals(indexInput, remoteSegmentStoreDirectory.openInput("_0.si", IOContext.DEFAULT)); + } + + public void testOpenInputNoSuchFile() { + assertThrows(NoSuchFileException.class, () -> remoteSegmentStoreDirectory.openInput("_0.si", IOContext.DEFAULT)); + } + + public void testOpenInputException() throws IOException { + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + when(remoteDataDirectory.openInput(startsWith("_0.si"), eq(IOContext.DEFAULT))).thenThrow(new IOException("Error")); + + assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.openInput("_0.si", IOContext.DEFAULT)); + } + + public void testCopyFrom() throws IOException { + String filename = "_100.si"; + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Directory storeDirectory = LuceneTestCase.newDirectory(); + IndexOutput indexOutput = storeDirectory.createOutput(filename, IOContext.DEFAULT); + indexOutput.writeString("Hello World!"); + CodecUtil.writeFooter(indexOutput); + indexOutput.close(); + storeDirectory.sync(List.of(filename)); + + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, filename, IOContext.DEFAULT); + assertTrue(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + + storeDirectory.close(); + } + + public void testCopyFromException() throws IOException { + String filename = "_100.si"; + Directory storeDirectory = LuceneTestCase.newDirectory(); + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + doThrow(new IOException("Error")).when(remoteDataDirectory).copyFrom(storeDirectory, filename, filename, IOContext.DEFAULT); + + assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, filename, IOContext.DEFAULT)); + assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); + + storeDirectory.close(); + } + + public void testContainsFile() throws IOException { + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + // This is not the correct way to add files but the other way is to open up access to fields in UploadedSegmentMetadata + Map uploadedSegmentMetadataMap = remoteSegmentStoreDirectory + .getSegmentsUploadedToRemoteStore(); + uploadedSegmentMetadataMap.put( + "_100.si", + new RemoteSegmentStoreDirectory.UploadedSegmentMetadata("_100.si", "_100.si__uuid1", "1234") + ); + + assertTrue(remoteSegmentStoreDirectory.containsFile("_100.si", "1234")); + assertFalse(remoteSegmentStoreDirectory.containsFile("_100.si", "2345")); + assertFalse(remoteSegmentStoreDirectory.containsFile("_200.si", "1234")); + } + + public void testUploadMetadataEmpty() throws IOException { + Directory storeDirectory = mock(Directory.class); + IndexOutput indexOutput = mock(IndexOutput.class); + when(storeDirectory.createOutput(startsWith("metadata__12__o"), eq(IOContext.DEFAULT))).thenReturn(indexOutput); + + Collection segmentFiles = List.of("s1", "s2", "s3"); + assertThrows(NoSuchFileException.class, () -> remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, storeDirectory, 12L, 24L)); + } + + public void testUploadMetadataNonEmpty() throws IOException { + populateMetadata(); + remoteSegmentStoreDirectory.init(); + + Directory storeDirectory = mock(Directory.class); + IndexOutput indexOutput = mock(IndexOutput.class); + when(storeDirectory.createOutput(startsWith("metadata__12__o"), eq(IOContext.DEFAULT))).thenReturn(indexOutput); + + Collection segmentFiles = List.of("_0.si"); + remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, storeDirectory, 12L, 24L); + + verify(remoteMetadataDirectory).copyFrom( + eq(storeDirectory), + startsWith("metadata__12__o"), + startsWith("metadata__12__o"), + eq(IOContext.DEFAULT) + ); + String metadataString = remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().get("_0.si").toString(); + verify(indexOutput).writeMapOfStrings(Map.of("_0.si", metadataString)); + } +}