Skip to content

Commit

Permalink
Add RTS stats tracker
Browse files Browse the repository at this point in the history
Signed-off-by: Bhumika Saini <sabhumik@amazon.com>
  • Loading branch information
Bhumika Saini committed Jul 27, 2023
1 parent c25c175 commit 794e497
Show file tree
Hide file tree
Showing 38 changed files with 1,441 additions and 276 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT;
import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;
import static org.opensearch.index.remote.RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT;
import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreBackpressureIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {
Expand Down Expand Up @@ -120,10 +120,10 @@ private RemoteRefreshSegmentTracker.Stats stats() {
RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, shardId).get();
final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId);
List<RemoteStoreStats> matches = Arrays.stream(response.getShards())
.filter(stat -> indexShardId.equals(stat.getStats().shardId.toString()))
.filter(stat -> indexShardId.equals(stat.getSegmentStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(1, matches.size());
return matches.get(0).getStats();
return matches.get(0).getSegmentStats();
}

private void indexDocAndRefresh(BytesReference source, int iterations) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;
import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ public void testStatsResponseFromAllNodes() {
assertTrue(response.getShards() != null && response.getShards().length != 0);
final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId);
List<RemoteStoreStats> matches = Arrays.stream(response.getShards())
.filter(stat -> indexShardId.equals(stat.getStats().shardId.toString()))
.filter(stat -> indexShardId.equals(stat.getSegmentStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(1, matches.size());
RemoteRefreshSegmentTracker.Stats stats = matches.get(0).getStats();
RemoteRefreshSegmentTracker.Stats stats = matches.get(0).getSegmentStats();
assertResponseStats(stats);
}
}
Expand All @@ -81,7 +81,7 @@ public void testStatsResponseAllShards() {
RemoteStoreStatsResponse response = remoteStoreStatsRequestBuilder.get();
assertTrue(response.getSuccessfulShards() == 3);
assertTrue(response.getShards() != null && response.getShards().length == 3);
RemoteRefreshSegmentTracker.Stats stats = response.getShards()[0].getStats();
RemoteRefreshSegmentTracker.Stats stats = response.getShards()[0].getSegmentStats();
assertResponseStats(stats);
}

Expand All @@ -107,7 +107,7 @@ public void testStatsResponseFromLocalNode() {
RemoteStoreStatsResponse response = remoteStoreStatsRequestBuilder.get();
assertTrue(response.getSuccessfulShards() == 1);
assertTrue(response.getShards() != null && response.getShards().length == 1);
RemoteRefreshSegmentTracker.Stats stats = response.getShards()[0].getStats();
RemoteRefreshSegmentTracker.Stats stats = response.getShards()[0].getSegmentStats();
assertResponseStats(stats);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
import org.opensearch.index.remote.RemoteTranslogTracker;

import java.io.IOException;

Expand All @@ -25,19 +26,29 @@
public class RemoteStoreStats implements Writeable, ToXContentFragment {

private final RemoteRefreshSegmentTracker.Stats remoteSegmentUploadShardStats;
private final RemoteTranslogTracker.Stats remoteTranslogShardStats;

public RemoteStoreStats(RemoteRefreshSegmentTracker.Stats remoteSegmentUploadShardStats) {
public RemoteStoreStats(
RemoteRefreshSegmentTracker.Stats remoteSegmentUploadShardStats,
RemoteTranslogTracker.Stats remoteTranslogShardStats
) {
this.remoteSegmentUploadShardStats = remoteSegmentUploadShardStats;
this.remoteTranslogShardStats = remoteTranslogShardStats;
}

public RemoteStoreStats(StreamInput in) throws IOException {
remoteSegmentUploadShardStats = in.readOptionalWriteable(RemoteRefreshSegmentTracker.Stats::new);
remoteTranslogShardStats = in.readOptionalWriteable(RemoteTranslogTracker.Stats::new);
}

public RemoteRefreshSegmentTracker.Stats getStats() {
public RemoteRefreshSegmentTracker.Stats getSegmentStats() {
return remoteSegmentUploadShardStats;
}

public RemoteTranslogTracker.Stats getTranslogStats() {
return remoteTranslogShardStats;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject()
Expand Down Expand Up @@ -74,6 +85,40 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject(Fields.REMOTE_REFRESH_LATENCY_IN_MILLIS);
builder.field(SubFields.MOVING_AVG, remoteSegmentUploadShardStats.uploadTimeMovingAverage);
builder.endObject();

builder.startObject("translog");
builder.startObject("upload");
builder.field(Fields.LAST_UPLOAD_TIMESTAMP, remoteTranslogShardStats.lastUploadTimestamp);

builder.startObject(Fields.TOTAL_UPLOADS);
builder.field(SubFields.STARTED, remoteTranslogShardStats.totalUploadsStarted)
.field(SubFields.FAILED, remoteTranslogShardStats.totalUploadsFailed)
.field(SubFields.SUCCEEDED, remoteTranslogShardStats.totalUploadsSucceeded);
builder.endObject();

builder.startObject(Fields.TOTAL_UPLOADS_IN_BYTES);
builder.field(SubFields.STARTED, remoteTranslogShardStats.uploadBytesStarted)
.field(SubFields.FAILED, remoteTranslogShardStats.uploadBytesFailed)
.field(SubFields.SUCCEEDED, remoteTranslogShardStats.uploadBytesSucceeded);
builder.endObject();

builder.field(Fields.TOTAL_UPLOAD_TIME_IN_MILLIS, remoteTranslogShardStats.totalUploadTimeInMillis);

builder.startObject(Fields.UPLOAD_BYTES);
builder.field(SubFields.MOVING_AVG, remoteTranslogShardStats.uploadBytesMovingAverage);
builder.endObject();

builder.startObject(Fields.UPLOAD_LATENCY_IN_BYTES_PER_SEC);
builder.field(SubFields.MOVING_AVG, remoteTranslogShardStats.uploadBytesPerSecMovingAverage);
builder.endObject();

builder.startObject(Fields.UPLOAD_TIME_IN_MILLIS);
builder.field(SubFields.MOVING_AVG, remoteTranslogShardStats.uploadTimeMovingAverage);
builder.endObject();

builder.endObject(); // translog.upload
builder.endObject(); // translog

builder.endObject();

return builder;
Expand All @@ -82,6 +127,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(remoteSegmentUploadShardStats);
out.writeOptionalWriteable(remoteTranslogShardStats);
}

/**
Expand Down Expand Up @@ -149,6 +195,31 @@ static final class Fields {
* Time taken by a single remote refresh
*/
static final String REMOTE_REFRESH_LATENCY_IN_MILLIS = "remote_refresh_latency_in_millis";

/**
*
*/
static final String LAST_UPLOAD_TIMESTAMP = "last_upload_timestamp";

/**
*
*/
static final String TOTAL_UPLOADS = "total_uploads";

/**
*
*/
static final String TOTAL_UPLOAD_TIME_IN_MILLIS = "total_upload_time_in_millis";

/**
*
*/
static final String UPLOAD_BYTES = "upload_bytes";

/**
*
*/
static final String UPLOAD_TIME_IN_MILLIS = "upload_time_in_millis";
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.IndexService;
import org.opensearch.index.remote.RemoteRefreshSegmentPressureService;
import org.opensearch.index.remote.RemoteStorePressureService;
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
import org.opensearch.index.remote.RemoteTranslogTracker;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardNotFoundException;
import org.opensearch.indices.IndicesService;
Expand All @@ -49,7 +50,7 @@ public class TransportRemoteStoreStatsAction extends TransportBroadcastByNodeAct
RemoteStoreStats> {

private final IndicesService indicesService;
private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService;
private final RemoteStorePressureService remoteStorePressureService;

@Inject
public TransportRemoteStoreStatsAction(
Expand All @@ -58,7 +59,7 @@ public TransportRemoteStoreStatsAction(
IndicesService indicesService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService
RemoteStorePressureService remoteStorePressureService
) {
super(
RemoteStoreStatsAction.NAME,
Expand All @@ -70,7 +71,7 @@ public TransportRemoteStoreStatsAction(
ThreadPool.Names.MANAGEMENT
);
this.indicesService = indicesService;
this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService;
this.remoteStorePressureService = remoteStorePressureService;
}

/**
Expand Down Expand Up @@ -153,11 +154,14 @@ protected RemoteStoreStats shardOperation(RemoteStoreStatsRequest request, Shard
throw new ShardNotFoundException(indexShard.shardId());
}

RemoteRefreshSegmentTracker remoteRefreshSegmentTracker = remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(
RemoteRefreshSegmentTracker remoteRefreshSegmentTracker = remoteStorePressureService.getRemoteRefreshSegmentTracker(
indexShard.shardId()
);
assert Objects.nonNull(remoteRefreshSegmentTracker);

return new RemoteStoreStats(remoteRefreshSegmentTracker.stats());
RemoteTranslogTracker remoteTranslogTracker = remoteStorePressureService.getRemoteTranslogTracker(indexShard.shardId());
assert Objects.nonNull(remoteTranslogTracker);

return new RemoteStoreStats(remoteRefreshSegmentTracker.stats(), remoteTranslogTracker.stats());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
import org.opensearch.index.mapper.MapperException;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.SourceToParse;
import org.opensearch.index.remote.RemoteRefreshSegmentPressureService;
import org.opensearch.index.remote.RemoteStorePressureService;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.core.index.shard.ShardId;
Expand Down Expand Up @@ -137,7 +137,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
private final UpdateHelper updateHelper;
private final MappingUpdatedAction mappingUpdatedAction;
private final SegmentReplicationPressureService segmentReplicationPressureService;
private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService;
private final RemoteStorePressureService remoteStorePressureService;

/**
* This action is used for performing primary term validation. With remote translog enabled, the translogs would
Expand All @@ -161,7 +161,7 @@ public TransportShardBulkAction(
ActionFilters actionFilters,
IndexingPressureService indexingPressureService,
SegmentReplicationPressureService segmentReplicationPressureService,
RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService,
RemoteStorePressureService remoteStorePressureService,
SystemIndices systemIndices
) {
super(
Expand All @@ -183,7 +183,7 @@ public TransportShardBulkAction(
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
this.segmentReplicationPressureService = segmentReplicationPressureService;
this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService;
this.remoteStorePressureService = remoteStorePressureService;

this.transportPrimaryTermValidationAction = ACTION_NAME + "[validate_primary_term]";

Expand Down Expand Up @@ -539,9 +539,8 @@ protected Releasable checkPrimaryLimits(BulkShardRequest request, boolean rerout
}
// TODO - While removing remote store flag, this can be encapsulated to single class with common interface for backpressure
// service
if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)
&& remoteRefreshSegmentPressureService.isSegmentsUploadBackpressureEnabled()) {
remoteRefreshSegmentPressureService.validateSegmentsUploadLag(request.shardId());
if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE) && remoteStorePressureService.isSegmentsUploadBackpressureEnabled()) {
remoteStorePressureService.validateSegmentsUploadLag(request.shardId());
}
}
return super.checkPrimaryLimits(request, rerouteWasLocal, localRerouteInitiatedByNodeClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexingPressure;
import org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings;
import org.opensearch.index.remote.RemoteStorePressureSettings;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.ShardIndexingPressureMemoryManager;
import org.opensearch.index.ShardIndexingPressureSettings;
Expand Down Expand Up @@ -645,13 +645,13 @@ public void apply(Settings value, Settings current, Settings previous) {
Node.NODE_SEARCH_CACHE_SIZE_SETTING,

// Settings related to Remote Refresh Segment Pressure
RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED,
RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_FACTOR,
RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR,
RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT,
RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE,
RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE,
RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE,
RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED,
RemoteStorePressureSettings.BYTES_LAG_VARIANCE_FACTOR,
RemoteStorePressureSettings.UPLOAD_TIME_LAG_VARIANCE_FACTOR,
RemoteStorePressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT,
RemoteStorePressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE,
RemoteStorePressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE,
RemoteStorePressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE,

// Related to monitoring of task cancellation
TaskCancellationMonitoringSettings.IS_ENABLED_SETTING,
Expand Down
6 changes: 3 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.query.SearchIndexNameMatcher;
import org.opensearch.index.remote.RemoteRefreshSegmentPressureService;
import org.opensearch.index.remote.RemoteStorePressureService;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -440,7 +440,7 @@ public synchronized IndexShard createShard(
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService
final RemoteStorePressureService remoteStorePressureService
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down Expand Up @@ -509,7 +509,7 @@ public synchronized IndexShard createShard(
translogFactorySupplier,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore,
remoteRefreshSegmentPressureService
remoteStorePressureService
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,30 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeDouble(uploadTimeMovingAverage);
out.writeLong(bytesLag);
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
Stats other = (Stats) obj;

return this.shardId.toString().equals(other.shardId.toString())
&& this.refreshTimeLagMs == other.refreshTimeLagMs
&& this.localRefreshNumber == other.localRefreshNumber
&& this.remoteRefreshNumber == other.remoteRefreshNumber
&& this.uploadBytesStarted == other.uploadBytesStarted
&& this.uploadBytesSucceeded == other.uploadBytesSucceeded
&& this.uploadBytesFailed == other.uploadBytesFailed
&& this.totalUploadsStarted == other.totalUploadsStarted
&& this.totalUploadsFailed == other.totalUploadsFailed
&& this.totalUploadsSucceeded == other.totalUploadsSucceeded
&& this.rejectionCount == other.rejectionCount
&& this.consecutiveFailuresCount == other.consecutiveFailuresCount
&& Double.compare(this.uploadBytesMovingAverage, other.uploadBytesMovingAverage) == 0
&& Double.compare(this.uploadBytesPerSecMovingAverage, other.uploadBytesPerSecMovingAverage) == 0
&& Double.compare(this.uploadTimeMovingAverage, other.uploadTimeMovingAverage) == 0
&& this.bytesLag == other.bytesLag;
}
}

}
Loading

0 comments on commit 794e497

Please sign in to comment.