Skip to content

Commit

Permalink
Delay the request size calculation until required by the indexing pre…
Browse files Browse the repository at this point in the history
…ssure framework. (#1560)

Signed-off-by: Saurabh Singh <sisurab@amazon.com>
  • Loading branch information
getsaurabh02 committed Nov 19, 2021
1 parent bcfb57c commit 14cbd72
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,8 @@ public static IndexRequest getIndexWriteRequest(DocWriteRequest<?> docWriteReque

@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
final long indexingBytes = bulkRequest.ramBytesUsed();
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted(indexingBytes, isOnlySystem);
final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted(bulkRequest, isOnlySystem);
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
try {
Expand Down Expand Up @@ -631,7 +630,7 @@ protected void doRun() {
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
final Releasable releasable = indexingPressureService.markCoordinatingOperationStarted(
shardId,
bulkShardRequest.ramBytesUsed(),
bulkShardRequest,
isOnlySystem
);
shardBulkAction.execute(bulkShardRequest, ActionListener.runBefore(new ActionListener<BulkShardResponse>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.index;

import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkShardRequest;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
Expand All @@ -25,16 +27,18 @@ public IndexingPressureService(Settings settings, ClusterService clusterService)
shardIndexingPressure = new ShardIndexingPressure(settings, clusterService);
}

public Releasable markCoordinatingOperationStarted(long bytes, boolean forceExecution) {
public Releasable markCoordinatingOperationStarted(BulkRequest bulkRequest, boolean forceExecution) {
if (isShardIndexingPressureEnabled() == false) {
final long bytes = bulkRequest.ramBytesUsed();
return shardIndexingPressure.markCoordinatingOperationStarted(bytes, forceExecution);
} else {
return () -> {};
}
}

public Releasable markCoordinatingOperationStarted(ShardId shardId, long bytes, boolean forceExecution) {
public Releasable markCoordinatingOperationStarted(ShardId shardId, BulkShardRequest bulkShardRequest, boolean forceExecution) {
if (isShardIndexingPressureEnabled()) {
final long bytes = bulkShardRequest.ramBytesUsed();
return shardIndexingPressure.markCoordinatingOperationStarted(shardId, bytes, forceExecution);
} else {
return () -> {};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@
package org.opensearch.index;

import org.junit.Before;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.bulk.BulkItemRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkShardRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Requests;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -43,11 +50,18 @@ public void testCoordinatingOperationForShardIndexingPressure() {
IndexingPressureService service = new IndexingPressureService(settings, clusterService);
Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);

Releasable releasable = service.markCoordinatingOperationStarted(shardId, 1024, false);
BulkItemRequest[] items = new BulkItemRequest[1];
DocWriteRequest<IndexRequest> writeRequest = new IndexRequest("index", "_doc", "id").source(
Requests.INDEX_CONTENT_TYPE,
"foo",
"bar"
);
items[0] = new BulkItemRequest(0, writeRequest);
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, WriteRequest.RefreshPolicy.NONE, items);
Releasable releasable = service.markCoordinatingOperationStarted(shardId, bulkShardRequest, false);

IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId);
assertEquals(1024, shardStats.getCurrentCoordinatingBytes());
assertEquals(bulkShardRequest.ramBytesUsed(), shardStats.getCurrentCoordinatingBytes());
releasable.close();
}

Expand All @@ -64,11 +78,12 @@ public void testCoordinatingOperationForIndexingPressure() {
);
clusterSettings.applySettings(updated.build());

Releasable releasable = service.markCoordinatingOperationStarted(1024, false);
BulkRequest bulkRequest = new BulkRequest();
Releasable releasable = service.markCoordinatingOperationStarted(bulkRequest, false);
IndexingPressurePerShardStats shardStats = service.shardStats(CommonStatsFlags.ALL).getIndexingPressureShardStats(shardId);
assertNull(shardStats);
IndexingPressureStats nodeStats = service.nodeStats();
assertEquals(1024, nodeStats.getCurrentCoordinatingBytes());
assertEquals(bulkRequest.ramBytesUsed(), nodeStats.getCurrentCoordinatingBytes());
releasable.close();
}

Expand Down

0 comments on commit 14cbd72

Please sign in to comment.