Skip to content

Commit

Permalink
Add deleteStaleSegments
Browse files Browse the repository at this point in the history
  • Loading branch information
Sachin Kale committed Aug 12, 2022
1 parent 54aabc2 commit b0d95c9
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public void beforeRefresh() throws IOException {
* Upload new segment files created as part of the last refresh to the remote segment store.
* This method also uploads remote_segments_metadata file which contains metadata of each segment file uploaded.
* @param didRefresh true if the refresh opened a new reference
* @throws IOException in case of I/O error in reading list of local files
*/
@Override
public void afterRefresh(boolean didRefresh) {
Expand All @@ -85,6 +84,7 @@ public void afterRefresh(boolean didRefresh) {
if (uploadStatus) {
remoteDirectory.copyFrom(storeDirectory, lastCommittedLocalSegmentFileName, lastCommittedLocalSegmentFileName, IOContext.DEFAULT);
remoteDirectory.uploadCommitMapping(committedLocalFiles, storeDirectory, indexShard.getOperationPrimaryTerm(), commitSegmentInfos.getGeneration());
deleteStaleCommits();
}
} else {
logger.info("Latest commit point {} is present in remote store", lastCommittedLocalSegmentFileName);
Expand Down Expand Up @@ -146,4 +146,12 @@ private String getChecksumOfLocalFile(String file) throws IOException {
return Long.toString(CodecUtil.retrieveChecksum(indexInput));
}
}

private void deleteStaleCommits() {
try {
remoteDirectory.deleteStaleCommits(5);
} catch(IOException e) {
logger.info("Exception while deleting stale commits from remote segment store, will retry delete post next commit");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.index.store;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
Expand All @@ -16,12 +18,14 @@
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.opensearch.common.UUIDs;
import org.opensearch.index.shard.RemoteStoreRefreshListener;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand All @@ -42,6 +46,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory {
private String refreshMappingFileUniqueSuffix;
private Map<String, UploadedSegmentMetadata> segmentsUploadedToRemoteStore;

private static final Logger logger = LogManager.getLogger(RemoteSegmentStoreDirectory.class);

public RemoteSegmentStoreDirectory(RemoteDirectory remoteDataDirectory, RemoteDirectory remoteMetadataDirectory) throws IOException {
super(remoteDataDirectory);
this.remoteDataDirectory = remoteDataDirectory;
Expand Down Expand Up @@ -173,12 +179,12 @@ public long fileLength(String name) throws IOException {

@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
return remoteDataDirectory.createOutput(getNewRemoteFilename(name), context);
return remoteDataDirectory.createOutput(getNewRemoteSegmentFilename(name), context);
}

@Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
return remoteDataDirectory.createTempOutput(getNewRemoteFilename(prefix), suffix, context);
return remoteDataDirectory.createTempOutput(getNewRemoteSegmentFilename(prefix), suffix, context);
}

@Override
Expand All @@ -189,7 +195,7 @@ public void sync(Collection<String> names) throws IOException {

@Override
public void rename(String source, String dest) throws IOException {
remoteDataDirectory.rename(getExistingRemoteFilename(source), getNewRemoteFilename(dest));
remoteDataDirectory.rename(getExistingRemoteFilename(source), getNewRemoteSegmentFilename(dest));
}

@Override
Expand All @@ -208,7 +214,7 @@ public Set<String> getPendingDeletions() throws IOException {
}

public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException {
String remoteFilename = getNewRemoteFilename(dest);
String remoteFilename = getNewRemoteSegmentFilename(dest);
remoteDataDirectory.copyFrom(from, src, remoteFilename, context);
String checksum = getChecksumOfLocalFile(from, src);
UploadedSegmentMetadata metadata = new UploadedSegmentMetadata(src, remoteFilename, checksum);
Expand Down Expand Up @@ -249,9 +255,12 @@ private String getExistingRemoteFilename(String localFilename) {
return segmentsUploadedToRemoteStore.get(localFilename).uploadedFilename;
}

private String getNewRemoteFilename(String localFilename) {
private String getNewRemoteSegmentFilename(String localFilename) {
return localFilename + SEPARATOR + UUIDs.base64UUID();
}
private String getLocalSegmentFilename(String remoteFilename) {
return remoteFilename.split(SEPARATOR)[0];
}

private String getNewRemoteFilename(String localFilename, long primaryTerm, long generation) {
return getNewRemoteFilename(localFilename, primaryTerm, generation, UUIDs.base64UUID());
Expand All @@ -260,4 +269,37 @@ private String getNewRemoteFilename(String localFilename, long primaryTerm, long
private String getNewRemoteFilename(String localFilename, long primaryTerm, long generation, String uuid) {
return localFilename + SEPARATOR + primaryTerm + SEPARATOR + Long.toString(generation, Character.MAX_RADIX) + SEPARATOR + uuid;
}

public void deleteStaleCommits(int lastNCommitsToKeep) throws IOException {
Collection<String> commitMappingFiles = remoteMetadataDirectory.listFilesByPrefix(COMMIT_MAPPING_PREFIX);
List<String> sortedMappingFileList = commitMappingFiles.stream().sorted(new MappingFilenameComparator()).collect(Collectors.toList());
if(sortedMappingFileList.size() <= lastNCommitsToKeep) {
logger.info("Number of commits in remote segment store={}, lastNCommitsToKeep={}", sortedMappingFileList.size(), lastNCommitsToKeep);
return;
}
List<String> latestNCommitFiles = sortedMappingFileList.subList(sortedMappingFileList.size() - lastNCommitsToKeep, sortedMappingFileList.size());
Map<String, UploadedSegmentMetadata> activeSegmentFilesMetadataMap = new HashMap<>();
for(String commitFile: latestNCommitFiles) {
readMappingFile(commitFile, activeSegmentFilesMetadataMap);
}

for(String commitFile: sortedMappingFileList.subList(0, sortedMappingFileList.size() - lastNCommitsToKeep)) {
Map<String, UploadedSegmentMetadata> staleSegmentFilesMetadataMap = new HashMap<>();
readMappingFile(commitFile, staleSegmentFilesMetadataMap);
Set<String> activeSegmentRemoteFilenames = activeSegmentFilesMetadataMap.values().stream().map(metadata -> metadata.uploadedFilename).collect(Collectors.toSet());
Set<String> staleSegmentRemoteFilenames = staleSegmentFilesMetadataMap.values().stream().map(metadata -> metadata.uploadedFilename).collect(Collectors.toSet());
staleSegmentRemoteFilenames.stream().filter(file -> !activeSegmentRemoteFilenames.contains(file)).forEach(file -> {
try {
remoteDataDirectory.deleteFile(getExistingRemoteFilename(file));
if(!activeSegmentFilesMetadataMap.containsKey(getLocalSegmentFilename(file))) {
segmentsUploadedToRemoteStore.remove(getLocalSegmentFilename(file));
}
remoteMetadataDirectory.deleteFile(commitFile);
remoteMetadataDirectory.deleteFile(REFRESH_MAPPING_PREFIX + commitFile.substring(COMMIT_MAPPING_PREFIX.length()));
} catch (IOException e) {
logger.info("Exception while deleting segment files related to commit file {}. Deletion will be re-tried", commitFile);
}
});
}
}
}

0 comments on commit b0d95c9

Please sign in to comment.