Skip to content

Commit

Permalink
[Remote Store] Optimize segments metadata upload (#7905)
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Jun 6, 2023
1 parent 8dd5461 commit e135e1e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,13 @@ public void beforeRefresh() throws IOException {}
@Override
public void afterRefresh(boolean didRefresh) {

if (didRefresh) {
if (didRefresh || remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()) {
updateLocalRefreshTimeAndSeqNo();
}

try {
indexShard.getThreadPool().executor(ThreadPool.Names.REMOTE_REFRESH).submit(() -> syncSegments(false)).get();
} catch (InterruptedException | ExecutionException e) {
logger.info("Exception occurred while scheduling syncSegments", e);
try {
indexShard.getThreadPool().executor(ThreadPool.Names.REMOTE_REFRESH).submit(() -> syncSegments(false)).get();
} catch (InterruptedException | ExecutionException e) {
logger.info("Exception occurred while scheduling syncSegments", e);
}
}
}

Expand Down Expand Up @@ -232,9 +231,7 @@ private synchronized void syncSegments(boolean isRetry) {
// Start metadata file upload
uploadMetadata(localSegmentsPostRefresh, segmentInfos);
clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh);
onSuccessfulSegmentsSync(refreshTimeMs, refreshSeqNo);
indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1);
checkpointPublisher.publish(indexShard, checkpoint);
onSuccessfulSegmentsSync(refreshTimeMs, refreshSeqNo, lastRefreshedCheckpoint, checkpoint);
// At this point since we have uploaded new segments, segment infos and segment metadata file,
// along with marking minSeqNoToKeep, upload has succeeded completely.
shouldRetry = false;
Expand Down Expand Up @@ -278,7 +275,12 @@ private void beforeSegmentsSync(boolean isRetry) {
segmentTracker.incrementTotalUploadsStarted();
}

private void onSuccessfulSegmentsSync(long refreshTimeMs, long refreshSeqNo) {
private void onSuccessfulSegmentsSync(
long refreshTimeMs,
long refreshSeqNo,
long lastRefreshedCheckpoint,
ReplicationCheckpoint checkpoint
) {
// Update latest uploaded segment files name in segment tracker
segmentTracker.setLatestUploadedFiles(latestFileNameSizeOnLocalMap.keySet());
// Update the remote refresh time and refresh seq no
Expand All @@ -287,6 +289,10 @@ private void onSuccessfulSegmentsSync(long refreshTimeMs, long refreshSeqNo) {
resetBackOffDelayIterator();
// Cancel the scheduled cancellable retry if possible and set it to null
cancelAndResetScheduledCancellableRetry();
// Set the minimum sequence number for keeping translog
indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1);
// Publishing the new checkpoint which is used for remote store + segrep indexes
checkpointPublisher.publish(indexShard, checkpoint);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void testNoTranslogHistoryTransferred() throws Exception {
shards.flush();
List<DocIdSeqNoAndSource> docIdAndSeqNosAfterFlush = getDocIdAndSeqNos(primary);
int moreDocs = shards.indexDocs(randomIntBetween(20, 100));
assertEquals(moreDocs, getTranslog(primary).totalOperations());
assertEquals(numDocs + moreDocs, getTranslog(primary).totalOperations());

// Step 2 - Start replica, recovery happens, check docs recovered till last flush
final IndexShard replica = shards.addReplica();
Expand Down

0 comments on commit e135e1e

Please sign in to comment.