Skip to content

Commit

Permalink
Initial changes for remote store cleanup during snapshot optimizations.
Browse files Browse the repository at this point in the history
  • Loading branch information
Harish Bhakuni committed Feb 16, 2024
1 parent b19e427 commit 99726a7
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -849,33 +849,46 @@ public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListene
}

/*
Tries to delete shard level directory if it is empty
Tries to delete shard level directory if it do not have any locks.
Return true if it deleted it successfully
*/
private boolean deleteIfEmpty() throws IOException {
Collection<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
MetadataFilenameUtils.METADATA_PREFIX,
1
);
if (metadataFiles.size() != 0) {
logger.info("Remote directory still has files, not deleting the path");
public boolean deleteIfEmpty() {
Set<String> allLockedFiles;
try {
allLockedFiles = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLockedMetadataFiles(MetadataFilenameUtils.METADATA_PREFIX);
} catch (Exception e) {
logger.error("Exception while fetching segment metadata lock files, skipping deleteStaleSegments", e);
return false;
}
if (allLockedFiles.size() != 0) {
logger.info("Remote directory still has locked files, not deleting the path");
return false;
}

AtomicBoolean cleanupFailed = new AtomicBoolean(false);
try {
remoteDataDirectory.delete();
remoteMetadataDirectory.delete();
mdLockManager.delete();
threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
try {
remoteDataDirectory.delete();
remoteMetadataDirectory.delete();
mdLockManager.delete();
} catch (Exception e) {
logger.error(
"Exception occurred while deleting directory, it will get retried during next call",
e
);
cleanupFailed.set(true);
}
});
} catch (Exception e) {
logger.error("Exception occurred while deleting directory", e);
return false;
cleanupFailed.set(true);
}

return true;
return !cleanupFailed.get();
}

@Override
public void close() throws IOException {
deleteStaleSegmentsAsync(0, ActionListener.wrap(r -> deleteIfEmpty(), e -> logger.error("Failed to cleanup remote directory")));
deleteIfEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.common.annotation.PublicApi;

import java.io.IOException;
import java.util.List;

/**
* An Interface that defines Remote Store Lock Manager.
Expand Down Expand Up @@ -43,6 +44,8 @@ public interface RemoteStoreLockManager {
*/
Boolean isAcquired(LockInfo lockInfo) throws IOException;

List<LockInfo> listLocks() throws IOException;

/**
* Acquires lock on the file mentioned in originalLockInfo for acquirer mentioned in clonedLockInfo.
* There can occur a race condition where the original file is deleted before we can use it to acquire lock for the new acquirer. Until we have a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.index.store.RemoteBufferedOutputDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;

import java.io.FileNotFoundException;
import java.io.IOException;
Expand Down Expand Up @@ -107,6 +108,12 @@ public Boolean isAcquired(LockInfo lockInfo) throws IOException {
return !lockFiles.isEmpty();
}

@Override
public List<LockInfo> listLocks() throws IOException {
Collection<String> lockFiles = lockDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX);
return lockFiles.stream().map(lock -> FileLockInfo.getLockInfoBuilder().withFileToLock(FileLockInfo.LockFileUtils.getFileToLockNameFromLock(lock)).withAcquirerId(FileLockInfo.LockFileUtils.getAcquirerIdFromLock(lock)).build()).collect(Collectors.toList());
}

/**
* Acquires lock on the file mentioned in originalLockInfo for acquirer mentioned in clonedLockInfo.
* Snapshot layer enforces thread safety by having checks in place to ensure that the source snapshot is not being deleted before proceeding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,11 @@
import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot;
import org.opensearch.index.snapshots.blobstore.SlicedInputStream;
import org.opensearch.index.snapshots.blobstore.SnapshotFiles;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.lockmanager.FileLockInfo;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.index.store.lockmanager.*;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.repositories.IndexId;
Expand Down Expand Up @@ -1098,6 +1097,84 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
}
}

/**
* The result of removing a snapshot from a shard folder in the repository.
*/
private static final class ShardRemoteStoreReleaseCleanupResult {
// snapshot shard blob file
private final String fileToDelete;

// does release/cleanup successful
private final Boolean cleanupSucceeded;

ShardRemoteStoreReleaseCleanupResult(String fileToDelete, boolean cleanupSucceeded) {
this.fileToDelete = fileToDelete;
this.cleanupSucceeded = cleanupSucceeded;
}
}

private boolean releaseRemoteStoreLockAndCleanup(String fileToDelete,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory) {
if (remoteStoreLockManagerFactory == null || !fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) {
return true;
}
String[] fileToDeletePath = fileToDelete.split("/");
String indexId = fileToDeletePath[1];
String shardId = fileToDeletePath[2];
String shallowSnapBlob = fileToDeletePath[3];
String snapshotUUID = shallowSnapBlob.substring(
SHALLOW_SNAPSHOT_PREFIX.length(),
shallowSnapBlob.length() - ".dat".length()
);

try {
BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId));
RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot =
REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read(
shardContainer,
snapshotUUID,
namedXContentRegistry
);
String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID();
String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository();
// Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while
// releasing the lock file, we would still have the shallow-snap-UUID file and that would be used during
// next delete operation for releasing this lock file
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager(
remoteStoreRepoForIndex,
indexUUID,
shardId
);
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build()
);
logger.debug("Successfully released lock for shard {} of index with uuid {}", shardId, indexUUID);
if (!isIndexPresent(clusterService, indexUUID) && remoteStoreMetadataLockManager.listLocks().size() == 0) {
// this is a temporary solution where snapshot deletion triggers remote store side
// cleanup if index is already deleted. We will add a poller in future to take
// care of remote store side cleanup.
// see https://github.com/opensearch-project/OpenSearch/issues/8469
try (RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
).newDirectory(
remoteStoreRepoForIndex,
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.valueOf(shardId))
)) {
// TODO: if cleanup fails we we will still proceed with corresponding shard md cleanup.
// but this is a known scenario which is needed to be taken care separately.
// related issue: https://github.com/opensearch-project/OpenSearch/issues/8469
return remoteSegmentStoreDirectory.deleteIfEmpty();
}

}
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to release lock or cleanup remote directory, skipping blob deletion for [{}]", shardId), e);
}
return false;
}

// When remoteStoreLockManagerFactory is non-null, while deleting the files, lock files are also released before deletion of respective
// shallow-snap-UUID files. And if it is null, we just delete the stale shard blobs.
private void executeStaleShardDelete(
Expand All @@ -1109,56 +1186,11 @@ private void executeStaleShardDelete(
if (filesToDelete != null) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
try {
if (remoteStoreLockManagerFactory != null) {
for (String fileToDelete : filesToDelete) {
if (fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) {
String[] fileToDeletePath = fileToDelete.split("/");
String indexId = fileToDeletePath[1];
String shardId = fileToDeletePath[2];
String shallowSnapBlob = fileToDeletePath[3];
String snapshotUUID = shallowSnapBlob.substring(
SHALLOW_SNAPSHOT_PREFIX.length(),
shallowSnapBlob.length() - ".dat".length()
);
BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId));
RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot =
REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read(
shardContainer,
snapshotUUID,
namedXContentRegistry
);
String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID();
String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository();
// Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while
// releasing the lock file, we would still have the shallow-snap-UUID file and that would be used during
// next delete operation for releasing this lock file
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager(
remoteStoreRepoForIndex,
indexUUID,
shardId
);
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build()
);
if (!isIndexPresent(clusterService, indexUUID)) {
// this is a temporary solution where snapshot deletion triggers remote store side
// cleanup if index is already deleted. We will add a poller in future to take
// care of remote store side cleanup.
// see https://github.com/opensearch-project/OpenSearch/issues/8469
new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
).newDirectory(
remoteStoreRepoForIndex,
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.valueOf(shardId))
).close();
}
}
}
}
// filtering files for which remote store lock release and cleanup succeeded,
// remaining files for which it failed will be retried in next snapshot delete run.
List<String> modifiedFilesToDelete = filesToDelete.stream().filter(fileToDelete -> releaseRemoteStoreLockAndCleanup(fileToDelete, remoteStoreLockManagerFactory)).collect(Collectors.toList());
// Deleting the shard blobs
deleteFromContainer(blobContainer(), filesToDelete);
deleteFromContainer(blobContainer(), modifiedFilesToDelete);
l.onResponse(null);
} catch (Exception e) {
logger.warn(
Expand Down Expand Up @@ -1594,35 +1626,9 @@ private void executeOneStaleIndexDelete(
String blob = shardLevelBlob.getKey();
String snapshotUUID = blob.substring(SHALLOW_SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length());
if (blob.startsWith(SHALLOW_SNAPSHOT_PREFIX) && blob.endsWith(".dat")) {
RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot =
REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read(
shardBlob.getValue(),
snapshotUUID,
namedXContentRegistry
);
String indexUUID = remoteStoreShardShallowCopySnapshot.getIndexUUID();
String remoteStoreRepoForIndex = remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository();
// Releasing lock files before deleting the shallow-snap-UUID file because in case of any failure
// while releasing the lock file, we would still have the corresponding shallow-snap-UUID file
// and that would be used during next delete operation for releasing this stale lock file
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory
.newLockManager(remoteStoreRepoForIndex, indexUUID, shardBlob.getKey());
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build()
);
if (!isIndexPresent(clusterService, indexUUID)) {
// this is a temporary solution where snapshot deletion triggers remote store side
// cleanup if index is already deleted. We will add a poller in future to take
// care of remote store side cleanup.
// see https://github.com/opensearch-project/OpenSearch/issues/8469
new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
).newDirectory(
remoteStoreRepoForIndex,
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.valueOf(shardBlob.getKey()))
).close();
if (!releaseRemoteStoreLockAndCleanup(blob, remoteStoreLockManagerFactory)) {
// throwing error to skip unused index deletion, which gets retried during next snapshot deletion.
throw new Exception("Failed to complete lock release and cleanup for one of the index.");
}
}
}
Expand Down

0 comments on commit 99726a7

Please sign in to comment.