Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Added support for feature flags in opensearch.yml (#4959) #5941

Merged
merged 1 commit into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
- Add query for initialized extensions ([#5658](https://github.com/opensearch-project/OpenSearch/pull/5658))
- Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959))

### Added
- Added cluster manager throttling stats in nodes/stats API ([#5790](https://github.com/opensearch-project/OpenSearch/pull/5790))
Expand Down
23 changes: 23 additions & 0 deletions distribution/src/config/opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,26 @@ ${path.logs}
# Require explicit names when deleting indices:
#
#action.destructive_requires_name: true
#
# ---------------------------------- Experimental Features -----------------------------------
#
# Gates the visibility of the index setting that allows changing of replication type.
# Once the feature is ready for production release, this feature flag can be removed.
#
#opensearch.experimental.feature.replication_type.enabled: false
#
#
# Gates the visibility of the index setting that allows persisting data to remote store along with local disk.
# Once the feature is ready for production release, this feature flag can be removed.
#
#opensearch.experimental.feature.remote_store.enabled: false
#
#
# Gates the functionality of a new parameter to the snapshot restore API
# that allows for creation of a new index type that searches a snapshot
# directly in a remote repository without restoring all index data to disk
# ahead of time.
#
#opensearch.experimental.feature.searchable_snapshot.enabled: false
#
#
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.junit.BeforeClass;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
Expand Down Expand Up @@ -66,11 +65,6 @@ public class SegmentReplicationIT extends OpenSearchIntegTestCase {
private static final int SHARD_COUNT = 1;
private static final int REPLICA_COUNT = 1;

@BeforeClass
public static void assumeFeatureFlag() {
assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE)));
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
Expand All @@ -92,11 +86,16 @@ protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build();
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
Expand All @@ -123,7 +122,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception {
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);

// start another node, index another doc and replicate.
String nodeC = internalCluster().startNode();
String nodeC = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get();
refresh(INDEX_NAME);
Expand All @@ -134,10 +133,10 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception {
}

public void testRestartPrimary() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

assertEquals(getNodeContainingPrimaryShard().getName(), primary);
Expand All @@ -163,10 +162,10 @@ public void testRestartPrimary() throws Exception {

public void testCancelPrimaryAllocation() throws Exception {
// this test cancels allocation on the primary - promoting the new replica and recreating the former primary as a replica.
final String primary = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

final int initialDocCount = 1;
Expand Down Expand Up @@ -197,10 +196,13 @@ public void testCancelPrimaryAllocation() throws Exception {

/**
* This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery.
*
* TODO: Ignoring this test as its flaky and needs separate fix
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testAddNewReplicaFailure() throws Exception {
logger.info("--> starting [Primary Node] ...");
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startNode(featureFlagSettings());

logger.info("--> creating test index ...");
prepareCreate(
Expand All @@ -223,7 +225,7 @@ public void testAddNewReplicaFailure() throws Exception {
assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L));

logger.info("--> start empty node to add replica shard");
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startNode(featureFlagSettings());

// Mock transport service to add behaviour of throwing corruption exception during segment replication process.
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
Expand Down Expand Up @@ -265,8 +267,8 @@ public void testAddNewReplicaFailure() throws Exception {
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -306,8 +308,8 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
}

public void testIndexReopenClose() throws Exception {
final String primary = internalCluster().startNode();
final String replica = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
final String replica = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -351,8 +353,8 @@ public void testMultipleShards() throws Exception {
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, indexSettings);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -392,8 +394,8 @@ public void testMultipleShards() throws Exception {
}

public void testReplicationAfterForceMerge() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -437,11 +439,11 @@ public void testReplicationAfterForceMerge() throws Exception {
}

public void testCancellation() throws Exception {
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureYellow(INDEX_NAME);

final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startNode(featureFlagSettings());

final SegmentReplicationSourceService segmentReplicationSourceService = internalCluster().getInstance(
SegmentReplicationSourceService.class,
Expand Down Expand Up @@ -496,7 +498,7 @@ public void testCancellation() throws Exception {
}

public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(INDEX_NAME);

Expand All @@ -519,7 +521,7 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
Expand All @@ -534,8 +536,8 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
}

public void testDeleteOperations() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());

createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
Expand Down Expand Up @@ -647,10 +649,10 @@ public void testDropPrimaryDuringReplication() throws Exception {
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 6)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode(Settings.EMPTY);
final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(featureFlagSettings());
final String primaryNode = internalCluster().startDataOnlyNode(featureFlagSettings());
createIndex(INDEX_NAME, settings);
internalCluster().startDataOnlyNodes(6);
internalCluster().startDataOnlyNodes(6, featureFlagSettings());
ensureGreen(INDEX_NAME);

int initialDocCount = scaledRandomIntBetween(100, 200);
Expand All @@ -673,7 +675,7 @@ public void testDropPrimaryDuringReplication() throws Exception {
ensureYellow(INDEX_NAME);

// start another replica.
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

// index another doc and refresh - without this the new replica won't catch up.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.settings;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.util.FeatureFlags;

/**
* Encapsulates all valid feature flag level settings.
*
* @opensearch.internal
*/
public class FeatureFlagSettings extends AbstractScopedSettings {

protected FeatureFlagSettings(
Settings settings,
Set<Setting<?>> settingsSet,
Set<SettingUpgrader<?>> settingUpgraders,
Property scope
) {
super(settings, settingsSet, settingUpgraders, scope);
}

public static final Set<Setting<?>> BUILT_IN_FEATURE_FLAGS = Collections.unmodifiableSet(
new HashSet<>(
Arrays.asList(
FeatureFlags.REPLICATION_TYPE_SETTING,
FeatureFlags.REMOTE_STORE_SETTING,
FeatureFlags.SEARCHABLE_SNAPSHOT_SETTING,
FeatureFlags.EXTENSIONS_SETTING
)
)
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ public SettingsModule(
for (Setting<?> setting : IndexScopedSettings.BUILT_IN_INDEX_SETTINGS) {
registerSetting(setting);
}
for (Setting<?> setting : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
registerSetting(setting);
}

for (Map.Entry<String, List<Setting>> featureFlaggedSetting : IndexScopedSettings.FEATURE_FLAGGED_INDEX_SETTINGS.entrySet()) {
if (FeatureFlags.isEnabled(featureFlaggedSetting.getKey())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

package org.opensearch.common.util;

import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;

/**
* Utility class to manage feature flags. Feature flags are system properties that must be set on the JVM.
* These are used to gate the visibility/availability of incomplete features. Fore more information, see
Expand Down Expand Up @@ -50,12 +54,39 @@ public class FeatureFlags {
*/
public static final String EXTENSIONS = "opensearch.experimental.feature.extensions.enabled";

/**
* Should store the settings from opensearch.yml.
*/
private static Settings settings;

/**
* This method is responsible to map settings from opensearch.yml to local stored
* settings value. That is used for the existing isEnabled method.
*
* @param openSearchSettings The settings stored in opensearch.yml.
*/
public static void initializeFeatureFlags(Settings openSearchSettings) {
settings = openSearchSettings;
}

/**
* Used to test feature flags whose values are expected to be booleans.
* This method returns true if the value is "true" (case-insensitive),
* and false otherwise.
*/
public static boolean isEnabled(String featureFlagName) {
return "true".equalsIgnoreCase(System.getProperty(featureFlagName));
if ("true".equalsIgnoreCase(System.getProperty(featureFlagName))) {
// TODO: Remove the if condition once FeatureFlags are only supported via opensearch.yml
return true;
}
return settings != null && settings.getAsBoolean(featureFlagName, false);
}

public static final Setting<Boolean> REPLICATION_TYPE_SETTING = Setting.boolSetting(REPLICATION_TYPE, false, Property.NodeScope);

public static final Setting<Boolean> REMOTE_STORE_SETTING = Setting.boolSetting(REMOTE_STORE, false, Property.NodeScope);

public static final Setting<Boolean> SEARCHABLE_SNAPSHOT_SETTING = Setting.boolSetting(SEARCHABLE_SNAPSHOT, false, Property.NodeScope);

public static final Setting<Boolean> EXTENSIONS_SETTING = Setting.boolSetting(EXTENSIONS, false, Property.NodeScope);
}
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,9 @@ protected Node(

final Settings settings = pluginsService.updatedSettings();

// Ensure to initialize Feature Flags via the settings from opensearch.yml
FeatureFlags.initializeFeatureFlags(settings);

final Set<DiscoveryNodeRole> additionalRoles = pluginsService.filterPlugins(Plugin.class)
.stream()
.map(Plugin::getRoles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.opensearch.common.network.NetworkAddress;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.regex.Regex;
import org.opensearch.common.settings.FeatureFlagSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -769,6 +770,20 @@ public Settings indexSettings() {
return builder.build();
}

/**
* Setting all feature flag settings at base IT, which can be overridden later by individual
* IT classes.
*
* @return Feature flag settings.
*/
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY));
}
return featureSettings.build();
}

/**
* Creates one or more indices and asserts that the indices are acknowledged. If one of the indices
* already exists this method will fail and wipe all the indices created so far.
Expand Down