From 794e497da4aa823560facf756ef63ab806235b95 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Fri, 7 Jul 2023 11:17:38 +0530 Subject: [PATCH] Add RTS stats tracker Signed-off-by: Bhumika Saini --- .../RemoteStoreBackpressureIT.java | 8 +- .../RemoteStoreRefreshListenerIT.java | 2 +- .../remotestore/RemoteStoreStatsIT.java | 8 +- .../remotestore/stats/RemoteStoreStats.java | 75 ++- .../TransportRemoteStoreStatsAction.java | 16 +- .../action/bulk/TransportShardBulkAction.java | 13 +- .../common/settings/ClusterSettings.java | 16 +- .../org/opensearch/index/IndexService.java | 6 +- .../remote/RemoteRefreshSegmentTracker.java | 24 + ...e.java => RemoteStorePressureService.java} | 91 +++- ....java => RemoteStorePressureSettings.java} | 6 +- .../index/remote/RemoteStoreUtils.java | 32 ++ .../index/remote/RemoteTranslogTracker.java | 462 ++++++++++++++++++ .../opensearch/index/shard/IndexShard.java | 11 +- ...emoteBlobStoreInternalTranslogFactory.java | 10 +- .../index/translog/RemoteFsTranslog.java | 103 +++- .../transfer/TranslogTransferManager.java | 9 +- .../listener/TranslogTransferListener.java | 5 + .../org/opensearch/indices/IndicesModule.java | 4 +- .../opensearch/indices/IndicesService.java | 15 +- .../cluster/IndicesClusterStateService.java | 18 +- .../main/java/org/opensearch/node/Node.java | 8 + .../stats/RemoteStoreStatsResponseTests.java | 11 +- .../stats/RemoteStoreStatsTestHelper.java | 166 +++++-- .../stats/RemoteStoreStatsTests.java | 38 +- .../TransportRemoteStoreStatsActionTests.java | 6 +- .../bulk/TransportShardBulkActionTests.java | 10 +- .../opensearch/index/IndexModuleTests.java | 4 +- .../RemoteRefreshSegmentTrackerTests.java | 47 +- ...a => RemoteStorePressureServiceTests.java} | 16 +- ... => RemoteStorePressureSettingsTests.java} | 86 ++-- .../remote/RemoteTranslogTrackerTests.java | 304 ++++++++++++ .../RemoteStoreRefreshListenerTests.java | 46 +- .../index/translog/RemoteFSTranslogTests.java | 10 +- .../TranslogTransferManagerTests.java | 3 + ...actIndicesClusterStateServiceTestCase.java | 4 +- .../snapshots/SnapshotResiliencyTests.java | 6 +- .../index/shard/IndexShardTestCase.java | 18 +- 38 files changed, 1441 insertions(+), 276 deletions(-) rename server/src/main/java/org/opensearch/index/remote/{RemoteRefreshSegmentPressureService.java => RemoteStorePressureService.java} (71%) rename server/src/main/java/org/opensearch/index/remote/{RemoteRefreshSegmentPressureSettings.java => RemoteStorePressureSettings.java} (98%) create mode 100644 server/src/main/java/org/opensearch/index/remote/RemoteTranslogTracker.java rename server/src/test/java/org/opensearch/index/remote/{RemoteRefreshSegmentPressureServiceTests.java => RemoteStorePressureServiceTests.java} (89%) rename server/src/test/java/org/opensearch/index/remote/{RemoteRefreshSegmentPressureSettingsTests.java => RemoteStorePressureSettingsTests.java} (65%) create mode 100644 server/src/test/java/org/opensearch/index/remote/RemoteTranslogTrackerTests.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java index 3fe7f3d553a1b..9b689c2f7526a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java @@ -29,8 +29,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT; -import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED; +import static org.opensearch.index.remote.RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT; +import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteStoreBackpressureIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { @@ -120,10 +120,10 @@ private RemoteRefreshSegmentTracker.Stats stats() { RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, shardId).get(); final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId); List matches = Arrays.stream(response.getShards()) - .filter(stat -> indexShardId.equals(stat.getStats().shardId.toString())) + .filter(stat -> indexShardId.equals(stat.getSegmentStats().shardId.toString())) .collect(Collectors.toList()); assertEquals(1, matches.size()); - return matches.get(0).getStats(); + return matches.get(0).getSegmentStats(); } private void indexDocAndRefresh(BytesReference source, int iterations) { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java index 4005e6359a2f7..b97e93f323fb2 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java @@ -22,7 +22,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; -import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED; +import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java index 76ef153fab963..b1ae583211718 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java @@ -53,10 +53,10 @@ public void testStatsResponseFromAllNodes() { assertTrue(response.getShards() != null && response.getShards().length != 0); final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId); List matches = Arrays.stream(response.getShards()) - .filter(stat -> indexShardId.equals(stat.getStats().shardId.toString())) + .filter(stat -> indexShardId.equals(stat.getSegmentStats().shardId.toString())) .collect(Collectors.toList()); assertEquals(1, matches.size()); - RemoteRefreshSegmentTracker.Stats stats = matches.get(0).getStats(); + RemoteRefreshSegmentTracker.Stats stats = matches.get(0).getSegmentStats(); assertResponseStats(stats); } } @@ -81,7 +81,7 @@ public void testStatsResponseAllShards() { RemoteStoreStatsResponse response = remoteStoreStatsRequestBuilder.get(); assertTrue(response.getSuccessfulShards() == 3); assertTrue(response.getShards() != null && response.getShards().length == 3); - RemoteRefreshSegmentTracker.Stats stats = response.getShards()[0].getStats(); + RemoteRefreshSegmentTracker.Stats stats = response.getShards()[0].getSegmentStats(); assertResponseStats(stats); } @@ -107,7 +107,7 @@ public void testStatsResponseFromLocalNode() { RemoteStoreStatsResponse response = remoteStoreStatsRequestBuilder.get(); assertTrue(response.getSuccessfulShards() == 1); assertTrue(response.getShards() != null && response.getShards().length == 1); - RemoteRefreshSegmentTracker.Stats stats = response.getShards()[0].getStats(); + RemoteRefreshSegmentTracker.Stats stats = response.getShards()[0].getSegmentStats(); assertResponseStats(stats); } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java index 5ac9c1cf5f74c..fff55630877c1 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java @@ -14,6 +14,7 @@ import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.remote.RemoteRefreshSegmentTracker; +import org.opensearch.index.remote.RemoteTranslogTracker; import java.io.IOException; @@ -25,19 +26,29 @@ public class RemoteStoreStats implements Writeable, ToXContentFragment { private final RemoteRefreshSegmentTracker.Stats remoteSegmentUploadShardStats; + private final RemoteTranslogTracker.Stats remoteTranslogShardStats; - public RemoteStoreStats(RemoteRefreshSegmentTracker.Stats remoteSegmentUploadShardStats) { + public RemoteStoreStats( + RemoteRefreshSegmentTracker.Stats remoteSegmentUploadShardStats, + RemoteTranslogTracker.Stats remoteTranslogShardStats + ) { this.remoteSegmentUploadShardStats = remoteSegmentUploadShardStats; + this.remoteTranslogShardStats = remoteTranslogShardStats; } public RemoteStoreStats(StreamInput in) throws IOException { remoteSegmentUploadShardStats = in.readOptionalWriteable(RemoteRefreshSegmentTracker.Stats::new); + remoteTranslogShardStats = in.readOptionalWriteable(RemoteTranslogTracker.Stats::new); } - public RemoteRefreshSegmentTracker.Stats getStats() { + public RemoteRefreshSegmentTracker.Stats getSegmentStats() { return remoteSegmentUploadShardStats; } + public RemoteTranslogTracker.Stats getTranslogStats() { + return remoteTranslogShardStats; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject() @@ -74,6 +85,40 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(Fields.REMOTE_REFRESH_LATENCY_IN_MILLIS); builder.field(SubFields.MOVING_AVG, remoteSegmentUploadShardStats.uploadTimeMovingAverage); builder.endObject(); + + builder.startObject("translog"); + builder.startObject("upload"); + builder.field(Fields.LAST_UPLOAD_TIMESTAMP, remoteTranslogShardStats.lastUploadTimestamp); + + builder.startObject(Fields.TOTAL_UPLOADS); + builder.field(SubFields.STARTED, remoteTranslogShardStats.totalUploadsStarted) + .field(SubFields.FAILED, remoteTranslogShardStats.totalUploadsFailed) + .field(SubFields.SUCCEEDED, remoteTranslogShardStats.totalUploadsSucceeded); + builder.endObject(); + + builder.startObject(Fields.TOTAL_UPLOADS_IN_BYTES); + builder.field(SubFields.STARTED, remoteTranslogShardStats.uploadBytesStarted) + .field(SubFields.FAILED, remoteTranslogShardStats.uploadBytesFailed) + .field(SubFields.SUCCEEDED, remoteTranslogShardStats.uploadBytesSucceeded); + builder.endObject(); + + builder.field(Fields.TOTAL_UPLOAD_TIME_IN_MILLIS, remoteTranslogShardStats.totalUploadTimeInMillis); + + builder.startObject(Fields.UPLOAD_BYTES); + builder.field(SubFields.MOVING_AVG, remoteTranslogShardStats.uploadBytesMovingAverage); + builder.endObject(); + + builder.startObject(Fields.UPLOAD_LATENCY_IN_BYTES_PER_SEC); + builder.field(SubFields.MOVING_AVG, remoteTranslogShardStats.uploadBytesPerSecMovingAverage); + builder.endObject(); + + builder.startObject(Fields.UPLOAD_TIME_IN_MILLIS); + builder.field(SubFields.MOVING_AVG, remoteTranslogShardStats.uploadTimeMovingAverage); + builder.endObject(); + + builder.endObject(); // translog.upload + builder.endObject(); // translog + builder.endObject(); return builder; @@ -82,6 +127,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(remoteSegmentUploadShardStats); + out.writeOptionalWriteable(remoteTranslogShardStats); } /** @@ -149,6 +195,31 @@ static final class Fields { * Time taken by a single remote refresh */ static final String REMOTE_REFRESH_LATENCY_IN_MILLIS = "remote_refresh_latency_in_millis"; + + /** + * + */ + static final String LAST_UPLOAD_TIMESTAMP = "last_upload_timestamp"; + + /** + * + */ + static final String TOTAL_UPLOADS = "total_uploads"; + + /** + * + */ + static final String TOTAL_UPLOAD_TIME_IN_MILLIS = "total_upload_time_in_millis"; + + /** + * + */ + static final String UPLOAD_BYTES = "upload_bytes"; + + /** + * + */ + static final String UPLOAD_TIME_IN_MILLIS = "upload_time_in_millis"; } /** diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java index 434abd1207f50..669529d39fddd 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java @@ -23,8 +23,9 @@ import org.opensearch.common.inject.Inject; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.index.IndexService; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.remote.RemoteRefreshSegmentTracker; +import org.opensearch.index.remote.RemoteTranslogTracker; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardNotFoundException; import org.opensearch.indices.IndicesService; @@ -49,7 +50,7 @@ public class TransportRemoteStoreStatsAction extends TransportBroadcastByNodeAct RemoteStoreStats> { private final IndicesService indicesService; - private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; + private final RemoteStorePressureService remoteStorePressureService; @Inject public TransportRemoteStoreStatsAction( @@ -58,7 +59,7 @@ public TransportRemoteStoreStatsAction( IndicesService indicesService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + RemoteStorePressureService remoteStorePressureService ) { super( RemoteStoreStatsAction.NAME, @@ -70,7 +71,7 @@ public TransportRemoteStoreStatsAction( ThreadPool.Names.MANAGEMENT ); this.indicesService = indicesService; - this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService; + this.remoteStorePressureService = remoteStorePressureService; } /** @@ -153,11 +154,14 @@ protected RemoteStoreStats shardOperation(RemoteStoreStatsRequest request, Shard throw new ShardNotFoundException(indexShard.shardId()); } - RemoteRefreshSegmentTracker remoteRefreshSegmentTracker = remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker( + RemoteRefreshSegmentTracker remoteRefreshSegmentTracker = remoteStorePressureService.getRemoteRefreshSegmentTracker( indexShard.shardId() ); assert Objects.nonNull(remoteRefreshSegmentTracker); - return new RemoteStoreStats(remoteRefreshSegmentTracker.stats()); + RemoteTranslogTracker remoteTranslogTracker = remoteStorePressureService.getRemoteTranslogTracker(indexShard.shardId()); + assert Objects.nonNull(remoteTranslogTracker); + + return new RemoteStoreStats(remoteRefreshSegmentTracker.stats(), remoteTranslogTracker.stats()); } } diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index 140c1320daa91..e908b77105ae5 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -89,7 +89,7 @@ import org.opensearch.index.mapper.MapperException; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.SourceToParse; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.core.index.shard.ShardId; @@ -137,7 +137,7 @@ public class TransportShardBulkAction extends TransportWriteAction globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final SegmentReplicationCheckpointPublisher checkpointPublisher, - final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + final RemoteStorePressureService remoteStorePressureService ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* @@ -509,7 +509,7 @@ public synchronized IndexShard createShard( translogFactorySupplier, this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null, remoteStore, - remoteRefreshSegmentPressureService + remoteStorePressureService ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java index 332b0d1698800..4ba55a0fd26b4 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java @@ -613,6 +613,30 @@ public void writeTo(StreamOutput out) throws IOException { out.writeDouble(uploadTimeMovingAverage); out.writeLong(bytesLag); } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + Stats other = (Stats) obj; + + return this.shardId.toString().equals(other.shardId.toString()) + && this.refreshTimeLagMs == other.refreshTimeLagMs + && this.localRefreshNumber == other.localRefreshNumber + && this.remoteRefreshNumber == other.remoteRefreshNumber + && this.uploadBytesStarted == other.uploadBytesStarted + && this.uploadBytesSucceeded == other.uploadBytesSucceeded + && this.uploadBytesFailed == other.uploadBytesFailed + && this.totalUploadsStarted == other.totalUploadsStarted + && this.totalUploadsFailed == other.totalUploadsFailed + && this.totalUploadsSucceeded == other.totalUploadsSucceeded + && this.rejectionCount == other.rejectionCount + && this.consecutiveFailuresCount == other.consecutiveFailuresCount + && Double.compare(this.uploadBytesMovingAverage, other.uploadBytesMovingAverage) == 0 + && Double.compare(this.uploadBytesPerSecMovingAverage, other.uploadBytesPerSecMovingAverage) == 0 + && Double.compare(this.uploadTimeMovingAverage, other.uploadTimeMovingAverage) == 0 + && this.bytesLag == other.bytesLag; + } } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java similarity index 71% rename from server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java rename to server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java index 3f1161f0c5e03..ebfd27c25fe87 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePressureService.java @@ -30,25 +30,30 @@ * * @opensearch.internal */ -public class RemoteRefreshSegmentPressureService implements IndexEventListener { +public class RemoteStorePressureService implements IndexEventListener { - private static final Logger logger = LogManager.getLogger(RemoteRefreshSegmentPressureService.class); + private static final Logger logger = LogManager.getLogger(RemoteStorePressureService.class); /** * Keeps map of remote-backed index shards and their corresponding backpressure tracker. */ - private final Map trackerMap = ConcurrentCollections.newConcurrentMap(); + private final Map trackerMapRemoteSegmentStore = ConcurrentCollections.newConcurrentMap(); + + /** + * Keeps map of remote-backed index shards and their corresponding backpressure tracker. + */ + private final Map trackerMapRemoteTranslogStore = ConcurrentCollections.newConcurrentMap(); /** * Remote refresh segment pressure settings which is used for creation of the backpressure tracker and as well as rejection. */ - private final RemoteRefreshSegmentPressureSettings pressureSettings; + private final RemoteStorePressureSettings pressureSettings; private final List lagValidators; @Inject - public RemoteRefreshSegmentPressureService(ClusterService clusterService, Settings settings) { - pressureSettings = new RemoteRefreshSegmentPressureSettings(clusterService, settings, this); + public RemoteStorePressureService(ClusterService clusterService, Settings settings) { + pressureSettings = new RemoteStorePressureSettings(clusterService, settings, this); lagValidators = Arrays.asList( new ConsecutiveFailureValidator(pressureSettings), new BytesLagValidator(pressureSettings), @@ -63,7 +68,17 @@ public RemoteRefreshSegmentPressureService(ClusterService clusterService, Settin * @return the tracker if index is remote-backed, else null. */ public RemoteRefreshSegmentTracker getRemoteRefreshSegmentTracker(ShardId shardId) { - return trackerMap.get(shardId); + return trackerMapRemoteSegmentStore.get(shardId); + } + + /** + * Get {@link RemoteTranslogTracker} only if the underlying Index has remote translog store enabled. + * + * @param shardId shard id + * @return the tracker if index is remote translog store-backed, else null. + */ + public RemoteTranslogTracker getRemoteTranslogTracker(ShardId shardId) { + return trackerMapRemoteTranslogStore.get(shardId); } @Override @@ -72,7 +87,7 @@ public void afterIndexShardCreated(IndexShard indexShard) { return; } ShardId shardId = indexShard.shardId(); - trackerMap.put( + trackerMapRemoteSegmentStore.put( shardId, new RemoteRefreshSegmentTracker( shardId, @@ -81,14 +96,31 @@ public void afterIndexShardCreated(IndexShard indexShard) { pressureSettings.getUploadTimeMovingAverageWindowSize() ) ); - logger.trace("Created tracker for shardId={}", shardId); + logger.trace("Created RemoteRefreshSegmentTracker for shardId={}", shardId); + if (indexShard.indexSettings().isRemoteTranslogStoreEnabled()) { + trackerMapRemoteTranslogStore.put( + shardId, + new RemoteTranslogTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ) + ); + logger.trace("Created RemoteTranslogTracker for shardId={}", shardId); + } } @Override public void afterIndexShardClosed(ShardId shardId, IndexShard indexShard, Settings indexSettings) { - RemoteRefreshSegmentTracker remoteRefreshSegmentTracker = trackerMap.remove(shardId); + RemoteRefreshSegmentTracker remoteRefreshSegmentTracker = trackerMapRemoteSegmentStore.remove(shardId); if (remoteRefreshSegmentTracker != null) { - logger.trace("Deleted tracker for shardId={}", shardId); + logger.trace("Deleted RemoteRefreshSegmentTracker for shardId={}", shardId); + } + + RemoteTranslogTracker remoteTranslogTracker = trackerMapRemoteTranslogStore.remove(shardId); + if (remoteTranslogTracker != null) { + logger.trace("Deleted RemoteTranslogTracker for shardId={}", shardId); } } @@ -101,6 +133,16 @@ public boolean isSegmentsUploadBackpressureEnabled() { return pressureSettings.isRemoteRefreshSegmentPressureEnabled(); } + /** + * Check if remote translog backpressure is enabled. + * + * @return true if enabled, else false. + */ + public boolean isTranslogUploadBackpressureEnabled() { + // Note: This is not yet implemented. + return false; + } + /** * Validates if segments are lagging more than the limits. If yes, it would lead to rejections of the requests. * @@ -123,19 +165,26 @@ public void validateSegmentsUploadLag(ShardId shardId) { } void updateUploadBytesMovingAverageWindowSize(int updatedSize) { - updateMovingAverageWindowSize(RemoteRefreshSegmentTracker::updateUploadBytesMovingAverageWindowSize, updatedSize); + updateMovingAverageWindowSizeSegmentStats(RemoteRefreshSegmentTracker::updateUploadBytesMovingAverageWindowSize, updatedSize); + updateMovingAverageWindowSizeTranslogStats(RemoteTranslogTracker::updateUploadBytesMovingAverageWindowSize, updatedSize); } void updateUploadBytesPerSecMovingAverageWindowSize(int updatedSize) { - updateMovingAverageWindowSize(RemoteRefreshSegmentTracker::updateUploadBytesPerSecMovingAverageWindowSize, updatedSize); + updateMovingAverageWindowSizeSegmentStats(RemoteRefreshSegmentTracker::updateUploadBytesPerSecMovingAverageWindowSize, updatedSize); + updateMovingAverageWindowSizeTranslogStats(RemoteTranslogTracker::updateUploadBytesPerSecMovingAverageWindowSize, updatedSize); } void updateUploadTimeMsMovingAverageWindowSize(int updatedSize) { - updateMovingAverageWindowSize(RemoteRefreshSegmentTracker::updateUploadTimeMsMovingAverageWindowSize, updatedSize); + updateMovingAverageWindowSizeSegmentStats(RemoteRefreshSegmentTracker::updateUploadTimeMsMovingAverageWindowSize, updatedSize); + updateMovingAverageWindowSizeTranslogStats(RemoteTranslogTracker::updateUploadTimeMsMovingAverageWindowSize, updatedSize); + } + + void updateMovingAverageWindowSizeSegmentStats(BiConsumer biConsumer, int updatedSize) { + trackerMapRemoteSegmentStore.values().forEach(tracker -> biConsumer.accept(tracker, updatedSize)); } - void updateMovingAverageWindowSize(BiConsumer biConsumer, int updatedSize) { - trackerMap.values().forEach(tracker -> biConsumer.accept(tracker, updatedSize)); + void updateMovingAverageWindowSizeTranslogStats(BiConsumer biConsumer, int updatedSize) { + trackerMapRemoteTranslogStore.values().forEach(tracker -> biConsumer.accept(tracker, updatedSize)); } /** @@ -145,9 +194,9 @@ void updateMovingAverageWindowSize(BiConsumer getUploadBlobsFromSnapshot( + TransferSnapshot transferSnapshot, + FileTransferTracker fileTransferTracker + ) { + Set toUpload = new HashSet<>(transferSnapshot.getTranslogTransferMetadata().getCount()); + toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots())); + toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots()))); + + return toUpload; + } + + public static long getCurrentSystemNanoTime() { + return System.nanoTime(); + } + + public static long getTotalBytes(Set files) { + return files.stream().map(blob -> { + try { + return blob.getContentLength(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }).reduce(0L, Long::sum); + } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteTranslogTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteTranslogTracker.java new file mode 100644 index 0000000000000..1cc8c0af69370 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/remote/RemoteTranslogTracker.java @@ -0,0 +1,462 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.common.util.MovingAverage; +import org.opensearch.core.index.shard.ShardId; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Stores Remote Translog Store-related stats for a given IndexShard. + */ +public class RemoteTranslogTracker { + public final ShardId shardId; + + /** + * Epoch timestamp of the last successful Remote Translog Store upload. + */ + private final AtomicLong lastUploadTimestamp; + + /** + * Total number of Remote Translog Store uploads that have been started. + */ + private final AtomicLong totalUploadsStarted; + + /** + * Total number of Remote Translog Store uploads that have failed. + */ + private final AtomicLong totalUploadsFailed; + + /** + * Total number of Remote Translog Store that have been successful. + */ + private final AtomicLong totalUploadsSucceeded; + + /** + * Total number of byte uploads to Remote Translog Store that have been started. + */ + private final AtomicLong uploadBytesStarted; + + /** + * Total number of byte uploads to Remote Translog Store that have failed. + */ + private final AtomicLong uploadBytesFailed; + + /** + * Total number of byte uploads to Remote Translog Store that have been successful. + */ + private final AtomicLong uploadBytesSucceeded; + + /** + * Total time spent on Remote Translog Store uploads. + */ + private final AtomicLong totalUploadTimeInMillis; + + /** + * Provides moving average over the last N total size in bytes of translog files uploaded as part of Remote Translog Store upload. + * N is window size. Wrapped with {@code AtomicReference} for dynamic changes in window size. + */ + private final AtomicReference uploadBytesMovingAverageReference; + + /** + * This lock object is used for making sure we do not miss any data. + */ + private final Object uploadBytesMutex; + + /** + * Provides moving average over the last N upload speed (in bytes/s) of translog files uploaded as part of Remote Translog Store upload. + * N is window size. Wrapped with {@code AtomicReference} for dynamic changes in window size. + */ + private final AtomicReference uploadBytesPerSecMovingAverageReference; + + /** + * This lock object is used for making sure we do not miss any data. + */ + private final Object uploadBytesPerSecMutex; + + /** + * Provides moving average over the last N overall upload time (in nanos) as part of Remote Translog Store upload. N is window size. + * Wrapped with {@code AtomicReference} for dynamic changes in window size. + */ + private final AtomicReference uploadTimeMsMovingAverageReference; + + /** + * This lock object is used for making sure we do not miss any data. + */ + private final Object uploadTimeMsMutex; + + public RemoteTranslogTracker( + ShardId shardId, + int uploadBytesMovingAverageWindowSize, + int uploadBytesPerSecMovingAverageWindowSize, + int uploadTimeMsMovingAverageWindowSize + ) { + this.shardId = shardId; + this.lastUploadTimestamp = new AtomicLong(System.currentTimeMillis()); + this.totalUploadsStarted = new AtomicLong(0); + this.totalUploadsFailed = new AtomicLong(0); + this.totalUploadsSucceeded = new AtomicLong(0); + this.uploadBytesStarted = new AtomicLong(0); + this.uploadBytesFailed = new AtomicLong(0); + this.uploadBytesSucceeded = new AtomicLong(0); + this.totalUploadTimeInMillis = new AtomicLong(0); + uploadBytesMutex = new Object(); + uploadBytesMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesMovingAverageWindowSize)); + uploadBytesPerSecMutex = new Object(); + uploadBytesPerSecMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesPerSecMovingAverageWindowSize)); + uploadTimeMsMutex = new Object(); + uploadTimeMsMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadTimeMsMovingAverageWindowSize)); + } + + public long getTotalUploadsStarted() { + return totalUploadsStarted.get(); + } + + public long getTotalUploadsFailed() { + return totalUploadsFailed.get(); + } + + public long getTotalUploadsSucceeded() { + return totalUploadsSucceeded.get(); + } + + public long getUploadBytesStarted() { + return uploadBytesStarted.get(); + } + + public long getUploadBytesFailed() { + return uploadBytesFailed.get(); + } + + public long getUploadBytesSucceeded() { + return uploadBytesSucceeded.get(); + } + + public long getTotalUploadTimeInMillis() { + return totalUploadTimeInMillis.get(); + } + + ShardId getShardId() { + return shardId; + } + + public void incrementUploadsStarted() { + totalUploadsStarted.incrementAndGet(); + } + + public void incrementUploadsFailed() { + checkTotal(totalUploadsStarted.get(), totalUploadsFailed.get(), totalUploadsSucceeded.get(), 1); + totalUploadsFailed.incrementAndGet(); + } + + public void incrementUploadsSucceeded() { + checkTotal(totalUploadsStarted.get(), totalUploadsFailed.get(), totalUploadsSucceeded.get(), 1); + totalUploadsSucceeded.incrementAndGet(); + } + + public void addUploadBytesStarted(long count) { + uploadBytesStarted.addAndGet(count); + } + + public void addUploadBytesFailed(long count) { + checkTotal(uploadBytesStarted.get(), uploadBytesFailed.get(), uploadBytesSucceeded.get(), count); + uploadBytesFailed.addAndGet(count); + } + + public void addUploadBytesSucceeded(long count) { + checkTotal(uploadBytesStarted.get(), uploadBytesFailed.get(), uploadBytesSucceeded.get(), count); + uploadBytesSucceeded.addAndGet(count); + } + + public void addUploadTimeInMillis(long duration) { + totalUploadTimeInMillis.addAndGet(duration); + } + + public long getLastUploadTimestamp() { + return lastUploadTimestamp.get(); + } + + public void setLastUploadTimestamp(long lastUploadTimestamp) { + this.lastUploadTimestamp.set(lastUploadTimestamp); + } + + boolean isUploadBytesMovingAverageReady() { + return uploadBytesMovingAverageReference.get().isReady(); + } + + public double getUploadBytesMovingAverage() { + return uploadBytesMovingAverageReference.get().getAverage(); + } + + public void updateUploadBytesMovingAverage(long count) { + synchronized (uploadBytesMutex) { + this.uploadBytesMovingAverageReference.get().record(count); + } + } + + boolean isUploadBytesPerSecMovingAverageReady() { + return uploadBytesPerSecMovingAverageReference.get().isReady(); + } + + public double getUploadBytesPerSecMovingAverage() { + return uploadBytesPerSecMovingAverageReference.get().getAverage(); + } + + public void updateUploadBytesPerSecMovingAverage(long speed) { + synchronized (uploadBytesPerSecMutex) { + this.uploadBytesPerSecMovingAverageReference.get().record(speed); + } + } + + boolean isUploadTimeMovingAverageReady() { + return uploadTimeMsMovingAverageReference.get().isReady(); + } + + public double getUploadTimeMovingAverage() { + return uploadTimeMsMovingAverageReference.get().getAverage(); + } + + public void updateUploadTimeMovingAverage(long duration) { + synchronized (uploadTimeMsMutex) { + this.uploadTimeMsMovingAverageReference.get().record(duration); + } + } + + /** + * Updates the window size for data collection of upload bytes. This also resets any data collected so far. + * + * @param updatedSize the updated size + */ + void updateUploadBytesMovingAverageWindowSize(int updatedSize) { + synchronized (uploadBytesMutex) { + this.uploadBytesMovingAverageReference.set(this.uploadBytesMovingAverageReference.get().copyWithSize(updatedSize)); + } + } + + /** + * Updates the window size for data collection of upload bytes per second. This also resets any data collected so far. + * + * @param updatedSize the updated size + */ + void updateUploadBytesPerSecMovingAverageWindowSize(int updatedSize) { + synchronized (uploadBytesPerSecMutex) { + this.uploadBytesPerSecMovingAverageReference.set(this.uploadBytesPerSecMovingAverageReference.get().copyWithSize(updatedSize)); + } + } + + /** + * Updates the window size for data collection of upload time (ms). This also resets any data collected so far. + * + * @param updatedSize the updated size + */ + void updateUploadTimeMsMovingAverageWindowSize(int updatedSize) { + synchronized (uploadTimeMsMutex) { + this.uploadTimeMsMovingAverageReference.set(this.uploadTimeMsMovingAverageReference.get().copyWithSize(updatedSize)); + } + } + + public RemoteTranslogTracker.Stats stats() { + return new RemoteTranslogTracker.Stats( + shardId, + lastUploadTimestamp.get(), + totalUploadsStarted.get(), + totalUploadsSucceeded.get(), + totalUploadsFailed.get(), + uploadBytesStarted.get(), + uploadBytesSucceeded.get(), + uploadBytesFailed.get(), + totalUploadTimeInMillis.get(), + uploadBytesMovingAverageReference.get().getAverage(), + uploadBytesPerSecMovingAverageReference.get().getAverage(), + uploadTimeMsMovingAverageReference.get().getAverage() + ); + } + + private void checkTotal(long startedCount, long failedCount, long succeededCount, long countToAdd) { + long delta = startedCount - (failedCount + succeededCount + countToAdd); + assert delta >= 0 : "Sum of failure count (" + + failedCount + + "), success count (" + + succeededCount + + "), and count to add (" + + countToAdd + + ") cannot exceed started count (" + + startedCount + + ")"; + } + + /** + * Represents the tracker's state as seen in the stats API. + * + * @opensearch.internal + */ + public static class Stats implements Writeable { + + public final ShardId shardId; + + /** + * Epoch timestamp of the last successful Remote Translog Store upload. + */ + public final long lastUploadTimestamp; + + /** + * Total number of Remote Translog Store uploads that have been started. + */ + public final long totalUploadsStarted; + + /** + * Total number of Remote Translog Store uploads that have failed. + */ + public final long totalUploadsFailed; + + /** + * Total number of Remote Translog Store that have been successful. + */ + public final long totalUploadsSucceeded; + + /** + * Total number of byte uploads to Remote Translog Store that have been started. + */ + public final long uploadBytesStarted; + + /** + * Total number of byte uploads to Remote Translog Store that have failed. + */ + public final long uploadBytesFailed; + + /** + * Total number of byte uploads to Remote Translog Store that have been successful. + */ + public final long uploadBytesSucceeded; + + /** + * Total time spent on Remote Translog Store uploads. + */ + public final long totalUploadTimeInMillis; + + /** + * Size of a Remote Translog Store upload in bytes. + */ + public final double uploadBytesMovingAverage; + + /** + * Speed of a Remote Translog Store upload in bytes-per-second. + */ + public final double uploadBytesPerSecMovingAverage; + + /** + * Time taken by a Remote Translog Store upload. + */ + public final double uploadTimeMovingAverage; + + public Stats( + ShardId shardId, + long lastUploadTimestamp, + long totalUploadsStarted, + long totalUploadsSucceeded, + long totalUploadsFailed, + long uploadBytesStarted, + long uploadBytesSucceeded, + long uploadBytesFailed, + long totalUploadTimeInMillis, + double uploadBytesMovingAverage, + double uploadBytesPerSecMovingAverage, + double uploadTimeMovingAverage + ) { + this.shardId = shardId; + this.lastUploadTimestamp = lastUploadTimestamp; + this.totalUploadsStarted = totalUploadsStarted; + this.totalUploadsFailed = totalUploadsFailed; + this.totalUploadsSucceeded = totalUploadsSucceeded; + this.uploadBytesStarted = uploadBytesStarted; + this.uploadBytesFailed = uploadBytesFailed; + this.uploadBytesSucceeded = uploadBytesSucceeded; + this.totalUploadTimeInMillis = totalUploadTimeInMillis; + this.uploadBytesMovingAverage = uploadBytesMovingAverage; + this.uploadBytesPerSecMovingAverage = uploadBytesPerSecMovingAverage; + this.uploadTimeMovingAverage = uploadTimeMovingAverage; + } + + public Stats(StreamInput in) throws IOException { + try { + this.shardId = new ShardId(in); + this.lastUploadTimestamp = in.readLong(); + this.totalUploadsStarted = in.readLong(); + this.totalUploadsFailed = in.readLong(); + this.totalUploadsSucceeded = in.readLong(); + this.uploadBytesStarted = in.readLong(); + this.uploadBytesFailed = in.readLong(); + this.uploadBytesSucceeded = in.readLong(); + this.totalUploadTimeInMillis = in.readLong(); + this.uploadBytesMovingAverage = in.readDouble(); + this.uploadBytesPerSecMovingAverage = in.readDouble(); + this.uploadTimeMovingAverage = in.readDouble(); + } catch (IOException e) { + throw e; + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); + out.writeLong(lastUploadTimestamp); + out.writeLong(totalUploadsStarted); + out.writeLong(totalUploadsFailed); + out.writeLong(totalUploadsSucceeded); + out.writeLong(uploadBytesStarted); + out.writeLong(uploadBytesFailed); + out.writeLong(uploadBytesSucceeded); + out.writeLong(totalUploadTimeInMillis); + out.writeDouble(uploadBytesMovingAverage); + out.writeDouble(uploadBytesPerSecMovingAverage); + out.writeDouble(uploadTimeMovingAverage); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + RemoteTranslogTracker.Stats other = (RemoteTranslogTracker.Stats) obj; + + return this.shardId.toString().equals(other.shardId.toString()) + && this.lastUploadTimestamp == other.lastUploadTimestamp + && this.totalUploadsStarted == other.totalUploadsStarted + && this.totalUploadsFailed == other.totalUploadsFailed + && this.totalUploadsSucceeded == other.totalUploadsSucceeded + && this.uploadBytesStarted == other.uploadBytesStarted + && this.uploadBytesFailed == other.uploadBytesFailed + && this.uploadBytesSucceeded == other.uploadBytesSucceeded + && this.totalUploadTimeInMillis == other.totalUploadTimeInMillis + && Double.compare(this.uploadBytesMovingAverage, other.uploadBytesMovingAverage) == 0 + && Double.compare(this.uploadBytesPerSecMovingAverage, other.uploadBytesPerSecMovingAverage) == 0 + && Double.compare(this.uploadTimeMovingAverage, other.uploadTimeMovingAverage) == 0; + } + } + + boolean hasSameStatsAs(RemoteTranslogTracker.Stats other) { + return this.getShardId().toString().equals(other.shardId.toString()) + && this.getLastUploadTimestamp() == other.lastUploadTimestamp + && this.getTotalUploadsStarted() == other.totalUploadsStarted + && this.getTotalUploadsFailed() == other.totalUploadsFailed + && this.getTotalUploadsSucceeded() == other.totalUploadsSucceeded + && this.getUploadBytesStarted() == other.uploadBytesStarted + && this.getUploadBytesFailed() == other.uploadBytesFailed + && this.getUploadBytesSucceeded() == other.uploadBytesSucceeded + && this.getTotalUploadTimeInMillis() == other.totalUploadTimeInMillis + && Double.compare(this.getUploadBytesMovingAverage(), other.uploadBytesMovingAverage) == 0 + && Double.compare(this.getUploadBytesPerSecMovingAverage(), other.uploadBytesPerSecMovingAverage) == 0 + && Double.compare(this.getUploadTimeMovingAverage(), other.uploadTimeMovingAverage) == 0; + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index e43b9773cc1e0..20351b5033894 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -145,7 +145,7 @@ import org.opensearch.index.merge.MergeStats; import org.opensearch.index.recovery.RecoveryStats; import org.opensearch.index.refresh.RefreshStats; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.search.stats.SearchStats; import org.opensearch.index.search.stats.ShardSearchStats; import org.opensearch.index.seqno.ReplicationTracker; @@ -335,8 +335,7 @@ Runnable getGlobalCheckpointSyncer() { private final Store remoteStore; private final BiFunction translogFactorySupplier; private final boolean isTimeSeriesIndex; - private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; - + private final RemoteStorePressureService remoteStorePressureService; private final List internalRefreshListener = new ArrayList<>(); public IndexShard( @@ -363,7 +362,7 @@ public IndexShard( final BiFunction translogFactorySupplier, @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher, @Nullable final Store remoteStore, - final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + final RemoteStorePressureService remoteStorePressureService ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -458,7 +457,7 @@ public boolean shouldCache(Query query) { this.isTimeSeriesIndex = (mapperService == null || mapperService.documentMapper() == null) ? false : mapperService.documentMapper().mappers().containsTimeStampField(); - this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService; + this.remoteStorePressureService = remoteStorePressureService; } public ThreadPool getThreadPool() { @@ -3679,7 +3678,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro this, // Add the checkpoint publisher if the Segment Replciation via remote store is enabled. indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY, - remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId()) + remoteStorePressureService.getRemoteRefreshSegmentTracker(shardId()) ) ); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java index 339e16db6f360..dc5d5994c956b 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java @@ -8,6 +8,7 @@ package org.opensearch.index.translog; +import org.opensearch.index.remote.RemoteTranslogTracker; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.RepositoryMissingException; @@ -31,10 +32,13 @@ public class RemoteBlobStoreInternalTranslogFactory implements TranslogFactory { private final ThreadPool threadPool; + private final RemoteTranslogTracker remoteTranslogTracker; + public RemoteBlobStoreInternalTranslogFactory( Supplier repositoriesServiceSupplier, ThreadPool threadPool, - String repositoryName + String repositoryName, + RemoteTranslogTracker remoteTranslogTracker ) { Repository repository; try { @@ -44,6 +48,7 @@ public RemoteBlobStoreInternalTranslogFactory( } this.repository = repository; this.threadPool = threadPool; + this.remoteTranslogTracker = remoteTranslogTracker; } @Override @@ -68,7 +73,8 @@ public Translog newTranslog( persistedSequenceNumberConsumer, blobStoreRepository, threadPool, - primaryModeSupplier + primaryModeSupplier, + remoteTranslogTracker ); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 87fadce1d834c..b250b699dce32 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -17,12 +17,15 @@ import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.remote.RemoteTranslogTracker; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.index.translog.transfer.FileTransferTracker; import org.opensearch.index.translog.transfer.TransferSnapshot; import org.opensearch.index.translog.transfer.TranslogCheckpointTransferSnapshot; import org.opensearch.index.translog.transfer.TranslogTransferManager; import org.opensearch.index.translog.transfer.TranslogTransferMetadata; +import org.opensearch.index.translog.transfer.FileSnapshot; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -55,6 +58,7 @@ public class RemoteFsTranslog extends Translog { private final TranslogTransferManager translogTransferManager; private final FileTransferTracker fileTransferTracker; private final BooleanSupplier primaryModeSupplier; + private final RemoteTranslogTracker remoteTranslogTracker; private volatile long maxRemoteTranslogGenerationUploaded; private volatile long minSeqNoToKeep; @@ -80,7 +84,8 @@ public RemoteFsTranslog( LongConsumer persistedSequenceNumberConsumer, BlobStoreRepository blobStoreRepository, ThreadPool threadPool, - BooleanSupplier primaryModeSupplier + BooleanSupplier primaryModeSupplier, + RemoteTranslogTracker remoteTranslogTracker ) throws IOException { super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer); logger = Loggers.getLogger(getClass(), shardId); @@ -116,6 +121,7 @@ public RemoteFsTranslog( IOUtils.closeWhileHandlingException(readers); } } + this.remoteTranslogTracker = remoteTranslogTracker; } catch (Exception e) { // close the opened translog files if we fail to create a new translog... IOUtils.closeWhileHandlingException(current); @@ -266,9 +272,55 @@ private boolean upload(Long primaryTerm, Long generation) throws IOException { ) { Releasable transferReleasable = Releasables.wrap(deletionPolicy.acquireTranslogGen(getMinFileGeneration())); return translogTransferManager.transferSnapshot(transferSnapshotProvider, new TranslogTransferListener() { + /** + * + */ + final RemoteTranslogTracker remoteTranslogTracker = RemoteFsTranslog.this.remoteTranslogTracker; + + /** + * + */ + Set toUpload; + + /** + * + */ + long uploadBytes; + + /** + * + */ + long uploadStartTime; + + /** + * + */ + long uploadEndTime; + + /** + * + * @param transferSnapshot the transfer snapshot + * @throws IOException + */ @Override + public void beforeUpload(TransferSnapshot transferSnapshot) throws IOException { + toUpload = RemoteStoreUtils.getUploadBlobsFromSnapshot(transferSnapshot, fileTransferTracker); + uploadBytes = RemoteStoreUtils.getTotalBytes(toUpload); + uploadStartTime = RemoteStoreUtils.getCurrentSystemNanoTime(); + captureStatsBeforeUpload(); + } + + /** + * + * @param transferSnapshot the transfer snapshot + * @throws IOException + */ + @Override public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException { + uploadEndTime = RemoteStoreUtils.getCurrentSystemNanoTime(); + captureStatsOnUploadSuccess(); + transferReleasable.close(); closeFilesIfNoPendingRetentionLocks(); maxRemoteTranslogGenerationUploaded = generation; @@ -276,16 +328,65 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOExcepti logger.trace("uploaded translog for {} {} ", primaryTerm, generation); } + /** + * + * @param transferSnapshot the transfer snapshot + * @param ex the exception while processing the {@link TransferSnapshot} + * @throws IOException + */ @Override public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException { + uploadEndTime = RemoteStoreUtils.getCurrentSystemNanoTime(); + captureStatsOnUploadFailure(); + transferReleasable.close(); closeFilesIfNoPendingRetentionLocks(); + if (ex instanceof IOException) { throw (IOException) ex; } else { throw (RuntimeException) ex; } } + + private void captureStatsBeforeUpload() { + remoteTranslogTracker.incrementUploadsStarted(); + remoteTranslogTracker.addUploadBytesStarted(uploadBytes); + } + + private void captureStatsOnUploadSuccess() { + long uploadDurationInMillis = (uploadEndTime - uploadStartTime) / 1_000_000L; + remoteTranslogTracker.incrementUploadsSucceeded(); + remoteTranslogTracker.addUploadBytesSucceeded(uploadBytes); + remoteTranslogTracker.addUploadTimeInMillis(uploadDurationInMillis); + remoteTranslogTracker.setLastUploadTimestamp(System.currentTimeMillis()); + + remoteTranslogTracker.updateUploadBytesMovingAverage(uploadBytes); + if (uploadDurationInMillis > 0) { + remoteTranslogTracker.updateUploadBytesPerSecMovingAverage((uploadBytes * 1_000L) / uploadDurationInMillis); + } + + remoteTranslogTracker.updateUploadTimeMovingAverage(uploadDurationInMillis); + } + + private void captureStatsOnUploadFailure() { + remoteTranslogTracker.incrementUploadsFailed(); + remoteTranslogTracker.addUploadTimeInMillis((uploadEndTime - uploadStartTime) / 1_000_000L); + + Set uploadedFiles = fileTransferTracker.allUploaded(); + Set successfulUploads = new HashSet<>(); + Set failedUploads = new HashSet<>(); + for (FileSnapshot.TransferFileSnapshot file : toUpload) { + if (uploadedFiles.contains(file.getName())) { + successfulUploads.add(file); + } else { + failedUploads.add(file); + } + } + + remoteTranslogTracker.addUploadBytesSucceeded(RemoteStoreUtils.getTotalBytes(successfulUploads)); + remoteTranslogTracker.addUploadBytesFailed(RemoteStoreUtils.getTotalBytes(failedUploads)); + } }); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index e2bb5f74df234..de6187df1c741 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -24,6 +24,7 @@ import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; import org.opensearch.threadpool.ThreadPool; @@ -34,7 +35,6 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -93,13 +93,14 @@ public ShardId getShardId() { public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTransferListener translogTransferListener) throws IOException { + // TODO: Should we be incrementing stats if there is nothing to upload? + translogTransferListener.beforeUpload(transferSnapshot); List exceptionList = new ArrayList<>(transferSnapshot.getTranslogTransferMetadata().getCount()); - Set toUpload = new HashSet<>(transferSnapshot.getTranslogTransferMetadata().getCount()); + Set toUpload = RemoteStoreUtils.getUploadBlobsFromSnapshot(transferSnapshot, fileTransferTracker); try { - toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots())); - toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots()))); if (toUpload.isEmpty()) { logger.trace("Nothing to upload for transfer"); + // TODO: Should we be incrementing stats if there is nothing to upload? translogTransferListener.onUploadComplete(transferSnapshot); return true; } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java index c09fd8798e505..83cf55340c5aa 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java @@ -18,6 +18,11 @@ * @opensearch.internal */ public interface TranslogTransferListener { + /** + * Invoked before the transfer of {@link TransferSnapshot} + * @param transferSnapshot the transfer snapshot + */ + void beforeUpload(TransferSnapshot transferSnapshot) throws IOException; /** * Invoked when the transfer of {@link TransferSnapshot} succeeds diff --git a/server/src/main/java/org/opensearch/indices/IndicesModule.java b/server/src/main/java/org/opensearch/indices/IndicesModule.java index 9d2eef5f67a86..a10c93d9c1580 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesModule.java +++ b/server/src/main/java/org/opensearch/indices/IndicesModule.java @@ -70,7 +70,7 @@ import org.opensearch.index.mapper.SourceFieldMapper; import org.opensearch.index.mapper.TextFieldMapper; import org.opensearch.index.mapper.VersionFieldMapper; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.seqno.RetentionLeaseBackgroundSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -289,7 +289,7 @@ protected void configure() { bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton(); bind(SegmentReplicationPressureService.class).asEagerSingleton(); if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) { - bind(RemoteRefreshSegmentPressureService.class).asEagerSingleton(); + bind(RemoteStorePressureService.class).asEagerSingleton(); } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index b574ffd1006c0..25532f6f8dcf0 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -60,6 +60,7 @@ import org.opensearch.common.CheckedFunction; import org.opensearch.common.CheckedSupplier; import org.opensearch.common.Nullable; +import org.opensearch.common.SetOnce; import org.opensearch.common.breaker.CircuitBreaker; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.common.component.AbstractLifecycleComponent; @@ -120,7 +121,7 @@ import org.opensearch.index.query.QueryRewriteContext; import org.opensearch.index.recovery.RecoveryStats; import org.opensearch.index.refresh.RefreshStats; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.search.stats.SearchStats; import org.opensearch.index.seqno.RetentionLeaseStats; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -320,6 +321,7 @@ public class IndicesService extends AbstractLifecycleComponent private final BiFunction translogFactorySupplier; private final FileCacheCleaner fileCacheCleaner; + private static final SetOnce pressureServiceSetOnce = new SetOnce<>(); @Override protected void doStart() { @@ -451,7 +453,8 @@ private static BiFunction getTrans return new RemoteBlobStoreInternalTranslogFactory( repositoriesServiceSupplier, threadPool, - indexSettings.getRemoteStoreTranslogRepository() + indexSettings.getRemoteStoreTranslogRepository(), + pressureServiceSetOnce.get().getRemoteTranslogTracker(shardRouting.shardId()) ); } return new InternalTranslogFactory(); @@ -928,7 +931,7 @@ public IndexShard createShard( final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, final DiscoveryNode sourceNode, - final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + final RemoteStorePressureService remoteStorePressureService ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); ensureChangesAllowed(); @@ -940,7 +943,7 @@ public IndexShard createShard( globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher, - remoteRefreshSegmentPressureService + remoteStorePressureService ); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { @@ -1859,4 +1862,8 @@ public boolean allPendingDanglingIndicesWritten() { return nodeWriteDanglingIndicesInfo == false || (danglingIndicesToWrite.isEmpty() && danglingIndicesThreadPoolExecutor.getActiveCount() == 0); } + + public void setPressureService(RemoteStorePressureService pressureService) { + pressureServiceSetOnce.set(pressureService); + } } diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index b3fc070d62e58..ade512aae9491 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -65,7 +65,7 @@ import org.opensearch.index.IndexComponent; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -149,7 +149,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final SegmentReplicationCheckpointPublisher checkpointPublisher; - private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; + private final RemoteStorePressureService remoteStorePressureService; @Inject public IndicesClusterStateService( @@ -170,7 +170,7 @@ public IndicesClusterStateService( final GlobalCheckpointSyncAction globalCheckpointSyncAction, final RetentionLeaseSyncer retentionLeaseSyncer, final SegmentReplicationCheckpointPublisher checkpointPublisher, - final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + final RemoteStorePressureService remoteStorePressureService ) { this( settings, @@ -190,7 +190,7 @@ public IndicesClusterStateService( primaryReplicaSyncer, globalCheckpointSyncAction::updateGlobalCheckpointForShard, retentionLeaseSyncer, - remoteRefreshSegmentPressureService + remoteStorePressureService ); } @@ -213,7 +213,7 @@ public IndicesClusterStateService( final PrimaryReplicaSyncer primaryReplicaSyncer, final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, - final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + final RemoteStorePressureService remoteStorePressureService ) { this.settings = settings; this.checkpointPublisher = checkpointPublisher; @@ -225,7 +225,7 @@ public IndicesClusterStateService( indexEventListeners.add(segmentReplicationSourceService); // if remote store feature is not enabled, do not wire the remote upload pressure service as an IndexEventListener. if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) { - indexEventListeners.add(remoteRefreshSegmentPressureService); + indexEventListeners.add(remoteStorePressureService); } this.segmentReplicationTargetService = segmentReplicationTargetService; this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners); @@ -240,7 +240,7 @@ public IndicesClusterStateService( this.globalCheckpointSyncer = globalCheckpointSyncer; this.retentionLeaseSyncer = Objects.requireNonNull(retentionLeaseSyncer); this.sendRefreshMapping = settings.getAsBoolean("indices.cluster.send_refresh_mapping", true); - this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService; + this.remoteStorePressureService = remoteStorePressureService; } @Override @@ -683,7 +683,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR retentionLeaseSyncer, nodes.getLocalNode(), sourceNode, - remoteRefreshSegmentPressureService + remoteStorePressureService ); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); @@ -1042,7 +1042,7 @@ T createShard( RetentionLeaseSyncer retentionLeaseSyncer, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode, - RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + RemoteStorePressureService remoteStorePressureService ) throws IOException; /** diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index d768165451a5a..7666327adda55 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -44,6 +44,7 @@ import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexingPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.filecache.FileCacheCleaner; import org.opensearch.index.store.remote.filecache.FileCacheFactory; @@ -259,6 +260,7 @@ import java.util.stream.Stream; import static java.util.stream.Collectors.toList; +import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE; import static org.opensearch.common.util.FeatureFlags.TELEMETRY; import static org.opensearch.env.NodeEnvironment.collectFileCacheDataPath; import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY; @@ -1258,6 +1260,12 @@ public Node start() throws NodeValidationException { nodeService.getSearchBackpressureService().start(); nodeService.getTaskCancellationMonitoringService().start(); + if (FeatureFlags.isEnabled(REMOTE_STORE)) { + IndicesService indicesService = injector.getInstance(IndicesService.class); + RemoteStorePressureService pressureService = injector.getInstance(RemoteStorePressureService.class); + indicesService.setPressureService(pressureService); + } + final ClusterService clusterService = injector.getInstance(ClusterService.class); final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsResponseTests.java index a476b66719d3f..47295639e52df 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsResponseTests.java @@ -15,6 +15,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.remote.RemoteRefreshSegmentTracker; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.remote.RemoteTranslogTracker; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -23,7 +24,8 @@ import java.util.Map; import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.compareStatsResponse; -import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createPressureTrackerStats; +import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createPressureTrackerSegmentStats; +import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createPressureTrackerTranslogStats; import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS; public class RemoteStoreStatsResponseTests extends OpenSearchTestCase { @@ -44,8 +46,9 @@ public void tearDown() throws Exception { } public void testSerialization() throws Exception { - RemoteRefreshSegmentTracker.Stats pressureTrackerStats = createPressureTrackerStats(shardId); - RemoteStoreStats stats = new RemoteStoreStats(pressureTrackerStats); + RemoteRefreshSegmentTracker.Stats pressureTrackerSegmentStats = createPressureTrackerSegmentStats(shardId); + RemoteTranslogTracker.Stats pressureTrackerTranslogStats = createPressureTrackerTranslogStats(shardId); + RemoteStoreStats stats = new RemoteStoreStats(pressureTrackerSegmentStats, pressureTrackerTranslogStats); RemoteStoreStatsResponse statsResponse = new RemoteStoreStatsResponse( new RemoteStoreStats[] { stats }, 1, @@ -67,6 +70,6 @@ public void testSerialization() throws Exception { assertEquals(shardsObject.get("total"), 1); assertEquals(shardsObject.get("successful"), 1); assertEquals(shardsObject.get("failed"), 0); - compareStatsResponse(statsObject, pressureTrackerStats); + compareStatsResponse(statsObject, pressureTrackerSegmentStats, pressureTrackerTranslogStats); } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java index 747dc692b1d5d..684f5e1d4e9dc 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java @@ -10,6 +10,7 @@ import org.opensearch.index.remote.RemoteRefreshSegmentTracker; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.remote.RemoteTranslogTracker; import java.util.Map; @@ -19,69 +20,168 @@ * Helper utilities for Remote Store stats tests */ public class RemoteStoreStatsTestHelper { - static RemoteRefreshSegmentTracker.Stats createPressureTrackerStats(ShardId shardId) { + static RemoteRefreshSegmentTracker.Stats createPressureTrackerSegmentStats(ShardId shardId) { return new RemoteRefreshSegmentTracker.Stats(shardId, 101, 102, 100, 3, 2, 10, 5, 5, 10, 5, 5, 3, 2, 5, 2, 3, 4, 9); } - static void compareStatsResponse(Map statsObject, RemoteRefreshSegmentTracker.Stats pressureTrackerStats) { - assertEquals(statsObject.get(RemoteStoreStats.Fields.SHARD_ID), pressureTrackerStats.shardId.toString()); - assertEquals(statsObject.get(RemoteStoreStats.Fields.LOCAL_REFRESH_TIMESTAMP), (int) pressureTrackerStats.localRefreshClockTimeMs); + static RemoteTranslogTracker.Stats createPressureTrackerTranslogStats(ShardId shardId) { + return new RemoteTranslogTracker.Stats(shardId, 1L, 2L, 3L, 4, 5L, 6L, 7L, 8L, 9D, 10D, 11D); + } + + static void compareStatsResponse( + Map statsObject, + RemoteRefreshSegmentTracker.Stats pressureTrackerSegmentStats, + RemoteTranslogTracker.Stats pressureTrackerTranslogStats + ) { + // Compare Remote Segment Store stats + assertEquals(pressureTrackerSegmentStats.shardId.toString(), statsObject.get(RemoteStoreStats.Fields.SHARD_ID)); + assertEquals( + (int) pressureTrackerSegmentStats.localRefreshClockTimeMs, + statsObject.get(RemoteStoreStats.Fields.LOCAL_REFRESH_TIMESTAMP) + ); + assertEquals( + (int) pressureTrackerSegmentStats.remoteRefreshClockTimeMs, + statsObject.get(RemoteStoreStats.Fields.REMOTE_REFRESH_TIMESTAMP) + ); + assertEquals( + (int) pressureTrackerSegmentStats.refreshTimeLagMs, + statsObject.get(RemoteStoreStats.Fields.REFRESH_TIME_LAG_IN_MILLIS) + ); + assertEquals( + (int) (pressureTrackerSegmentStats.localRefreshNumber - pressureTrackerSegmentStats.remoteRefreshNumber), + statsObject.get(RemoteStoreStats.Fields.REFRESH_LAG) + ); + assertEquals((int) pressureTrackerSegmentStats.bytesLag, statsObject.get(RemoteStoreStats.Fields.BYTES_LAG)); + + assertEquals( + (int) pressureTrackerSegmentStats.rejectionCount, + statsObject.get(RemoteStoreStats.Fields.BACKPRESSURE_REJECTION_COUNT) + ); + assertEquals( + (int) pressureTrackerSegmentStats.consecutiveFailuresCount, + statsObject.get(RemoteStoreStats.Fields.CONSECUTIVE_FAILURE_COUNT) + ); + + assertEquals( + (int) pressureTrackerSegmentStats.uploadBytesStarted, + ((Map) statsObject.get(RemoteStoreStats.Fields.TOTAL_UPLOADS_IN_BYTES)).get(RemoteStoreStats.SubFields.STARTED) + ); assertEquals( - statsObject.get(RemoteStoreStats.Fields.REMOTE_REFRESH_TIMESTAMP), - (int) pressureTrackerStats.remoteRefreshClockTimeMs + (int) pressureTrackerSegmentStats.uploadBytesSucceeded, + ((Map) statsObject.get(RemoteStoreStats.Fields.TOTAL_UPLOADS_IN_BYTES)).get(RemoteStoreStats.SubFields.SUCCEEDED) ); - assertEquals(statsObject.get(RemoteStoreStats.Fields.REFRESH_TIME_LAG_IN_MILLIS), (int) pressureTrackerStats.refreshTimeLagMs); assertEquals( - statsObject.get(RemoteStoreStats.Fields.REFRESH_LAG), - (int) (pressureTrackerStats.localRefreshNumber - pressureTrackerStats.remoteRefreshNumber) + (int) pressureTrackerSegmentStats.uploadBytesFailed, + ((Map) statsObject.get(RemoteStoreStats.Fields.TOTAL_UPLOADS_IN_BYTES)).get(RemoteStoreStats.SubFields.FAILED) + ); + assertEquals( + pressureTrackerSegmentStats.uploadBytesMovingAverage, + ((Map) statsObject.get(RemoteStoreStats.Fields.REMOTE_REFRESH_SIZE_IN_BYTES)).get(RemoteStoreStats.SubFields.MOVING_AVG) + ); + assertEquals( + (int) pressureTrackerSegmentStats.lastSuccessfulRemoteRefreshBytes, + ((Map) statsObject.get(RemoteStoreStats.Fields.REMOTE_REFRESH_SIZE_IN_BYTES)).get( + RemoteStoreStats.SubFields.LAST_SUCCESSFUL + ) + ); + assertEquals( + pressureTrackerSegmentStats.uploadBytesPerSecMovingAverage, + ((Map) statsObject.get(RemoteStoreStats.Fields.UPLOAD_LATENCY_IN_BYTES_PER_SEC)).get( + RemoteStoreStats.SubFields.MOVING_AVG + ) + ); + assertEquals( + (int) pressureTrackerSegmentStats.totalUploadsStarted, + ((Map) statsObject.get(RemoteStoreStats.Fields.TOTAL_REMOTE_REFRESH)).get(RemoteStoreStats.SubFields.STARTED) + ); + assertEquals( + (int) pressureTrackerSegmentStats.totalUploadsSucceeded, + ((Map) statsObject.get(RemoteStoreStats.Fields.TOTAL_REMOTE_REFRESH)).get(RemoteStoreStats.SubFields.SUCCEEDED) + ); + assertEquals( + (int) pressureTrackerSegmentStats.totalUploadsFailed, + ((Map) statsObject.get(RemoteStoreStats.Fields.TOTAL_REMOTE_REFRESH)).get(RemoteStoreStats.SubFields.FAILED) + ); + assertEquals( + pressureTrackerSegmentStats.uploadTimeMovingAverage, + ((Map) statsObject.get(RemoteStoreStats.Fields.REMOTE_REFRESH_LATENCY_IN_MILLIS)).get( + RemoteStoreStats.SubFields.MOVING_AVG + ) ); - assertEquals(statsObject.get(RemoteStoreStats.Fields.BYTES_LAG), (int) pressureTrackerStats.bytesLag); - assertEquals(statsObject.get(RemoteStoreStats.Fields.BACKPRESSURE_REJECTION_COUNT), (int) pressureTrackerStats.rejectionCount); + // Compare Remote Translog Store stats + Map tlogStatsObj = (Map) statsObject.get("translog"); + Map tlogUploadStatsObj = (Map) tlogStatsObj.get("upload"); + + assertEquals(pressureTrackerTranslogStats.shardId.toString(), statsObject.get(RemoteStoreStats.Fields.SHARD_ID)); assertEquals( - statsObject.get(RemoteStoreStats.Fields.CONSECUTIVE_FAILURE_COUNT), - (int) pressureTrackerStats.consecutiveFailuresCount + pressureTrackerTranslogStats.lastUploadTimestamp, + Long.parseLong(tlogUploadStatsObj.get(RemoteStoreStats.Fields.LAST_UPLOAD_TIMESTAMP).toString()) ); assertEquals( - ((Map) statsObject.get(RemoteStoreStats.Fields.TOTAL_UPLOADS_IN_BYTES)).get(RemoteStoreStats.SubFields.STARTED), - (int) pressureTrackerStats.uploadBytesStarted + pressureTrackerTranslogStats.totalUploadsStarted, + Long.parseLong( + ((Map) tlogUploadStatsObj.get(RemoteStoreStats.Fields.TOTAL_UPLOADS)).get(RemoteStoreStats.SubFields.STARTED) + .toString() + ) ); assertEquals( - ((Map) statsObject.get(RemoteStoreStats.Fields.TOTAL_UPLOADS_IN_BYTES)).get(RemoteStoreStats.SubFields.SUCCEEDED), - (int) pressureTrackerStats.uploadBytesSucceeded + pressureTrackerTranslogStats.totalUploadsSucceeded, + Long.parseLong( + ((Map) tlogUploadStatsObj.get(RemoteStoreStats.Fields.TOTAL_UPLOADS)).get(RemoteStoreStats.SubFields.SUCCEEDED) + .toString() + ) ); assertEquals( - ((Map) statsObject.get(RemoteStoreStats.Fields.TOTAL_UPLOADS_IN_BYTES)).get(RemoteStoreStats.SubFields.FAILED), - (int) pressureTrackerStats.uploadBytesFailed + pressureTrackerTranslogStats.totalUploadsFailed, + Long.parseLong( + ((Map) tlogUploadStatsObj.get(RemoteStoreStats.Fields.TOTAL_UPLOADS)).get(RemoteStoreStats.SubFields.FAILED) + .toString() + ) ); + assertEquals( - ((Map) statsObject.get(RemoteStoreStats.Fields.REMOTE_REFRESH_SIZE_IN_BYTES)).get(RemoteStoreStats.SubFields.MOVING_AVG), - pressureTrackerStats.uploadBytesMovingAverage + pressureTrackerTranslogStats.uploadBytesStarted, + Long.parseLong( + ((Map) tlogUploadStatsObj.get(RemoteStoreStats.Fields.TOTAL_UPLOADS_IN_BYTES)).get(RemoteStoreStats.SubFields.STARTED) + .toString() + ) ); assertEquals( - ((Map) statsObject.get(RemoteStoreStats.Fields.REMOTE_REFRESH_SIZE_IN_BYTES)).get(RemoteStoreStats.SubFields.LAST_SUCCESSFUL), - (int) pressureTrackerStats.lastSuccessfulRemoteRefreshBytes + pressureTrackerTranslogStats.uploadBytesSucceeded, + Long.parseLong( + ((Map) tlogUploadStatsObj.get(RemoteStoreStats.Fields.TOTAL_UPLOADS_IN_BYTES)).get( + RemoteStoreStats.SubFields.SUCCEEDED + ).toString() + ) ); assertEquals( - ((Map) statsObject.get(RemoteStoreStats.Fields.UPLOAD_LATENCY_IN_BYTES_PER_SEC)).get(RemoteStoreStats.SubFields.MOVING_AVG), - pressureTrackerStats.uploadBytesPerSecMovingAverage + pressureTrackerTranslogStats.uploadBytesFailed, + Long.parseLong( + ((Map) tlogUploadStatsObj.get(RemoteStoreStats.Fields.TOTAL_UPLOADS_IN_BYTES)).get(RemoteStoreStats.SubFields.FAILED) + .toString() + ) ); + assertEquals( - ((Map) statsObject.get(RemoteStoreStats.Fields.TOTAL_REMOTE_REFRESH)).get(RemoteStoreStats.SubFields.STARTED), - (int) pressureTrackerStats.totalUploadsStarted + pressureTrackerTranslogStats.totalUploadTimeInMillis, + Long.parseLong(tlogUploadStatsObj.get(RemoteStoreStats.Fields.TOTAL_UPLOAD_TIME_IN_MILLIS).toString()) ); + assertEquals( - ((Map) statsObject.get(RemoteStoreStats.Fields.TOTAL_REMOTE_REFRESH)).get(RemoteStoreStats.SubFields.SUCCEEDED), - (int) pressureTrackerStats.totalUploadsSucceeded + pressureTrackerTranslogStats.uploadBytesMovingAverage, + ((Map) tlogUploadStatsObj.get(RemoteStoreStats.Fields.UPLOAD_BYTES)).get(RemoteStoreStats.SubFields.MOVING_AVG) ); assertEquals( - ((Map) statsObject.get(RemoteStoreStats.Fields.TOTAL_REMOTE_REFRESH)).get(RemoteStoreStats.SubFields.FAILED), - (int) pressureTrackerStats.totalUploadsFailed + pressureTrackerTranslogStats.uploadBytesPerSecMovingAverage, + ((Map) tlogUploadStatsObj.get(RemoteStoreStats.Fields.UPLOAD_LATENCY_IN_BYTES_PER_SEC)).get( + RemoteStoreStats.SubFields.MOVING_AVG + ) ); assertEquals( - ((Map) statsObject.get(RemoteStoreStats.Fields.REMOTE_REFRESH_LATENCY_IN_MILLIS)).get(RemoteStoreStats.SubFields.MOVING_AVG), - pressureTrackerStats.uploadTimeMovingAverage + pressureTrackerTranslogStats.uploadTimeMovingAverage, + ((Map) tlogUploadStatsObj.get(RemoteStoreStats.Fields.UPLOAD_TIME_IN_MILLIS)).get(RemoteStoreStats.SubFields.MOVING_AVG) ); } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTests.java index fc057b71b15f8..6fe7bc5a475e2 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTests.java @@ -16,6 +16,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.remote.RemoteRefreshSegmentTracker; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.remote.RemoteTranslogTracker; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -24,7 +25,8 @@ import java.util.Map; import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.compareStatsResponse; -import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createPressureTrackerStats; +import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createPressureTrackerSegmentStats; +import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createPressureTrackerTranslogStats; import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS; public class RemoteStoreStatsTests extends OpenSearchTestCase { @@ -45,42 +47,26 @@ public void tearDown() throws Exception { } public void testXContentBuilder() throws IOException { - RemoteRefreshSegmentTracker.Stats pressureTrackerStats = createPressureTrackerStats(shardId); - RemoteStoreStats stats = new RemoteStoreStats(pressureTrackerStats); + RemoteRefreshSegmentTracker.Stats pressureTrackerSegmentStats = createPressureTrackerSegmentStats(shardId); + RemoteTranslogTracker.Stats pressureTrackerTranslogStats = createPressureTrackerTranslogStats(shardId); + RemoteStoreStats stats = new RemoteStoreStats(pressureTrackerSegmentStats, pressureTrackerTranslogStats); XContentBuilder builder = XContentFactory.jsonBuilder(); stats.toXContent(builder, EMPTY_PARAMS); Map jsonObject = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2(); - compareStatsResponse(jsonObject, pressureTrackerStats); + compareStatsResponse(jsonObject, pressureTrackerSegmentStats, pressureTrackerTranslogStats); } public void testSerialization() throws Exception { - RemoteRefreshSegmentTracker.Stats pressureTrackerStats = createPressureTrackerStats(shardId); - RemoteStoreStats stats = new RemoteStoreStats(pressureTrackerStats); + RemoteRefreshSegmentTracker.Stats pressureTrackerSegmentStats = createPressureTrackerSegmentStats(shardId); + RemoteTranslogTracker.Stats pressureTrackerTranslogStats = createPressureTrackerTranslogStats(shardId); + RemoteStoreStats stats = new RemoteStoreStats(pressureTrackerSegmentStats, pressureTrackerTranslogStats); try (BytesStreamOutput out = new BytesStreamOutput()) { stats.writeTo(out); try (StreamInput in = out.bytes().streamInput()) { RemoteStoreStats deserializedStats = new RemoteStoreStats(in); - assertEquals(deserializedStats.getStats().shardId.toString(), stats.getStats().shardId.toString()); - assertEquals(deserializedStats.getStats().refreshTimeLagMs, stats.getStats().refreshTimeLagMs); - assertEquals(deserializedStats.getStats().localRefreshNumber, stats.getStats().localRefreshNumber); - assertEquals(deserializedStats.getStats().remoteRefreshNumber, stats.getStats().remoteRefreshNumber); - assertEquals(deserializedStats.getStats().uploadBytesStarted, stats.getStats().uploadBytesStarted); - assertEquals(deserializedStats.getStats().uploadBytesSucceeded, stats.getStats().uploadBytesSucceeded); - assertEquals(deserializedStats.getStats().uploadBytesFailed, stats.getStats().uploadBytesFailed); - assertEquals(deserializedStats.getStats().totalUploadsStarted, stats.getStats().totalUploadsStarted); - assertEquals(deserializedStats.getStats().totalUploadsFailed, stats.getStats().totalUploadsFailed); - assertEquals(deserializedStats.getStats().totalUploadsSucceeded, stats.getStats().totalUploadsSucceeded); - assertEquals(deserializedStats.getStats().rejectionCount, stats.getStats().rejectionCount); - assertEquals(deserializedStats.getStats().consecutiveFailuresCount, stats.getStats().consecutiveFailuresCount); - assertEquals(deserializedStats.getStats().uploadBytesMovingAverage, stats.getStats().uploadBytesMovingAverage, 0); - assertEquals( - deserializedStats.getStats().uploadBytesPerSecMovingAverage, - stats.getStats().uploadBytesPerSecMovingAverage, - 0 - ); - assertEquals(deserializedStats.getStats().uploadTimeMovingAverage, stats.getStats().uploadTimeMovingAverage, 0); - assertEquals(deserializedStats.getStats().bytesLag, stats.getStats().bytesLag); + assertEquals(stats.getSegmentStats(), deserializedStats.getSegmentStats()); + assertEquals(stats.getTranslogStats(), deserializedStats.getTranslogStats()); } } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java index 25e44884814a5..ca272479d1dac 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java @@ -28,7 +28,7 @@ import org.opensearch.core.index.Index; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.remote.RemoteRefreshSegmentTracker; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.indices.IndicesService; @@ -50,7 +50,7 @@ public class TransportRemoteStoreStatsActionTests extends IndexShardTestCase { private IndicesService indicesService; - private RemoteRefreshSegmentPressureService pressureService; + private RemoteStorePressureService pressureService; private IndexMetadata remoteStoreIndexMetadata; private TransportService transportService; private ClusterService clusterService; @@ -64,7 +64,7 @@ public void setUp() throws Exception { indicesService = mock(IndicesService.class); IndexService indexService = mock(IndexService.class); clusterService = mock(ClusterService.class); - pressureService = mock(RemoteRefreshSegmentPressureService.class); + pressureService = mock(RemoteStorePressureService.class); MockTransport mockTransport = new MockTransport(); localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); remoteStoreIndexMetadata = IndexMetadata.builder(INDEX.getName()) diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java index c0c35e8c22f4d..9b6aa62bdecd6 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java @@ -77,7 +77,7 @@ import org.opensearch.index.mapper.Mapping; import org.opensearch.index.mapper.MetadataFieldMapper; import org.opensearch.index.mapper.RootObjectMapper; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; @@ -1073,7 +1073,7 @@ public void testHandlePrimaryTermValidationRequestWithDifferentAllocationId() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), - mock(RemoteRefreshSegmentPressureService.class), + mock(RemoteStorePressureService.class), mock(SystemIndices.class) ); action.handlePrimaryTermValidationRequest( @@ -1104,7 +1104,7 @@ public void testHandlePrimaryTermValidationRequestWithOlderPrimaryTerm() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), - mock(RemoteRefreshSegmentPressureService.class), + mock(RemoteStorePressureService.class), mock(SystemIndices.class) ); action.handlePrimaryTermValidationRequest( @@ -1135,7 +1135,7 @@ public void testHandlePrimaryTermValidationRequestSuccess() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), - mock(RemoteRefreshSegmentPressureService.class), + mock(RemoteStorePressureService.class), mock(SystemIndices.class) ); action.handlePrimaryTermValidationRequest( @@ -1177,7 +1177,7 @@ private TransportShardBulkAction createAction() { mock(ActionFilters.class), mock(IndexingPressureService.class), mock(SegmentReplicationPressureService.class), - mock(RemoteRefreshSegmentPressureService.class), + mock(RemoteStorePressureService.class), mock(SystemIndices.class) ); } diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index 95d0ae706b286..cd0f85c54dce6 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -84,6 +84,7 @@ import org.opensearch.index.fielddata.IndexFieldDataCache; import org.opensearch.index.mapper.ParsedDocument; import org.opensearch.index.mapper.Uid; +import org.opensearch.index.remote.RemoteTranslogTracker; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexingOperationListener; import org.opensearch.index.shard.SearchOperationListener; @@ -231,7 +232,8 @@ private IndexService newIndexService(IndexModule module) throws IOException { return new RemoteBlobStoreInternalTranslogFactory( repositoriesServiceReference::get, threadPool, - indexSettings.getRemoteStoreTranslogRepository() + indexSettings.getRemoteStoreTranslogRepository(), + new RemoteTranslogTracker(shardRouting.shardId(), 10, 10, 10) ); } return new InternalTranslogFactory(); diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java index badfeb0d67c05..ccc5e3008c1d4 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java @@ -26,7 +26,7 @@ public class RemoteRefreshSegmentTrackerTests extends OpenSearchTestCase { - private RemoteRefreshSegmentPressureSettings pressureSettings; + private RemoteStorePressureSettings pressureSettings; private ClusterService clusterService; @@ -45,11 +45,7 @@ public void setUp() throws Exception { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool ); - pressureSettings = new RemoteRefreshSegmentPressureSettings( - clusterService, - Settings.EMPTY, - mock(RemoteRefreshSegmentPressureService.class) - ); + pressureSettings = new RemoteStorePressureSettings(clusterService, Settings.EMPTY, mock(RemoteStorePressureService.class)); shardId = new ShardId("index", "uuid", 0); } @@ -324,84 +320,87 @@ public void testComputeBytesLag() { } public void testIsUploadBytesAverageReady() { + int uploadBytesMovingAverageWindowSize = 20; pressureTracker = new RemoteRefreshSegmentTracker( shardId, - pressureSettings.getUploadBytesMovingAverageWindowSize(), + uploadBytesMovingAverageWindowSize, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() ); assertFalse(pressureTracker.isUploadBytesAverageReady()); long sum = 0; - for (int i = 1; i < 20; i++) { + for (int i = 1; i < uploadBytesMovingAverageWindowSize; i++) { pressureTracker.addUploadBytes(i); sum += i; assertFalse(pressureTracker.isUploadBytesAverageReady()); assertEquals((double) sum / i, pressureTracker.getUploadBytesAverage(), 0.0d); } - pressureTracker.addUploadBytes(20); - sum += 20; + pressureTracker.addUploadBytes(uploadBytesMovingAverageWindowSize); + sum += uploadBytesMovingAverageWindowSize; assertTrue(pressureTracker.isUploadBytesAverageReady()); - assertEquals((double) sum / 20, pressureTracker.getUploadBytesAverage(), 0.0d); + assertEquals((double) sum / uploadBytesMovingAverageWindowSize, pressureTracker.getUploadBytesAverage(), 0.0d); pressureTracker.addUploadBytes(100); sum = sum + 100 - 1; - assertEquals((double) sum / 20, pressureTracker.getUploadBytesAverage(), 0.0d); + assertEquals((double) sum / uploadBytesMovingAverageWindowSize, pressureTracker.getUploadBytesAverage(), 0.0d); } public void testIsUploadBytesPerSecAverageReady() { + int uploadBytesPerSecMovingAverageWindowSize = 20; pressureTracker = new RemoteRefreshSegmentTracker( shardId, pressureSettings.getUploadBytesMovingAverageWindowSize(), - pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + uploadBytesPerSecMovingAverageWindowSize, pressureSettings.getUploadTimeMovingAverageWindowSize() ); assertFalse(pressureTracker.isUploadBytesPerSecAverageReady()); long sum = 0; - for (int i = 1; i < 20; i++) { + for (int i = 1; i < uploadBytesPerSecMovingAverageWindowSize; i++) { pressureTracker.addUploadBytesPerSec(i); sum += i; assertFalse(pressureTracker.isUploadBytesPerSecAverageReady()); assertEquals((double) sum / i, pressureTracker.getUploadBytesPerSecAverage(), 0.0d); } - pressureTracker.addUploadBytesPerSec(20); - sum += 20; + pressureTracker.addUploadBytesPerSec(uploadBytesPerSecMovingAverageWindowSize); + sum += uploadBytesPerSecMovingAverageWindowSize; assertTrue(pressureTracker.isUploadBytesPerSecAverageReady()); - assertEquals((double) sum / 20, pressureTracker.getUploadBytesPerSecAverage(), 0.0d); + assertEquals((double) sum / uploadBytesPerSecMovingAverageWindowSize, pressureTracker.getUploadBytesPerSecAverage(), 0.0d); pressureTracker.addUploadBytesPerSec(100); sum = sum + 100 - 1; - assertEquals((double) sum / 20, pressureTracker.getUploadBytesPerSecAverage(), 0.0d); + assertEquals((double) sum / uploadBytesPerSecMovingAverageWindowSize, pressureTracker.getUploadBytesPerSecAverage(), 0.0d); } public void testIsUploadTimeMsAverageReady() { + int uploadTimeMovingAverageWindowSize = 20; pressureTracker = new RemoteRefreshSegmentTracker( shardId, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), - pressureSettings.getUploadTimeMovingAverageWindowSize() + uploadTimeMovingAverageWindowSize ); assertFalse(pressureTracker.isUploadTimeMsAverageReady()); long sum = 0; - for (int i = 1; i < 20; i++) { + for (int i = 1; i < uploadTimeMovingAverageWindowSize; i++) { pressureTracker.addUploadTimeMs(i); sum += i; assertFalse(pressureTracker.isUploadTimeMsAverageReady()); assertEquals((double) sum / i, pressureTracker.getUploadTimeMsAverage(), 0.0d); } - pressureTracker.addUploadTimeMs(20); - sum += 20; + pressureTracker.addUploadTimeMs(uploadTimeMovingAverageWindowSize); + sum += uploadTimeMovingAverageWindowSize; assertTrue(pressureTracker.isUploadTimeMsAverageReady()); - assertEquals((double) sum / 20, pressureTracker.getUploadTimeMsAverage(), 0.0d); + assertEquals((double) sum / uploadTimeMovingAverageWindowSize, pressureTracker.getUploadTimeMsAverage(), 0.0d); pressureTracker.addUploadTimeMs(100); sum = sum + 100 - 1; - assertEquals((double) sum / 20, pressureTracker.getUploadTimeMsAverage(), 0.0d); + assertEquals((double) sum / uploadTimeMovingAverageWindowSize, pressureTracker.getUploadTimeMsAverage(), 0.0d); } /** diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java similarity index 89% rename from server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java rename to server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java index 5ccacd4048596..dee2de9507df3 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java @@ -29,7 +29,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class RemoteRefreshSegmentPressureServiceTests extends OpenSearchTestCase { +public class RemoteStorePressureServiceTests extends OpenSearchTestCase { private ClusterService clusterService; @@ -37,7 +37,7 @@ public class RemoteRefreshSegmentPressureServiceTests extends OpenSearchTestCase private ShardId shardId; - private RemoteRefreshSegmentPressureService pressureService; + private RemoteStorePressureService pressureService; @Override public void setUp() throws Exception { @@ -58,11 +58,11 @@ public void tearDown() throws Exception { } public void testIsSegmentsUploadBackpressureEnabled() { - pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); assertFalse(pressureService.isSegmentsUploadBackpressureEnabled()); Settings newSettings = Settings.builder() - .put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), "true") + .put(RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), "true") .build(); clusterService.getClusterSettings().applySettings(newSettings); @@ -71,21 +71,21 @@ public void testIsSegmentsUploadBackpressureEnabled() { public void testAfterIndexShardCreatedForRemoteBackedIndex() { IndexShard indexShard = createIndexShard(shardId, true); - pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); assertNotNull(pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId())); } public void testAfterIndexShardCreatedForNonRemoteBackedIndex() { IndexShard indexShard = createIndexShard(shardId, false); - pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); assertNull(pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId())); } public void testAfterIndexShardClosed() { IndexShard indexShard = createIndexShard(shardId, true); - pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); assertNotNull(pressureService.getRemoteRefreshSegmentTracker(shardId)); @@ -96,7 +96,7 @@ public void testAfterIndexShardClosed() { public void testValidateSegmentUploadLag() { // Create the pressure tracker IndexShard indexShard = createIndexShard(shardId, true); - pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); + pressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); RemoteRefreshSegmentTracker pressureTracker = pressureService.getRemoteRefreshSegmentTracker(shardId); diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java similarity index 65% rename from server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java rename to server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java index 75b5b946e8bf8..9c5ec69cf6be9 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureSettingsTests.java @@ -21,7 +21,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -public class RemoteRefreshSegmentPressureSettingsTests extends OpenSearchTestCase { +public class RemoteStorePressureSettingsTests extends OpenSearchTestCase { private ClusterService clusterService; @@ -45,10 +45,10 @@ public void tearDown() throws Exception { } public void testGetDefaultSettings() { - RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings( + RemoteStorePressureSettings pressureSettings = new RemoteStorePressureSettings( clusterService, Settings.EMPTY, - mock(RemoteRefreshSegmentPressureService.class) + mock(RemoteStorePressureService.class) ); // Check remote refresh segment pressure enabled is false @@ -75,18 +75,18 @@ public void testGetDefaultSettings() { public void testGetConfiguredSettings() { Settings settings = Settings.builder() - .put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) - .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) - .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) + .put(RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) + .put(RemoteStorePressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) + .put(RemoteStorePressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) + .put(RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) + .put(RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) + .put(RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) + .put(RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) .build(); - RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings( + RemoteStorePressureSettings pressureSettings = new RemoteStorePressureSettings( clusterService, settings, - mock(RemoteRefreshSegmentPressureService.class) + mock(RemoteStorePressureService.class) ); // Check remote refresh segment pressure enabled is true @@ -112,20 +112,20 @@ public void testGetConfiguredSettings() { } public void testUpdateAfterGetDefaultSettings() { - RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings( + RemoteStorePressureSettings pressureSettings = new RemoteStorePressureSettings( clusterService, Settings.EMPTY, - mock(RemoteRefreshSegmentPressureService.class) + mock(RemoteStorePressureService.class) ); Settings newSettings = Settings.builder() - .put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) - .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) - .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) + .put(RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) + .put(RemoteStorePressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) + .put(RemoteStorePressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) + .put(RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) + .put(RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) + .put(RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) + .put(RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) .build(); clusterService.getClusterSettings().applySettings(newSettings); @@ -153,27 +153,27 @@ public void testUpdateAfterGetDefaultSettings() { public void testUpdateAfterGetConfiguredSettings() { Settings settings = Settings.builder() - .put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) - .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) - .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) + .put(RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) + .put(RemoteStorePressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 50.0) + .put(RemoteStorePressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 60.0) + .put(RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) + .put(RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) + .put(RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) + .put(RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) .build(); - RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings( + RemoteStorePressureSettings pressureSettings = new RemoteStorePressureSettings( clusterService, settings, - mock(RemoteRefreshSegmentPressureService.class) + mock(RemoteStorePressureService.class) ); Settings newSettings = Settings.builder() - .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 40.0) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 50.0) - .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 111) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 112) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 113) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 114) + .put(RemoteStorePressureSettings.BYTES_LAG_VARIANCE_FACTOR.getKey(), 40.0) + .put(RemoteStorePressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR.getKey(), 50.0) + .put(RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 111) + .put(RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 112) + .put(RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 113) + .put(RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 114) .build(); clusterService.getClusterSettings().applySettings(newSettings); @@ -208,7 +208,7 @@ public void testUpdateTriggeredInRemotePressureServiceOnUpdateSettings() { AtomicInteger updatedUploadBytesPerSecWindowSize = new AtomicInteger(); AtomicInteger updatedUploadTimeWindowSize = new AtomicInteger(); - RemoteRefreshSegmentPressureService pressureService = mock(RemoteRefreshSegmentPressureService.class); + RemoteStorePressureService pressureService = mock(RemoteStorePressureService.class); // Upload bytes doAnswer(invocation -> { @@ -228,15 +228,11 @@ public void testUpdateTriggeredInRemotePressureServiceOnUpdateSettings() { return null; }).when(pressureService).updateUploadTimeMsMovingAverageWindowSize(anyInt()); - RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings( - clusterService, - Settings.EMPTY, - pressureService - ); + RemoteStorePressureSettings pressureSettings = new RemoteStorePressureSettings(clusterService, Settings.EMPTY, pressureService); Settings newSettings = Settings.builder() - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal1) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal2) - .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal3) + .put(RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal1) + .put(RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal2) + .put(RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal3) .build(); clusterService.getClusterSettings().applySettings(newSettings); diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteTranslogTrackerTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteTranslogTrackerTests.java new file mode 100644 index 0000000000000..fb878673477dd --- /dev/null +++ b/server/src/test/java/org/opensearch/index/remote/RemoteTranslogTrackerTests.java @@ -0,0 +1,304 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.junit.Before; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; + +import static org.mockito.Mockito.mock; + +public class RemoteTranslogTrackerTests extends OpenSearchTestCase { + private RemoteStorePressureSettings pressureSettings; + + private ClusterService clusterService; + + private ThreadPool threadPool; + + private ShardId shardId; + + private RemoteTranslogTracker tracker; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("remote_store_pressure_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + pressureSettings = new RemoteStorePressureSettings(clusterService, Settings.EMPTY, mock(RemoteStorePressureService.class)); + shardId = new ShardId("index", "uuid", 0); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + @Before + public void initTracker() { + tracker = new RemoteTranslogTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + } + + public void testGetShardId() { + assertEquals(shardId, tracker.getShardId()); + } + + public void testIncrementUploadsStarted() { + populateUploadsStarted(); + } + + public void testIncrementUploadsFailed() { + populateUploadsStarted(); + populateUploadsFailed(); + } + + public void testInvalidIncrementUploadsFailed() { + populateUploadsStarted(); + populateUploadsSucceeded(); + AssertionError error = assertThrows(AssertionError.class, () -> tracker.incrementUploadsFailed()); + assertTrue(error.getMessage().contains("Sum of failure count (")); + } + + public void testIncrementUploadsSucceeded() { + populateUploadsStarted(); + populateUploadsSucceeded(); + } + + public void testInvalidIncrementUploadsSucceeded() { + populateUploadsStarted(); + populateUploadsFailed(); + AssertionError error = assertThrows(AssertionError.class, this::populateUploadsSucceeded); + assertTrue(error.getMessage().contains("Sum of failure count (")); + } + + public void testSetUploadBytesStarted() { + populateUploadBytesStarted(); + } + + public void testSetUploadBytesFailed() { + populateUploadBytesStarted(); + assertEquals(0L, tracker.getUploadBytesFailed()); + long count1 = randomIntBetween(1, (int) tracker.getUploadBytesStarted() / 4); + tracker.addUploadBytesFailed(count1); + assertEquals(count1, tracker.getUploadBytesFailed()); + long count2 = randomIntBetween(1, (int) tracker.getUploadBytesStarted() / 4); + tracker.addUploadBytesFailed(count2); + assertEquals(count1 + count2, tracker.getUploadBytesFailed()); + } + + public void testInvalidSetUploadBytesFailed() { + populateUploadBytesStarted(); + tracker.addUploadBytesSucceeded(tracker.getUploadBytesStarted()); + AssertionError error = assertThrows(AssertionError.class, () -> tracker.addUploadBytesFailed(1L)); + assertTrue(error.getMessage().contains("Sum of failure count (")); + } + + public void testSetUploadBytesSucceeded() { + populateUploadBytesStarted(); + assertEquals(0L, tracker.getUploadBytesSucceeded()); + long count1 = randomIntBetween(1, (int) tracker.getUploadBytesStarted() / 4); + tracker.addUploadBytesSucceeded(count1); + assertEquals(count1, tracker.getUploadBytesSucceeded()); + long count2 = randomIntBetween(1, (int) tracker.getUploadBytesStarted() / 4); + tracker.addUploadBytesSucceeded(count2); + assertEquals(count1 + count2, tracker.getUploadBytesSucceeded()); + } + + public void testInvalidSetUploadBytesSucceeded() { + populateUploadBytesStarted(); + tracker.addUploadBytesFailed(tracker.getUploadBytesStarted()); + AssertionError error = assertThrows(AssertionError.class, () -> tracker.addUploadBytesSucceeded(1L)); + assertTrue(error.getMessage().contains("Sum of failure count (")); + } + + public void testAddUploadTimeInMillis() { + assertEquals(0L, tracker.getTotalUploadTimeInMillis()); + int duration1 = randomIntBetween(10, 50); + tracker.addUploadTimeInMillis(duration1); + assertEquals(duration1, tracker.getTotalUploadTimeInMillis()); + int duration2 = randomIntBetween(10, 50); + tracker.addUploadTimeInMillis(duration2); + assertEquals(duration1 + duration2, tracker.getTotalUploadTimeInMillis()); + } + + public void testSetLastUploadTimestamp() { + long lastUploadTimestamp = System.nanoTime() / 1_000_000L + randomIntBetween(10, 100); + tracker.setLastUploadTimestamp(lastUploadTimestamp); + assertEquals(lastUploadTimestamp, tracker.getLastUploadTimestamp()); + } + + public void testUpdateUploadBytesMovingAverage() { + int uploadBytesMovingAverageWindowSize = 20; + tracker = new RemoteTranslogTracker( + shardId, + uploadBytesMovingAverageWindowSize, + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + assertFalse(tracker.isUploadBytesMovingAverageReady()); + + long sum = 0; + for (int i = 1; i < uploadBytesMovingAverageWindowSize; i++) { + tracker.updateUploadBytesMovingAverage(i); + sum += i; + assertFalse(tracker.isUploadBytesMovingAverageReady()); + assertEquals((double) sum / i, tracker.getUploadBytesMovingAverage(), 0.0d); + } + + tracker.updateUploadBytesMovingAverage(uploadBytesMovingAverageWindowSize); + sum += uploadBytesMovingAverageWindowSize; + assertTrue(tracker.isUploadBytesMovingAverageReady()); + assertEquals((double) sum / uploadBytesMovingAverageWindowSize, tracker.getUploadBytesMovingAverage(), 0.0d); + + tracker.updateUploadBytesMovingAverage(100); + sum = sum + 100 - 1; + assertEquals((double) sum / uploadBytesMovingAverageWindowSize, tracker.getUploadBytesMovingAverage(), 0.0d); + } + + public void testUpdateUploadBytesPerSecMovingAverage() { + int uploadBytesPerSecMovingAverageWindowSize = 20; + tracker = new RemoteTranslogTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + uploadBytesPerSecMovingAverageWindowSize, + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + assertFalse(tracker.isUploadBytesPerSecMovingAverageReady()); + + long sum = 0; + for (int i = 1; i < uploadBytesPerSecMovingAverageWindowSize; i++) { + tracker.updateUploadBytesPerSecMovingAverage(i); + sum += i; + assertFalse(tracker.isUploadBytesPerSecMovingAverageReady()); + assertEquals((double) sum / i, tracker.getUploadBytesPerSecMovingAverage(), 0.0d); + } + + tracker.updateUploadBytesPerSecMovingAverage(uploadBytesPerSecMovingAverageWindowSize); + sum += uploadBytesPerSecMovingAverageWindowSize; + assertTrue(tracker.isUploadBytesPerSecMovingAverageReady()); + assertEquals((double) sum / uploadBytesPerSecMovingAverageWindowSize, tracker.getUploadBytesPerSecMovingAverage(), 0.0d); + + tracker.updateUploadBytesPerSecMovingAverage(100); + sum = sum + 100 - 1; + assertEquals((double) sum / uploadBytesPerSecMovingAverageWindowSize, tracker.getUploadBytesPerSecMovingAverage(), 0.0d); + } + + public void testUpdateUploadTimeMovingAverage() { + int uploadTimeMovingAverageWindowSize = 20; + tracker = new RemoteTranslogTracker( + shardId, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + uploadTimeMovingAverageWindowSize + ); + assertFalse(tracker.isUploadTimeMovingAverageReady()); + + long sum = 0; + for (int i = 1; i < uploadTimeMovingAverageWindowSize; i++) { + tracker.updateUploadTimeMovingAverage(i); + sum += i; + assertFalse(tracker.isUploadTimeMovingAverageReady()); + assertEquals((double) sum / i, tracker.getUploadTimeMovingAverage(), 0.0d); + } + + tracker.updateUploadTimeMovingAverage(uploadTimeMovingAverageWindowSize); + sum += uploadTimeMovingAverageWindowSize; + assertTrue(tracker.isUploadTimeMovingAverageReady()); + assertEquals((double) sum / uploadTimeMovingAverageWindowSize, tracker.getUploadTimeMovingAverage(), 0.0d); + + tracker.updateUploadTimeMovingAverage(100); + sum = sum + 100 - 1; + assertEquals((double) sum / uploadTimeMovingAverageWindowSize, tracker.getUploadTimeMovingAverage(), 0.0d); + } + + public void testStatsObjectCreation() { + populateDummyStats(); + RemoteTranslogTracker.Stats actualStats = tracker.stats(); + assertTrue(tracker.hasSameStatsAs(actualStats)); + } + + public void testStatsObjectCreationViaStream() throws IOException { + populateDummyStats(); + RemoteTranslogTracker.Stats expectedStats = tracker.stats(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + expectedStats.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + RemoteTranslogTracker.Stats deserializedStats = new RemoteTranslogTracker.Stats(in); + assertTrue(tracker.hasSameStatsAs(deserializedStats)); + } + } + } + + private void populateUploadsStarted() { + assertEquals(0L, tracker.getTotalUploadsStarted()); + tracker.incrementUploadsStarted(); + assertEquals(1L, tracker.getTotalUploadsStarted()); + tracker.incrementUploadsStarted(); + assertEquals(2L, tracker.getTotalUploadsStarted()); + } + + private void populateUploadsFailed() { + assertEquals(0L, tracker.getTotalUploadsFailed()); + tracker.incrementUploadsFailed(); + assertEquals(1L, tracker.getTotalUploadsFailed()); + tracker.incrementUploadsFailed(); + assertEquals(2L, tracker.getTotalUploadsFailed()); + } + + private void populateUploadsSucceeded() { + assertEquals(0L, tracker.getTotalUploadsSucceeded()); + tracker.incrementUploadsSucceeded(); + assertEquals(1L, tracker.getTotalUploadsSucceeded()); + tracker.incrementUploadsSucceeded(); + assertEquals(2L, tracker.getTotalUploadsSucceeded()); + } + + private void populateUploadBytesStarted() { + assertEquals(0L, tracker.getUploadBytesStarted()); + long count1 = randomIntBetween(500, 1000); + tracker.addUploadBytesStarted(count1); + assertEquals(count1, tracker.getUploadBytesStarted()); + long count2 = randomIntBetween(500, 1000); + tracker.addUploadBytesStarted(count2); + assertEquals(count1 + count2, tracker.getUploadBytesStarted()); + } + + private void populateDummyStats() { + tracker.setLastUploadTimestamp(System.nanoTime() / 1_000_000L + randomIntBetween(10, 100)); + tracker.incrementUploadsStarted(); + tracker.incrementUploadsStarted(); + tracker.incrementUploadsFailed(); + tracker.incrementUploadsSucceeded(); + int startedBytes = randomIntBetween(10, 100); + int failedBytes = randomIntBetween(1, startedBytes / 2); + int succeededBytes = randomIntBetween(1, startedBytes / 2); + tracker.addUploadBytesStarted(startedBytes); + tracker.addUploadBytesFailed(failedBytes); + tracker.addUploadBytesSucceeded(succeededBytes); + tracker.addUploadTimeInMillis(randomIntBetween(10, 100)); + } +} diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 7c119bfbbc573..4e05356c3630c 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -25,7 +25,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.index.engine.InternalEngineFactory; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.remote.RemoteRefreshSegmentTracker; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; @@ -51,7 +51,7 @@ public class RemoteStoreRefreshListenerTests extends IndexShardTestCase { private IndexShard indexShard; private ClusterService clusterService; private RemoteStoreRefreshListener remoteStoreRefreshListener; - private RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; + private RemoteStorePressureService remoteStorePressureService; public void setup(boolean primary, int numberOfDocs) throws IOException { indexShard = newStartedShard( @@ -75,12 +75,12 @@ public void setup(boolean primary, int numberOfDocs) throws IOException { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool ); - remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); - remoteRefreshSegmentPressureService.afterIndexShardCreated(indexShard); + remoteStorePressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); + remoteStorePressureService.afterIndexShardCreated(indexShard); remoteStoreRefreshListener = new RemoteStoreRefreshListener( indexShard, SegmentReplicationCheckpointPublisher.EMPTY, - remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()) + remoteStorePressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()) ); } @@ -247,14 +247,14 @@ public void testRefreshSuccessOnFirstAttempt() throws Exception { // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down CountDownLatch successLatch = new CountDownLatch(3); - Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( succeedOnAttempt, refreshCountLatch, successLatch ); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); - RemoteRefreshSegmentPressureService pressureService = tuple.v2(); + RemoteStorePressureService pressureService = tuple.v2(); RemoteRefreshSegmentTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); assertNoLagAndTotalUploadsFailed(segmentTracker, 0); } @@ -268,14 +268,14 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception { // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down CountDownLatch successLatch = new CountDownLatch(3); - Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( succeedOnAttempt, refreshCountLatch, successLatch ); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); - RemoteRefreshSegmentPressureService pressureService = tuple.v2(); + RemoteStorePressureService pressureService = tuple.v2(); RemoteRefreshSegmentTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); assertNoLagAndTotalUploadsFailed(segmentTracker, 1); } @@ -314,14 +314,14 @@ public void testRefreshSuccessOnThirdAttempt() throws Exception { // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down CountDownLatch successLatch = new CountDownLatch(3); - Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( succeedOnAttempt, refreshCountLatch, successLatch ); assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); - RemoteRefreshSegmentPressureService pressureService = tuple.v2(); + RemoteStorePressureService pressureService = tuple.v2(); RemoteRefreshSegmentTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); assertNoLagAndTotalUploadsFailed(segmentTracker, 2); } @@ -336,9 +336,9 @@ private void assertNoLagAndTotalUploadsFailed(RemoteRefreshSegmentTracker segmen } public void testTrackerData() throws Exception { - Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh(1); + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh(1); RemoteStoreRefreshListener listener = tuple.v1(); - RemoteRefreshSegmentPressureService pressureService = tuple.v2(); + RemoteStorePressureService pressureService = tuple.v2(); RemoteRefreshSegmentTracker tracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); assertNoLag(tracker); indexDocs(100, randomIntBetween(100, 200)); @@ -361,13 +361,12 @@ private void assertNoLag(RemoteRefreshSegmentTracker tracker) { assertEquals(0, tracker.getTotalUploadsFailed()); } - private Tuple mockIndexShardWithRetryAndScheduleRefresh( - int succeedOnAttempt - ) throws IOException { + private Tuple mockIndexShardWithRetryAndScheduleRefresh(int succeedOnAttempt) + throws IOException { return mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, null, null); } - private Tuple mockIndexShardWithRetryAndScheduleRefresh( + private Tuple mockIndexShardWithRetryAndScheduleRefresh( int succeedOnAttempt, CountDownLatch refreshCountLatch, CountDownLatch successLatch @@ -376,7 +375,7 @@ private Tuple m return mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch, 1, noOpLatch); } - private Tuple mockIndexShardWithRetryAndScheduleRefresh( + private Tuple mockIndexShardWithRetryAndScheduleRefresh( int succeedOnAttempt, CountDownLatch refreshCountLatch, CountDownLatch successLatch, @@ -463,20 +462,17 @@ private Tuple m new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool ); - RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService( - clusterService, - Settings.EMPTY - ); + RemoteStorePressureService remoteStorePressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); when(shard.indexSettings()).thenReturn(indexShard.indexSettings()); when(shard.shardId()).thenReturn(indexShard.shardId()); - remoteRefreshSegmentPressureService.afterIndexShardCreated(shard); + remoteStorePressureService.afterIndexShardCreated(shard); RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener( shard, emptyCheckpointPublisher, - remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()) + remoteStorePressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()) ); refreshListener.afterRefresh(true); - return Tuple.tuple(refreshListener, remoteRefreshSegmentPressureService); + return Tuple.tuple(refreshListener, remoteStorePressureService); } public static class TestFilterDirectory extends FilterDirectory { diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index d26379eaefa5c..8d84e58815cb2 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -41,6 +41,7 @@ import org.opensearch.env.TestEnvironment; import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.MissingHistoryOperationsException; +import org.opensearch.index.remote.RemoteTranslogTracker; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.LocalCheckpointTrackerTests; import org.opensearch.index.seqno.SequenceNumbers; @@ -171,7 +172,8 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin getPersistedSeqNoConsumer(), repository, threadPool, - primaryMode::get + primaryMode::get, + new RemoteTranslogTracker(shardId, 10, 10, 10) ); } @@ -1221,7 +1223,8 @@ public int write(ByteBuffer src) throws IOException { persistedSeqNos::add, repository, threadPool, - () -> Boolean.TRUE + () -> Boolean.TRUE, + new RemoteTranslogTracker(shardId, 10, 10, 10) ) { @Override ChannelFactory getChannelFactory() { @@ -1327,7 +1330,8 @@ public void force(boolean metaData) throws IOException { persistedSeqNos::add, repository, threadPool, - () -> Boolean.TRUE + () -> Boolean.TRUE, + new RemoteTranslogTracker(shardId, 10, 10, 10) ) { @Override ChannelFactory getChannelFactory() { diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index b7091f3f4f8a6..97a2b612ed27d 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -125,6 +125,9 @@ public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { ); assertTrue(translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { + @Override + public void beforeUpload(TransferSnapshot transferSnapshot) throws IOException {} + @Override public void onUploadComplete(TransferSnapshot transferSnapshot) { translogTransferSucceeded.incrementAndGet(); diff --git a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 65a70b9f059f7..b35463e0385f9 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -46,7 +46,7 @@ import org.opensearch.core.index.Index; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; @@ -264,7 +264,7 @@ public MockIndexShard createShard( final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, final DiscoveryNode sourceNode, - final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService + final RemoteStorePressureService remoteStorePressureService ) throws IOException { failRandomly(); RecoveryState recoveryState = new RecoveryState(shardRouting, targetNode, sourceNode); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index a121a190096b4..58bfdb2d35f1c 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -171,7 +171,7 @@ import org.opensearch.index.IndexingPressureService; import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.analysis.AnalysisRegistry; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.PrimaryReplicaSyncer; @@ -1902,7 +1902,7 @@ public void onFailure(final Exception e) { ), RetentionLeaseSyncer.EMPTY, SegmentReplicationCheckpointPublisher.EMPTY, - mock(RemoteRefreshSegmentPressureService.class) + mock(RemoteStorePressureService.class) ); Map actions = new HashMap<>(); final SystemIndices systemIndices = new SystemIndices(emptyMap()); @@ -1953,7 +1953,7 @@ public void onFailure(final Exception e) { mock(ShardStateAction.class), mock(ThreadPool.class) ), - mock(RemoteRefreshSegmentPressureService.class), + mock(RemoteStorePressureService.class), new SystemIndices(emptyMap()) ); actions.put( diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 93b9742ada0da..9d0e4da7b5294 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -93,7 +93,7 @@ import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.SourceToParse; -import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; +import org.opensearch.index.remote.RemoteStorePressureService; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -579,7 +579,7 @@ protected IndexShard newShard( clusterSettings ); - RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService = null; + RemoteStorePressureService remoteStorePressureService; if (indexSettings.isRemoteStoreEnabled()) { if (remoteStore == null) { Path remoteStorePath; @@ -591,15 +591,19 @@ protected IndexShard newShard( } remoteStore = createRemoteStore(remoteStorePath, routing, indexMetadata); } - remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService(clusterService, indexSettings.getSettings()); + remoteStorePressureService = new RemoteStorePressureService(clusterService, indexSettings.getSettings()); + } else { + remoteStorePressureService = null; } final BiFunction translogFactorySupplier = (settings, shardRouting) -> { if (settings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { + assert remoteStorePressureService != null; return new RemoteBlobStoreInternalTranslogFactory( this::createRepositoriesService, threadPool, - settings.getRemoteStoreTranslogRepository() + settings.getRemoteStoreTranslogRepository(), + remoteStorePressureService.getRemoteTranslogTracker(shardRouting.shardId()) ); } return new InternalTranslogFactory(); @@ -628,11 +632,11 @@ protected IndexShard newShard( translogFactorySupplier, checkpointPublisher, remoteStore, - remoteRefreshSegmentPressureService + remoteStorePressureService ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); - if (remoteRefreshSegmentPressureService != null) { - remoteRefreshSegmentPressureService.afterIndexShardCreated(indexShard); + if (remoteStorePressureService != null) { + remoteStorePressureService.afterIndexShardCreated(indexShard); } success = true; } finally {