Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup Unreferenced file on segment merge failure #9503

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added sampler based on `Blanket Probabilistic Sampling rate` and `Override for on demand` ([#9621](https://github.com/opensearch-project/OpenSearch/issues/9621))
- [Remote Store] Add support for Remote Translog Store stats in `_remotestore/stats/` API ([#9263](https://github.com/opensearch-project/OpenSearch/pull/9263))
- Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248))
- Cleanup Unreferenced file on segment merge failure ([#9503](https://github.com/opensearch-project/OpenSearch/pull/9503))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING,
IndexSettings.INDEX_SEARCH_IDLE_AFTER,
IndexSettings.INDEX_SEARCH_THROTTLED,
IndexSettings.INDEX_UNREFERENCED_FILE_CLEANUP,
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
FieldMapper.IGNORE_MALFORMED_SETTING,
FieldMapper.COERCE_SETTING,
Expand Down
27 changes: 27 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,18 @@ public final class IndexSettings {
Property.Dynamic
);

/**
* This setting controls if unreferenced files will be cleaned up in case segment merge fails due to disk full.
*
* Defaults to true which means unreferenced files will be cleaned up in case segment merge fails.
*/
public static final Setting<Boolean> INDEX_UNREFERENCED_FILE_CLEANUP = Setting.boolSetting(
"index.unreferenced_file_cleanup.enabled",
true,
Property.IndexScope,
Property.Dynamic
);

/**
* Determines a balance between file-based and operations-based peer recoveries. The number of operations that will be used in an
* operations-based peer recovery is limited to this proportion of the total number of documents in the shard (including deleted
Expand Down Expand Up @@ -678,6 +690,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
private volatile String defaultPipeline;
private volatile String requiredPipeline;
private volatile boolean searchThrottled;
private volatile boolean shouldCleanupUnreferencedFiles;
private volatile long mappingNestedFieldsLimit;
private volatile long mappingNestedDocsLimit;
private volatile long mappingTotalFieldsLimit;
Expand Down Expand Up @@ -795,6 +808,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
}

this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.shouldCleanupUnreferencedFiles = INDEX_UNREFERENCED_FILE_CLEANUP.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings);
this.queryStringAllowLeadingWildcard = QUERY_STRING_ALLOW_LEADING_WILDCARD.get(nodeSettings);
Expand Down Expand Up @@ -907,6 +921,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(FINAL_PIPELINE, this::setRequiredPipeline);
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations);
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THROTTLED, this::setSearchThrottled);
scopedSettings.addSettingsUpdateConsumer(INDEX_UNREFERENCED_FILE_CLEANUP, this::setShouldCleanupUnreferencedFiles);
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING, this::setRetentionLeaseMillis);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING, this::setMappingNestedFieldsLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING, this::setMappingNestedDocsLimit);
Expand Down Expand Up @@ -1539,6 +1554,18 @@ private void setSearchThrottled(boolean searchThrottled) {
this.searchThrottled = searchThrottled;
}

/**
* Returns true if unreferenced files should be cleaned up on merge failure for this index.
*
*/
public boolean shouldCleanupUnreferencedFiles() {
return shouldCleanupUnreferencedFiles;
}

private void setShouldCleanupUnreferencedFiles(boolean shouldCleanupUnreferencedFiles) {
this.shouldCleanupUnreferencedFiles = shouldCleanupUnreferencedFiles;
}

public long getMappingNestedFieldsLimit() {
return mappingNestedFieldsLimit;
}
Expand Down
43 changes: 43 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
Expand Down Expand Up @@ -134,6 +137,8 @@ public abstract class Engine implements LifecycleAware, Closeable {
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
public static final String SEARCH_SOURCE = "search"; // TODO: Make source of search enum?
public static final String CAN_MATCH_SEARCH_SOURCE = "can_match";
public static final String FORCE_MERGE = "force merge";
public static final String MERGE_FAILED = "merge failed";

protected final ShardId shardId;
protected final Logger logger;
Expand Down Expand Up @@ -950,6 +955,10 @@ protected void fillSegmentStats(SegmentReader segmentReader, boolean includeSegm
}
}

boolean shouldCleanupUnreferencedFiles() {
return engineConfig.getIndexSettings().shouldCleanupUnreferencedFiles();
}

private Map<String, Long> getSegmentFileSizes(SegmentReader segmentReader) {
Directory directory = null;
SegmentCommitInfo segmentCommitInfo = segmentReader.getSegmentInfo();
Expand Down Expand Up @@ -1291,6 +1300,14 @@ public void failEngine(String reason, @Nullable Exception failure) {
);
}
}

// If cleanup of unreferenced flag is enabled and force merge or regular merge failed due to IOException,
// clean all unreferenced files on best effort basis created during failed merge and reset the
// shard state back to last Lucene Commit.
if (shouldCleanupUnreferencedFiles() && isMergeFailureDueToIOException(failure, reason)) {
cleanUpUnreferencedFiles();
}

eventListener.onFailedEngine(reason, failure);
}
} catch (Exception inner) {
Expand All @@ -1309,6 +1326,32 @@ public void failEngine(String reason, @Nullable Exception failure) {
}
}

/**
* Cleanup all unreferenced files generated during failed segment merge. This resets shard state to last Lucene
* commit.
*/
private void cleanUpUnreferencedFiles() {
RS146BIJAY marked this conversation as resolved.
Show resolved Hide resolved
try (
IndexWriter writer = new IndexWriter(
RS146BIJAY marked this conversation as resolved.
Show resolved Hide resolved
store.directory(),
new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
.setCommitOnClose(false)
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(IndexWriterConfig.OpenMode.APPEND)
)
) {
// do nothing and close this will kick off IndexFileDeleter which will remove all unreferenced files.
} catch (Exception ex) {
logger.error("Error while deleting unreferenced file ", ex);
}
}

/** Check whether the merge failure happened due to IOException. */
private boolean isMergeFailureDueToIOException(Exception failure, String reason) {
return (reason.equals(FORCE_MERGE) || reason.equals(MERGE_FAILED))
&& ExceptionsHelper.unwrap(failure, IOException.class) instanceof IOException;
}

/** Check whether the engine should be failed */
protected boolean maybeFailEngine(String source, Exception e) {
if (Lucene.isCorruptionException(e)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2038,7 +2038,7 @@ public void forceMerge(
throw ex;
} catch (Exception e) {
try {
maybeFailEngine("force merge", e);
maybeFailEngine(FORCE_MERGE, e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
Expand Down Expand Up @@ -2488,7 +2488,7 @@ protected void doRun() throws Exception {
* confidence that the call stack does not contain catch statements that would cause the error that might be thrown
* here from being caught and never reaching the uncaught exception handler.
*/
failEngine("merge failed", new MergePolicy.MergeException(exc));
failEngine(MERGE_FAILED, new MergePolicy.MergeException(exc));
}
});
}
Expand Down
Loading
Loading