diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java index 1eeadfe228a45..114d07589b0c0 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java @@ -48,4 +48,25 @@ public static long invertLong(String str) { } return Long.MAX_VALUE - num; } + + /** + * Extracts the segment name from the provided segment file name + * @param filename Segment file name to parse + * @return Name of the segment that the segment file belongs to + */ + public static String getSegmentName(String filename) { + // Segment file names follow patterns like "_0.cfe" or "_0_1_Lucene90_0.dvm". + // Here, the segment name is "_0", which is the set of characters + // starting with "_" until the next "_" or first ".". + int endIdx = filename.indexOf('_', 1); + if (endIdx == -1) { + endIdx = filename.indexOf('.'); + } + + if (endIdx == -1) { + throw new IllegalArgumentException("Unable to infer segment name for segment file " + filename); + } + + return filename.substring(0, endIdx); + } } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index d3e8d961337cc..8ee267cb67e68 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -14,6 +14,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.ByteBuffersDataOutput; import org.apache.lucene.store.ByteBuffersIndexOutput; @@ -22,6 +24,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.Version; import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; import org.opensearch.common.UUIDs; @@ -217,6 +220,14 @@ public static class UploadedSegmentMetadata { private final String checksum; private final long length; + /** + * The Lucene major version that wrote the original segment files. + * As part of the Lucene version compatibility check, this version information stored in the metadata + * will be used to skip downloading the segment files unnecessarily + * if they were written by an incompatible Lucene version. + */ + private int writtenByMajor; + UploadedSegmentMetadata(String originalFilename, String uploadedFilename, String checksum, long length) { this.originalFilename = originalFilename; this.uploadedFilename = uploadedFilename; @@ -226,7 +237,14 @@ public static class UploadedSegmentMetadata { @Override public String toString() { - return String.join(SEPARATOR, originalFilename, uploadedFilename, checksum, String.valueOf(length)); + return String.join( + SEPARATOR, + originalFilename, + uploadedFilename, + checksum, + String.valueOf(length), + String.valueOf(writtenByMajor) + ); } public String getChecksum() { @@ -239,12 +257,35 @@ public long getLength() { public static UploadedSegmentMetadata fromString(String uploadedFilename) { String[] values = uploadedFilename.split(SEPARATOR); - return new UploadedSegmentMetadata(values[0], values[1], values[2], Long.parseLong(values[3])); + UploadedSegmentMetadata metadata = new UploadedSegmentMetadata(values[0], values[1], values[2], Long.parseLong(values[3])); + if (values.length < 5) { + logger.error("Lucene version is missing for UploadedSegmentMetadata: " + uploadedFilename); + } + + metadata.setWrittenByMajor(Integer.parseInt(values[4])); + + return metadata; } public String getOriginalFilename() { return originalFilename; } + + public void setWrittenByMajor(int writtenByMajor) { + if (writtenByMajor <= Version.LATEST.major && writtenByMajor >= Version.MIN_SUPPORTED_MAJOR) { + this.writtenByMajor = writtenByMajor; + } else { + throw new IllegalArgumentException( + "Lucene major version supplied (" + + writtenByMajor + + ") is incorrect. Should be between Version.LATEST (" + + Version.LATEST.major + + ") and Version.MIN_SUPPORTED_MAJOR (" + + Version.MIN_SUPPORTED_MAJOR + + ")." + ); + } + } } /** @@ -582,10 +623,13 @@ public void uploadMetadata( ); try { try (IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT)) { + Map segmentToLuceneVersion = getSegmentToLuceneVersion(segmentFiles, segmentInfosSnapshot); Map uploadedSegments = new HashMap<>(); for (String file : segmentFiles) { if (segmentsUploadedToRemoteStore.containsKey(file)) { - uploadedSegments.put(file, segmentsUploadedToRemoteStore.get(file).toString()); + UploadedSegmentMetadata metadata = segmentsUploadedToRemoteStore.get(file); + metadata.setWrittenByMajor(segmentToLuceneVersion.get(metadata.originalFilename)); + uploadedSegments.put(file, metadata.toString()); } else { throw new NoSuchFileException(file); } @@ -615,6 +659,38 @@ public void uploadMetadata( } } + /** + * Parses the provided SegmentInfos to retrieve a mapping of the provided segment files to + * the respective Lucene major version that wrote the segments + * @param segmentFiles List of segment files for which the Lucene major version is needed + * @param segmentInfosSnapshot SegmentInfos instance to parse + * @return Map of the segment file to its Lucene major version + */ + private Map getSegmentToLuceneVersion(Collection segmentFiles, SegmentInfos segmentInfosSnapshot) { + Map segmentToLuceneVersion = new HashMap<>(); + for (SegmentCommitInfo segmentCommitInfo : segmentInfosSnapshot) { + SegmentInfo info = segmentCommitInfo.info; + Set segFiles = info.files(); + for (String file : segFiles) { + segmentToLuceneVersion.put(file, info.getVersion().major); + } + } + + for (String file : segmentFiles) { + if (segmentToLuceneVersion.containsKey(file) == false) { + if (file.equals(segmentInfosSnapshot.getSegmentsFileName())) { + segmentToLuceneVersion.put(file, segmentInfosSnapshot.getCommitLuceneVersion().major); + } else { + // Fallback to the Lucene major version of the respective segment's .si file + String segmentInfoFileName = RemoteStoreUtils.getSegmentName(file) + ".si"; + segmentToLuceneVersion.put(file, segmentToLuceneVersion.get(segmentInfoFileName)); + } + } + } + + return segmentToLuceneVersion; + } + /** * Try to delete file from local store. Fails silently on failures * @param filename: name of the file to be deleted diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java index 5b9135afb66f3..9afa75dd601b2 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java @@ -38,4 +38,26 @@ public void testinvert() { assertEquals(num, RemoteStoreUtils.invertLong(RemoteStoreUtils.invertLong(num))); } } + + public void testGetSegmentNameForCfeFile() { + assertEquals("_foo", RemoteStoreUtils.getSegmentName("_foo.cfe")); + } + + public void testGetSegmentNameForDvmFile() { + assertEquals("_bar", RemoteStoreUtils.getSegmentName("_bar_1_Lucene90_0.dvm")); + } + + public void testGetSegmentNameWeirdSegmentNameOnlyUnderscore() { + // Validate behaviour when segment name contains delimiters only + assertEquals("_", RemoteStoreUtils.getSegmentName("_.dvm")); + } + + public void testGetSegmentNameUnderscoreDelimiterOverrides() { + // Validate behaviour when segment name contains delimiters only + assertEquals("_", RemoteStoreUtils.getSegmentName("___.dvm")); + } + + public void testGetSegmentNameException() { + assertThrows(IllegalArgumentException.class, () -> RemoteStoreUtils.getSegmentName("dvd")); + } } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 72d8c234d71c8..7c765cf5df0be 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -21,6 +21,7 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.OutputStreamIndexOutput; import org.apache.lucene.tests.util.LuceneTestCase; +import org.apache.lucene.util.Version; import org.junit.After; import org.junit.Before; import org.mockito.Mockito; @@ -124,14 +125,42 @@ public void testUploadedSegmentMetadataToString() { "123456", 1234 ); - assertEquals("abc::pqr::123456::1234", metadata.toString()); + metadata.setWrittenByMajor(Version.LATEST.major); + assertEquals("abc::pqr::123456::1234::" + Version.LATEST.major, metadata.toString()); + } + + public void testUploadedSegmentMetadataToStringExceptionTooNew() { + RemoteSegmentStoreDirectory.UploadedSegmentMetadata metadata = new RemoteSegmentStoreDirectory.UploadedSegmentMetadata( + "abc", + "pqr", + "123456", + 1234 + ); + assertThrows(IllegalArgumentException.class, () -> metadata.setWrittenByMajor(Version.LATEST.major + 1)); + } + + public void testUploadedSegmentMetadataToStringExceptionTooOld() { + RemoteSegmentStoreDirectory.UploadedSegmentMetadata metadata = new RemoteSegmentStoreDirectory.UploadedSegmentMetadata( + "abc", + "pqr", + "123456", + 1234 + ); + assertThrows(IllegalArgumentException.class, () -> metadata.setWrittenByMajor(Version.LATEST.major - 2)); } public void testUploadedSegmentMetadataFromString() { RemoteSegmentStoreDirectory.UploadedSegmentMetadata metadata = RemoteSegmentStoreDirectory.UploadedSegmentMetadata.fromString( - "_0.cfe::_0.cfe__uuidxyz::4567::372000" + "_0.cfe::_0.cfe__uuidxyz::4567::372000::" + Version.LATEST.major + ); + assertEquals("_0.cfe::_0.cfe__uuidxyz::4567::372000::" + Version.LATEST.major, metadata.toString()); + } + + public void testUploadedSegmentMetadataFromStringException() { + assertThrows( + ArrayIndexOutOfBoundsException.class, + () -> RemoteSegmentStoreDirectory.UploadedSegmentMetadata.fromString("_0.cfe::_0.cfe__uuidxyz::4567::372000") ); - assertEquals("_0.cfe::_0.cfe__uuidxyz::4567::372000", metadata.toString()); } public void testGetPrimaryTermGenerationUuid() { @@ -176,6 +205,8 @@ private Map getDummyMetadata(String prefix, int commitGeneration + randomIntBetween(1000, 5000) + "::" + randomIntBetween(512000, 1024000) + + "::" + + Version.MIN_SUPPORTED_MAJOR ); metadata.put( prefix + ".cfs", @@ -188,6 +219,8 @@ private Map getDummyMetadata(String prefix, int commitGeneration + randomIntBetween(1000, 5000) + "::" + randomIntBetween(512000, 1024000) + + "::" + + Version.MIN_SUPPORTED_MAJOR ); metadata.put( prefix + ".si", @@ -200,6 +233,8 @@ private Map getDummyMetadata(String prefix, int commitGeneration + randomIntBetween(1000, 5000) + "::" + randomIntBetween(512000, 1024000) + + "::" + + Version.LATEST.major ); metadata.put( "segments_" + commitGeneration, @@ -213,6 +248,8 @@ private Map getDummyMetadata(String prefix, int commitGeneration + randomIntBetween(1000, 5000) + "::" + randomIntBetween(1024, 5120) + + "::" + + Version.LATEST.major ); return metadata; } @@ -611,8 +648,8 @@ public void testContainsFile() throws IOException { ).thenReturn(metadataFiles); Map metadata = new HashMap<>(); - metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512"); - metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::1024"); + metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512::" + Version.LATEST.major); + metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::1024::" + Version.LATEST.major); when(remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)).thenReturn(createMetadataFileBytes(metadata, 1, 5)); @@ -641,7 +678,7 @@ public void testUploadMetadataEmpty() throws IOException { IndexOutput indexOutput = mock(IndexOutput.class); when(storeDirectory.createOutput(startsWith("metadata__12__o"), eq(IOContext.DEFAULT))).thenReturn(indexOutput); - Collection segmentFiles = List.of("s1", "s2", "s3"); + Collection segmentFiles = List.of("_s1.si", "_s1.cfe", "_s3.cfs"); assertThrows( NoSuchFileException.class, () -> remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, segmentInfos, storeDirectory, 12L, 34L) @@ -649,29 +686,45 @@ public void testUploadMetadataEmpty() throws IOException { } public void testUploadMetadataNonEmpty() throws IOException { - populateMetadata(); + indexDocs(142364, 5); + flushShard(indexShard, true); + SegmentInfos segInfos = indexShard.store().readLastCommittedSegmentsInfo(); + long primaryTerm = 12; + String primaryTermLong = RemoteStoreUtils.invertLong(primaryTerm); + long generation = segInfos.getGeneration(); + String generationLong = RemoteStoreUtils.invertLong(generation); + String latestMetadataFileName = "metadata__" + primaryTermLong + "__" + generationLong + "__abc"; + List metadataFiles = List.of(latestMetadataFileName); + when( + remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder( + RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX, + 1 + ) + ).thenReturn(metadataFiles); + Map> metadataFilenameContentMapping = Map.of( + latestMetadataFileName, + getDummyMetadata("_0", (int) generation) + ); + when(remoteMetadataDirectory.openInput(latestMetadataFileName, IOContext.DEFAULT)).thenReturn( + createMetadataFileBytes(metadataFilenameContentMapping.get(latestMetadataFileName), generation, primaryTerm) + ); + remoteSegmentStoreDirectory.init(); Directory storeDirectory = mock(Directory.class); BytesStreamOutput output = new BytesStreamOutput(); IndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096); + when(storeDirectory.createOutput(startsWith("metadata__" + primaryTermLong + "__" + generationLong), eq(IOContext.DEFAULT))) + .thenReturn(indexOutput); - String generation = RemoteStoreUtils.invertLong(segmentInfos.getGeneration()); - String primaryTerm = RemoteStoreUtils.invertLong(12); - when(storeDirectory.createOutput(startsWith("metadata__" + primaryTerm + "__" + generation), eq(IOContext.DEFAULT))).thenReturn( - indexOutput - ); - - Collection segmentFiles = List.of("_0.si", "_0.cfe", "_0.cfs", "segments_1"); - remoteSegmentStoreDirectory.uploadMetadata(segmentFiles, segmentInfos, storeDirectory, 12L, 34L); + remoteSegmentStoreDirectory.uploadMetadata(segInfos.files(true), segInfos, storeDirectory, primaryTerm, generation); verify(remoteMetadataDirectory).copyFrom( eq(storeDirectory), - startsWith("metadata__" + primaryTerm + "__" + generation), - startsWith("metadata__" + primaryTerm + "__" + generation), + startsWith("metadata__" + primaryTermLong + "__" + generationLong), + startsWith("metadata__" + primaryTermLong + "__" + generationLong), eq(IOContext.DEFAULT) ); - VersionedCodecStreamWrapper streamWrapper = new VersionedCodecStreamWrapper<>( new RemoteSegmentMetadataHandler(), RemoteSegmentMetadata.CURRENT_VERSION, @@ -680,16 +733,25 @@ public void testUploadMetadataNonEmpty() throws IOException { RemoteSegmentMetadata remoteSegmentMetadata = streamWrapper.readStream( new ByteArrayIndexInput("expected", BytesReference.toBytes(output.bytes())) ); - Map actual = remoteSegmentStoreDirectory .getSegmentsUploadedToRemoteStore(); Map expected = remoteSegmentMetadata.getMetadata(); - for (String filename : expected.keySet()) { assertEquals(expected.get(filename).toString(), actual.get(filename).toString()); } } + public void testUploadMetadataNoSegmentCommitInfos() throws IOException { + SegmentInfos segInfos = indexShard.store().readLastCommittedSegmentsInfo(); + int numSegCommitInfos = segInfos.size(); + assertEquals( + "For a fresh index, the number of SegmentCommitInfo instances associated with the SegmentInfos instance should be 0, but were found to be " + + numSegCommitInfos, + 0, + numSegCommitInfos + ); + } + public void testNoMetadataHeaderCorruptIndexException() throws IOException { List metadataFiles = List.of(metadataFilename); when( @@ -700,8 +762,8 @@ public void testNoMetadataHeaderCorruptIndexException() throws IOException { ).thenReturn(metadataFiles); Map metadata = new HashMap<>(); - metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234"); - metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345"); + metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::" + Version.LATEST.major); + metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::" + Version.LATEST.major); BytesStreamOutput output = new BytesStreamOutput(); OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096); @@ -723,8 +785,8 @@ public void testInvalidCodecHeaderCorruptIndexException() throws IOException { ).thenReturn(metadataFiles); Map metadata = new HashMap<>(); - metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234"); - metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345"); + metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::" + Version.LATEST.major); + metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::" + Version.LATEST.major); BytesStreamOutput output = new BytesStreamOutput(); OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096); @@ -748,8 +810,8 @@ public void testHeaderMinVersionCorruptIndexException() throws IOException { ).thenReturn(metadataFiles); Map metadata = new HashMap<>(); - metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234"); - metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345"); + metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::" + Version.LATEST.major); + metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::" + Version.LATEST.major); BytesStreamOutput output = new BytesStreamOutput(); OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096); @@ -773,8 +835,8 @@ public void testHeaderMaxVersionCorruptIndexException() throws IOException { ).thenReturn(metadataFiles); Map metadata = new HashMap<>(); - metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234"); - metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345"); + metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::" + Version.LATEST.major); + metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::" + Version.LATEST.major); BytesStreamOutput output = new BytesStreamOutput(); OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096); @@ -798,8 +860,8 @@ public void testIncorrectChecksumCorruptIndexException() throws IOException { ).thenReturn(metadataFiles); Map metadata = new HashMap<>(); - metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512"); - metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::1024"); + metadata.put("_0.cfe", "_0.cfe::_0.cfe__" + UUIDs.base64UUID() + "::1234::512::" + Version.LATEST.major); + metadata.put("_0.cfs", "_0.cfs::_0.cfs__" + UUIDs.base64UUID() + "::2345::1024::" + Version.LATEST.major); BytesStreamOutput output = new BytesStreamOutput(); IndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096); @@ -947,6 +1009,12 @@ public void testSegmentMetadataCurrentVersion() { assertEquals(RemoteSegmentMetadata.CURRENT_VERSION, 1); } + private void indexDocs(int startDocId, int numberOfDocs) throws IOException { + for (int i = startDocId; i < startDocId + numberOfDocs; i++) { + indexDoc(indexShard, "_doc", Integer.toString(i)); + } + } + public void testMetadataFileNameOrder() { String file1 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 21, 23, 1, 1); String file2 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(15, 38, 38, 1, 1); diff --git a/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java index dc7940529f024..2fee77ab563c0 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java @@ -12,6 +12,7 @@ import org.apache.lucene.store.ByteBuffersDataOutput; import org.apache.lucene.store.ByteBuffersIndexOutput; import org.apache.lucene.store.OutputStreamIndexOutput; +import org.apache.lucene.util.Version; import org.junit.After; import org.junit.Before; import org.opensearch.cluster.metadata.IndexMetadata; @@ -134,6 +135,8 @@ private Map getDummyData() { + randomIntBetween(1000, 5000) + "::" + randomIntBetween(1024, 2048) + + "::" + + Version.LATEST.major ); expectedOutput.put( prefix + ".cfs", @@ -146,6 +149,8 @@ private Map getDummyData() { + randomIntBetween(1000, 5000) + "::" + randomIntBetween(1024, 2048) + + "::" + + Version.LATEST.major ); return expectedOutput; }