From b3ee17da3ff4b7e00b596d88a9fba8c491fd7653 Mon Sep 17 00:00:00 2001 From: Nagaraj Tantri Date: Wed, 4 Jan 2023 17:09:09 +0000 Subject: [PATCH] Added support for feature flags in opensearch.yml (#4959) This change introduces a static store "settings" in FeatureFlags.java file to enable isEnabled method to fetch flag settings defined in opensearch.yml. Signed-off-by: Nagaraj Tantri (cherry picked from commit 241bd42ab4d0bff0c6d232df0076d2568e326667) --- CHANGELOG.md | 16 +- distribution/src/config/opensearch.yml | 28 ++ .../replication/SegmentReplicationIT.java | 129 +++++--- .../SegmentReplicationSnapshotIT.java | 278 ++++++++++++++++++ .../common/settings/FeatureFlagSettings.java | 45 +++ .../common/settings/SettingsModule.java | 3 + .../opensearch/common/util/FeatureFlags.java | 36 ++- .../main/java/org/opensearch/node/Node.java | 3 + .../test/OpenSearchIntegTestCase.java | 15 + 9 files changed, 516 insertions(+), 37 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java create mode 100644 server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java diff --git a/CHANGELOG.md b/CHANGELOG.md index c4676779f0fa7..8bd9ac08a7178 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,13 +13,27 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Update to Gradle 7.6 ([#5382](https://github.com/opensearch-project/OpenSearch/pull/5382)) - Reject bulk requests with invalid actions ([#5299](https://github.com/opensearch-project/OpenSearch/issues/5299)) - Add max_shard_size parameter for shrink API ([#5229](https://github.com/opensearch-project/OpenSearch/pull/5229)) -- Added jackson dependency to server ([#5366] (https://github.com/opensearch-project/OpenSearch/pull/5366)) +- Added jackson dependency to server ([#5366](https://github.com/opensearch-project/OpenSearch/pull/5366)) - Adding support to register settings dynamically ([#5495](https://github.com/opensearch-project/OpenSearch/pull/5495)) - Adding auto release workflow ([#5582](https://github.com/opensearch-project/OpenSearch/pull/5582)) - Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518)), ([#5597](https://github.com/opensearch-project/OpenSearch/pull/5597)), ([#5615](https://github.com/opensearch-project/OpenSearch/pull/5615))) - Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348)) - Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668)) - Experimental support for extended backward compatiblity in searchable snapshots ([#5429](https://github.com/opensearch-project/OpenSearch/pull/5429)) +- Add feature flag for extensions ([#5211](https://github.com/opensearch-project/OpenSearch/pull/5211)) +- Add support for s390x architecture ([#4001](https://github.com/opensearch-project/OpenSearch/pull/4001)) +- Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085)) +- Point in time rest layer changes for create and delete PIT API ([#4064](https://github.com/opensearch-project/OpenSearch/pull/4064)) +- Add failover support with Segment Replication enabled. ([#4325](https://github.com/opensearch-project/OpenSearch/pull/4325) +- Point in time rest layer changes for list PIT and PIT segments API ([#4388](https://github.com/opensearch-project/OpenSearch/pull/4388)) +- Added @dreamer-89 as an Opensearch maintainer ([#4342](https://github.com/opensearch-project/OpenSearch/pull/4342)) +- Added release notes for 1.3.5 ([#4343](https://github.com/opensearch-project/OpenSearch/pull/4343)) +- Added release notes for 2.2.1 ([#4344](https://github.com/opensearch-project/OpenSearch/pull/4344)) +- BWC version 2.2.2 ([#4383](https://github.com/opensearch-project/OpenSearch/pull/4383)) +- Support for labels on version bump PRs, skip label support for changelog verifier ([#4391](https://github.com/opensearch-project/OpenSearch/pull/4391)) +- Add a new node role 'search' which is dedicated to provide search capability ([#4689](https://github.com/opensearch-project/OpenSearch/pull/4689)) +- Introduce experimental searchable snapshot API ([#4680](https://github.com/opensearch-project/OpenSearch/pull/4680)) +- Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959)) ### Dependencies - Bump bcpg-fips from 1.0.5.1 to 1.0.7.1 ([#5148](https://github.com/opensearch-project/OpenSearch/pull/5148)) diff --git a/distribution/src/config/opensearch.yml b/distribution/src/config/opensearch.yml index 2188fbe600cbf..07cf06ad04b4a 100644 --- a/distribution/src/config/opensearch.yml +++ b/distribution/src/config/opensearch.yml @@ -86,3 +86,31 @@ ${path.logs} # Require explicit names when deleting indices: # #action.destructive_requires_name: true +# +# ---------------------------------- Experimental Features ----------------------------------- +# +# Gates the visibility of the index setting that allows changing of replication type. +# Once the feature is ready for production release, this feature flag can be removed. +# +#opensearch.experimental.feature.replication_type.enabled: false +# +# +# Gates the visibility of the index setting that allows persisting data to remote store along with local disk. +# Once the feature is ready for production release, this feature flag can be removed. +# +#opensearch.experimental.feature.remote_store.enabled: false +# +# +# Gates the functionality of a new parameter to the snapshot restore API +# that allows for creation of a new index type that searches a snapshot +# directly in a remote repository without restoring all index data to disk +# ahead of time. +# +#opensearch.experimental.feature.searchable_snapshot.enabled: false +# +# +# Gates the ability for Searchable Snapshots to read snapshots that are older than the +# guaranteed backward compatibility for OpenSearch (one prior major version) on a best effort basis. +# +#opensearch.experimental.feature.searchable_snapshot.extended_compatibility.enabled: false +# diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index e0fa0dbe1e82a..98b69c0ca0d22 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -9,7 +9,6 @@ package org.opensearch.indices.replication; import com.carrotsearch.randomizedtesting.RandomizedTest; -import org.junit.BeforeClass; import org.opensearch.OpenSearchCorruptionException; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.indices.segments.IndexShardSegments; @@ -17,6 +16,8 @@ import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.opensearch.action.admin.indices.segments.ShardSegments; import org.opensearch.action.support.WriteRequest; +import org.opensearch.action.update.UpdateResponse; +import org.opensearch.client.Requests; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; @@ -56,8 +57,10 @@ import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.index.query.QueryBuilders.matchQuery; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class SegmentReplicationIT extends OpenSearchIntegTestCase { @@ -66,11 +69,6 @@ public class SegmentReplicationIT extends OpenSearchIntegTestCase { private static final int SHARD_COUNT = 1; private static final int REPLICA_COUNT = 1; - @BeforeClass - public static void assumeFeatureFlag() { - assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE))); - } - @Override protected Collection> nodePlugins() { return Arrays.asList(MockTransportService.TestPlugin.class); @@ -92,11 +90,16 @@ protected boolean addMockInternalEngine() { return false; } + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build(); + } + public void testPrimaryStopped_ReplicaPromoted() throws Exception { - final String primary = internalCluster().startNode(); + final String primary = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startNode(featureFlagSettings()); ensureGreen(INDEX_NAME); client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); @@ -123,7 +126,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); // start another node, index another doc and replicate. - String nodeC = internalCluster().startNode(); + String nodeC = internalCluster().startNode(featureFlagSettings()); ensureGreen(INDEX_NAME); client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get(); refresh(INDEX_NAME); @@ -134,10 +137,10 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { } public void testRestartPrimary() throws Exception { - final String primary = internalCluster().startNode(); + final String primary = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startNode(featureFlagSettings()); ensureGreen(INDEX_NAME); assertEquals(getNodeContainingPrimaryShard().getName(), primary); @@ -163,10 +166,10 @@ public void testRestartPrimary() throws Exception { public void testCancelPrimaryAllocation() throws Exception { // this test cancels allocation on the primary - promoting the new replica and recreating the former primary as a replica. - final String primary = internalCluster().startNode(); + final String primary = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startNode(featureFlagSettings()); ensureGreen(INDEX_NAME); final int initialDocCount = 1; @@ -197,10 +200,13 @@ public void testCancelPrimaryAllocation() throws Exception { /** * This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery. + * + * TODO: Ignoring this test as its flaky and needs separate fix */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testAddNewReplicaFailure() throws Exception { logger.info("--> starting [Primary Node] ..."); - final String primaryNode = internalCluster().startNode(); + final String primaryNode = internalCluster().startNode(featureFlagSettings()); logger.info("--> creating test index ..."); prepareCreate( @@ -223,7 +229,7 @@ public void testAddNewReplicaFailure() throws Exception { assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L)); logger.info("--> start empty node to add replica shard"); - final String replicaNode = internalCluster().startNode(); + final String replicaNode = internalCluster().startNode(featureFlagSettings()); // Mock transport service to add behaviour of throwing corruption exception during segment replication process. MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( @@ -265,8 +271,8 @@ public void testAddNewReplicaFailure() throws Exception { } public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { - final String nodeA = internalCluster().startNode(); - final String nodeB = internalCluster().startNode(); + final String nodeA = internalCluster().startNode(featureFlagSettings()); + final String nodeB = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); @@ -304,10 +310,9 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { assertSegmentStats(REPLICA_COUNT); } } - public void testIndexReopenClose() throws Exception { - final String primary = internalCluster().startNode(); - final String replica = internalCluster().startNode(); + final String primary = internalCluster().startNode(featureFlagSettings()); + final String replica = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); @@ -351,8 +356,8 @@ public void testMultipleShards() throws Exception { .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); - final String nodeA = internalCluster().startNode(); - final String nodeB = internalCluster().startNode(); + final String nodeA = internalCluster().startNode(featureFlagSettings()); + final String nodeB = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME, indexSettings); ensureGreen(INDEX_NAME); @@ -392,8 +397,8 @@ public void testMultipleShards() throws Exception { } public void testReplicationAfterForceMerge() throws Exception { - final String nodeA = internalCluster().startNode(); - final String nodeB = internalCluster().startNode(); + final String nodeA = internalCluster().startNode(featureFlagSettings()); + final String nodeB = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); @@ -437,11 +442,11 @@ public void testReplicationAfterForceMerge() throws Exception { } public void testCancellation() throws Exception { - final String primaryNode = internalCluster().startNode(); + final String primaryNode = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()); ensureYellow(INDEX_NAME); - final String replicaNode = internalCluster().startNode(); + final String replicaNode = internalCluster().startNode(featureFlagSettings()); final SegmentReplicationSourceService segmentReplicationSourceService = internalCluster().getInstance( SegmentReplicationSourceService.class, @@ -496,7 +501,7 @@ public void testCancellation() throws Exception { } public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { - final String primaryNode = internalCluster().startNode(); + final String primaryNode = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); ensureGreen(INDEX_NAME); @@ -519,7 +524,7 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { .prepareUpdateSettings(INDEX_NAME) .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) ); - final String replicaNode = internalCluster().startNode(); + final String replicaNode = internalCluster().startNode(featureFlagSettings()); ensureGreen(INDEX_NAME); assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); @@ -534,8 +539,8 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { } public void testDeleteOperations() throws Exception { - final String nodeA = internalCluster().startNode(); - final String nodeB = internalCluster().startNode(); + final String nodeA = internalCluster().startNode(featureFlagSettings()); + final String nodeB = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); @@ -596,6 +601,60 @@ public void testDeleteOperations() throws Exception { } } + public void testUpdateOperations() throws Exception { + final String primary = internalCluster().startNode(featureFlagSettings()); + createIndex(INDEX_NAME); + ensureYellow(INDEX_NAME); + final String replica = internalCluster().startNode(featureFlagSettings()); + + final int initialDocCount = scaledRandomIntBetween(0, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + + // wait a short amount of time to give replication a chance to complete. + assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + waitForReplicaUpdate(); + + assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + Set ids = indexer.getIds(); + String id = ids.toArray()[0].toString(); + UpdateResponse updateResponse = client(primary).prepareUpdate(INDEX_NAME, id) + .setDoc(Requests.INDEX_CONTENT_TYPE, "foo", "baz") + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .get(); + assertFalse("request shouldn't have forced a refresh", updateResponse.forcedRefresh()); + assertEquals(2, updateResponse.getVersion()); + + refresh(INDEX_NAME); + waitForReplicaUpdate(); + + assertSearchHits(client(primary).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); + assertSearchHits(client(replica).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id); + + } + } + private void assertSegmentStats(int numberOfReplicas) throws IOException { final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet(); @@ -647,10 +706,10 @@ public void testDropPrimaryDuringReplication() throws Exception { .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 6) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); - final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); - final String primaryNode = internalCluster().startDataOnlyNode(Settings.EMPTY); + final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(featureFlagSettings()); + final String primaryNode = internalCluster().startDataOnlyNode(featureFlagSettings()); createIndex(INDEX_NAME, settings); - internalCluster().startDataOnlyNodes(6); + internalCluster().startDataOnlyNodes(6, featureFlagSettings()); ensureGreen(INDEX_NAME); int initialDocCount = scaledRandomIntBetween(100, 200); @@ -673,7 +732,7 @@ public void testDropPrimaryDuringReplication() throws Exception { ensureYellow(INDEX_NAME); // start another replica. - internalCluster().startDataOnlyNode(); + internalCluster().startDataOnlyNode(featureFlagSettings()); ensureGreen(INDEX_NAME); // index another doc and refresh - without this the new replica won't catch up. diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java new file mode 100644 index 0000000000000..11c8cf21ac66e --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java @@ -0,0 +1,278 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.snapshots; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder; +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.rest.RestStatus; +import org.opensearch.test.BackgroundIndexer; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationSnapshotIT extends AbstractSnapshotIntegTestCase { + private static final String INDEX_NAME = "test-segrep-idx"; + private static final String RESTORED_INDEX_NAME = INDEX_NAME + "-restored"; + private static final int SHARD_COUNT = 1; + private static final int REPLICA_COUNT = 1; + private static final int DOC_COUNT = 1010; + + private static final String REPOSITORY_NAME = "test-segrep-repo"; + private static final String SNAPSHOT_NAME = "test-segrep-snapshot"; + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build(); + } + + public Settings segRepEnableIndexSettings() { + return getShardSettings().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); + } + + public Settings docRepEnableIndexSettings() { + return getShardSettings().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build(); + } + + public Settings.Builder getShardSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT); + } + + public Settings restoreIndexSegRepSettings() { + return Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); + } + + public Settings restoreIndexDocRepSettings() { + return Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build(); + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + public void ingestData(int docCount, String indexName) throws Exception { + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + indexName, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(docCount); + waitForDocs(docCount, indexer); + refresh(indexName); + } + } + + // Start cluster with provided settings and return the node names as list + public List startClusterWithSettings(Settings indexSettings, int replicaCount) throws Exception { + // Start primary + final String primaryNode = internalCluster().startNode(featureFlagSettings()); + List nodeNames = new ArrayList<>(); + nodeNames.add(primaryNode); + for (int i = 0; i < replicaCount; i++) { + nodeNames.add(internalCluster().startNode(featureFlagSettings())); + } + createIndex(INDEX_NAME, indexSettings); + ensureGreen(INDEX_NAME); + // Ingest data + ingestData(DOC_COUNT, INDEX_NAME); + return nodeNames; + } + + public void createSnapshot() { + // Snapshot declaration + Path absolutePath = randomRepoPath().toAbsolutePath(); + // Create snapshot + createRepository(REPOSITORY_NAME, "fs", absolutePath); + CreateSnapshotResponse createSnapshotResponse = client().admin() + .cluster() + .prepareCreateSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME) + .setWaitForCompletion(true) + .setIndices(INDEX_NAME) + .get(); + assertThat( + createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()) + ); + assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + } + + public RestoreSnapshotResponse restoreSnapshotWithSettings(Settings indexSettings) { + RestoreSnapshotRequestBuilder builder = client().admin() + .cluster() + .prepareRestoreSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME) + .setWaitForCompletion(false) + .setRenamePattern(INDEX_NAME) + .setRenameReplacement(RESTORED_INDEX_NAME); + if (indexSettings != null) { + builder.setIndexSettings(indexSettings); + } + return builder.get(); + } + + public void testRestoreOnSegRep() throws Exception { + // Start cluster with one primary and one replica node + startClusterWithSettings(segRepEnableIndexSettings(), 1); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME)); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT"); + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT); + } + + public void testSnapshotOnSegRep_RestoreOnSegRepDuringIngestion() throws Exception { + startClusterWithSettings(segRepEnableIndexSettings(), 1); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME)); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + ingestData(5000, RESTORED_INDEX_NAME); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT"); + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT + 5000); + } + + public void testSnapshotOnDocRep_RestoreOnSegRep() throws Exception { + startClusterWithSettings(docRepEnableIndexSettings(), 1); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexSegRepSettings()); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT"); + + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT); + } + + public void testSnapshotOnSegRep_RestoreOnDocRep() throws Exception { + // Start a cluster with one primary and one replica + startClusterWithSettings(segRepEnableIndexSettings(), 1); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexDocRepSettings()); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "DOCUMENT"); + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT); + } + + public void testSnapshotOnDocRep_RestoreOnDocRep() throws Exception { + startClusterWithSettings(docRepEnableIndexSettings(), 1); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexDocRepSettings()); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "DOCUMENT"); + + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT); + } + + public void testRestoreOnReplicaNode() throws Exception { + List nodeNames = startClusterWithSettings(segRepEnableIndexSettings(), 1); + final String primaryNode = nodeNames.get(0); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME)); + + // stop the primary node so that restoration happens on replica node + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + internalCluster().startNode(featureFlagSettings()); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT"); + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT); + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java new file mode 100644 index 0000000000000..9354b72f9823d --- /dev/null +++ b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.settings; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import org.opensearch.common.settings.Setting.Property; +import org.opensearch.common.util.FeatureFlags; + +/** + * Encapsulates all valid feature flag level settings. + * + * @opensearch.internal + */ +public class FeatureFlagSettings extends AbstractScopedSettings { + + protected FeatureFlagSettings( + Settings settings, + Set> settingsSet, + Set> settingUpgraders, + Property scope + ) { + super(settings, settingsSet, settingUpgraders, scope); + } + + public static final Set> BUILT_IN_FEATURE_FLAGS = Collections.unmodifiableSet( + new HashSet<>( + Arrays.asList( + FeatureFlags.REPLICATION_TYPE_SETTING, + FeatureFlags.REMOTE_STORE_SETTING, + FeatureFlags.SEARCHABLE_SNAPSHOT_SETTING, + FeatureFlags.EXTENSIONS_SETTING, + FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_SETTING + ) + ) + ); +} diff --git a/server/src/main/java/org/opensearch/common/settings/SettingsModule.java b/server/src/main/java/org/opensearch/common/settings/SettingsModule.java index df16c5a499ebe..8000cde6f81c1 100644 --- a/server/src/main/java/org/opensearch/common/settings/SettingsModule.java +++ b/server/src/main/java/org/opensearch/common/settings/SettingsModule.java @@ -87,6 +87,9 @@ public SettingsModule( for (Setting setting : IndexScopedSettings.BUILT_IN_INDEX_SETTINGS) { registerSetting(setting); } + for (Setting setting : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) { + registerSetting(setting); + } for (Map.Entry> featureFlaggedSetting : IndexScopedSettings.FEATURE_FLAGGED_INDEX_SETTINGS.entrySet()) { if (FeatureFlags.isEnabled(featureFlaggedSetting.getKey())) { diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index 9f1a9a43ff54e..2ab52a87b2240 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -8,6 +8,10 @@ package org.opensearch.common.util; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Setting.Property; +import org.opensearch.common.settings.Settings; + /** * Utility class to manage feature flags. Feature flags are system properties that must be set on the JVM. * These are used to gate the visibility/availability of incomplete features. Fore more information, see @@ -50,12 +54,42 @@ public class FeatureFlags { */ public static final String EXTENSIONS = "opensearch.experimental.feature.extensions.enabled"; + /** + * Should store the settings from opensearch.yml. + */ + private static Settings settings; + + /** + * This method is responsible to map settings from opensearch.yml to local stored + * settings value. That is used for the existing isEnabled method. + * + * @param openSearchSettings The settings stored in opensearch.yml. + */ + public static void initializeFeatureFlags(Settings openSearchSettings) { + settings = openSearchSettings; + } + /** * Used to test feature flags whose values are expected to be booleans. * This method returns true if the value is "true" (case-insensitive), * and false otherwise. */ public static boolean isEnabled(String featureFlagName) { - return "true".equalsIgnoreCase(System.getProperty(featureFlagName)); + if ("true".equalsIgnoreCase(System.getProperty(featureFlagName))) { + // TODO: Remove the if condition once FeatureFlags are only supported via opensearch.yml + return true; + } + return settings != null && settings.getAsBoolean(featureFlagName, false); } + + public static final Setting REPLICATION_TYPE_SETTING = Setting.boolSetting(REPLICATION_TYPE, false, Property.NodeScope); + public static final Setting REMOTE_STORE_SETTING = Setting.boolSetting(REMOTE_STORE, false, Property.NodeScope); + public static final Setting SEARCHABLE_SNAPSHOT_SETTING = Setting.boolSetting(SEARCHABLE_SNAPSHOT, false, Property.NodeScope); + + public static final Setting SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_SETTING = Setting.boolSetting( + SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY, + false, + Property.NodeScope + ); + public static final Setting EXTENSIONS_SETTING = Setting.boolSetting(EXTENSIONS, false, Property.NodeScope); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 69816db248c66..0d867b4a24561 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -438,6 +438,9 @@ protected Node( final Settings settings = pluginsService.updatedSettings(); + // Ensure to initialize Feature Flags via the settings from opensearch.yml + FeatureFlags.initializeFeatureFlags(settings); + final Set additionalRoles = pluginsService.filterPlugins(Plugin.class) .stream() .map(Plugin::getRoles) diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index f210b0076d186..e27e461a58a05 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -106,6 +106,7 @@ import org.opensearch.common.network.NetworkAddress; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.regex.Regex; +import org.opensearch.common.settings.FeatureFlagSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; @@ -769,6 +770,20 @@ public Settings indexSettings() { return builder.build(); } + /** + * Setting all feature flag settings at base IT, which can be overridden later by individual + * IT classes. + * + * @return Feature flag settings. + */ + protected Settings featureFlagSettings() { + Settings.Builder featureSettings = Settings.builder(); + for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) { + featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY)); + } + return featureSettings.build(); + } + /** * Creates one or more indices and asserts that the indices are acknowledged. If one of the indices * already exists this method will fail and wipe all the indices created so far.