Skip to content

Commit

Permalink
Minor refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: bansvaru <bansvaru@amazon.com>
  • Loading branch information
linuxpi committed Sep 4, 2023
1 parent 7b806b1 commit 04d220d
Showing 1 changed file with 76 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ public class RemoteClusterStateService implements Closeable {

public static final int RETAINED_MANIFESTS = 3;

public static final String DELIMITER = "__";

private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class);

public static final ChecksumBlobStoreFormat<IndexMetadata> INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
"index-metadata",
METADATA_NAME_FORMAT,
Expand Down Expand Up @@ -97,10 +101,6 @@ public class RemoteClusterStateService implements Closeable {
Property.NodeScope,
Property.Final
);
private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class);

public static final String DELIMITER = "__";

private final String nodeId;
private final Supplier<RepositoriesService> repositoriesService;
private final Settings settings;
Expand Down Expand Up @@ -245,6 +245,7 @@ public ClusterMetadataManifest writeIncrementalMetadata(
allUploadedIndexMetadata.values().stream().collect(Collectors.toList()),
false
);
// TODO: Do we need to trigger delete less frequently?
deleteClusterMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS);
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
Expand Down Expand Up @@ -517,82 +518,82 @@ public void onFailure(Exception e) {
public void deleteClusterMetadataMarker(String clusterName, String clusterUUID, int manifestsToRetain) {
BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool);

synchronized (this) {
transferService.listAllInSortedOrderAsync(
ThreadPool.Names.REMOTE_PURGE,
getManifestFolderPath(clusterName, clusterUUID),
"manifest",
Integer.MAX_VALUE,
new ActionListener<>() {
int checkedManifestCount = 1;

@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
Set<String> filesToKeep = new HashSet<>();
BlobPath staleBlobPath = new BlobPath();
blobMetadata.forEach(manifestBlobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest(
clusterName,
clusterUUID,
manifestBlobMetadata.name()
// TODO: do we need a synchronized block to avoid concurrent executions
transferService.listAllInSortedOrderAsync(
ThreadPool.Names.REMOTE_PURGE,
getManifestFolderPath(clusterName, clusterUUID),
"manifest",
Integer.MAX_VALUE,
new ActionListener<>() {
int checkedManifestCount = 1;

@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
Set<String> filesToKeep = new HashSet<>();
BlobPath staleBlobPath = new BlobPath();
blobMetadata.forEach(manifestBlobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest(
clusterName,
clusterUUID,
manifestBlobMetadata.name()
);
if (checkedManifestCount <= manifestsToRetain) {
clusterMetadataManifest.getIndices()
.forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()));
} else {
staleBlobPath.add(
getManifestFolderPath(clusterName, clusterUUID).add(
getManifestFileNamePrefix(
clusterMetadataManifest.getClusterTerm(),
clusterMetadataManifest.getStateVersion()
)
).toString()
);
if (checkedManifestCount <= manifestsToRetain) {
clusterMetadataManifest.getIndices()
.forEach(uploadedIndexMetadata -> { filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()); });
} else {
staleBlobPath.add(
getManifestFolderPath(clusterName, clusterUUID).add(
getManifestFileNamePrefix(
clusterMetadataManifest.getClusterTerm(),
clusterMetadataManifest.getStateVersion()
)
).toString()
);
clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
staleBlobPath.add(uploadedIndexMetadata.getUploadedFilename());
}
});
}
logger.trace(String.format(Locale.ROOT, "Deleting stale files from remote - %s", staleBlobPath));
checkedManifestCount += 1;
});

if (staleBlobPath.toArray().length == 0) {
return;
clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
staleBlobPath.add(uploadedIndexMetadata.getUploadedFilename());
}
});
}
checkedManifestCount += 1;
});

transferService.deleteAsync(ThreadPool.Names.REMOTE_PURGE, staleBlobPath, new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.info(
String.format(Locale.ROOT, "Deleted [%s] stale Remote Cluster Metadata files", staleBlobPath.size())
);
}

@Override
public void onFailure(Exception e) {
logger.error(
new ParameterizedMessage(
"Exception occurred while deleting stale Remote Cluster Metadata files - {}",
staleBlobPath
)
);
}
});
logger.trace(String.format(Locale.ROOT, "Deleting stale files from remote - %s", staleBlobPath));
if (staleBlobPath.toArray().length == 0) {
logger.trace("No stale Remote Cluster Metadata files found");
return;
}

@Override
public void onFailure(Exception e) {
logger.error(
new ParameterizedMessage(
"Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}",
clusterUUID
)
);
}
transferService.deleteAsync(ThreadPool.Names.REMOTE_PURGE, staleBlobPath, new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.info(
String.format(Locale.ROOT, "Deleted [%s] stale Remote Cluster Metadata files", staleBlobPath.size())
);
}

@Override
public void onFailure(Exception e) {
logger.error(
new ParameterizedMessage(
"Exception occurred while deleting stale Remote Cluster Metadata files - {}",
staleBlobPath
)
);
}
});
}
);
}

@Override
public void onFailure(Exception e) {
logger.error(
new ParameterizedMessage(
"Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}",
clusterUUID
)
);
}
}
);
}
}

0 comments on commit 04d220d

Please sign in to comment.