From 2dc3f7416ec1c1f715c8d16b7e546c020ac52450 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 29 Apr 2024 19:08:08 +0530 Subject: [PATCH] Add version details in remote index path file with code enhancements (#13386) Signed-off-by: Ashish Singh --- CHANGELOG.md | 2 + .../RemoteStoreUploadIndexPathIT.java | 39 ++++++----- .../remote/IndexMetadataUploadListener.java | 15 +++- .../remote/RemoteClusterStateService.java | 45 ++++++------ .../org/opensearch/index/IndexSettings.java | 19 +---- .../index/remote/RemoteIndexPath.java | 11 ++- .../index/remote/RemoteIndexPathUploader.java | 67 ++++++++++++------ .../index/remote/RemoteStoreUtils.java | 21 ++++++ .../remote/RemoteIndexPathUploaderTests.java | 69 ++++++++++++------- 9 files changed, 183 insertions(+), 105 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a7d3ebc7f65dc..a6711fb58f43f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992)) - [Tiered caching] Make Indices Request Cache Stale Key Mgmt Threshold setting dynamic ([#12941](https://github.com/opensearch-project/OpenSearch/pull/12941)) - Batch mode for async fetching shard information in GatewayAllocator for unassigned shards ([#8746](https://github.com/opensearch-project/OpenSearch/pull/8746)) +- [Remote Store] Add settings for remote path type and hash algorithm ([#13225](https://github.com/opensearch-project/OpenSearch/pull/13225)) +- [Remote Store] Upload remote paths during remote enabled index creation ([#13386](https://github.com/opensearch-project/OpenSearch/pull/13386)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreUploadIndexPathIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreUploadIndexPathIT.java index c39cec96aa476..9b30dacfced13 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreUploadIndexPathIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreUploadIndexPathIT.java @@ -13,11 +13,13 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.util.FileSystemUtils; import org.opensearch.index.remote.RemoteIndexPath; +import org.opensearch.index.remote.RemoteIndexPathUploader; import org.opensearch.index.remote.RemoteStoreEnums; import org.opensearch.test.OpenSearchIntegTestCase; import java.io.IOException; -import java.util.Locale; +import java.nio.file.Path; +import java.util.Arrays; import java.util.concurrent.ExecutionException; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; @@ -81,28 +83,29 @@ public void testRemoteIndexPathFileCreation() throws ExecutionException, Interru } - private void validateRemoteIndexPathFile(boolean exists) { + private void validateRemoteIndexPathFile(boolean exists) throws IOException { String indexUUID = client().admin() .indices() .prepareGetSettings(INDEX_NAME) .get() .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); - + String fileName = generatePartFileName(indexUUID); assertEquals(exists, FileSystemUtils.exists(translogRepoPath.resolve(RemoteIndexPath.DIR))); - assertEquals( - exists, - FileSystemUtils.exists( - translogRepoPath.resolve(RemoteIndexPath.DIR) - .resolve(String.format(Locale.ROOT, RemoteIndexPath.FILE_NAME_FORMAT, indexUUID)) - ) - ); - assertEquals(exists, FileSystemUtils.exists(segmentRepoPath.resolve(RemoteIndexPath.DIR))); - assertEquals( - exists, - FileSystemUtils.exists( - segmentRepoPath.resolve(RemoteIndexPath.DIR) - .resolve(String.format(Locale.ROOT, RemoteIndexPath.FILE_NAME_FORMAT, indexUUID)) - ) - ); + if (exists) { + Path[] files = FileSystemUtils.files(translogRepoPath.resolve(RemoteIndexPath.DIR)); + assertEquals(1, files.length); + assertTrue(Arrays.stream(files).anyMatch(file -> file.toString().contains(fileName))); + String translogPathFile = files[0].toString(); + assertTrue(FileSystemUtils.exists(segmentRepoPath.resolve(RemoteIndexPath.DIR))); + files = FileSystemUtils.files(segmentRepoPath.resolve(RemoteIndexPath.DIR)); + assertEquals(1, files.length); + assertTrue(Arrays.stream(files).anyMatch(file -> file.toString().contains(fileName))); + String segmentPathFile = files[0].toString(); + assertNotEquals(translogPathFile, segmentPathFile); + } + } + + private String generatePartFileName(String indexUUID) { + return String.join(RemoteIndexPathUploader.DELIMITER, indexUUID, "2", RemoteIndexPath.DEFAULT_VERSION); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadListener.java b/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadListener.java index f9158c9260747..7d5fe8140e1e1 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadListener.java +++ b/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadListener.java @@ -13,6 +13,7 @@ import org.opensearch.threadpool.ThreadPool; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.ExecutorService; @@ -41,9 +42,17 @@ public IndexMetadataUploadListener(ThreadPool threadPool, String threadPoolName) * @param indexMetadataList list of index metadata of new indexes (or first time index metadata upload). * @param actionListener listener to be invoked on success or failure. */ - public final void onNewIndexUpload(List indexMetadataList, ActionListener actionListener) { - executorService.execute(() -> doOnNewIndexUpload(indexMetadataList, actionListener)); + public final void onUpload( + List indexMetadataList, + Map prevIndexMetadataByName, + ActionListener actionListener + ) { + executorService.execute(() -> doOnUpload(indexMetadataList, prevIndexMetadataByName, actionListener)); } - protected abstract void doOnNewIndexUpload(List indexMetadataList, ActionListener actionListener); + protected abstract void doOnUpload( + List indexMetadataList, + Map prevIndexMetadataByName, + ActionListener actionListener + ); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index d2f927c827e5b..eaf607564185c 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -242,7 +242,7 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri final List allUploadedIndexMetadata = writeIndexMetadataParallel( clusterState, toUpload, - ClusterState.UNKNOWN_UUID.equals(previousClusterUUID) ? toUpload : Collections.emptyList() + Collections.emptyMap() ); final ClusterMetadataManifest manifest = uploadManifest( clusterState, @@ -307,9 +307,9 @@ public ClusterMetadataManifest writeIncrementalMetadata( } // Write Index Metadata - final Map previousStateIndexMetadataVersionByName = new HashMap<>(); + final Map previousStateIndexMetadataByName = new HashMap<>(); for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) { - previousStateIndexMetadataVersionByName.put(indexMetadata.getIndex().getName(), indexMetadata.getVersion()); + previousStateIndexMetadataByName.put(indexMetadata.getIndex().getName(), indexMetadata); } int numIndicesUpdated = 0; @@ -319,9 +319,12 @@ public ClusterMetadataManifest writeIncrementalMetadata( .collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity())); List toUpload = new ArrayList<>(); - List newIndexMetadataList = new ArrayList<>(); + // We prepare a map that contains the previous index metadata for the indexes for which version has changed. + Map prevIndexMetadataByName = new HashMap<>(); for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { - final Long previousVersion = previousStateIndexMetadataVersionByName.get(indexMetadata.getIndex().getName()); + String indexName = indexMetadata.getIndex().getName(); + final IndexMetadata prevIndexMetadata = previousStateIndexMetadataByName.get(indexName); + Long previousVersion = prevIndexMetadata != null ? prevIndexMetadata.getVersion() : null; if (previousVersion == null || indexMetadata.getVersion() != previousVersion) { logger.debug( "updating metadata for [{}], changing version from [{}] to [{}]", @@ -331,22 +334,19 @@ public ClusterMetadataManifest writeIncrementalMetadata( ); numIndicesUpdated++; toUpload.add(indexMetadata); + prevIndexMetadataByName.put(indexName, prevIndexMetadata); } else { numIndicesUnchanged++; } - previousStateIndexMetadataVersionByName.remove(indexMetadata.getIndex().getName()); - // Adding the indexMetadata to newIndexMetadataList if there is no previous version present for the index. - if (previousVersion == null) { - newIndexMetadataList.add(indexMetadata); - } + previousStateIndexMetadataByName.remove(indexMetadata.getIndex().getName()); } - List uploadedIndexMetadataList = writeIndexMetadataParallel(clusterState, toUpload, newIndexMetadataList); + List uploadedIndexMetadataList = writeIndexMetadataParallel(clusterState, toUpload, prevIndexMetadataByName); uploadedIndexMetadataList.forEach( uploadedIndexMetadata -> allUploadedIndexMetadata.put(uploadedIndexMetadata.getIndexName(), uploadedIndexMetadata) ); - for (String removedIndexName : previousStateIndexMetadataVersionByName.keySet()) { + for (String removedIndexName : previousStateIndexMetadataByName.keySet()) { allUploadedIndexMetadata.remove(removedIndexName); } final ClusterMetadataManifest manifest = uploadManifest( @@ -452,7 +452,7 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException private List writeIndexMetadataParallel( ClusterState clusterState, List toUpload, - List newIndexMetadataList + Map prevIndexMetadataByName ) throws IOException { assert Objects.nonNull(indexMetadataUploadListeners) : "indexMetadataUploadListeners can not be null"; int latchCount = toUpload.size() + indexMetadataUploadListeners.size(); @@ -482,7 +482,7 @@ private List writeIndexMetadataParallel( writeIndexMetadataAsync(clusterState, indexMetadata, latchedActionListener); } - invokeIndexMetadataUploadListeners(newIndexMetadataList, latch, exceptionList); + invokeIndexMetadataUploadListeners(toUpload, prevIndexMetadataByName, latch, exceptionList); try { if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) { @@ -527,15 +527,17 @@ private List writeIndexMetadataParallel( * Invokes the index metadata upload listener but does not wait for the execution to complete. */ private void invokeIndexMetadataUploadListeners( - List newIndexMetadataList, + List updatedIndexMetadataList, + Map prevIndexMetadataByName, CountDownLatch latch, List exceptionList ) { for (IndexMetadataUploadListener listener : indexMetadataUploadListeners) { String listenerName = listener.getClass().getSimpleName(); - listener.onNewIndexUpload( - newIndexMetadataList, - getIndexMetadataUploadActionListener(newIndexMetadataList, latch, exceptionList, listenerName) + listener.onUpload( + updatedIndexMetadataList, + prevIndexMetadataByName, + getIndexMetadataUploadActionListener(updatedIndexMetadataList, prevIndexMetadataByName, latch, exceptionList, listenerName) ); } @@ -543,6 +545,7 @@ private void invokeIndexMetadataUploadListeners( private ActionListener getIndexMetadataUploadActionListener( List newIndexMetadataList, + Map prevIndexMetadataByName, CountDownLatch latch, List exceptionList, String listenerName @@ -552,18 +555,20 @@ private ActionListener getIndexMetadataUploadActionListener( ActionListener.wrap( ignored -> logger.trace( new ParameterizedMessage( - "{} : Invoked listener={} successfully tookTimeNs={}", + "listener={} : Invoked successfully with indexMetadataList={} prevIndexMetadataList={} tookTimeNs={}", listenerName, newIndexMetadataList, + prevIndexMetadataByName.values(), (System.nanoTime() - startTime) ) ), ex -> { logger.error( new ParameterizedMessage( - "{} : Exception during invocation of listener={} tookTimeNs={}", + "listener={} : Exception during invocation with indexMetadataList={} prevIndexMetadataList={} tookTimeNs={}", listenerName, newIndexMetadataList, + prevIndexMetadataByName.values(), (System.nanoTime() - startTime) ), ex diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 388de65ca58a1..9d8ab6815eecc 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -48,9 +48,8 @@ import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.Index; -import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; -import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.index.remote.RemoteStorePathStrategy; +import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.Translog; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.ingest.IngestService; @@ -62,8 +61,6 @@ import java.util.Collections; import java.util.List; import java.util.Locale; -import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -990,7 +987,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti */ widenIndexSortType = IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(settings).before(V_2_7_0); assignedOnRemoteNode = RemoteStoreNodeAttribute.isRemoteDataAttributePresent(this.getNodeSettings()); - remoteStorePathStrategy = determineRemoteStorePathStrategy(); + remoteStorePathStrategy = RemoteStoreUtils.determineRemoteStorePathStrategy(indexMetadata); setEnableFuzzySetForDocId(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING)); setDocIdFuzzySetFalsePositiveProbability(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING)); @@ -1911,18 +1908,6 @@ public void setDocIdFuzzySetFalsePositiveProbability(double docIdFuzzySetFalsePo this.docIdFuzzySetFalsePositiveProbability = docIdFuzzySetFalsePositiveProbability; } - private RemoteStorePathStrategy determineRemoteStorePathStrategy() { - Map remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); - assert remoteCustomData == null || remoteCustomData.containsKey(PathType.NAME); - if (remoteCustomData != null && remoteCustomData.containsKey(PathType.NAME)) { - PathType pathType = PathType.parseString(remoteCustomData.get(PathType.NAME)); - String hashAlgoStr = remoteCustomData.get(PathHashAlgorithm.NAME); - PathHashAlgorithm hashAlgorithm = Objects.nonNull(hashAlgoStr) ? PathHashAlgorithm.parseString(hashAlgoStr) : null; - return new RemoteStorePathStrategy(pathType, hashAlgorithm); - } - return new RemoteStorePathStrategy(PathType.FIXED); - } - public RemoteStorePathStrategy getRemoteStorePathStrategy() { return remoteStorePathStrategy; } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPath.java b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPath.java index 68cf6923bcf45..89b642b79df86 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPath.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPath.java @@ -52,7 +52,7 @@ public class RemoteIndexPath implements ToXContentFragment { combinedPath.putAll(SEGMENT_PATH); COMBINED_PATH = Collections.unmodifiableMap(combinedPath); } - private static final String DEFAULT_VERSION = "1"; + public static final String DEFAULT_VERSION = "1"; public static final String DIR = "remote-index-path"; public static final String FILE_NAME_FORMAT = "remote_path_%s"; static final String KEY_VERSION = "version"; @@ -60,6 +60,8 @@ public class RemoteIndexPath implements ToXContentFragment { static final String KEY_SHARD_COUNT = "shard_count"; static final String KEY_PATH_CREATION_MAP = "path_creation_map"; static final String KEY_PATHS = "paths"; + + private final String version; private final String indexUUID; private final int shardCount; private final Iterable basePath; @@ -109,6 +111,7 @@ public RemoteIndexPath( .getFormattedMessage() ); } + this.version = DEFAULT_VERSION; this.indexUUID = indexUUID; this.shardCount = shardCount; this.basePath = basePath; @@ -119,7 +122,7 @@ public RemoteIndexPath( @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field(KEY_VERSION, DEFAULT_VERSION); + builder.field(KEY_VERSION, version); builder.field(KEY_INDEX_UUID, indexUUID); builder.field(KEY_SHARD_COUNT, shardCount); builder.field(PathType.NAME, pathType.name()); @@ -156,4 +159,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public static RemoteIndexPath fromXContent(XContentParser ignored) { throw new UnsupportedOperationException("RemoteIndexPath.fromXContent() is not supported"); } + + String getVersion() { + return version; + } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java index 1ac7e41014d23..d736a82d57a7c 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.UUIDs; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; @@ -23,6 +24,7 @@ import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.IndexMetadataUploadListener; import org.opensearch.gateway.remote.RemoteClusterStateService.RemoteStateTransferException; +import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.repositories.RepositoriesService; @@ -47,6 +49,7 @@ import static org.opensearch.index.remote.RemoteIndexPath.COMBINED_PATH; import static org.opensearch.index.remote.RemoteIndexPath.SEGMENT_PATH; import static org.opensearch.index.remote.RemoteIndexPath.TRANSLOG_PATH; +import static org.opensearch.index.remote.RemoteStoreUtils.determineRemoteStorePathStrategy; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; @@ -59,6 +62,7 @@ @ExperimentalApi public class RemoteIndexPathUploader extends IndexMetadataUploadListener { + public static final String DELIMITER = "#"; public static final ConfigBlobStoreFormat REMOTE_INDEX_PATH_FORMAT = new ConfigBlobStoreFormat<>( RemoteIndexPath.FILE_NAME_FORMAT ); @@ -99,7 +103,11 @@ public RemoteIndexPathUploader( } @Override - protected void doOnNewIndexUpload(List indexMetadataList, ActionListener actionListener) { + protected void doOnUpload( + List indexMetadataList, + Map prevIndexMetadataByName, + ActionListener actionListener + ) { if (isRemoteDataAttributePresent == false) { logger.trace("Skipping beforeNewIndexUpload as there are no remote indexes"); actionListener.onResponse(null); @@ -108,7 +116,9 @@ protected void doOnNewIndexUpload(List indexMetadataList, ActionL long startTime = System.nanoTime(); boolean success = false; - List eligibleList = indexMetadataList.stream().filter(this::requiresPathUpload).collect(Collectors.toList()); + List eligibleList = indexMetadataList.stream() + .filter(idxMd -> requiresPathUpload(idxMd, prevIndexMetadataByName.get(idxMd.getIndex().getName()))) + .collect(Collectors.toList()); String indexNames = eligibleList.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining(",")); int latchCount = eligibleList.size() * (isTranslogSegmentRepoSame ? 1 : 2); CountDownLatch latch = new CountDownLatch(latchCount); @@ -182,7 +192,7 @@ private void writePathToRemoteStore( Map> pathCreationMap ) { Map remoteCustomData = idxMD.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); - RemoteStoreEnums.PathType pathType = RemoteStoreEnums.PathType.valueOf(remoteCustomData.get(RemoteStoreEnums.PathType.NAME)); + PathType pathType = PathType.valueOf(remoteCustomData.get(PathType.NAME)); RemoteStoreEnums.PathHashAlgorithm hashAlgorithm = RemoteStoreEnums.PathHashAlgorithm.valueOf( remoteCustomData.get(RemoteStoreEnums.PathHashAlgorithm.NAME) ); @@ -192,17 +202,22 @@ private void writePathToRemoteStore( BlobContainer blobContainer = repository.blobStore().blobContainer(basePath.add(RemoteIndexPath.DIR)); ActionListener actionListener = getUploadPathLatchedActionListener(idxMD, latch, exceptionList, pathCreationMap); try { - REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority( - new RemoteIndexPath(indexUUID, shardCount, basePath, pathType, hashAlgorithm, pathCreationMap), - blobContainer, + RemoteIndexPath remoteIndexPath = new RemoteIndexPath( indexUUID, - actionListener + shardCount, + basePath, + pathType, + hashAlgorithm, + pathCreationMap ); + String fileName = generateFileName(indexUUID, idxMD.getVersion(), remoteIndexPath.getVersion()); + REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority(remoteIndexPath, blobContainer, fileName, actionListener); } catch (IOException ioException) { RemoteStateTransferException ex = new RemoteStateTransferException( - String.format(Locale.ROOT, UPLOAD_EXCEPTION_MSG, List.of(idxMD.getIndex().getName())) + String.format(Locale.ROOT, UPLOAD_EXCEPTION_MSG, List.of(idxMD.getIndex().getName())), + ioException ); - actionListener.onFailure(ioException); + actionListener.onFailure(ex); } } @@ -225,6 +240,8 @@ public void start() { } private boolean isTranslogSegmentRepoSame() { + // TODO - The current comparison checks the repository name. But it is also possible that the repository are same + // by attributes, but different by name. We need to handle this. String translogRepoName = settings.get(TRANSLOG_REPO_NAME_KEY); String segmentRepoName = settings.get(SEGMENT_REPO_NAME_KEY); return Objects.equals(translogRepoName, segmentRepoName); @@ -261,21 +278,29 @@ private LatchedActionListener getUploadPathLatchedActionListener( * This method checks if the index metadata has attributes that calls for uploading the index path for remote store * uploads. It checks if the remote store path type is {@code HASHED_PREFIX} and returns true if so. */ - private boolean requiresPathUpload(IndexMetadata indexMetadata) { - // A cluster will have remote custom metadata only if the cluster is remote store enabled from data side. - Map remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); - if (Objects.isNull(remoteCustomData) || remoteCustomData.isEmpty()) { - return false; - } - String pathTypeStr = remoteCustomData.get(RemoteStoreEnums.PathType.NAME); - if (Objects.isNull(pathTypeStr)) { - return false; - } - // We need to upload the path only if the path type for an index is hashed_prefix - return RemoteStoreEnums.PathType.HASHED_PREFIX == RemoteStoreEnums.PathType.parseString(pathTypeStr); + private boolean requiresPathUpload(IndexMetadata indexMetadata, IndexMetadata prevIndexMetadata) { + PathType pathType = determineRemoteStorePathStrategy(indexMetadata).getType(); + PathType prevPathType = Objects.nonNull(prevIndexMetadata) ? determineRemoteStorePathStrategy(prevIndexMetadata).getType() : null; + // If previous metadata is null or previous path type is not hashed_prefix, and along with new path type being + // hashed_prefix, then this can mean any of the following - + // 1. This is creation of remote index with hashed_prefix + // 2. We are enabling cluster state for the very first time with multiple indexes having hashed_prefix path type. + // 3. A docrep index is being migrated to being remote store index. + return pathType == PathType.HASHED_PREFIX && (Objects.isNull(prevPathType) || prevPathType != PathType.HASHED_PREFIX); } private void setIndexMetadataUploadTimeout(TimeValue newIndexMetadataUploadTimeout) { this.indexMetadataUploadTimeout = newIndexMetadataUploadTimeout; } + + /** + * Creates a file name by combining index uuid, index metadata version and file version. # has been chosen as the + * delimiter since it does not collide with any possible letters in file name. The random base64 uuid is added to + * ensure that the file does not get overwritten. We do check if translog and segment repo are same by name, but + * it is possible that a user configures same repo by different name for translog and segment in which case, this + * will lead to file not being overwritten. + */ + private String generateFileName(String indexUUID, long indexMetadataVersion, String fileVersion) { + return String.join(DELIMITER, indexUUID, Long.toString(indexMetadataVersion), fileVersion, UUIDs.randomBase64UUID()); + } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java index 4d1d98334c3c4..7208dac162e1a 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java @@ -8,6 +8,7 @@ package org.opensearch.index.remote; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.collect.Tuple; import java.nio.ByteBuffer; @@ -17,6 +18,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.function.Function; /** @@ -146,4 +148,23 @@ static String longToCompositeBase64AndBinaryEncoding(long value, int len) { assert base64DecimalValue >= 0 && base64DecimalValue < 64; return URL_BASE64_CHARSET[base64DecimalValue] + binaryPart; } + + /** + * Determines the remote store path strategy by reading the custom data map in IndexMetadata class. + */ + public static RemoteStorePathStrategy determineRemoteStorePathStrategy(IndexMetadata indexMetadata) { + Map remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); + assert remoteCustomData == null || remoteCustomData.containsKey(RemoteStoreEnums.PathType.NAME); + if (remoteCustomData != null && remoteCustomData.containsKey(RemoteStoreEnums.PathType.NAME)) { + RemoteStoreEnums.PathType pathType = RemoteStoreEnums.PathType.parseString( + remoteCustomData.get(RemoteStoreEnums.PathType.NAME) + ); + String hashAlgoStr = remoteCustomData.get(RemoteStoreEnums.PathHashAlgorithm.NAME); + RemoteStoreEnums.PathHashAlgorithm hashAlgorithm = Objects.nonNull(hashAlgoStr) + ? RemoteStoreEnums.PathHashAlgorithm.parseString(hashAlgoStr) + : null; + return new RemoteStorePathStrategy(pathType, hashAlgorithm); + } + return new RemoteStorePathStrategy(RemoteStoreEnums.PathType.FIXED); + } } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java index 2e4dd15ccb581..e539b382a5f3b 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java @@ -23,6 +23,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.gateway.remote.RemoteClusterStateService.RemoteStateTransferException; +import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; @@ -36,11 +37,11 @@ import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; import org.mockito.Mockito; @@ -101,8 +102,8 @@ public void setup() { Map remoteCustomData = Map.of( PathType.NAME, HASHED_PREFIX.name(), - RemoteStoreEnums.PathHashAlgorithm.NAME, - RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64.name() + PathHashAlgorithm.NAME, + PathHashAlgorithm.FNV_1A_BASE64.name() ); Settings idxSettings = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) @@ -136,7 +137,7 @@ public void testInterceptWithNoRemoteDataAttributes() { res -> successCount.incrementAndGet(), ex -> failureCount.incrementAndGet() ); - remoteIndexPathUploader.doOnNewIndexUpload(indexMetadataList, actionListener); + remoteIndexPathUploader.doOnUpload(indexMetadataList, Collections.emptyMap(), actionListener); assertEquals(1, successCount.get()); assertEquals(0, failureCount.get()); verify(indexMetadataList, times(0)).stream(); @@ -154,7 +155,7 @@ public void testInterceptWithEmptyIndexMetadataList() { res -> successCount.incrementAndGet(), ex -> failureCount.incrementAndGet() ); - remoteIndexPathUploader.doOnNewIndexUpload(Collections.emptyList(), actionListener); + remoteIndexPathUploader.doOnUpload(Collections.emptyList(), Collections.emptyMap(), actionListener); assertEquals(1, successCount.get()); assertEquals(0, failureCount.get()); } @@ -173,34 +174,54 @@ public void testInterceptWithEmptyEligibleIndexMetadataList() { ); // Case 1 - Null remoteCustomData - List indexMetadataList = new ArrayList<>(); - IndexMetadata indexMetadata = mock(IndexMetadata.class); - indexMetadataList.add(indexMetadata); - remoteIndexPathUploader.doOnNewIndexUpload(indexMetadataList, actionListener); + List indexMetadataList = List.of(createIndexMetadata(null)); + remoteIndexPathUploader.doOnUpload(indexMetadataList, Collections.emptyMap(), actionListener); assertEquals(1, successCount.get()); assertEquals(0, failureCount.get()); // Case 2 - Empty remoteCustomData - when(indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY)).thenReturn(new HashMap<>()); - remoteIndexPathUploader.doOnNewIndexUpload(indexMetadataList, actionListener); - assertEquals(2, successCount.get()); + assertThrows( + AssertionError.class, + () -> remoteIndexPathUploader.doOnUpload(List.of(createIndexMetadata(new HashMap<>())), Collections.emptyMap(), actionListener) + ); + assertEquals(1, successCount.get()); assertEquals(0, failureCount.get()); // Case 3 - RemoteStoreEnums.PathType.NAME not in remoteCustomData map - Map remoteCustomData = Map.of("test", "test"); - when(indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY)).thenReturn(remoteCustomData); - remoteIndexPathUploader.doOnNewIndexUpload(indexMetadataList, actionListener); - assertEquals(3, successCount.get()); + assertThrows( + AssertionError.class, + () -> remoteIndexPathUploader.doOnUpload( + List.of(createIndexMetadata(Map.of("test", "test"))), + Collections.emptyMap(), + actionListener + ) + ); + assertEquals(1, successCount.get()); assertEquals(0, failureCount.get()); // Case 4 - RemoteStoreEnums.PathType.NAME is not HASHED_PREFIX - remoteCustomData = Map.of(PathType.NAME, randomFrom(FIXED, HASHED_INFIX).name()); - when(indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY)).thenReturn(remoteCustomData); - remoteIndexPathUploader.doOnNewIndexUpload(indexMetadataList, actionListener); - assertEquals(4, successCount.get()); + String pathType = randomFrom(FIXED, HASHED_INFIX).name(); + String pathHashAlgorithm = FIXED.name().equals(pathType) ? null : randomFrom(PathHashAlgorithm.values()).name(); + Map remoteCustomData = new HashMap<>(); + remoteCustomData.put(PathType.NAME, pathType); + remoteCustomData.put(PathHashAlgorithm.NAME, pathHashAlgorithm); + indexMetadataList = List.of(createIndexMetadata(remoteCustomData)); + remoteIndexPathUploader.doOnUpload(indexMetadataList, Collections.emptyMap(), actionListener); + assertEquals(2, successCount.get()); assertEquals(0, failureCount.get()); } + private IndexMetadata createIndexMetadata(Map remoteCustomData) { + IndexMetadata.Builder builder = IndexMetadata.builder("test") + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0); + if (Objects.nonNull(remoteCustomData)) { + builder.putCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY, remoteCustomData); + } + return builder.build(); + } + public void testInterceptWithSameRepo() throws IOException { RemoteIndexPathUploader remoteIndexPathUploader = new RemoteIndexPathUploader( threadPool, @@ -213,7 +234,7 @@ public void testInterceptWithSameRepo() throws IOException { res -> successCount.incrementAndGet(), ex -> failureCount.incrementAndGet() ); - remoteIndexPathUploader.doOnNewIndexUpload(indexMetadataList, actionListener); + remoteIndexPathUploader.doOnUpload(indexMetadataList, Collections.emptyMap(), actionListener); assertEquals(1, successCount.get()); assertEquals(0, failureCount.get()); verify(blobContainer, times(1)).writeBlob(anyString(), any(InputStream.class), anyLong(), anyBoolean()); @@ -236,7 +257,7 @@ public void testInterceptWithDifferentRepo() throws IOException { res -> successCount.incrementAndGet(), ex -> failureCount.incrementAndGet() ); - remoteIndexPathUploader.doOnNewIndexUpload(indexMetadataList, actionListener); + remoteIndexPathUploader.doOnUpload(indexMetadataList, Collections.emptyMap(), actionListener); assertEquals(1, successCount.get()); assertEquals(0, failureCount.get()); verify(blobContainer, times(2)).writeBlob(anyString(), any(InputStream.class), anyLong(), anyBoolean()); @@ -263,7 +284,7 @@ public void testInterceptWithLatchAwaitTimeout() throws IOException { failureCount.incrementAndGet(); exceptionSetOnce.set(ex); }); - remoteIndexPathUploader.doOnNewIndexUpload(indexMetadataList, actionListener); + remoteIndexPathUploader.doOnUpload(indexMetadataList, Collections.emptyMap(), actionListener); assertEquals(0, successCount.get()); assertEquals(1, failureCount.get()); assertTrue(exceptionSetOnce.get() instanceof RemoteStateTransferException); @@ -295,7 +316,7 @@ public void testInterceptWithInterruptedExceptionDuringLatchAwait() throws Excep }); Thread thread = new Thread(() -> { try { - remoteIndexPathUploader.onNewIndexUpload(indexMetadataList, actionListener); + remoteIndexPathUploader.onUpload(indexMetadataList, Collections.emptyMap(), actionListener); } catch (Exception e) { assertTrue(e instanceof InterruptedException); assertEquals("sleep interrupted", e.getMessage());