Skip to content

Commit

Permalink
Update merge on refresh and merge on commit defaults in Opensearch (L…
Browse files Browse the repository at this point in the history
…ucene 9.3) (#3561) (#4013)

* Update merge on refresh and merge on commit defaults in Opensearch (Lucene 9.3)

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

* Addressing code review comments, adding more tests

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
(cherry picked from commit a34a4c0)
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta committed Jul 26, 2022
1 parent 888d4bf commit 72674d4
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING,
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED,
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME,
IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Expand Down
50 changes: 46 additions & 4 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@
package org.opensearch.index;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Strings;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
Expand All @@ -53,9 +54,11 @@
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;

import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING;
Expand All @@ -73,6 +76,9 @@
* @opensearch.internal
*/
public final class IndexSettings {
private static final String MERGE_ON_FLUSH_DEFAULT_POLICY = "default";
private static final String MERGE_ON_FLUSH_MERGE_POLICY = "merge-on-flush";

public static final Setting<List<String>> DEFAULT_FIELD_SETTING = Setting.listSetting(
"index.query.default_field",
Collections.singletonList("*"),
Expand Down Expand Up @@ -515,14 +521,21 @@ public final class IndexSettings {
public static final Setting<TimeValue> INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME = Setting.timeSetting(
"index.merge_on_flush.max_full_flush_merge_wait_time",
new TimeValue(10, TimeUnit.SECONDS),
new TimeValue(0, TimeUnit.MILLISECONDS),
new TimeValue(1, TimeUnit.MILLISECONDS),
Property.Dynamic,
Property.IndexScope
);

public static final Setting<Boolean> INDEX_MERGE_ON_FLUSH_ENABLED = Setting.boolSetting(
"index.merge_on_flush.enabled",
false,
true, /* https://issues.apache.org/jira/browse/LUCENE-10078 */
Property.IndexScope,
Property.Dynamic
);

public static final Setting<String> INDEX_MERGE_ON_FLUSH_POLICY = Setting.simpleString(
"index.merge_on_flush.policy",
MERGE_ON_FLUSH_DEFAULT_POLICY,
Property.IndexScope,
Property.Dynamic
);
Expand Down Expand Up @@ -617,6 +630,10 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
* Is merge of flush enabled or not
*/
private volatile boolean mergeOnFlushEnabled;
/**
* Specialized merge-on-flush policy if provided
*/
private volatile UnaryOperator<MergePolicy> mergeOnFlushPolicy;

/**
* Returns the default search fields for this index.
Expand Down Expand Up @@ -733,6 +750,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
mappingFieldNameLengthLimit = scopedSettings.get(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING);
maxFullFlushMergeWaitTime = scopedSettings.get(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME);
mergeOnFlushEnabled = scopedSettings.get(INDEX_MERGE_ON_FLUSH_ENABLED);
setMergeOnFlushPolicy(scopedSettings.get(INDEX_MERGE_ON_FLUSH_POLICY));

scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
scopedSettings.addSettingsUpdateConsumer(
Expand Down Expand Up @@ -804,6 +822,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING, this::setMappingFieldNameLengthLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, this::setMaxFullFlushMergeWaitTime);
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_ENABLED, this::setMergeOnFlushEnabled);
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_POLICY, this::setMergeOnFlushPolicy);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
Expand Down Expand Up @@ -874,7 +893,7 @@ public String getUUID() {
* Returns <code>true</code> if the index has a custom data path
*/
public boolean hasCustomDataPath() {
return Strings.isNotEmpty(customDataPath());
return !Strings.isEmpty(customDataPath());
}

/**
Expand Down Expand Up @@ -1390,4 +1409,27 @@ public TimeValue getMaxFullFlushMergeWaitTime() {
public boolean isMergeOnFlushEnabled() {
return mergeOnFlushEnabled;
}

private void setMergeOnFlushPolicy(String policy) {
if (Strings.isEmpty(policy) || MERGE_ON_FLUSH_DEFAULT_POLICY.equalsIgnoreCase(policy)) {
mergeOnFlushPolicy = null;
} else if (MERGE_ON_FLUSH_MERGE_POLICY.equalsIgnoreCase(policy)) {
this.mergeOnFlushPolicy = MergeOnFlushMergePolicy::new;
} else {
throw new IllegalArgumentException(
"The "
+ IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey()
+ " has unsupported policy specified: "
+ policy
+ ". Please use one of: "
+ MERGE_ON_FLUSH_DEFAULT_POLICY
+ ", "
+ MERGE_ON_FLUSH_MERGE_POLICY
);
}
}

public Optional<UnaryOperator<MergePolicy>> getMergeOnFlushPolicy() {
return Optional.ofNullable(mergeOnFlushPolicy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.lucene.index.ShuffleForcedMergePolicy;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocIdSetIterator;
Expand Down Expand Up @@ -133,6 +132,7 @@
import java.util.function.BiFunction;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -2448,14 +2448,14 @@ private IndexWriterConfig getIndexWriterConfig() {
final long maxFullFlushMergeWaitMillis = config().getIndexSettings().getMaxFullFlushMergeWaitTime().millis();
if (maxFullFlushMergeWaitMillis > 0) {
iwc.setMaxFullFlushMergeWaitMillis(maxFullFlushMergeWaitMillis);
mergePolicy = new MergeOnFlushMergePolicy(mergePolicy);
} else {
logger.warn(
"The {} is enabled but {} is set to 0, merge on flush will not be activated",
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(),
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey()
);
final Optional<UnaryOperator<MergePolicy>> mergeOnFlushPolicy = config().getIndexSettings().getMergeOnFlushPolicy();
if (mergeOnFlushPolicy.isPresent()) {
mergePolicy = mergeOnFlushPolicy.get().apply(mergePolicy);
}
}
} else {
// Disable merge on refresh
iwc.setMaxFullFlushMergeWaitMillis(0);
}

iwc.setMergePolicy(new OpenSearchMergePolicy(mergePolicy));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,10 +590,8 @@ public void testWrapAllDocsLive() throws Exception {
public void testWrapLiveDocsNotExposeAbortedDocuments() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig config = newIndexWriterConfig().setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
.setMergePolicy(new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, MatchAllDocsQuery::new, newMergePolicy()));
// override 500ms default introduced in
// https://issues.apache.org/jira/browse/LUCENE-10078
config.setMaxFullFlushMergeWaitMillis(0);
.setMergePolicy(new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, MatchAllDocsQuery::new, newMergePolicy()))
.setMaxFullFlushMergeWaitMillis(0);
IndexWriter writer = new IndexWriter(dir, config);
int numDocs = between(1, 10);
List<String> liveDocs = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,7 @@ public void testMergeSegmentsOnCommitIsDisabled() throws Exception {

final Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(0))
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true);
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false);
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

Expand Down Expand Up @@ -572,7 +571,7 @@ public void testMergeSegmentsOnCommit() throws Exception {
final Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(5000))
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true);
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), "merge-on-flush");
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

Expand Down Expand Up @@ -634,14 +633,52 @@ public void testMergeSegmentsOnCommit() throws Exception {
}
}

public void testMergeSegmentsOnCommitDefault() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

final Settings.Builder settings = Settings.builder().put(defaultSettings.getSettings());
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

final TieredMergePolicy mergePolicy = new TieredMergePolicy();
mergePolicy.setSegmentsPerTier(2);

try (
Store store = createStore();
InternalEngine engine = createEngine(
config(indexSettings, store, createTempDir(), mergePolicy, null, null, globalCheckpoint::get)
)
) {
List<Segment> segments = engine.segments(true);
assertThat(segments.isEmpty(), equalTo(true));

ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null);
engine.index(indexForDoc(doc));
engine.refresh("test");

segments = engine.segments(true);
assertThat(segments.size(), equalTo(1));

ParsedDocument doc2 = testParsedDocument("2", null, testDocumentWithTextField(), B_2, null);
engine.index(indexForDoc(doc2));
engine.refresh("test");
ParsedDocument doc3 = testParsedDocument("3", null, testDocumentWithTextField(), B_3, null);
engine.index(indexForDoc(doc3));
engine.refresh("test");

segments = engine.segments(true);
assertThat(segments.size(), equalTo(2));
}
}

// this test writes documents to the engine while concurrently flushing/commit
public void testConcurrentMergeSegmentsOnCommit() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

final Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(5000))
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true);
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), "merge-on-flush");
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public void testGetForUpdate() throws IOException {
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)

.build();
IndexMetadata metadata = IndexMetadata.builder("test")
.putMapping("{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
Expand Down

0 comments on commit 72674d4

Please sign in to comment.