Skip to content

Commit

Permalink
minor fixes and improve tests
Browse files Browse the repository at this point in the history
Signed-off-by: bansvaru <bansvaru@amazon.com>
  • Loading branch information
linuxpi committed Sep 5, 2023
1 parent 788856a commit 82a4ee8
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,15 @@
package org.opensearch.gateway.remote;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.junit.Before;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
Expand All @@ -22,6 +28,11 @@ public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase {

private static String INDEX_NAME = "test-index";

@Before
public void setup() {
setupRepo();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
Expand Down Expand Up @@ -55,8 +66,7 @@ public void testFullClusterRestoreStaleDelete() throws Exception {
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;

// Step - 1 index some data to generate files in remote directory
Map<String, Long> indexStats = initialTestSetup(shardCount, replicaCount, dataNodeCount, 1);
initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
setReplicaCount(0);
setReplicaCount(1);
setReplicaCount(0);
Expand All @@ -70,21 +80,34 @@ public void testFullClusterRestoreStaleDelete() throws Exception {
RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateService.class
);

RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);

BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
BlobPath baseMetadataPath = repository.basePath()
.add(
Base64.getUrlEncoder()
.withoutPadding()
.encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
)
.add("cluster-state")
.add(getClusterState().metadata().clusterUUID());

assertEquals(repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size(), 4);

Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestIndexMetadata(
cluster().getClusterName(),
getClusterState().metadata().clusterUUID()
);
assertEquals(indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas(), 0);
assertEquals(indexMetadataMap.values().stream().findFirst().get().getNumberOfShards(), shardCount);
}

private void setReplicaCount(int replicaCount) {
// Step 3 - Reduce shard limits to hit shard limit with less no of shards
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount))
.get();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,4 @@ public String toString() {
}
return sb.toString();
}

public int size() {
return paths.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,9 @@ private BlobPath getManifestFolderPath(String clusterName, String clusterUUID) {
* @param clusterName name of the cluster
* @return {@code Map<String, IndexMetadata>} latest IndexUUID to IndexMetadata map
*/
public Map<String, IndexMetadata> getLatestIndexMetadata(String clusterName, String clusterUUID) throws IOException {
public Map<String, IndexMetadata> getLatestIndexMetadata(String clusterName, String clusterUUID) {
ensureRepositorySet();

Map<String, IndexMetadata> remoteIndexMetadata = new HashMap<>();
ClusterMetadataManifest clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
assert Objects.equals(clusterUUID, clusterMetadataManifest.getClusterUUID())
Expand Down Expand Up @@ -484,29 +486,34 @@ private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String cluste
* @param clusterName name of the cluster
* @param clusterUUIDs clusterUUID's for which remote cluster metadata needs to be purged.
*/
public void deleteMetadataAsync(String clusterName, List<String> clusterUUIDs) {
(new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool)).deleteAsync(
ThreadPool.Names.REMOTE_PURGE,
clusterUUIDs.stream()
.map(clusterUUID -> getCusterMetadataBasePath(clusterName, clusterUUID).toString())
.collect(Collectors.toList()),
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.info("Deleted Remote Cluster Metadata for clusterUUIDs {}", clusterUUIDs);
}
private void deleteMetadataAsync(String clusterName, List<String> clusterUUIDs) {
// TODO: add tests and make public
ensureRepositorySet();

@Override
public void onFailure(Exception e) {
logger.error(
new ParameterizedMessage(
"Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}",
clusterUUIDs
)
);
BlobStoreTransferService blobStoreTransferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool);
clusterUUIDs.forEach(clusterUUID -> {
blobStoreTransferService.deleteAsync(
ThreadPool.Names.REMOTE_PURGE,
getCusterMetadataBasePath(clusterName, clusterUUID),
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.info("Deleted Remote Cluster Metadata for clusterUUIDs {}", clusterUUIDs);
}

@Override
public void onFailure(Exception e) {
logger.error(
new ParameterizedMessage(
"Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}",
clusterUUIDs
),
e
);
}
}
}
);
);
});
}

/**
Expand All @@ -518,85 +525,86 @@ public void onFailure(Exception e) {
public void deleteClusterMetadataMarker(String clusterName, String clusterUUID, int manifestsToRetain) {
BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool);

// TODO: do we need a synchronized block to avoid concurrent executions
transferService.listAllInSortedOrderAsync(
ThreadPool.Names.REMOTE_PURGE,
getManifestFolderPath(clusterName, clusterUUID),
"manifest",
Integer.MAX_VALUE,
new ActionListener<>() {
int evaluatedManifestCount = 1;

@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
Set<String> filesToKeep = new HashSet<>();
List<String> stalePaths = new ArrayList<>();
blobMetadata.forEach(manifestBlobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest(
clusterName,
clusterUUID,
manifestBlobMetadata.name()
);
if (evaluatedManifestCount <= manifestsToRetain) {
clusterMetadataManifest.getIndices()
.forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()));
} else {
stalePaths.add(new BlobPath().add("manifest").buildAsString() + manifestBlobMetadata.name());
clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
stalePaths.add(
new BlobPath().add("index").add(uploadedIndexMetadata.getIndexUUID()).buildAsString()
+ uploadedIndexMetadata.getUploadedFilename()
+ ".dat"
);
}
});
}
evaluatedManifestCount += 1;
});
synchronized (this) {
transferService.listAllInSortedOrderAsync(
ThreadPool.Names.REMOTE_PURGE,
getManifestFolderPath(clusterName, clusterUUID),
"manifest",
Integer.MAX_VALUE,
new ActionListener<>() {
int evaluatedManifestCount = 1;

@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
Set<String> filesToKeep = new HashSet<>();
List<String> stalePaths = new ArrayList<>();
blobMetadata.forEach(manifestBlobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest(
clusterName,
clusterUUID,
manifestBlobMetadata.name()
);
if (evaluatedManifestCount <= manifestsToRetain) {
clusterMetadataManifest.getIndices()
.forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()));
} else {
stalePaths.add(new BlobPath().add("manifest").buildAsString() + manifestBlobMetadata.name());
clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
stalePaths.add(
new BlobPath().add("index").add(uploadedIndexMetadata.getIndexUUID()).buildAsString()
+ uploadedIndexMetadata.getUploadedFilename()
+ ".dat"
);
}
});
}
evaluatedManifestCount += 1;
});

logger.info(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths));
logger.info(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths));

if (stalePaths.toArray().length == 0) {
logger.trace("No stale Remote Cluster Metadata files found");
return;
}
if (stalePaths.toArray().length == 0) {
logger.trace("No stale Remote Cluster Metadata files found");
return;
}

transferService.deleteBlobsAsync(
ThreadPool.Names.REMOTE_PURGE,
getCusterMetadataBasePath(clusterName, clusterUUID),
stalePaths,
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.info(
String.format(Locale.ROOT, "Deleted [%s] stale Remote Cluster Metadata files", stalePaths.size())
);
}
transferService.deleteBlobsAsync(
ThreadPool.Names.REMOTE_PURGE,
getCusterMetadataBasePath(clusterName, clusterUUID),
stalePaths,
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.info(
String.format(Locale.ROOT, "Deleted [%s] stale Remote Cluster Metadata files", stalePaths.size())
);
}

@Override
public void onFailure(Exception e) {
logger.error(
new ParameterizedMessage(
"Exception occurred while deleting stale Remote Cluster Metadata files - {}",
stalePaths
)
);
@Override
public void onFailure(Exception e) {
logger.error(
new ParameterizedMessage(
"Exception occurred while deleting stale Remote Cluster Metadata files - {}",
stalePaths
)
);
}
}
}
);
}
);
}

@Override
public void onFailure(Exception e) {
logger.error(
new ParameterizedMessage(
"Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}",
clusterUUID
)
);
@Override
public void onFailure(Exception e) {
logger.error(
new ParameterizedMessage(
"Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}",
clusterUUID
)
);
}
}
}
);
);
}
}
}

0 comments on commit 82a4ee8

Please sign in to comment.