Skip to content

Commit

Permalink
Repository stats for remote store
Browse files Browse the repository at this point in the history
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
  • Loading branch information
Bukhtawar committed Oct 11, 2023
1 parent 8bb11a6 commit c9688ee
Show file tree
Hide file tree
Showing 23 changed files with 304 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.repositories.s3;

import org.opensearch.core.common.util.CollectionUtils;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.StorageClass;

Expand All @@ -47,6 +48,8 @@
import org.opensearch.repositories.s3.async.AsyncTransferManager;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

Expand Down Expand Up @@ -170,6 +173,16 @@ public Map<String, Long> stats() {
return statsMetricPublisher.getStats().toMap();
}

@Override
public Map<Metric, Map<String, Long>> extendedStats() {
if (statsMetricPublisher.getExtendedStats() == null || statsMetricPublisher.getExtendedStats().isEmpty()) {
return Collections.emptyMap();
}
Map<Metric, Map<String, Long>> extendedStats = new HashMap<>();
statsMetricPublisher.getExtendedStats().forEach((k, v) -> extendedStats.put(k, v.toMap()));
return extendedStats;
}

public ObjectCannedACL getCannedACL() {
return cannedACL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@

package org.opensearch.repositories.s3;

import software.amazon.awssdk.http.HttpMetric;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.blobstore.BlobStore;
import software.amazon.awssdk.metrics.MetricCollection;
import software.amazon.awssdk.metrics.MetricPublisher;
import software.amazon.awssdk.metrics.MetricRecord;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -20,18 +24,36 @@ public class StatsMetricPublisher {

private final Stats stats = new Stats();

private final Map<BlobStore.Metric, Stats> extendedStats = new HashMap<>() {{
put(BlobStore.Metric.REQUEST_LATENCY, new Stats());
put(BlobStore.Metric.REQUEST_SUCCESS, new Stats());
put(BlobStore.Metric.REQUEST_FAILURE, new Stats());
put(BlobStore.Metric.RETRY_COUNT, new Stats());
}};

private static final Logger LOG = LogManager.getLogger(StatsMetricPublisher.class);

public MetricPublisher listObjectsMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
stats.listCount.addAndGet(
metricCollection.children()
.stream()
.filter(
metricRecords -> metricRecords.name().equals("ApiCallAttempt")
&& !metricRecords.metricValues(HttpMetric.HTTP_STATUS_CODE).isEmpty()
)
.count()
);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
extendedStats.get(BlobStore.Metric.REQUEST_LATENCY).listMetrics.addAndGet(((Duration) metricRecord.value()).toMillis());
break;
case "RetryCount":
extendedStats.get(BlobStore.Metric.RETRY_COUNT).listMetrics.addAndGet(((Integer) metricRecord.value()));
break;
case "ApiCallSuccessful":
if((Boolean) metricRecord.value()) {
extendedStats.get(BlobStore.Metric.REQUEST_SUCCESS).listMetrics.addAndGet(1);
} else {
extendedStats.get(BlobStore.Metric.REQUEST_FAILURE).listMetrics.addAndGet(1);
}
stats.listMetrics.addAndGet(1);
break;
}
}
}

@Override
Expand All @@ -41,15 +63,24 @@ public void close() {}
public MetricPublisher getObjectMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
stats.getCount.addAndGet(
metricCollection.children()
.stream()
.filter(
metricRecords -> metricRecords.name().equals("ApiCallAttempt")
&& !metricRecords.metricValues(HttpMetric.HTTP_STATUS_CODE).isEmpty()
)
.count()
);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
extendedStats.get(BlobStore.Metric.REQUEST_LATENCY).getMetrics.addAndGet(((Duration) metricRecord.value()).toMillis());
break;
case "RetryCount":
extendedStats.get(BlobStore.Metric.RETRY_COUNT).getMetrics.addAndGet(((Integer) metricRecord.value()));
break;
case "ApiCallSuccessful":
if((Boolean) metricRecord.value()) {
extendedStats.get(BlobStore.Metric.REQUEST_SUCCESS).getMetrics.addAndGet(1);
} else {
extendedStats.get(BlobStore.Metric.REQUEST_FAILURE).getMetrics.addAndGet(1);
}
stats.getMetrics.addAndGet(1);
break;
}
}
}

@Override
Expand All @@ -59,15 +90,24 @@ public void close() {}
public MetricPublisher putObjectMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
stats.putCount.addAndGet(
metricCollection.children()
.stream()
.filter(
metricRecords -> metricRecords.name().equals("ApiCallAttempt")
&& !metricRecords.metricValues(HttpMetric.HTTP_STATUS_CODE).isEmpty()
)
.count()
);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
extendedStats.get(BlobStore.Metric.REQUEST_LATENCY).putMetrics.addAndGet(((Duration) metricRecord.value()).toMillis());
break;
case "RetryCount":
extendedStats.get(BlobStore.Metric.RETRY_COUNT).putMetrics.addAndGet(((Integer) metricRecord.value()));
break;
case "ApiCallSuccessful":
if((Boolean) metricRecord.value()) {
extendedStats.get(BlobStore.Metric.REQUEST_SUCCESS).putMetrics.addAndGet(1);
} else {
extendedStats.get(BlobStore.Metric.REQUEST_FAILURE).putMetrics.addAndGet(1);
}
stats.putMetrics.addAndGet(1);
break;
}
}
}

@Override
Expand All @@ -77,15 +117,24 @@ public void close() {}
public MetricPublisher multipartUploadMetricCollector = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
stats.postCount.addAndGet(
metricCollection.children()
.stream()
.filter(
metricRecords -> metricRecords.name().equals("ApiCallAttempt")
&& !metricRecords.metricValues(HttpMetric.HTTP_STATUS_CODE).isEmpty()
)
.count()
);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
extendedStats.get(BlobStore.Metric.REQUEST_LATENCY).multiPartPutMetrics.addAndGet(((Duration) metricRecord.value()).toMillis());
break;
case "RetryCount":
extendedStats.get(BlobStore.Metric.RETRY_COUNT).multiPartPutMetrics.addAndGet(((Integer) metricRecord.value()));
break;
case "ApiCallSuccessful":
if((Boolean) metricRecord.value()) {
extendedStats.get(BlobStore.Metric.REQUEST_SUCCESS).multiPartPutMetrics.addAndGet(1);
} else {
extendedStats.get(BlobStore.Metric.REQUEST_FAILURE).multiPartPutMetrics.addAndGet(1);
}
stats.multiPartPutMetrics.addAndGet(1);
break;
}
}
}

@Override
Expand All @@ -96,22 +145,27 @@ public Stats getStats() {
return stats;
}

public Map<BlobStore.Metric, Stats> getExtendedStats() {
return extendedStats;
}

static class Stats {

final AtomicLong listCount = new AtomicLong();
final AtomicLong listMetrics = new AtomicLong();

final AtomicLong getMetrics = new AtomicLong();

final AtomicLong getCount = new AtomicLong();
final AtomicLong putMetrics = new AtomicLong();

final AtomicLong putCount = new AtomicLong();
final AtomicLong multiPartPutMetrics = new AtomicLong();

final AtomicLong postCount = new AtomicLong();

Map<String, Long> toMap() {
final Map<String, Long> results = new HashMap<>();
results.put("GetObject", getCount.get());
results.put("ListObjects", listCount.get());
results.put("PutObject", putCount.get());
results.put("PutMultipartObject", postCount.get());
results.put("GetObject", getMetrics.get());
results.put("ListObjects", listMetrics.get());
results.put("PutObject", putMetrics.get());
results.put("PutMultipartObject", multiPartPutMetrics.get());
return results;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.monitor.os.OsStats;
import org.opensearch.monitor.process.ProcessStats;
import org.opensearch.node.AdaptiveSelectionStats;
import org.opensearch.repositories.RepositoriesStats;
import org.opensearch.script.ScriptCacheStats;
import org.opensearch.script.ScriptStats;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
Expand Down Expand Up @@ -142,6 +143,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private SearchPipelineStats searchPipelineStats;

@Nullable
private RepositoriesStats repositoriesStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -224,7 +228,8 @@ public NodeStats(
@Nullable WeightedRoutingStats weightedRoutingStats,
@Nullable FileCacheStats fileCacheStats,
@Nullable TaskCancellationStats taskCancellationStats,
@Nullable SearchPipelineStats searchPipelineStats
@Nullable SearchPipelineStats searchPipelineStats,
@Nullable RepositoriesStats repositoriesStats
) {
super(node);
this.timestamp = timestamp;
Expand All @@ -250,6 +255,7 @@ public NodeStats(
this.fileCacheStats = fileCacheStats;
this.taskCancellationStats = taskCancellationStats;
this.searchPipelineStats = searchPipelineStats;
this.repositoriesStats = repositoriesStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -387,6 +393,11 @@ public SearchPipelineStats getSearchPipelineStats() {
return searchPipelineStats;
}

@Nullable
public RepositoriesStats getRepositoriesStats() {
return repositoriesStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -430,6 +441,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
out.writeOptionalWriteable(searchPipelineStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(repositoriesStats);
}
}

@Override
Expand Down Expand Up @@ -520,6 +534,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getSearchPipelineStats() != null) {
getSearchPipelineStats().toXContent(builder, params);
}
if (getRepositoriesStats() != null) {
getRepositoriesStats().toXContent(builder, params);
}

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ public enum Metric {
WEIGHTED_ROUTING_STATS("weighted_routing"),
FILE_CACHE_STATS("file_cache"),
TASK_CANCELLATION("task_cancellation"),
SEARCH_PIPELINE("search_pipeline");
SEARCH_PIPELINE("search_pipeline"),
REPOSITORIES("repositories");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics),
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics)
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics),
NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,46 @@ public interface BlobStore extends Closeable {
*/
BlobContainer blobContainer(BlobPath path);

/**
* Returns statistics on the count of operations that have been performed on this blob store
*/
/**
* Returns statistics on the count of operations that have been performed on this blob store
*/
default Map<String, Long> stats() {
return Collections.emptyMap();
}

/**
* Returns details statistics of operations that have been performed on this blob store
*/
default Map<Metric, Map<String, Long>> extendedStats() {
return Collections.emptyMap();
}

/**
* Reload the blob store inplace
*/
default void reload(RepositoryMetadata repositoryMetadata) {}

/**
* Metrics for BlobStore interactions
*/
enum Metric {
REQUEST_SUCCESS("request_success_total"),
REQUEST_FAILURE("request_failures_total"),
REQUEST_LATENCY("request_time_in_millis"),
RETRY_COUNT("request_retry_count_total");

private String metricName;

Metric(String name) {
this.metricName = name;
}

public String metricName() {
return this.metricName;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ public Map<String, Long> stats() {
return blobStore.stats();
}

/**
* Retrieves extended statistics about the BlobStore. Delegates the call to the underlying BlobStore's extendedStats() method.
*
* @return A map containing extended statistics about the BlobStore.
*/
@Override
public Map<Metric, Map<String, Long>> extendedStats() {
return blobStore.extendedStats();
}

/**
* Closes the EncryptedBlobStore by decrementing the reference count of the CryptoManager and closing the
* underlying BlobStore. This ensures proper cleanup of resources.
Expand Down
Loading

0 comments on commit c9688ee

Please sign in to comment.