Skip to content

Commit

Permalink
Enhance debug, trace & info logs for remote store flows
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Sep 22, 2023
1 parent 9f0e017 commit d254c51
Show file tree
Hide file tree
Showing 21 changed files with 140 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class Index implements Writeable, ToXContentObject {
public static final Index[] EMPTY_ARRAY = new Index[0];
private static final String INDEX_UUID_KEY = "index_uuid";
private static final String INDEX_NAME_KEY = "index_name";
public static final String UNKNOWN_INDEX_NAME = "unknown";

private static final ObjectParser<Builder, Void> INDEX_PARSER = new ObjectParser<>("index", Builder::new);
static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,11 @@ public void updateLatestLocalFileNameLengthMap(
Collection<String> segmentFiles,
CheckedFunction<String, Long, IOException> fileSizeFunction
) {
logger.debug(
"segmentFilesPostRefresh={} latestLocalFileNamesBeforeMapUpdate={}",
segmentFiles,
latestLocalFileNameLengthMap.keySet()
);
// Update the map
segmentFiles.stream()
.filter(file -> EXCLUDE_FILES.contains(file) == false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ protected TimeValue getNextRetryInterval() {
private void scheduleRetry(TimeValue interval, String retryThreadPoolName, boolean didRefresh) {
// If the underlying listener has closed, then we do not allow even the retry to be scheduled
if (closed.get() || isRetryEnabled() == false) {
getLogger().debug("skip retry on closed={} isRetryEnabled={}", closed.get(), isRetryEnabled());
return;
}

Expand All @@ -112,6 +113,7 @@ private void scheduleRetry(TimeValue interval, String retryThreadPoolName, boole
// If the retryScheduled is already true, then we return from here itself. If not, then we proceed with scheduling
// the retry.
if (retryScheduled.getAndSet(true)) {
getLogger().debug("skip retry on retryScheduled=true");
return;
}

Expand Down Expand Up @@ -188,7 +190,7 @@ public final void close() throws IOException {
if (semaphore.tryAcquire(TOTAL_PERMITS, 10, TimeUnit.MINUTES)) {
boolean result = closed.compareAndSet(false, true);
assert result && semaphore.availablePermits() == 0;
getLogger().info("Closed");
getLogger().info("All permits are acquired and refresh listener is closed");
} else {
throw new TimeoutException("timeout while closing gated refresh listener");
}
Expand All @@ -200,7 +202,6 @@ public final void close() throws IOException {
protected abstract Logger getLogger();

// Visible for testing

/**
* Returns if the retry is scheduled or not.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3506,6 +3506,7 @@ public void startRecovery(
// }
// }}
// }
logger.debug("startRecovery type={}", recoveryState.getRecoverySource().getType());
assert recoveryState.getRecoverySource().equals(shardRouting.recoverySource());
switch (recoveryState.getRecoverySource().getType()) {
case EMPTY_STORE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ protected void runAfterRefreshExactlyOnce(boolean didRefresh) {
segmentTracker.updateLocalRefreshTimeAndSeqNo();
try {
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
logger.debug("primaryTerm update from={} to={}", primaryTerm, indexShard.getOperationPrimaryTerm());
this.primaryTerm = indexShard.getOperationPrimaryTerm();
this.remoteDirectory.init();
}
Expand All @@ -150,8 +151,6 @@ protected void runAfterRefreshExactlyOnce(boolean didRefresh) {
@Override
protected boolean performAfterRefreshWithPermit(boolean didRefresh) {
boolean successful;
// The third condition exists for uploading the zero state segments where the refresh has not changed the reader reference, but it
// is important to upload the zero state segments so that the restore does not break.
if (shouldSync(didRefresh)) {
successful = syncSegments();
} else {
Expand All @@ -161,27 +160,27 @@ protected boolean performAfterRefreshWithPermit(boolean didRefresh) {
}

private boolean shouldSync(boolean didRefresh) {
// The third condition exists for uploading the zero state segments where the refresh has not changed the reader reference, but it
// is important to upload the zero state segments so that the restore does not break.
return this.primaryTerm != indexShard.getOperationPrimaryTerm()
|| didRefresh
|| remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty();
}

private boolean syncSegments() {
if (indexShard.getReplicationTracker().isPrimaryMode() == false || indexShard.state() == IndexShardState.CLOSED) {
logger.trace(
"Skipped syncing segments with primaryMode={} indexShardState={}",
logger.debug(
"Skipped syncing segments with primaryMode={} indexShardState={} engine={}",
indexShard.getReplicationTracker().isPrimaryMode(),
indexShard.state()
indexShard.state(),
indexShard.getEngine().getClass().getSimpleName()
);
// Following check is required to enable retry and make sure that we do not lose this refresh event
// When primary shard is restored from remote store, the recovery happens first followed by changing
// primaryMode to true. Due to this, the refresh that is triggered post replay of translog will not go through
// if following condition does not exist. The segments created as part of translog replay will not be present
// in the remote store.
if (indexShard.state() == IndexShardState.STARTED && indexShard.getEngine() instanceof InternalEngine) {
return false;
}
return true;
return indexShard.state() != IndexShardState.STARTED || !(indexShard.getEngine() instanceof InternalEngine);
}
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();
beforeSegmentsSync();
Expand Down Expand Up @@ -217,8 +216,10 @@ private boolean syncSegments() {
@Override
public void onResponse(Void unused) {
try {
logger.debug("New segments upload successful");
// Start metadata file upload
uploadMetadata(localSegmentsPostRefresh, segmentInfos, checkpoint);
logger.debug("Metadata upload successful");
clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh);
onSuccessfulSegmentsSync(
refreshTimeMs,
Expand Down Expand Up @@ -262,6 +263,7 @@ public void onFailure(Exception e) {
updateFinalStatusInSegmentTracker(successful.get(), bytesBeforeUpload, startTimeInNS);
// If there are failures in uploading segments, then we should retry as search idle can lead to
// refresh not occurring until write happens.
logger.debug("syncSegments runStatus={}", successful.get());
return successful.get();
}

Expand Down Expand Up @@ -352,10 +354,12 @@ void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos se
private void uploadNewSegments(Collection<String> localSegmentsPostRefresh, ActionListener<Void> listener) {
Collection<String> filteredFiles = localSegmentsPostRefresh.stream().filter(file -> !skipUpload(file)).collect(Collectors.toList());
if (filteredFiles.size() == 0) {
logger.debug("No new segments to upload in uploadNewSegments");
listener.onResponse(null);
return;
}

logger.debug("Effective new segments files to upload {}", filteredFiles);
ActionListener<Collection<Void>> mappedListener = ActionListener.map(listener, resp -> null);
GroupedActionListener<Void> batchUploadListener = new GroupedActionListener<>(mappedListener, filteredFiles.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ void recoverFromSnapshotAndRemoteStore(
RemoteSegmentStoreDirectory sourceRemoteDirectory = (RemoteSegmentStoreDirectory) directoryFactory.newDirectory(
remoteStoreRepository,
indexUUID,
String.valueOf(shardId.id())
shardId
);
indexShard.syncSegmentsFromGivenRemoteSegmentStore(true, sourceRemoteDirectory, primaryTerm, commitGeneration);
final Store store = indexShard.store();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.apache.lucene.util.Version;
import org.opensearch.common.UUIDs;
import org.opensearch.common.io.VersionedCodecStreamWrapper;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.store.lockmanager.FileLockInfo;
import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager;
Expand All @@ -40,6 +42,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -97,7 +100,9 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement
RemoteSegmentMetadata.METADATA_CODEC
);

private static final Logger logger = LogManager.getLogger(RemoteSegmentStoreDirectory.class);
private static final Logger staticLogger = LogManager.getLogger(RemoteSegmentStoreDirectory.class);

private final Logger logger;

/**
* AtomicBoolean that ensures only one staleCommitDeletion activity is scheduled at a time.
Expand All @@ -111,13 +116,15 @@ public RemoteSegmentStoreDirectory(
RemoteDirectory remoteDataDirectory,
RemoteDirectory remoteMetadataDirectory,
RemoteStoreLockManager mdLockManager,
ThreadPool threadPool
ThreadPool threadPool,
ShardId shardId
) throws IOException {
super(remoteDataDirectory);
this.remoteDataDirectory = remoteDataDirectory;
this.remoteMetadataDirectory = remoteMetadataDirectory;
this.mdLockManager = mdLockManager;
this.threadPool = threadPool;
this.logger = Loggers.getLogger(getClass(), shardId);
init();
}

Expand All @@ -130,12 +137,14 @@ public RemoteSegmentStoreDirectory(
* @throws IOException if there were any failures in reading the metadata file
*/
public RemoteSegmentMetadata init() throws IOException {
logger.debug("Start initialisation of remote segment metadata");
RemoteSegmentMetadata remoteSegmentMetadata = readLatestMetadataFile();
if (remoteSegmentMetadata != null) {
this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(remoteSegmentMetadata.getMetadata());
} else {
this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>();
}
logger.debug("Initialisation of remote segment metadata completed");
return remoteSegmentMetadata;
}

Expand Down Expand Up @@ -248,7 +257,7 @@ public static UploadedSegmentMetadata fromString(String uploadedFilename) {
String[] values = uploadedFilename.split(SEPARATOR);
UploadedSegmentMetadata metadata = new UploadedSegmentMetadata(values[0], values[1], values[2], Long.parseLong(values[3]));
if (values.length < 5) {
logger.error("Lucene version is missing for UploadedSegmentMetadata: " + uploadedFilename);
staticLogger.error("Lucene version is missing for UploadedSegmentMetadata: " + uploadedFilename);
}

metadata.setWrittenByMajor(Integer.parseInt(values[4]));
Expand Down Expand Up @@ -641,7 +650,7 @@ private Map<String, Integer> getSegmentToLuceneVersion(Collection<String> segmen
*/
private void tryAndDeleteLocalFile(String filename, Directory directory) {
try {
logger.trace("Deleting file: " + filename);
logger.debug("Deleting file: " + filename);
directory.deleteFile(filename);
} catch (NoSuchFileException | FileNotFoundException e) {
logger.trace("Exception while deleting. Missing file : " + filename, e);
Expand Down Expand Up @@ -691,7 +700,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
Integer.MAX_VALUE
);
if (sortedMetadataFileList.size() <= lastNMetadataFilesToKeep) {
logger.trace(
logger.debug(
"Number of commits in remote segment store={}, lastNMetadataFilesToKeep={}",
sortedMetadataFileList.size(),
lastNMetadataFilesToKeep
Expand Down Expand Up @@ -719,6 +728,11 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
}).collect(Collectors.toList());

sortedMetadataFileList.removeAll(metadataFilesToBeDeleted);
logger.debug(
"metadataFilesEligibleToDelete={} metadataFilesToBeDeleted={}",
metadataFilesEligibleToDelete,
metadataFilesEligibleToDelete
);

Map<String, UploadedSegmentMetadata> activeSegmentFilesMetadataMap = new HashMap<>();
Set<String> activeSegmentRemoteFilenames = new HashSet<>();
Expand All @@ -736,9 +750,11 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
.map(metadata -> metadata.uploadedFilename)
.collect(Collectors.toSet());
AtomicBoolean deletionSuccessful = new AtomicBoolean(true);
List<String> nonActiveDeletedSegmentFiles = new ArrayList<>();
staleSegmentRemoteFilenames.stream().filter(file -> !activeSegmentRemoteFilenames.contains(file)).forEach(file -> {
try {
remoteDataDirectory.deleteFile(file);
nonActiveDeletedSegmentFiles.add(file);
if (!activeSegmentFilesMetadataMap.containsKey(getLocalSegmentFilename(file))) {
segmentsUploadedToRemoteStore.remove(getLocalSegmentFilename(file));
}
Expand All @@ -753,8 +769,9 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
);
}
});
logger.debug("nonActiveDeletedSegmentFiles={}", nonActiveDeletedSegmentFiles);
if (deletionSuccessful.get()) {
logger.trace("Deleting stale metadata file {} from remote segment store", metadataFile);
logger.debug("Deleting stale metadata file {} from remote segment store", metadataFile);
remoteMetadataDirectory.deleteFile(metadataFile);
}
}
Expand Down Expand Up @@ -805,7 +822,7 @@ private boolean deleteIfEmpty() throws IOException {
1
);
if (metadataFiles.size() != 0) {
logger.info("Remote directory still has files , not deleting the path");
logger.info("Remote directory still has files, not deleting the path");
return false;
}

Expand All @@ -821,6 +838,7 @@ private boolean deleteIfEmpty() throws IOException {
return true;
}

@Override
public void close() throws IOException {
deleteStaleSegmentsAsync(0, ActionListener.wrap(r -> deleteIfEmpty(), e -> logger.error("Failed to cleanup remote directory")));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.lucene.store.Directory;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
Expand Down Expand Up @@ -45,17 +46,15 @@ public RemoteSegmentStoreDirectoryFactory(Supplier<RepositoriesService> reposito
public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throws IOException {
String repositoryName = indexSettings.getRemoteStoreRepository();
String indexUUID = indexSettings.getIndex().getUUID();
String shardId = String.valueOf(path.getShardId().getId());

return newDirectory(repositoryName, indexUUID, shardId);
return newDirectory(repositoryName, indexUUID, path.getShardId());
}

public Directory newDirectory(String repositoryName, String indexUUID, String shardId) throws IOException {
public Directory newDirectory(String repositoryName, String indexUUID, ShardId shardId) throws IOException {
try (Repository repository = repositoriesService.get().repository(repositoryName)) {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = ((BlobStoreRepository) repository);
BlobPath commonBlobPath = blobStoreRepository.basePath();
commonBlobPath = commonBlobPath.add(indexUUID).add(shardId).add(SEGMENTS);
commonBlobPath = commonBlobPath.add(indexUUID).add(String.valueOf(shardId.id())).add(SEGMENTS);

RemoteDirectory dataDirectory = new RemoteDirectory(
blobStoreRepository.blobStore().blobContainer(commonBlobPath.add("data")),
Expand All @@ -69,10 +68,10 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh
repositoriesService.get(),
repositoryName,
indexUUID,
shardId
String.valueOf(shardId.id())
);

return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool);
return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool, shardId);
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ static void download(TranslogTransferManager translogTransferManager, Path locat
ex = e;
}
}
logger.debug("Exhausted all download retries during translog/checkpoint file download");
throw ex;
}

Expand All @@ -207,6 +208,11 @@ static private void downloadOnce(TranslogTransferManager translogTransferManager
String generation = Long.toString(i);
translogTransferManager.downloadTranslog(generationToPrimaryTermMapper.get(generation), generation, location);
}
logger.info(
"Downloaded translog and checkpoint files between from={} to={}",
translogMetadata.getMinTranslogGeneration(),
translogMetadata.getGeneration()
);

statsTracker.recordDownloadStats(prevDownloadBytesSucceeded, prevDownloadTimeInMillis);

Expand All @@ -217,7 +223,7 @@ static private void downloadOnce(TranslogTransferManager translogTransferManager
location.resolve(Translog.CHECKPOINT_FILE_NAME)
);
}
logger.trace("Downloaded translog files from remote");
logger.debug("downloadOnce execution completed");
}

public static TranslogTransferManager buildTranslogTransferManager(
Expand Down Expand Up @@ -333,7 +339,7 @@ private boolean upload(Long primaryTerm, Long generation) throws IOException {
// primary, the engine is reset to InternalEngine which also initialises the RemoteFsTranslog which in turns
// downloads all the translogs from remote store and does a flush before the relocation finishes.
if (primaryModeSupplier.getAsBoolean() == false) {
logger.trace("skipped uploading translog for {} {}", primaryTerm, generation);
logger.debug("skipped uploading translog for {} {}", primaryTerm, generation);
// NO-OP
return true;
}
Expand Down
Loading

0 comments on commit d254c51

Please sign in to comment.