Skip to content

Commit

Permalink
Repository stats for remote store (opensearch-project#10567)
Browse files Browse the repository at this point in the history
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
  • Loading branch information
Bukhtawar authored and gbbafna committed Oct 18, 2023
1 parent 3085734 commit 353cb00
Show file tree
Hide file tree
Showing 30 changed files with 480 additions and 209 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Admission control] Add Resource usage collector service and resource usage tracker ([#10695](https://github.com/opensearch-project/OpenSearch/pull/10695))
- [Admission control] Add enhancements to FS stats to include read/write time, queue size and IO time ([#10696](https://github.com/opensearch-project/OpenSearch/pull/10696))
- [Remote cluster state] Change file names for remote cluster state ([#10557](https://github.com/opensearch-project/OpenSearch/pull/10557))
- [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567))

### Dependencies
- Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

import software.amazon.awssdk.core.internal.http.pipeline.stages.ApplyTransactionIdStage;

import fixture.s3.S3HttpHandler;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SuppressForbidden;
Expand All @@ -57,30 +57,39 @@
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryData;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.RepositoryStats;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.OpenSearchMockAPIBasedRepositoryIntegTestCase;
import org.opensearch.repositories.s3.utils.AwsRequestSigner;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.snapshots.mockstore.BlobStoreWrapper;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.ThreadPool;
import software.amazon.awssdk.core.internal.http.pipeline.stages.ApplyTransactionIdStage;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import fixture.s3.S3HttpHandler;
import java.util.Objects;
import java.util.stream.StreamSupport;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
// Need to set up a new cluster for each test because cluster settings use randomized authentication settings
Expand Down Expand Up @@ -216,6 +225,66 @@ public void testEnforcedCooldownPeriod() throws IOException {
assertThat(repository.threadPool().relativeTimeInNanos() - beforeFastDelete, lessThan(TEST_COOLDOWN_PERIOD.getNanos()));
}

@Override
public void testRequestStats() throws Exception {
final String repository = createRepository(randomName());
final String index = "index-no-merges";
createIndex(
index,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);

final long nbDocs = randomLongBetween(10_000L, 20_000L);
try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) {
waitForDocs(nbDocs, indexer);
}

flushAndRefresh(index);
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);

final String snapshot = "snapshot";
assertSuccessfulSnapshot(
client().admin().cluster().prepareCreateSnapshot(repository, snapshot).setWaitForCompletion(true).setIndices(index)
);

assertAcked(client().admin().indices().prepareDelete(index));

assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, snapshot).setWaitForCompletion(true));
ensureGreen(index);
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);

assertAcked(client().admin().cluster().prepareDeleteSnapshot(repository, snapshot).get());

final RepositoryStats repositoryStats = StreamSupport.stream(
internalCluster().getInstances(RepositoriesService.class).spliterator(),
false
).map(repositoriesService -> {
try {
return repositoriesService.repository(repository);
} catch (RepositoryMissingException e) {
return null;
}
}).filter(Objects::nonNull).map(Repository::stats).reduce(RepositoryStats::merge).get();

Map<BlobStore.Metric, Map<String, Long>> extendedStats = repositoryStats.extendedStats;
Map<String, Long> aggregatedStats = new HashMap<>();
extendedStats.forEach((k, v) -> {
if (k == BlobStore.Metric.RETRY_COUNT || k == BlobStore.Metric.REQUEST_SUCCESS || k == BlobStore.Metric.REQUEST_FAILURE) {
for (Map.Entry<String, Long> entry : v.entrySet()) {
aggregatedStats.merge(entry.getKey(), entry.getValue(), Math::addExact);
}
}

});
final Map<String, Long> mockCalls = getMockRequestCounts();

String assertionErrorMsg = String.format("SDK sent [%s] calls and handler measured [%s] calls", aggregatedStats, mockCalls);

assertEquals(assertionErrorMsg, mockCalls, aggregatedStats);
}

/**
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
*/
Expand Down Expand Up @@ -327,6 +396,8 @@ public void maybeTrack(final String request, Headers requestHeaders) {
trackRequest("PutMultipartObject");
} else if (Regex.simpleMatch("PUT /*/*", request)) {
trackRequest("PutObject");
} else if (Regex.simpleMatch("POST /*?delete*", request)) {
trackRequest("DeleteObjects");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
? amazonS3Reference.get().priorityClient()
: amazonS3Reference.get().client();
CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
.uploadObject(s3AsyncClient, uploadRequest, streamContext);
.uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher());
completableFuture.whenComplete((response, throwable) -> {
if (throwable == null) {
completionListener.onResponse(response);
Expand Down Expand Up @@ -384,7 +384,7 @@ private void doDeleteBlobs(List<String> blobNames, boolean relative) throws IOEx
assert outstanding.isEmpty();
}

private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {
private DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {
return DeleteObjectsRequest.builder()
.bucket(bucket)
.delete(
Expand All @@ -393,6 +393,7 @@ private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs
.quiet(true)
.build()
)
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().deleteObjectsMetricPublisher))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.opensearch.repositories.s3.async.AsyncTransferManager;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

Expand Down Expand Up @@ -180,6 +182,16 @@ public Map<String, Long> stats() {
return statsMetricPublisher.getStats().toMap();
}

@Override
public Map<Metric, Map<String, Long>> extendedStats() {
if (statsMetricPublisher.getExtendedStats() == null || statsMetricPublisher.getExtendedStats().isEmpty()) {
return Collections.emptyMap();
}
Map<Metric, Map<String, Long>> extendedStats = new HashMap<>();
statsMetricPublisher.getExtendedStats().forEach((k, v) -> extendedStats.put(k, v.toMap()));
return extendedStats;
}

public ObjectCannedACL getCannedACL() {
return cannedACL;
}
Expand Down
Loading

0 comments on commit 353cb00

Please sign in to comment.