Skip to content

Commit

Permalink
[Segment Replication] Allow Cluster Setting to enable default Replica…
Browse files Browse the repository at this point in the history
…tion type (opensearch-project#7420)

* Add cluster setting to enable default replication type at cluster level.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* update opensearch.yml file

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Fix failing unit test.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Fix failing tests.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Apply spotlesscheck.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Address comments on PR.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Fix Failing Tests.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Fix test index name

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* add logic for system indices to always use document replication.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* remove unnecessary logic.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* fix failing tests.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* fix failing tests

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Refactor and add additional assert for segRepEnabled.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* fix failing tests.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* refactor code.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Fix failing tests.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Fix failing tests.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Fix failing tests.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* remove unnecessary code logic and test

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Refactor back to having boolean value of system index,

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Add unit test for system index check and some refactor.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* add changelog entry.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

---------

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
Signed-off-by: Rishikesh Pasham <62345295+Rishikesh1159@users.noreply.github.com>
  • Loading branch information
Rishikesh1159 authored and austintlee committed Jun 2, 2023
1 parent 32956a1 commit 4470aa5
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 81 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Create NamedRoute to map extension routes to a shortened name ([#6870](https://github.com/opensearch-project/OpenSearch/pull/6870))
- Added @dbwiddis as on OpenSearch maintainer ([#7665](https://github.com/opensearch-project/OpenSearch/pull/7665))
- [Extensions] Add ExtensionAwarePlugin extension point to add custom settings for extensions ([#7526](https://github.com/opensearch-project/OpenSearch/pull/7526))
- Add new cluster setting to set default index replication type ([#7420](https://github.com/opensearch-project/OpenSearch/pull/7420))

### Dependencies
- Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.3 (#7564)
Expand Down
5 changes: 2 additions & 3 deletions distribution/src/config/opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,9 @@ ${path.logs}
#
# ---------------------------------- Experimental Features -----------------------------------
#
# Gates the visibility of the index setting that allows changing of replication type.
# Once the feature is ready for production release, this feature flag can be removed.
# Gates the visibility of the experimental segment replication features until they are production ready.
#
#opensearch.experimental.feature.replication_type.enabled: false
#opensearch.experimental.feature.segment_replication_experimental.enabled: false
#
#
# Gates the visibility of the index setting that allows persisting data to remote store along with local disk.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@

package org.opensearch.indices.replication;

import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
Expand All @@ -24,8 +27,8 @@
import java.util.Collections;
import java.util.Arrays;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.indices.IndicesService.CLUSTER_SETTING_REPLICATION_TYPE;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationClusterSettingIT extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -77,71 +80,119 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(SegmentReplicationClusterSettingIT.TestPlugin.class, MockTransportService.TestPlugin.class);
}

public void testReplicationWithSegmentReplicationClusterSetting() throws Exception {

boolean isSystemIndex = randomBoolean();
String indexName = isSystemIndex ? SYSTEM_INDEX_NAME : INDEX_NAME;
public void testSystemIndexWithSegmentReplicationClusterSetting() throws Exception {

// Starting two nodes with primary and replica shards respectively.
final String primaryNode = internalCluster().startNode();
createIndex(indexName);
ensureYellowAndNoInitializingShards(indexName);
createIndex(SYSTEM_INDEX_NAME);
ensureYellowAndNoInitializingShards(SYSTEM_INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(indexName);

final int initialDocCount = scaledRandomIntBetween(20, 30);
for (int i = 0; i < initialDocCount; i++) {
client().prepareIndex(indexName).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}

refresh(indexName);
assertBusy(() -> {
assertHitCount(client(replicaNode).prepareSearch(indexName).setSize(0).setPreference("_only_local").get(), initialDocCount);
});

SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin()
ensureGreen(SYSTEM_INDEX_NAME);
final GetSettingsResponse response = client().admin()
.indices()
.prepareSegmentReplicationStats(indexName)
.execute()
.getSettings(new GetSettingsRequest().indices(SYSTEM_INDEX_NAME).includeDefaults(true))
.actionGet();
if (isSystemIndex) {
// Verify that Segment Replication did not happen on the replica shard.
assertNull(segmentReplicationStatsResponse.getReplicationStats().get(indexName));
} else {
// Verify that Segment Replication happened on the replica shard.
assertFalse(segmentReplicationStatsResponse.getReplicationStats().get(indexName).get(0).getReplicaStats().isEmpty());
}
assertEquals(response.getSetting(SYSTEM_INDEX_NAME, SETTING_REPLICATION_TYPE), ReplicationType.DOCUMENT.toString());

// Verify index setting isSegRepEnabled is false.
Index index = resolveIndex(SYSTEM_INDEX_NAME);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode);
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false);
}

public void testIndexReplicationSettingOverridesClusterSetting() throws Exception {
public void testIndexReplicationSettingOverridesSegRepClusterSetting() throws Exception {
Settings settings = Settings.builder().put(CLUSTER_SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();
final String ANOTHER_INDEX = "test-index";

// Starting two nodes with primary and replica shards respectively.
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startNode(settings);
prepareCreate(
INDEX_NAME,
Settings.builder()
// we want to override cluster replication setting by passing a index replication setting
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
).get();
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);
createIndex(ANOTHER_INDEX);
ensureYellowAndNoInitializingShards(INDEX_NAME, ANOTHER_INDEX);
final String replicaNode = internalCluster().startNode(settings);

// Randomly close and open index.
if (randomBoolean()) {
logger.info("--> Closing the index ");
client().admin().indices().prepareClose(INDEX_NAME).get();

final int initialDocCount = scaledRandomIntBetween(20, 30);
for (int i = 0; i < initialDocCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
logger.info("--> Opening the index");
client().admin().indices().prepareOpen(INDEX_NAME).get();
}
ensureGreen(INDEX_NAME, ANOTHER_INDEX);

refresh(INDEX_NAME);
assertBusy(() -> {
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
});
final GetSettingsResponse response = client().admin()
.indices()
.getSettings(new GetSettingsRequest().indices(INDEX_NAME, ANOTHER_INDEX).includeDefaults(true))
.actionGet();
assertEquals(response.getSetting(INDEX_NAME, SETTING_REPLICATION_TYPE), ReplicationType.DOCUMENT.toString());
assertEquals(response.getSetting(ANOTHER_INDEX, SETTING_REPLICATION_TYPE), ReplicationType.SEGMENT.toString());

// Verify index setting isSegRepEnabled.
Index index = resolveIndex(INDEX_NAME);
Index anotherIndex = resolveIndex(ANOTHER_INDEX);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode);
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false);
assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabled(), true);
}

SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin()
public void testIndexReplicationSettingOverridesDocRepClusterSetting() throws Exception {
Settings settings = Settings.builder().put(CLUSTER_SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build();
final String ANOTHER_INDEX = "test-index";
final String primaryNode = internalCluster().startNode(settings);
prepareCreate(
INDEX_NAME,
Settings.builder()
// we want to override cluster replication setting by passing a index replication setting
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
).get();
createIndex(ANOTHER_INDEX);
ensureYellowAndNoInitializingShards(INDEX_NAME, ANOTHER_INDEX);
final String replicaNode = internalCluster().startNode(settings);
ensureGreen(INDEX_NAME, ANOTHER_INDEX);

final GetSettingsResponse response = client().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.execute()
.getSettings(new GetSettingsRequest().indices(INDEX_NAME, ANOTHER_INDEX).includeDefaults(true))
.actionGet();
// Verify that Segment Replication did not happen on the replica shard.
assertNull(segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME));
assertEquals(response.getSetting(INDEX_NAME, SETTING_REPLICATION_TYPE), ReplicationType.SEGMENT.toString());
assertEquals(response.getSetting(ANOTHER_INDEX, SETTING_REPLICATION_TYPE), ReplicationType.DOCUMENT.toString());

// Verify index setting isSegRepEnabled.
Index index = resolveIndex(INDEX_NAME);
Index anotherIndex = resolveIndex(ANOTHER_INDEX);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode);
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), true);
assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabled(), false);
}

public void testHiddenIndicesWithReplicationStrategyClusterSetting() throws Exception {
final String primaryNode = internalCluster().startNode();
final String replicaNode = internalCluster().startNode();
prepareCreate(
INDEX_NAME,
Settings.builder()
// we want to set index as hidden
.put("index.hidden", true)
).get();
ensureGreen(INDEX_NAME);

// Verify that document replication strategy is used for hidden indices.
final GetSettingsResponse response = client().admin()
.indices()
.getSettings(new GetSettingsRequest().indices(INDEX_NAME).includeDefaults(true))
.actionGet();
assertEquals(response.getSetting(INDEX_NAME, SETTING_REPLICATION_TYPE), ReplicationType.DOCUMENT.toString());

// Verify index setting isSegRepEnabled.
Index index = resolveIndex(INDEX_NAME);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode);
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_STORE_ENABLED_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_SETTING_REPLICATION_TYPE;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
Expand Down Expand Up @@ -120,7 +121,15 @@ public void testRemoteStoreDisabledByUser() throws Exception {
.getIndex(new GetIndexRequest().indices("test-idx-1").includeDefaults(true))
.get();
Settings indexSettings = getIndexResponse.settings().get("test-idx-1");
verifyRemoteStoreIndexSettings(indexSettings, "false", null, null, null, null, null);
verifyRemoteStoreIndexSettings(
indexSettings,
"false",
null,
null,
null,
client().settings().get(CLUSTER_SETTING_REPLICATION_TYPE),
null
);
}

public void testRemoteStoreEnabledByUserWithoutRemoteRepoAndSegmentReplicationIllegalArgumentException() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.InternalTestCluster;
Expand All @@ -28,6 +31,8 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.indices.IndicesService.CLUSTER_SETTING_REPLICATION_TYPE;
import static org.opensearch.indices.replication.SegmentReplicationBaseIT.waitForSearchableDocs;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
Expand Down Expand Up @@ -261,4 +266,45 @@ public void testRestoreOnReplicaNode() throws Exception {
SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get();
assertHitCount(resp, DOC_COUNT);
}

public void testSnapshotRestoreOnIndexWithSegRepClusterSetting() throws Exception {
Settings settings = Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.put(CLUSTER_SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();

// Starting two nodes with primary and replica shards respectively.
final String primaryNode = internalCluster().startNode(settings);
prepareCreate(
INDEX_NAME,
Settings.builder()
// we want to override cluster replication setting by passing a index replication setting
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT)
).get();
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode(settings);
ensureGreen(INDEX_NAME);

createSnapshot();
// Delete index
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME));

RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null);

// Assertions
assertEquals(restoreSnapshotResponse.status(), RestStatus.ACCEPTED);
ensureGreen(RESTORED_INDEX_NAME);
GetSettingsResponse settingsResponse = client().admin()
.indices()
.getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME).includeDefaults(true))
.get();
assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, SETTING_REPLICATION_TYPE), ReplicationType.DOCUMENT.toString());

// Verify index setting isSegRepEnabled.
Index index = resolveIndex(RESTORED_INDEX_NAME);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode);
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false);
}
}
Loading

0 comments on commit 4470aa5

Please sign in to comment.