Skip to content

Commit

Permalink
Cleanup Unreferenced file on segment merge failure (#9503) (#9808)
Browse files Browse the repository at this point in the history
(cherry picked from commit 37bdb6b)

Signed-off-by: Rishav Sagar <rissag@amazon.com>
  • Loading branch information
RS146BIJAY committed Sep 6, 2023
1 parent bae4e76 commit 12ac2a8
Show file tree
Hide file tree
Showing 8 changed files with 440 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Decouple replication lag from logic to fail stale replicas ([#9507](https://github.com/opensearch-project/OpenSearch/pull/9507))
- [Remote Store] Add support for Remote Translog Store stats in `_remotestore/stats/` API ([#9263](https://github.com/opensearch-project/OpenSearch/pull/9263))
- Removing the vec file extension from INDEX_STORE_HYBRID_NIO_EXTENSIONS, to ensure the no performance degradation for vector search via Lucene Engine.([#9528](https://github.com/opensearch-project/OpenSearch/pull/9528)))
- Cleanup Unreferenced file on segment merge failure ([#9503](https://github.com/opensearch-project/OpenSearch/pull/9503))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING,
IndexSettings.INDEX_SEARCH_IDLE_AFTER,
IndexSettings.INDEX_SEARCH_THROTTLED,
IndexSettings.INDEX_UNREFERENCED_FILE_CLEANUP,
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
FieldMapper.IGNORE_MALFORMED_SETTING,
FieldMapper.COERCE_SETTING,
Expand Down
27 changes: 27 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,18 @@ public final class IndexSettings {
Property.Dynamic
);

/**
* This setting controls if unreferenced files will be cleaned up in case segment merge fails due to disk full.
*
* Defaults to true which means unreferenced files will be cleaned up in case segment merge fails.
*/
public static final Setting<Boolean> INDEX_UNREFERENCED_FILE_CLEANUP = Setting.boolSetting(
"index.unreferenced_file_cleanup.enabled",
true,
Property.IndexScope,
Property.Dynamic
);

/**
* Determines a balance between file-based and operations-based peer recoveries. The number of operations that will be used in an
* operations-based peer recovery is limited to this proportion of the total number of documents in the shard (including deleted
Expand Down Expand Up @@ -678,6 +690,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
private volatile String defaultPipeline;
private volatile String requiredPipeline;
private volatile boolean searchThrottled;
private volatile boolean shouldCleanupUnreferencedFiles;
private volatile long mappingNestedFieldsLimit;
private volatile long mappingNestedDocsLimit;
private volatile long mappingTotalFieldsLimit;
Expand Down Expand Up @@ -794,6 +807,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
extendedCompatibilitySnapshotVersion = Version.CURRENT.minimumIndexCompatibilityVersion();
}
this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.shouldCleanupUnreferencedFiles = INDEX_UNREFERENCED_FILE_CLEANUP.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings);
this.queryStringAllowLeadingWildcard = QUERY_STRING_ALLOW_LEADING_WILDCARD.get(nodeSettings);
Expand Down Expand Up @@ -906,6 +920,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(FINAL_PIPELINE, this::setRequiredPipeline);
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations);
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THROTTLED, this::setSearchThrottled);
scopedSettings.addSettingsUpdateConsumer(INDEX_UNREFERENCED_FILE_CLEANUP, this::setShouldCleanupUnreferencedFiles);
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING, this::setRetentionLeaseMillis);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING, this::setMappingNestedFieldsLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING, this::setMappingNestedDocsLimit);
Expand Down Expand Up @@ -1539,6 +1554,18 @@ private void setSearchThrottled(boolean searchThrottled) {
this.searchThrottled = searchThrottled;
}

/**
* Returns true if unreferenced files should be cleaned up on merge failure for this index.
*
*/
public boolean shouldCleanupUnreferencedFiles() {
return shouldCleanupUnreferencedFiles;
}

private void setShouldCleanupUnreferencedFiles(boolean shouldCleanupUnreferencedFiles) {
this.shouldCleanupUnreferencedFiles = shouldCleanupUnreferencedFiles;
}

public long getMappingNestedFieldsLimit() {
return mappingNestedFieldsLimit;
}
Expand Down
43 changes: 43 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
Expand Down Expand Up @@ -135,6 +138,8 @@ public abstract class Engine implements Closeable {
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
public static final String SEARCH_SOURCE = "search"; // TODO: Make source of search enum?
public static final String CAN_MATCH_SEARCH_SOURCE = "can_match";
public static final String FORCE_MERGE = "force merge";
public static final String MERGE_FAILED = "merge failed";

protected final ShardId shardId;
protected final Logger logger;
Expand Down Expand Up @@ -983,6 +988,10 @@ protected void fillSegmentStats(SegmentReader segmentReader, boolean includeSegm
}
}

boolean shouldCleanupUnreferencedFiles() {
return engineConfig.getIndexSettings().shouldCleanupUnreferencedFiles();
}

private Map<String, Long> getSegmentFileSizes(SegmentReader segmentReader) {
Directory directory = null;
SegmentCommitInfo segmentCommitInfo = segmentReader.getSegmentInfo();
Expand Down Expand Up @@ -1343,6 +1352,14 @@ public void failEngine(String reason, @Nullable Exception failure) {
);
}
}

// If cleanup of unreferenced flag is enabled and force merge or regular merge failed due to IOException,
// clean all unreferenced files on best effort basis created during failed merge and reset the
// shard state back to last Lucene Commit.
if (shouldCleanupUnreferencedFiles() && isMergeFailureDueToIOException(failure, reason)) {
cleanUpUnreferencedFiles();
}

eventListener.onFailedEngine(reason, failure);
}
} catch (Exception inner) {
Expand All @@ -1361,6 +1378,32 @@ public void failEngine(String reason, @Nullable Exception failure) {
}
}

/**
* Cleanup all unreferenced files generated during failed segment merge. This resets shard state to last Lucene
* commit.
*/
private void cleanUpUnreferencedFiles() {
try (
IndexWriter writer = new IndexWriter(
store.directory(),
new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
.setCommitOnClose(false)
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(IndexWriterConfig.OpenMode.APPEND)
)
) {
// do nothing and close this will kick off IndexFileDeleter which will remove all unreferenced files.
} catch (Exception ex) {
logger.error("Error while deleting unreferenced file ", ex);
}
}

/** Check whether the merge failure happened due to IOException. */
private boolean isMergeFailureDueToIOException(Exception failure, String reason) {
return (reason.equals(FORCE_MERGE) || reason.equals(MERGE_FAILED))
&& ExceptionsHelper.unwrap(failure, IOException.class) instanceof IOException;
}

/** Check whether the engine should be failed */
protected boolean maybeFailEngine(String source, Exception e) {
if (Lucene.isCorruptionException(e)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2189,7 +2189,7 @@ public void forceMerge(
throw ex;
} catch (Exception e) {
try {
maybeFailEngine("force merge", e);
maybeFailEngine(FORCE_MERGE, e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
Expand Down Expand Up @@ -2639,7 +2639,7 @@ protected void doRun() throws Exception {
* confidence that the call stack does not contain catch statements that would cause the error that might be thrown
* here from being caught and never reaching the uncaught exception handler.
*/
failEngine("merge failed", new MergePolicy.MergeException(exc));
failEngine(MERGE_FAILED, new MergePolicy.MergeException(exc));
}
});
}
Expand Down
Loading

0 comments on commit 12ac2a8

Please sign in to comment.