Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Introduce cluster default remote translog buffer interval setting (#9584) #9723

Merged
merged 1 commit into from
Sep 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Core crypto library to perform encryption and decryption of source content ([#8466](https://github.com/opensearch-project/OpenSearch/pull/8466))
- Expose DelimitedTermFrequencyTokenFilter to allow providing term frequencies along with terms ([#9479](https://github.com/opensearch-project/OpenSearch/pull/9479))
- APIs for performing async blob reads and async downloads from the repository using multiple streams ([#9592](https://github.com/opensearch-project/OpenSearch/issues/9592))
- Introduce cluster default remote translog buffer interval setting ([#9584](https://github.com/opensearch-project/OpenSearch/pull/9584))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,8 @@ public static final IndexShard newIndexShard(
(indexSettings, shardRouting) -> new InternalTranslogFactory(),
SegmentReplicationCheckpointPublisher.EMPTY,
null,
null
null,
() -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,21 @@
package org.opensearch.remotestore;

import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor;
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand All @@ -26,9 +36,11 @@
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.opensearch.index.shard.RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.comparesEqualTo;
Expand Down Expand Up @@ -184,4 +196,161 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
}

/**
* Tests that when the index setting is not passed during index creation, the buffer interval picked up is the cluster
* default.
*/
public void testDefaultBufferInterval() throws ExecutionException, InterruptedException {
setupRepo();
String clusterManagerName = internalCluster().getClusterManagerName();
String dataNode = internalCluster().startDataOnlyNodes(1).get(0);
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);
assertClusterRemoteBufferInterval(IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, dataNode);

IndexShard indexShard = getIndexShard(dataNode);
assertTrue(indexShard.getTranslogSyncProcessor() instanceof BufferedAsyncIOProcessor);
assertBufferInterval(IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, indexShard);

// Next, we change the default buffer interval and the same should reflect in the buffer interval of the index created
TimeValue clusterBufferInterval = TimeValue.timeValueSeconds(randomIntBetween(100, 200));
client(clusterManagerName).admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), clusterBufferInterval))
.get();
assertBufferInterval(clusterBufferInterval, indexShard);
clearClusterBufferIntervalSetting(clusterManagerName);
}

/**
* This tests multiple cases where the index setting is passed during the index creation with multiple combinations
* with and without cluster default.
*/
public void testOverriddenBufferInterval() throws ExecutionException, InterruptedException {
setupRepo();
String clusterManagerName = internalCluster().getClusterManagerName();
String dataNode = internalCluster().startDataOnlyNodes(1).get(0);

TimeValue bufferInterval = TimeValue.timeValueSeconds(randomIntBetween(0, 100));
Settings indexSettings = Settings.builder()
.put(indexSettings())
.put(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), bufferInterval)
.build();
createIndex(INDEX_NAME, indexSettings);
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

IndexShard indexShard = getIndexShard(dataNode);
assertTrue(indexShard.getTranslogSyncProcessor() instanceof BufferedAsyncIOProcessor);
assertBufferInterval(bufferInterval, indexShard);

// Set the cluster default with a different value, validate that the buffer interval is still the overridden value
TimeValue clusterBufferInterval = TimeValue.timeValueSeconds(randomIntBetween(100, 200));
client(clusterManagerName).admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), clusterBufferInterval))
.get();
assertBufferInterval(bufferInterval, indexShard);

// Set the index setting (index.remote_store.translog.buffer_interval) with a different value and validate that
// the buffer interval is updated
bufferInterval = TimeValue.timeValueSeconds(bufferInterval.seconds() + randomIntBetween(1, 100));
client(clusterManagerName).admin()
.indices()
.updateSettings(
new UpdateSettingsRequest(INDEX_NAME).settings(
Settings.builder().put(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), bufferInterval)
)
)
.get();
assertBufferInterval(bufferInterval, indexShard);

// Set the index setting (index.remote_store.translog.buffer_interval) with null and validate the buffer interval
// which will be the cluster default now.
client(clusterManagerName).admin()
.indices()
.updateSettings(
new UpdateSettingsRequest(INDEX_NAME).settings(
Settings.builder().putNull(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey())
)
)
.get();
assertBufferInterval(clusterBufferInterval, indexShard);
clearClusterBufferIntervalSetting(clusterManagerName);
}

/**
* This tests validation which kicks in during index creation failing creation if the value is less than minimum allowed value.
*/
public void testOverriddenBufferIntervalValidation() {
setupRepo();
TimeValue bufferInterval = TimeValue.timeValueSeconds(-1);
Settings indexSettings = Settings.builder()
.put(indexSettings())
.put(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), bufferInterval)
.build();
IllegalArgumentException exceptionDuringCreateIndex = assertThrows(
IllegalArgumentException.class,
() -> createIndex(INDEX_NAME, indexSettings)
);
assertEquals(
"failed to parse value [-1] for setting [index.remote_store.translog.buffer_interval], must be >= [0ms]",
exceptionDuringCreateIndex.getMessage()
);
}

/**
* This tests validation of the cluster setting when being set.
*/
public void testClusterBufferIntervalValidation() {
String clusterManagerName = internalCluster().startClusterManagerOnlyNode();
setupRepo(false);
IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> client(clusterManagerName).admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(-1))
)
.get()
);
assertEquals(
"failed to parse value [-1] for setting [cluster.remote_store.translog.buffer_interval], must be >= [0ms]",
exception.getMessage()
);
}

private IndexShard getIndexShard(String dataNode) throws ExecutionException, InterruptedException {
String clusterManagerName = internalCluster().getClusterManagerName();
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode);
GetIndexResponse getIndexResponse = client(clusterManagerName).admin().indices().getIndex(new GetIndexRequest()).get();
String uuid = getIndexResponse.getSettings().get(INDEX_NAME).get(IndexMetadata.SETTING_INDEX_UUID);
IndexService indexService = indicesService.indexService(new Index(INDEX_NAME, uuid));
return indexService.getShard(0);
}

private void assertClusterRemoteBufferInterval(TimeValue expectedBufferInterval, String dataNode) {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode);
assertEquals(expectedBufferInterval, indicesService.getClusterRemoteTranslogBufferInterval());
}

private void assertBufferInterval(TimeValue expectedBufferInterval, IndexShard indexShard) {
assertEquals(
expectedBufferInterval,
((BufferedAsyncIOProcessor<?>) indexShard.getTranslogSyncProcessor()).getBufferIntervalSupplier().get()
);
}

private void clearClusterBufferIntervalSetting(String clusterManagerName) {
client(clusterManagerName).admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().putNull(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey()))
.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,8 @@ public void apply(Settings value, Settings current, Settings previous) {
List.of(
IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING,
IndicesService.CLUSTER_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING
IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
),
List.of(FeatureFlags.CONCURRENT_SEGMENT_SEARCH),
List.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,8 @@

protected abstract String getBufferProcessThreadPoolName();

// Exclusively for testing, please do not use it elsewhere.
public Supplier<TimeValue> getBufferIntervalSupplier() {
return bufferIntervalSupplier;

Check warning on line 97 in server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java#L97

Added line #L97 was not covered by tests
}
}
6 changes: 4 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,8 @@ public IndexService newIndexService(
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -659,7 +660,8 @@ public IndexService newIndexService(
valuesSourceRegistry,
recoveryStateFactory,
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier
clusterDefaultRefreshIntervalSupplier,
clusterRemoteTranslogBufferIntervalSupplier
);
success = true;
return indexService;
Expand Down
8 changes: 6 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final ValuesSourceRegistry valuesSourceRegistry;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private final Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier;
private final Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -210,7 +211,8 @@ public IndexService(
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -284,6 +286,7 @@ public IndexService(
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
this.translogFactorySupplier = translogFactorySupplier;
this.clusterRemoteTranslogBufferIntervalSupplier = clusterRemoteTranslogBufferIntervalSupplier;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -512,7 +515,8 @@ public synchronized IndexShard createShard(
translogFactorySupplier,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore,
remoteStoreStatsTrackerFactory
remoteStoreStatsTrackerFactory,
clusterRemoteTranslogBufferIntervalSupplier
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,13 @@ public TimeValue getRemoteTranslogUploadBufferInterval() {
return remoteTranslogUploadBufferInterval;
}

/**
* Returns true iff the remote translog buffer interval setting exists or in other words is explicitly set.
*/
public boolean isRemoteTranslogBufferIntervalExplicit() {
return INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.exists(settings);
}

public void setRemoteTranslogUploadBufferInterval(TimeValue remoteTranslogUploadBufferInterval) {
this.remoteTranslogUploadBufferInterval = remoteTranslogUploadBufferInterval;
}
Expand Down
20 changes: 18 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,8 @@
final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final Store remoteStore,
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
final Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand All @@ -384,7 +385,7 @@
threadPool,
this::getEngine,
indexSettings.isRemoteTranslogStoreEnabled(),
indexSettings::getRemoteTranslogUploadBufferInterval
() -> getRemoteTranslogUploadBufferInterval(clusterRemoteTranslogBufferIntervalSupplier)
);
this.mapperService = mapperService;
this.indexCache = indexCache;
Expand Down Expand Up @@ -4130,6 +4131,8 @@
boolean bufferAsyncIoProcessor,
Supplier<TimeValue> bufferIntervalSupplier
) {
assert bufferAsyncIoProcessor == false || Objects.nonNull(bufferIntervalSupplier)
: "If bufferAsyncIoProcessor is true, then the bufferIntervalSupplier needs to be non null";
ThreadContext threadContext = threadPool.getThreadContext();
CheckedConsumer<List<Tuple<Translog.Location, Consumer<Exception>>>, IOException> writeConsumer = candidates -> {
try {
Expand Down Expand Up @@ -4922,4 +4925,17 @@
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
return getEngine().getSegmentInfosSnapshot();
}

private TimeValue getRemoteTranslogUploadBufferInterval(Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier) {
assert Objects.nonNull(clusterRemoteTranslogBufferIntervalSupplier) : "remote translog buffer interval supplier is null";
if (indexSettings().isRemoteTranslogBufferIntervalExplicit()) {
return indexSettings().getRemoteTranslogUploadBufferInterval();
}
return clusterRemoteTranslogBufferIntervalSupplier.get();
}

// Exclusively for testing, please do not use it elsewhere.
public AsyncIOProcessor<Translog.Location> getTranslogSyncProcessor() {
return translogSyncProcessor;

Check warning on line 4939 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L4939

Added line #L4939 was not covered by tests
}
}
Loading
Loading