Skip to content

Commit

Permalink
[Remote Store] Add Segment download stats to remotestore stats API (#…
Browse files Browse the repository at this point in the history
…8718) (#9089)

---------

Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com>
Signed-off-by: Shourya <114977491+shourya035@users.noreply.github.com>
Signed-off-by: Ashish Singh <ssashish@amazon.com>
Co-authored-by: Ashish Singh <ssashish@amazon.com>
(cherry picked from commit 77074b4)
  • Loading branch information
shourya035 committed Aug 3, 2023
1 parent 3fa9a08 commit aa59b15
Show file tree
Hide file tree
Showing 17 changed files with 1,657 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -92,7 +92,7 @@ private void validateBackpressure(
assertTrue(ex.getMessage().contains("rejected execution on primary shard"));
assertTrue(ex.getMessage().contains(breachMode));

RemoteRefreshSegmentTracker.Stats stats = stats();
RemoteSegmentTransferTracker.Stats stats = stats();
assertTrue(stats.bytesLag > 0);
assertTrue(stats.refreshTimeLagMs > 0);
assertTrue(stats.localRefreshNumber - stats.remoteRefreshNumber > 0);
Expand All @@ -102,7 +102,7 @@ private void validateBackpressure(
.setRandomControlIOExceptionRate(0d);

assertBusy(() -> {
RemoteRefreshSegmentTracker.Stats finalStats = stats();
RemoteSegmentTransferTracker.Stats finalStats = stats();
assertEquals(0, finalStats.bytesLag);
assertEquals(0, finalStats.refreshTimeLagMs);
assertEquals(0, finalStats.localRefreshNumber - finalStats.remoteRefreshNumber);
Expand All @@ -115,11 +115,11 @@ private void validateBackpressure(
deleteRepo();
}

private RemoteRefreshSegmentTracker.Stats stats() {
private RemoteSegmentTransferTracker.Stats stats() {
String shardId = "0";
RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, shardId).get();
final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId);
List<RemoteStoreStats> matches = Arrays.stream(response.getShards())
List<RemoteStoreStats> matches = Arrays.stream(response.getRemoteStoreStats())
.filter(stat -> indexShardId.equals(stat.getStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(1, matches.size());
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;

import java.io.IOException;

Expand All @@ -24,72 +25,128 @@
*/
public class RemoteStoreStats implements Writeable, ToXContentFragment {

private final RemoteRefreshSegmentTracker.Stats remoteSegmentUploadShardStats;
private final RemoteSegmentTransferTracker.Stats remoteSegmentShardStats;

public RemoteStoreStats(RemoteRefreshSegmentTracker.Stats remoteSegmentUploadShardStats) {
this.remoteSegmentUploadShardStats = remoteSegmentUploadShardStats;
private final ShardRouting shardRouting;

public RemoteStoreStats(RemoteSegmentTransferTracker.Stats remoteSegmentUploadShardStats, ShardRouting shardRouting) {
this.remoteSegmentShardStats = remoteSegmentUploadShardStats;
this.shardRouting = shardRouting;
}

public RemoteStoreStats(StreamInput in) throws IOException {
remoteSegmentUploadShardStats = in.readOptionalWriteable(RemoteRefreshSegmentTracker.Stats::new);
this.remoteSegmentShardStats = in.readOptionalWriteable(RemoteSegmentTransferTracker.Stats::new);
this.shardRouting = new ShardRouting(in);
}

public RemoteSegmentTransferTracker.Stats getStats() {
return remoteSegmentShardStats;
}

public RemoteRefreshSegmentTracker.Stats getStats() {
return remoteSegmentUploadShardStats;
public ShardRouting getShardRouting() {
return shardRouting;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject()
.field(Fields.SHARD_ID, remoteSegmentUploadShardStats.shardId)
.field(Fields.LOCAL_REFRESH_TIMESTAMP, remoteSegmentUploadShardStats.localRefreshClockTimeMs)
.field(Fields.REMOTE_REFRESH_TIMESTAMP, remoteSegmentUploadShardStats.remoteRefreshClockTimeMs)
.field(Fields.REFRESH_TIME_LAG_IN_MILLIS, remoteSegmentUploadShardStats.refreshTimeLagMs)
.field(Fields.REFRESH_LAG, remoteSegmentUploadShardStats.localRefreshNumber - remoteSegmentUploadShardStats.remoteRefreshNumber)
.field(Fields.BYTES_LAG, remoteSegmentUploadShardStats.bytesLag)

.field(Fields.BACKPRESSURE_REJECTION_COUNT, remoteSegmentUploadShardStats.rejectionCount)
.field(Fields.CONSECUTIVE_FAILURE_COUNT, remoteSegmentUploadShardStats.consecutiveFailuresCount);

builder.startObject(Fields.TOTAL_REMOTE_REFRESH);
builder.field(SubFields.STARTED, remoteSegmentUploadShardStats.totalUploadsStarted)
.field(SubFields.SUCCEEDED, remoteSegmentUploadShardStats.totalUploadsSucceeded)
.field(SubFields.FAILED, remoteSegmentUploadShardStats.totalUploadsFailed);
builder.startObject();
buildShardRouting(builder);
builder.startObject(Fields.SEGMENT);
builder.startObject(SubFields.DOWNLOAD);
// Ensuring that we are not showing 0 metrics to the user
if (remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesStarted != 0) {
buildDownloadStats(builder);
}
builder.endObject();
builder.startObject(SubFields.UPLOAD);
// Ensuring that we are not showing 0 metrics to the user
if (remoteSegmentShardStats.totalUploadsStarted != 0) {
buildUploadStats(builder);
}
builder.endObject();

builder.startObject(Fields.TOTAL_UPLOADS_IN_BYTES);
builder.field(SubFields.STARTED, remoteSegmentUploadShardStats.uploadBytesStarted)
.field(SubFields.SUCCEEDED, remoteSegmentUploadShardStats.uploadBytesSucceeded)
.field(SubFields.FAILED, remoteSegmentUploadShardStats.uploadBytesFailed);
builder.endObject();
return builder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(remoteSegmentShardStats);
shardRouting.writeTo(out);
}

builder.startObject(Fields.REMOTE_REFRESH_SIZE_IN_BYTES);
builder.field(SubFields.LAST_SUCCESSFUL, remoteSegmentUploadShardStats.lastSuccessfulRemoteRefreshBytes);
builder.field(SubFields.MOVING_AVG, remoteSegmentUploadShardStats.uploadBytesMovingAverage);
private void buildUploadStats(XContentBuilder builder) throws IOException {
builder.field(UploadStatsFields.LOCAL_REFRESH_TIMESTAMP, remoteSegmentShardStats.localRefreshClockTimeMs)
.field(UploadStatsFields.REMOTE_REFRESH_TIMESTAMP, remoteSegmentShardStats.remoteRefreshClockTimeMs)
.field(UploadStatsFields.REFRESH_TIME_LAG_IN_MILLIS, remoteSegmentShardStats.refreshTimeLagMs)
.field(UploadStatsFields.REFRESH_LAG, remoteSegmentShardStats.localRefreshNumber - remoteSegmentShardStats.remoteRefreshNumber)
.field(UploadStatsFields.BYTES_LAG, remoteSegmentShardStats.bytesLag)
.field(UploadStatsFields.BACKPRESSURE_REJECTION_COUNT, remoteSegmentShardStats.rejectionCount)
.field(UploadStatsFields.CONSECUTIVE_FAILURE_COUNT, remoteSegmentShardStats.consecutiveFailuresCount);
builder.startObject(UploadStatsFields.TOTAL_SYNCS_TO_REMOTE)
.field(SubFields.STARTED, remoteSegmentShardStats.totalUploadsStarted)
.field(SubFields.SUCCEEDED, remoteSegmentShardStats.totalUploadsSucceeded)
.field(SubFields.FAILED, remoteSegmentShardStats.totalUploadsFailed);
builder.endObject();
builder.startObject(UploadStatsFields.TOTAL_UPLOADS_IN_BYTES)
.field(SubFields.STARTED, remoteSegmentShardStats.uploadBytesStarted)
.field(SubFields.SUCCEEDED, remoteSegmentShardStats.uploadBytesSucceeded)
.field(SubFields.FAILED, remoteSegmentShardStats.uploadBytesFailed);
builder.endObject();
builder.startObject(UploadStatsFields.REMOTE_REFRESH_SIZE_IN_BYTES)
.field(SubFields.LAST_SUCCESSFUL, remoteSegmentShardStats.lastSuccessfulRemoteRefreshBytes)
.field(SubFields.MOVING_AVG, remoteSegmentShardStats.uploadBytesMovingAverage);
builder.endObject();
builder.startObject(UploadStatsFields.UPLOAD_LATENCY_IN_BYTES_PER_SEC)
.field(SubFields.MOVING_AVG, remoteSegmentShardStats.uploadBytesPerSecMovingAverage);
builder.endObject();
builder.startObject(UploadStatsFields.REMOTE_REFRESH_LATENCY_IN_MILLIS)
.field(SubFields.MOVING_AVG, remoteSegmentShardStats.uploadTimeMovingAverage);
builder.endObject();
}

builder.startObject(Fields.UPLOAD_LATENCY_IN_BYTES_PER_SEC);
builder.field(SubFields.MOVING_AVG, remoteSegmentUploadShardStats.uploadBytesPerSecMovingAverage);
private void buildDownloadStats(XContentBuilder builder) throws IOException {
builder.field(
DownloadStatsFields.LAST_SYNC_TIMESTAMP,
remoteSegmentShardStats.directoryFileTransferTrackerStats.lastTransferTimestampMs
);
builder.startObject(DownloadStatsFields.TOTAL_DOWNLOADS_IN_BYTES)
.field(SubFields.STARTED, remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesStarted)
.field(SubFields.SUCCEEDED, remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesSucceeded)
.field(SubFields.FAILED, remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesFailed);
builder.endObject();
builder.startObject(Fields.REMOTE_REFRESH_LATENCY_IN_MILLIS);
builder.field(SubFields.MOVING_AVG, remoteSegmentUploadShardStats.uploadTimeMovingAverage);
builder.startObject(DownloadStatsFields.DOWNLOAD_SIZE_IN_BYTES)
.field(SubFields.LAST_SUCCESSFUL, remoteSegmentShardStats.directoryFileTransferTrackerStats.lastSuccessfulTransferInBytes)
.field(SubFields.MOVING_AVG, remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesMovingAverage);
builder.endObject();
builder.startObject(DownloadStatsFields.DOWNLOAD_SPEED_IN_BYTES_PER_SEC)
.field(SubFields.MOVING_AVG, remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage);
builder.endObject();
}

return builder;
private void buildShardRouting(XContentBuilder builder) throws IOException {
builder.startObject(Fields.ROUTING);
builder.field(RoutingFields.STATE, shardRouting.state());
builder.field(RoutingFields.PRIMARY, shardRouting.primary());
builder.field(RoutingFields.NODE_ID, shardRouting.currentNodeId());
builder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(remoteSegmentUploadShardStats);
static final class Fields {
static final String ROUTING = "routing";
static final String SEGMENT = "segment";
static final String TRANSLOG = "translog";
}

static final class RoutingFields {
static final String STATE = "state";
static final String PRIMARY = "primary";
static final String NODE_ID = "node";
}

/**
* Fields for remote store stats response
*/
static final class Fields {
static final String SHARD_ID = "shard_id";

static final class UploadStatsFields {
/**
* Lag in terms of bytes b/w local and remote store
*/
Expand Down Expand Up @@ -128,7 +185,7 @@ static final class Fields {
/**
* Represents the number of remote refreshes
*/
static final String TOTAL_REMOTE_REFRESH = "total_remote_refresh";
static final String TOTAL_SYNCS_TO_REMOTE = "total_syncs_to_remote";

/**
* Represents the total uploads to remote store in bytes
Expand All @@ -151,21 +208,46 @@ static final class Fields {
static final String REMOTE_REFRESH_LATENCY_IN_MILLIS = "remote_refresh_latency_in_millis";
}

static final class DownloadStatsFields {
/**
* Last successful sync from remote in milliseconds
*/
static final String LAST_SYNC_TIMESTAMP = "last_sync_timestamp";

/**
* Total bytes of segment files downloaded from the remote store for a specific shard
*/
static final String TOTAL_DOWNLOADS_IN_BYTES = "total_downloads_in_bytes";

/**
* Size of each segment file downloaded from the remote store
*/
static final String DOWNLOAD_SIZE_IN_BYTES = "download_size_in_bytes";

/**
* Speed (in bytes/sec) for segment file downloads
*/
static final String DOWNLOAD_SPEED_IN_BYTES_PER_SEC = "download_speed_in_bytes_per_sec";
}

/**
* Reusable sub fields for {@link Fields}
* Reusable sub fields for {@link UploadStatsFields} and {@link DownloadStatsFields}
*/
static final class SubFields {
static final String STARTED = "started";
static final String SUCCEEDED = "succeeded";
static final String FAILED = "failed";

static final String DOWNLOAD = "download";
static final String UPLOAD = "upload";

/**
* Moving avg over last N values stat for a {@link Fields}
* Moving avg over last N values stat
*/
static final String MOVING_AVG = "moving_avg";

/**
* Most recent successful attempt stat for a {@link Fields}
* Most recent successful attempt stat
*/
static final String LAST_SUCCESSFUL = "last_successful";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Remote Store stats response
Expand All @@ -26,49 +29,71 @@
*/
public class RemoteStoreStatsResponse extends BroadcastResponse {

private final RemoteStoreStats[] shards;
private final RemoteStoreStats[] remoteStoreStats;

public RemoteStoreStatsResponse(StreamInput in) throws IOException {
super(in);
shards = in.readArray(RemoteStoreStats::new, RemoteStoreStats[]::new);
remoteStoreStats = in.readArray(RemoteStoreStats::new, RemoteStoreStats[]::new);
}

public RemoteStoreStatsResponse(
RemoteStoreStats[] shards,
RemoteStoreStats[] remoteStoreStats,
int totalShards,
int successfulShards,
int failedShards,
List<DefaultShardOperationFailedException> shardFailures
) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.shards = shards;
this.remoteStoreStats = remoteStoreStats;
}

public RemoteStoreStats[] getShards() {
return this.shards;
public RemoteStoreStats[] getRemoteStoreStats() {
return this.remoteStoreStats;
}

public RemoteStoreStats getAt(int position) {
return shards[position];
public Map<String, Map<Integer, List<RemoteStoreStats>>> groupByIndexAndShards() {
Map<String, Map<Integer, List<RemoteStoreStats>>> indexWiseStats = new HashMap<>();
for (RemoteStoreStats shardStat : remoteStoreStats) {
indexWiseStats.computeIfAbsent(shardStat.getShardRouting().getIndexName(), k -> new HashMap<>())
.computeIfAbsent(shardStat.getShardRouting().getId(), k -> new ArrayList<>())
.add(shardStat);
}
return indexWiseStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeArray(shards);
out.writeArray(remoteStoreStats);
}

@Override
protected void addCustomXContentFields(XContentBuilder builder, Params params) throws IOException {
builder.startArray("stats");
for (RemoteStoreStats shard : shards) {
shard.toXContent(builder, params);
Map<String, Map<Integer, List<RemoteStoreStats>>> indexWiseStats = groupByIndexAndShards();
builder.startObject(Fields.INDICES);
for (String indexName : indexWiseStats.keySet()) {
builder.startObject(indexName);
builder.startObject(Fields.SHARDS);
for (int shardId : indexWiseStats.get(indexName).keySet()) {
builder.startArray(Integer.toString(shardId));
for (RemoteStoreStats shardStat : indexWiseStats.get(indexName).get(shardId)) {
shardStat.toXContent(builder, params);
}
builder.endArray();
}
builder.endObject();
builder.endObject();
}
builder.endArray();
builder.endObject();
}

@Override
public String toString() {
return Strings.toString(XContentType.JSON, this, true, false);
}

static final class Fields {
static final String SHARDS = "shards";
static final String INDICES = "indices";
}
}
Loading

0 comments on commit aa59b15

Please sign in to comment.