Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add capability to restrict async durability mode for remote indexes #10189

Merged
merged 3 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Async blob read support for S3 plugin ([#9694](https://github.com/opensearch-project/OpenSearch/pull/9694))
- [Telemetry-Otel] Added support for OtlpGrpcSpanExporter exporter ([#9666](https://github.com/opensearch-project/OpenSearch/pull/9666))
- Async blob read support for encrypted containers ([#10131](https://github.com/opensearch-project/OpenSearch/pull/10131))
-
- Add capability to restrict async durability mode for remote indexes ([#10189](https://github.com/opensearch-project/OpenSearch/pull/10189))

### Dependencies
- Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.translog.Translog.Durability;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
Expand Down Expand Up @@ -323,6 +324,86 @@ public void testClusterBufferIntervalValidation() {
);
}

public void testRequestDurabilityWhenRestrictSettingExplicitFalse() throws ExecutionException, InterruptedException {
// Explicit node settings and request durability
testRestrictSettingFalse(true, Durability.REQUEST);
}

public void testAsyncDurabilityWhenRestrictSettingExplicitFalse() throws ExecutionException, InterruptedException {
// Explicit node settings and async durability
testRestrictSettingFalse(true, Durability.ASYNC);
}

public void testRequestDurabilityWhenRestrictSettingImplicitFalse() throws ExecutionException, InterruptedException {
// No node settings and request durability
testRestrictSettingFalse(false, Durability.REQUEST);
}

public void testAsyncDurabilityWhenRestrictSettingImplicitFalse() throws ExecutionException, InterruptedException {
// No node settings and async durability
testRestrictSettingFalse(false, Durability.ASYNC);
}

private void testRestrictSettingFalse(boolean setRestrictFalse, Durability durability) throws ExecutionException, InterruptedException {
String clusterManagerName;
if (setRestrictFalse) {
clusterManagerName = internalCluster().startClusterManagerOnlyNode(
Settings.builder().put(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING.getKey(), false).build()
);
} else {
clusterManagerName = internalCluster().startClusterManagerOnlyNode();
}
String dataNode = internalCluster().startDataOnlyNodes(1).get(0);
Settings indexSettings = Settings.builder()
.put(indexSettings())
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), durability)
.build();
createIndex(INDEX_NAME, indexSettings);
IndexShard indexShard = getIndexShard(dataNode);
assertEquals(durability, indexShard.indexSettings().getTranslogDurability());

durability = randomFrom(Durability.values());
client(clusterManagerName).admin()
.indices()
.updateSettings(
new UpdateSettingsRequest(INDEX_NAME).settings(
Settings.builder().put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), durability)
)
)
.get();
assertEquals(durability, indexShard.indexSettings().getTranslogDurability());
}

public void testAsyncDurabilityThrowsExceptionWhenRestrictSettingTrue() throws ExecutionException, InterruptedException {
String expectedExceptionMsg =
"index setting [index.translog.durability=async] is not allowed as cluster setting [cluster.remote_store.index.restrict.async-durability=true]";
String clusterManagerName = internalCluster().startClusterManagerOnlyNode(
Settings.builder().put(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING.getKey(), true).build()
);
String dataNode = internalCluster().startDataOnlyNodes(1).get(0);

// Case 1 - Test create index fails
Settings indexSettings = Settings.builder()
.put(indexSettings())
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Durability.ASYNC)
.build();
IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> createIndex(INDEX_NAME, indexSettings));
assertEquals(expectedExceptionMsg, exception.getMessage());

// Case 2 - Test update index fails
createIndex(INDEX_NAME);
IndexShard indexShard = getIndexShard(dataNode);
assertEquals(Durability.REQUEST, indexShard.indexSettings().getTranslogDurability());
exception = assertThrows(
IllegalArgumentException.class,
() -> client(clusterManagerName).admin()
.indices()
.updateSettings(new UpdateSettingsRequest(INDEX_NAME).settings(indexSettings))
.actionGet()
);
assertEquals(expectedExceptionMsg, exception.getMessage());
}

private IndexShard getIndexShard(String dataNode) throws ExecutionException, InterruptedException {
String clusterManagerName = internalCluster().getClusterManagerName();
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.opensearch.index.mapper.MapperService.MergeReason;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.shard.IndexSettingProvider;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.IndexCreationException;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.InvalidIndexNameException;
Expand Down Expand Up @@ -922,6 +923,7 @@ static Settings aggregateIndexSettings(
validateTranslogRetentionSettings(indexSettings);
validateStoreTypeSettings(indexSettings);
validateRefreshIntervalSettings(request.settings(), clusterSettings);
validateTranslogDurabilitySettings(request.settings(), clusterSettings, settings);

return indexSettings;
}
Expand Down Expand Up @@ -1491,7 +1493,7 @@ public static void validateTranslogRetentionSettings(Settings indexSettings) {
/**
* Validates {@code index.refresh_interval} is equal or below the {@code cluster.minimum.index.refresh_interval}.
*
* @param requestSettings settings passed in during index create request
* @param requestSettings settings passed in during index create/update request
* @param clusterSettings cluster setting
*/
static void validateRefreshIntervalSettings(Settings requestSettings, ClusterSettings clusterSettings) {
Expand All @@ -1510,4 +1512,27 @@ static void validateRefreshIntervalSettings(Settings requestSettings, ClusterSet
);
}
}

/**
* Validates {@code index.translog.durability} is not async if the {@code cluster.remote_store.index.restrict.async-durability} is set to true.
*
* @param requestSettings settings passed in during index create/update request
* @param clusterSettings cluster setting
*/
static void validateTranslogDurabilitySettings(Settings requestSettings, ClusterSettings clusterSettings, Settings settings) {
if (isRemoteStoreAttributePresent(settings) == false
|| IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.exists(requestSettings) == false
|| clusterSettings.get(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING) == false) {
return;
}
Translog.Durability durability = IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.get(requestSettings);
if (durability.equals(Translog.Durability.ASYNC)) {
throw new IllegalArgumentException(
"index setting [index.translog.durability=async] is not allowed as cluster setting ["
+ IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING.getKey()
+ "=true]"
);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
import java.util.Set;

import static org.opensearch.action.support.ContextPreservingActionListener.wrapPreservingContext;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateRefreshIntervalSettings;
import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateTranslogDurabilitySettings;
import static org.opensearch.common.settings.AbstractScopedSettings.ARCHIVED_SETTINGS_PREFIX;
import static org.opensearch.index.IndexSettings.same;

Expand Down Expand Up @@ -127,7 +129,8 @@ public void updateSettings(
.normalizePrefix(IndexMetadata.INDEX_SETTING_PREFIX)
.build();

MetadataCreateIndexService.validateRefreshIntervalSettings(normalizedSettings, clusterService.getClusterSettings());
validateRefreshIntervalSettings(normalizedSettings, clusterService.getClusterSettings());
validateTranslogDurabilitySettings(normalizedSettings, clusterService.getClusterSettings(), clusterService.getSettings());

Settings.Builder settingsForClosedIndices = Settings.builder();
Settings.Builder settingsForOpenIndices = Settings.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,8 @@ public void apply(Settings value, Settings current, Settings previous) {
// Remote cluster state settings
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING
)
)
);
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,18 @@ public class IndicesService extends AbstractLifecycleComponent
Property.Dynamic
);

/**
* This setting is used to restrict creation or updation of index where the `index.translog.durability` index setting
* is set as ASYNC if enabled. If disabled, any of the durability mode can be used and switched at any later time from
* one to another.
*/
public static final Setting<Boolean> CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING = Setting.boolSetting(
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
"cluster.remote_store.index.restrict.async-durability",
false,
Property.NodeScope,
Property.Final
);

/**
* The node's settings.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.InvalidAliasNameException;
import org.opensearch.indices.InvalidIndexNameException;
import org.opensearch.indices.ShardLimitValidator;
Expand Down Expand Up @@ -132,8 +134,10 @@
import static org.opensearch.index.IndexSettings.INDEX_REFRESH_INTERVAL_SETTING;
import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
import static org.opensearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
import static org.opensearch.index.IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_MINIMUM_INDEX_REFRESH_INTERVAL_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.indices.ShardLimitValidatorTests.createTestShardLimitService;
import static org.opensearch.node.Node.NODE_ATTRIBUTES;
Expand Down Expand Up @@ -1725,6 +1729,87 @@ public void testRefreshIntervalValidationFailureWithIndexSetting() {
);
}

public void testAnyTranslogDurabilityWhenRestrictSettingFalse() {
// This checks that aggregateIndexSettings works for the case when the cluster setting
// cluster.remote_store.index.restrict.async-durability is false or not set, it allows all types of durability modes
request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
final Settings.Builder requestSettings = Settings.builder();
Translog.Durability durability = randomFrom(Translog.Durability.values());
requestSettings.put(INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), durability);
request.settings(requestSettings.build());
if (randomBoolean()) {
Settings settings = Settings.builder().put(CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING.getKey(), false).build();
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
}
Settings indexSettings = aggregateIndexSettings(
ClusterState.EMPTY_STATE,
request,
Settings.EMPTY,
null,
Settings.EMPTY,
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
randomShardLimitService(),
Collections.emptySet(),
clusterSettings
);
assertFalse(clusterSettings.get(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING));
assertEquals(durability, INDEX_TRANSLOG_DURABILITY_SETTING.get(indexSettings));
}

public void testAsyncDurabilityThrowsExceptionWhenRestrictSettingTrue() {
// This checks that aggregateIndexSettings works for the case when the cluster setting
// cluster.remote_store.index.restrict.async-durability is false or not set, it allows all types of durability modes
request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
final Settings.Builder requestSettings = Settings.builder();
requestSettings.put(INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC);
request.settings(requestSettings.build());
Settings settings = Settings.builder().put(CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING.getKey(), true).build();
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> aggregateIndexSettings(
ClusterState.EMPTY_STATE,
request,
Settings.EMPTY,
null,
Settings.builder().put("node.attr.remote_store.setting", "test").build(),
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
randomShardLimitService(),
Collections.emptySet(),
clusterSettings
)
);
// verify that the message is as expected
assertEquals(
"index setting [index.translog.durability=async] is not allowed as cluster setting [cluster.remote_store.index.restrict.async-durability=true]",
exception.getMessage()
);
}

public void testRequestDurabilityWhenRestrictSettingTrue() {
// This checks that aggregateIndexSettings works for the case when the cluster setting
// cluster.remote_store.index.restrict.async-durability is false or not set, it allows all types of durability modes
request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
final Settings.Builder requestSettings = Settings.builder();
requestSettings.put(INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.REQUEST);
request.settings(requestSettings.build());
Settings settings = Settings.builder().put(CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING.getKey(), true).build();
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
Settings indexSettings = aggregateIndexSettings(
ClusterState.EMPTY_STATE,
request,
Settings.EMPTY,
null,
Settings.EMPTY,
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
randomShardLimitService(),
Collections.emptySet(),
clusterSettings
);
assertTrue(clusterSettings.get(IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING));
assertEquals(Translog.Durability.REQUEST, INDEX_TRANSLOG_DURABILITY_SETTING.get(indexSettings));
}

private IndexTemplateMetadata addMatchingTemplate(Consumer<IndexTemplateMetadata.Builder> configurator) {
IndexTemplateMetadata.Builder builder = templateMetadataBuilder("template1", "te*");
configurator.accept(builder);
Expand Down
Loading