diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index e8384d8e2f4a6..8f71a9f3ff658 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -164,14 +164,13 @@ public void beforeRefresh() throws IOException {} @Override public void afterRefresh(boolean didRefresh) { - if (didRefresh) { + if (didRefresh || remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()) { updateLocalRefreshTimeAndSeqNo(); - } - - try { - indexShard.getThreadPool().executor(ThreadPool.Names.REMOTE_REFRESH).submit(() -> syncSegments(false)).get(); - } catch (InterruptedException | ExecutionException e) { - logger.info("Exception occurred while scheduling syncSegments", e); + try { + indexShard.getThreadPool().executor(ThreadPool.Names.REMOTE_REFRESH).submit(() -> syncSegments(false)).get(); + } catch (InterruptedException | ExecutionException e) { + logger.info("Exception occurred while scheduling syncSegments", e); + } } } @@ -232,9 +231,7 @@ private synchronized void syncSegments(boolean isRetry) { // Start metadata file upload uploadMetadata(localSegmentsPostRefresh, segmentInfos); clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh); - onSuccessfulSegmentsSync(refreshTimeMs, refreshSeqNo); - indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); - checkpointPublisher.publish(indexShard, checkpoint); + onSuccessfulSegmentsSync(refreshTimeMs, refreshSeqNo, lastRefreshedCheckpoint, checkpoint); // At this point since we have uploaded new segments, segment infos and segment metadata file, // along with marking minSeqNoToKeep, upload has succeeded completely. shouldRetry = false; @@ -278,7 +275,12 @@ private void beforeSegmentsSync(boolean isRetry) { segmentTracker.incrementTotalUploadsStarted(); } - private void onSuccessfulSegmentsSync(long refreshTimeMs, long refreshSeqNo) { + private void onSuccessfulSegmentsSync( + long refreshTimeMs, + long refreshSeqNo, + long lastRefreshedCheckpoint, + ReplicationCheckpoint checkpoint + ) { // Update latest uploaded segment files name in segment tracker segmentTracker.setLatestUploadedFiles(latestFileNameSizeOnLocalMap.keySet()); // Update the remote refresh time and refresh seq no @@ -287,6 +289,10 @@ private void onSuccessfulSegmentsSync(long refreshTimeMs, long refreshSeqNo) { resetBackOffDelayIterator(); // Cancel the scheduled cancellable retry if possible and set it to null cancelAndResetScheduledCancellableRetry(); + // Set the minimum sequence number for keeping translog + indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); + // Publishing the new checkpoint which is used for remote store + segrep indexes + checkpointPublisher.publish(indexShard, checkpoint); } /** diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java index d76afca51e354..7a2f67e206f74 100644 --- a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java @@ -120,7 +120,7 @@ public void testNoTranslogHistoryTransferred() throws Exception { shards.flush(); List docIdAndSeqNosAfterFlush = getDocIdAndSeqNos(primary); int moreDocs = shards.indexDocs(randomIntBetween(20, 100)); - assertEquals(moreDocs, getTranslog(primary).totalOperations()); + assertEquals(numDocs + moreDocs, getTranslog(primary).totalOperations()); // Step 2 - Start replica, recovery happens, check docs recovered till last flush final IndexShard replica = shards.addReplica();