Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add support download latest index metadata from remote #9713

Merged
merged 1 commit into from
Sep 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -349,17 +349,13 @@
}

@Override
public void listBlobsByPrefixInSortedOrder(
String blobNamePrefix,
int limit,
BlobNameSortOrder blobNameSortOrder,
ActionListener<List<BlobMetadata>> listener
) {
public List<BlobMetadata> listBlobsByPrefixInSortedOrder(String blobNamePrefix, int limit, BlobNameSortOrder blobNameSortOrder)
throws IOException {
// As AWS S3 returns list of keys in Lexicographic order, we don't have to fetch all the keys in order to sort them
// We fetch only keys as per the given limit to optimize the fetch. If provided sort order is not Lexicographic,
// we fall-back to default implementation of fetching all the keys and sorting them.
if (blobNameSortOrder != BlobNameSortOrder.LEXICOGRAPHIC) {
super.listBlobsByPrefixInSortedOrder(blobNamePrefix, limit, blobNameSortOrder, listener);
return super.listBlobsByPrefixInSortedOrder(blobNamePrefix, limit, blobNameSortOrder);

Check warning on line 358 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L358

Added line #L358 was not covered by tests
} else {
if (limit < 0) {
throw new IllegalArgumentException("limit should not be a negative value");
Expand All @@ -370,9 +366,9 @@
.flatMap(listing -> listing.contents().stream())
.map(s3Object -> new PlainBlobMetadata(s3Object.key().substring(keyPath.length()), s3Object.size()))
.collect(Collectors.toList());
listener.onResponse(blobs.subList(0, Math.min(limit, blobs.size())));
return blobs.subList(0, Math.min(limit, blobs.size()));
} catch (final Exception e) {
listener.onFailure(new IOException("Exception when listing blobs by prefix [" + prefix + "]", e));
throw new IOException("Exception when listing blobs by prefix [" + prefix + "]", e);

Check warning on line 371 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L371

Added line #L371 was not covered by tests
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,19 @@
throw new IllegalArgumentException("limit should not be a negative value");
}
try {
List<BlobMetadata> blobNames = new ArrayList<>(listBlobsByPrefix(blobNamePrefix).values());
blobNames.sort(blobNameSortOrder.comparator());
listener.onResponse(blobNames.subList(0, Math.min(blobNames.size(), limit)));
listener.onResponse(listBlobsByPrefixInSortedOrder(blobNamePrefix, limit, blobNameSortOrder));
} catch (Exception e) {
listener.onFailure(e);
}
}

default List<BlobMetadata> listBlobsByPrefixInSortedOrder(String blobNamePrefix, int limit, BlobNameSortOrder blobNameSortOrder)
throws IOException {
if (limit < 0) {
throw new IllegalArgumentException("limit should not be a negative value");

Check warning on line 243 in server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java#L243

Added line #L243 was not covered by tests
}
List<BlobMetadata> blobNames = new ArrayList<>(listBlobsByPrefix(blobNamePrefix).values());
blobNames.sort(blobNameSortOrder.comparator());
return blobNames.subList(0, Math.min(blobNames.size(), limit));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
Expand All @@ -35,7 +36,9 @@
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -87,7 +90,7 @@ public class RemoteClusterStateService implements Closeable {
);
private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class);

private static final String DELIMITER = "__";
public static final String DELIMITER = "__";

private final String nodeId;
private final Supplier<RepositoriesService> repositoriesService;
Expand Down Expand Up @@ -367,4 +370,101 @@ private static String indexMetadataFileName(IndexMetadata indexMetadata) {
return String.join(DELIMITER, "metadata", String.valueOf(indexMetadata.getVersion()), String.valueOf(System.currentTimeMillis()));
}

/**
* Fetch latest index metadata from remote cluster state
* @param clusterUUID uuid of cluster state to refer to in remote
* @param clusterName name of the cluster
* @return {@code Map<String, IndexMetadata>} latest IndexUUID to IndexMetadata map
*/
public Map<String, IndexMetadata> getLatestIndexMetadata(String clusterName, String clusterUUID) throws IOException {
Map<String, IndexMetadata> remoteIndexMetadata = new HashMap<>();
ClusterMetadataManifest clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
assert Objects.equals(clusterUUID, clusterMetadataManifest.getClusterUUID())
: "Corrupt ClusterMetadataManifest found. Cluster UUID mismatch.";
for (UploadedIndexMetadata uploadedIndexMetadata : clusterMetadataManifest.getIndices()) {
IndexMetadata indexMetadata = getIndexMetadata(clusterName, clusterUUID, uploadedIndexMetadata);
remoteIndexMetadata.put(uploadedIndexMetadata.getIndexUUID(), indexMetadata);
}
return remoteIndexMetadata;
}

/**
* Fetch index metadata from remote cluster state
* @param clusterUUID uuid of cluster state to refer to in remote
* @param clusterName name of the cluster
* @param uploadedIndexMetadata {@link UploadedIndexMetadata} contains details about remote location of index metadata
* @return {@link IndexMetadata}
*/
private IndexMetadata getIndexMetadata(String clusterName, String clusterUUID, UploadedIndexMetadata uploadedIndexMetadata) {
try {
return INDEX_METADATA_FORMAT.read(
indexMetadataContainer(clusterName, clusterUUID, uploadedIndexMetadata.getIndexUUID()),
uploadedIndexMetadata.getUploadedFilename(),
blobStoreRepository.getNamedXContentRegistry()
);
} catch (IOException e) {
throw new IllegalStateException(
String.format(Locale.ROOT, "Error while downloading IndexMetadata - %s", uploadedIndexMetadata.getUploadedFilename()),
e
);
}
}

/**
* Fetch latest ClusterMetadataManifest from remote state store
* @param clusterUUID uuid of cluster state to refer to in remote
* @param clusterName name of the cluster
* @return ClusterMetadataManifest
*/
public ClusterMetadataManifest getLatestClusterMetadataManifest(String clusterName, String clusterUUID) {
String latestManifestFileName = getLatestManifestFileName(clusterName, clusterUUID);
return fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, latestManifestFileName);
}

/**
* Fetch latest ClusterMetadataManifest file from remote state store
* @param clusterUUID uuid of cluster state to refer to in remote
* @param clusterName name of the cluster
* @return latest ClusterMetadataManifest filename
*/
private String getLatestManifestFileName(String clusterName, String clusterUUID) throws IllegalStateException {
try {
/**
* {@link BlobContainer#listBlobsByPrefixInSortedOrder} will get the latest manifest file
* as the manifest file name generated via {@link RemoteClusterStateService#getManifestFileName} ensures
* when sorted in LEXICOGRAPHIC order the latest uploaded manifest file comes on top.
*/
List<BlobMetadata> manifestFilesMetadata = manifestContainer(clusterName, clusterUUID).listBlobsByPrefixInSortedOrder(
"manifest" + DELIMITER,
1,
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
);
if (manifestFilesMetadata != null && !manifestFilesMetadata.isEmpty()) {
return manifestFilesMetadata.get(0).name();
}
} catch (IOException e) {
throw new IllegalStateException("Error while fetching latest manifest file for remote cluster state", e);
}

throw new IllegalStateException(String.format(Locale.ROOT, "Remote Cluster State not found - %s", clusterUUID));
}

/**
* Fetch ClusterMetadataManifest from remote state store
* @param clusterUUID uuid of cluster state to refer to in remote
* @param clusterName name of the cluster
* @return ClusterMetadataManifest
*/
private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String clusterName, String clusterUUID, String filename)
throws IllegalStateException {
try {
return RemoteClusterStateService.CLUSTER_METADATA_MANIFEST_FORMAT.read(
manifestContainer(clusterName, clusterUUID),
filename,
blobStoreRepository.getNamedXContentRegistry()
);
} catch (IOException e) {
throw new IllegalStateException(String.format(Locale.ROOT, "Error while downloading cluster metadata - %s", filename), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,10 @@
return metadata;
}

public NamedXContentRegistry getNamedXContentRegistry() {
return namedXContentRegistry;

Check warning on line 809 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L809

Added line #L809 was not covered by tests
}

public Compressor getCompressor() {
return compressor;
}
Expand Down
Loading
Loading