Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Repository stats for remote store #10567

Merged
merged 10 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add unreferenced file cleanup count to merge stats ([#10204](https://github.com/opensearch-project/OpenSearch/pull/10204))
- [Remote Store] Add support to restrict creation & deletion if system repository and mutation of immutable settings of system repository ([#9839](https://github.com/opensearch-project/OpenSearch/pull/9839))
- Improve compressed request handling ([#10261](https://github.com/opensearch-project/OpenSearch/pull/10261))
- [Remote Store] Add repositiry stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567))
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved

### Dependencies
- Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
? amazonS3Reference.get().priorityClient()
: amazonS3Reference.get().client();
CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
.uploadObject(s3AsyncClient, uploadRequest, streamContext);
.uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher());
completableFuture.whenComplete((response, throwable) -> {
if (throwable == null) {
completionListener.onResponse(response);
Expand Down Expand Up @@ -391,7 +391,7 @@ private void doDeleteBlobs(List<String> blobNames, boolean relative) throws IOEx
assert outstanding.isEmpty();
}

private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {
private DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {
return DeleteObjectsRequest.builder()
.bucket(bucket)
.delete(
Expand All @@ -400,6 +400,7 @@ private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs
.quiet(true)
.build()
)
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().deleteObjectsMetricPublisher))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.opensearch.repositories.s3.async.AsyncTransferManager;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

Expand Down Expand Up @@ -170,6 +172,16 @@ public Map<String, Long> stats() {
return statsMetricPublisher.getStats().toMap();
}

@Override
public Map<Metric, Map<String, Long>> extendedStats() {
if (statsMetricPublisher.getExtendedStats() == null || statsMetricPublisher.getExtendedStats().isEmpty()) {
return Collections.emptyMap();
}
Map<Metric, Map<String, Long>> extendedStats = new HashMap<>();
statsMetricPublisher.getExtendedStats().forEach((k, v) -> extendedStats.put(k, v.toMap()));
return extendedStats;
}

public ObjectCannedACL getCannedACL() {
return cannedACL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@

package org.opensearch.repositories.s3;

import software.amazon.awssdk.http.HttpMetric;
import software.amazon.awssdk.metrics.MetricCollection;
import software.amazon.awssdk.metrics.MetricPublisher;
import software.amazon.awssdk.metrics.MetricRecord;

import org.opensearch.common.blobstore.BlobStore;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -20,18 +23,67 @@ public class StatsMetricPublisher {

private final Stats stats = new Stats();
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved

private final Map<BlobStore.Metric, Stats> extendedStats = new HashMap<>() {
{
put(BlobStore.Metric.REQUEST_LATENCY, new Stats());
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
put(BlobStore.Metric.REQUEST_SUCCESS, new Stats());
put(BlobStore.Metric.REQUEST_FAILURE, new Stats());
put(BlobStore.Metric.RETRY_COUNT, new Stats());
}
};

public MetricPublisher listObjectsMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
stats.listCount.addAndGet(
metricCollection.children()
.stream()
.filter(
metricRecords -> metricRecords.name().equals("ApiCallAttempt")
&& !metricRecords.metricValues(HttpMetric.HTTP_STATUS_CODE).isEmpty()
)
.count()
);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
extendedStats.get(BlobStore.Metric.REQUEST_LATENCY).listMetrics.addAndGet(
((Duration) metricRecord.value()).toMillis()
);
break;
case "RetryCount":
extendedStats.get(BlobStore.Metric.RETRY_COUNT).listMetrics.addAndGet(((Integer) metricRecord.value()));
break;
case "ApiCallSuccessful":
if ((Boolean) metricRecord.value()) {
extendedStats.get(BlobStore.Metric.REQUEST_SUCCESS).listMetrics.addAndGet(1);
} else {
extendedStats.get(BlobStore.Metric.REQUEST_FAILURE).listMetrics.addAndGet(1);
}
stats.listMetrics.addAndGet(1);
break;
}
}
}

@Override
public void close() {}
};

public MetricPublisher deleteObjectsMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
extendedStats.get(BlobStore.Metric.REQUEST_LATENCY).deleteMetrics.addAndGet(
((Duration) metricRecord.value()).toMillis()
);
break;
case "RetryCount":
extendedStats.get(BlobStore.Metric.RETRY_COUNT).deleteMetrics.addAndGet(((Integer) metricRecord.value()));
break;
case "ApiCallSuccessful":
if ((Boolean) metricRecord.value()) {
extendedStats.get(BlobStore.Metric.REQUEST_SUCCESS).deleteMetrics.addAndGet(1);
} else {
extendedStats.get(BlobStore.Metric.REQUEST_FAILURE).deleteMetrics.addAndGet(1);
}
stats.deleteMetrics.addAndGet(1);
break;
}
}
}

@Override
Expand All @@ -41,15 +93,26 @@ public void close() {}
public MetricPublisher getObjectMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
stats.getCount.addAndGet(
metricCollection.children()
.stream()
.filter(
metricRecords -> metricRecords.name().equals("ApiCallAttempt")
&& !metricRecords.metricValues(HttpMetric.HTTP_STATUS_CODE).isEmpty()
)
.count()
);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
extendedStats.get(BlobStore.Metric.REQUEST_LATENCY).getMetrics.addAndGet(
((Duration) metricRecord.value()).toMillis()
);
break;
case "RetryCount":
extendedStats.get(BlobStore.Metric.RETRY_COUNT).getMetrics.addAndGet(((Integer) metricRecord.value()));
break;
case "ApiCallSuccessful":
if ((Boolean) metricRecord.value()) {
extendedStats.get(BlobStore.Metric.REQUEST_SUCCESS).getMetrics.addAndGet(1);
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
} else {
extendedStats.get(BlobStore.Metric.REQUEST_FAILURE).getMetrics.addAndGet(1);
}
stats.getMetrics.addAndGet(1);
break;
}
}
}

@Override
Expand All @@ -59,15 +122,26 @@ public void close() {}
public MetricPublisher putObjectMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
stats.putCount.addAndGet(
metricCollection.children()
.stream()
.filter(
metricRecords -> metricRecords.name().equals("ApiCallAttempt")
&& !metricRecords.metricValues(HttpMetric.HTTP_STATUS_CODE).isEmpty()
)
.count()
);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
extendedStats.get(BlobStore.Metric.REQUEST_LATENCY).putMetrics.addAndGet(
((Duration) metricRecord.value()).toMillis()
);
break;
case "RetryCount":
extendedStats.get(BlobStore.Metric.RETRY_COUNT).putMetrics.addAndGet(((Integer) metricRecord.value()));
break;
case "ApiCallSuccessful":
if ((Boolean) metricRecord.value()) {
extendedStats.get(BlobStore.Metric.REQUEST_SUCCESS).putMetrics.addAndGet(1);
} else {
extendedStats.get(BlobStore.Metric.REQUEST_FAILURE).putMetrics.addAndGet(1);
}
stats.putMetrics.addAndGet(1);
break;
}
}
}

@Override
Expand All @@ -77,15 +151,26 @@ public void close() {}
public MetricPublisher multipartUploadMetricCollector = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
stats.postCount.addAndGet(
metricCollection.children()
.stream()
.filter(
metricRecords -> metricRecords.name().equals("ApiCallAttempt")
&& !metricRecords.metricValues(HttpMetric.HTTP_STATUS_CODE).isEmpty()
)
.count()
);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
extendedStats.get(BlobStore.Metric.REQUEST_LATENCY).multiPartPutMetrics.addAndGet(
((Duration) metricRecord.value()).toMillis()
);
break;
case "RetryCount":
extendedStats.get(BlobStore.Metric.RETRY_COUNT).multiPartPutMetrics.addAndGet(((Integer) metricRecord.value()));
break;
case "ApiCallSuccessful":
if ((Boolean) metricRecord.value()) {
extendedStats.get(BlobStore.Metric.REQUEST_SUCCESS).multiPartPutMetrics.addAndGet(1);
} else {
extendedStats.get(BlobStore.Metric.REQUEST_FAILURE).multiPartPutMetrics.addAndGet(1);
}
stats.multiPartPutMetrics.addAndGet(1);
break;
}
}
}

@Override
Expand All @@ -96,22 +181,29 @@ public Stats getStats() {
return stats;
}

public Map<BlobStore.Metric, Stats> getExtendedStats() {
return extendedStats;
}

static class Stats {

final AtomicLong listCount = new AtomicLong();
final AtomicLong listMetrics = new AtomicLong();

final AtomicLong getMetrics = new AtomicLong();

final AtomicLong getCount = new AtomicLong();
final AtomicLong putMetrics = new AtomicLong();

final AtomicLong putCount = new AtomicLong();
final AtomicLong deleteMetrics = new AtomicLong();

final AtomicLong postCount = new AtomicLong();
final AtomicLong multiPartPutMetrics = new AtomicLong();

Map<String, Long> toMap() {
final Map<String, Long> results = new HashMap<>();
results.put("GetObject", getCount.get());
results.put("ListObjects", listCount.get());
results.put("PutObject", putCount.get());
results.put("PutMultipartObject", postCount.get());
results.put("GetObject", getMetrics.get());
results.put("ListObjects", listMetrics.get());
results.put("PutObject", putMetrics.get());
results.put("DeleteObjects", deleteMetrics.get());
results.put("PutMultipartObject", multiPartPutMetrics.get());
return results;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.common.util.ByteUtils;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.repositories.s3.SocketAccess;
import org.opensearch.repositories.s3.StatsMetricPublisher;
import org.opensearch.repositories.s3.io.CheckedContainer;

import java.io.IOException;
Expand Down Expand Up @@ -86,16 +87,21 @@ public AsyncTransferManager(long minimumPartSize, ExecutorService executorServic
* @param streamContext The {@link StreamContext} to supply streams during upload
* @return A {@link CompletableFuture} to listen for upload completion
*/
public CompletableFuture<Void> uploadObject(S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, StreamContext streamContext) {
public CompletableFuture<Void> uploadObject(
S3AsyncClient s3AsyncClient,
UploadRequest uploadRequest,
StreamContext streamContext,
StatsMetricPublisher statsMetricPublisher
) {

CompletableFuture<Void> returnFuture = new CompletableFuture<>();
try {
if (streamContext.getNumberOfParts() == 1) {
log.debug(() -> "Starting the upload as a single upload part request");
uploadInOneChunk(s3AsyncClient, uploadRequest, streamContext.provideStream(0), returnFuture);
uploadInOneChunk(s3AsyncClient, uploadRequest, streamContext.provideStream(0), returnFuture, statsMetricPublisher);
} else {
log.debug(() -> "Starting the upload as multipart upload request");
uploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture);
uploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, statsMetricPublisher);
}
} catch (Throwable throwable) {
returnFuture.completeExceptionally(throwable);
Expand All @@ -108,12 +114,14 @@ private void uploadInParts(
S3AsyncClient s3AsyncClient,
UploadRequest uploadRequest,
StreamContext streamContext,
CompletableFuture<Void> returnFuture
CompletableFuture<Void> returnFuture,
StatsMetricPublisher statsMetricPublisher
) {

CreateMultipartUploadRequest.Builder createMultipartUploadRequestBuilder = CreateMultipartUploadRequest.builder()
.bucket(uploadRequest.getBucket())
.key(uploadRequest.getKey());
.key(uploadRequest.getKey())
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector));
if (uploadRequest.doRemoteDataIntegrityCheck()) {
createMultipartUploadRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
}
Expand Down Expand Up @@ -286,12 +294,14 @@ private void uploadInOneChunk(
S3AsyncClient s3AsyncClient,
UploadRequest uploadRequest,
InputStreamContainer inputStreamContainer,
CompletableFuture<Void> returnFuture
CompletableFuture<Void> returnFuture,
StatsMetricPublisher statsMetricPublisher
) {
PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder()
.bucket(uploadRequest.getBucket())
.key(uploadRequest.getKey())
.contentLength(uploadRequest.getContentLength());
.contentLength(uploadRequest.getContentLength())
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.putObjectMetricPublisher));
if (uploadRequest.doRemoteDataIntegrityCheck()) {
putObjectRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
putObjectRequestBuilder.checksumCRC32(base64StringFromLong(uploadRequest.getExpectedChecksum()));
Expand Down
Loading
Loading