Skip to content

Commit

Permalink
add taskCompletionCount in search_backpressure stats (opensearch-proj…
Browse files Browse the repository at this point in the history
…ect#10028)

* add taskCompletionCount in search_backpressure

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* address comments to use final objects

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* do spotless check run

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* use primitive long to store completionCount

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* use primitive long to store completionCount in searchTaskStats

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add pr link against the change

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add taskCompletionCount in search_backpressure

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* address comments to use final objects

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* do spotless check run

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* use primitive long to store completionCount

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* rebase with upstream main

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* rebase with upstream main

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

---------

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
  • Loading branch information
kaushalmahi12 authored and austintlee committed Oct 23, 2023
1 parent f266a97 commit da1c531
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 10 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Change http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773))
- Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792))
- Return 409 Conflict HTTP status instead of 503 on failure to concurrently execute snapshots ([#8986](https://github.com/opensearch-project/OpenSearch/pull/5855))
- Add task completion count in search backpressure stats API ([#10028](https://github.com/opensearch-project/OpenSearch/pull/10028/))
- Performance improvement for Datetime field caching ([#4558](https://github.com/opensearch-project/OpenSearch/issues/4558))


### Deprecated

### Removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ public SearchBackpressureStats nodeStats() {
SearchTaskStats searchTaskStats = new SearchTaskStats(
searchBackpressureStates.get(SearchTask.class).getCancellationCount(),
searchBackpressureStates.get(SearchTask.class).getLimitReachedCount(),
searchBackpressureStates.get(SearchTask.class).getCompletionCount(),
taskTrackers.get(SearchTask.class)
.stream()
.collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchTasks)))
Expand All @@ -407,6 +408,7 @@ public SearchBackpressureStats nodeStats() {
SearchShardTaskStats searchShardTaskStats = new SearchShardTaskStats(
searchBackpressureStates.get(SearchShardTask.class).getCancellationCount(),
searchBackpressureStates.get(SearchShardTask.class).getLimitReachedCount(),
searchBackpressureStates.get(SearchShardTask.class).getCompletionCount(),
taskTrackers.get(SearchShardTask.class)
.stream()
.collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchShardTasks)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.search.backpressure.stats;

import org.opensearch.Version;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand All @@ -30,21 +31,29 @@
public class SearchShardTaskStats implements ToXContentObject, Writeable {
private final long cancellationCount;
private final long limitReachedCount;
private final long completionCount;
private final Map<TaskResourceUsageTrackerType, TaskResourceUsageTracker.Stats> resourceUsageTrackerStats;

public SearchShardTaskStats(
long cancellationCount,
long limitReachedCount,
long completionCount,
Map<TaskResourceUsageTrackerType, TaskResourceUsageTracker.Stats> resourceUsageTrackerStats
) {
this.cancellationCount = cancellationCount;
this.limitReachedCount = limitReachedCount;
this.completionCount = completionCount;
this.resourceUsageTrackerStats = resourceUsageTrackerStats;
}

public SearchShardTaskStats(StreamInput in) throws IOException {
this.cancellationCount = in.readVLong();
this.limitReachedCount = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
completionCount = in.readVLong();
} else {
completionCount = -1;
}

MapBuilder<TaskResourceUsageTrackerType, TaskResourceUsageTracker.Stats> builder = new MapBuilder<>();
builder.put(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, in.readOptionalWriteable(CpuUsageTracker.Stats::new));
Expand All @@ -62,6 +71,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(entry.getKey().getName(), entry.getValue());
}
builder.endObject();
if (completionCount != -1) {
builder.field("completion_count", completionCount);
}

builder.startObject("cancellation_stats")
.field("cancellation_count", cancellationCount)
Expand All @@ -75,6 +87,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(cancellationCount);
out.writeVLong(limitReachedCount);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeVLong(completionCount);
}

out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER));
out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER));
Expand All @@ -88,11 +103,12 @@ public boolean equals(Object o) {
SearchShardTaskStats that = (SearchShardTaskStats) o;
return cancellationCount == that.cancellationCount
&& limitReachedCount == that.limitReachedCount
&& completionCount == that.completionCount
&& resourceUsageTrackerStats.equals(that.resourceUsageTrackerStats);
}

@Override
public int hashCode() {
return Objects.hash(cancellationCount, limitReachedCount, resourceUsageTrackerStats);
return Objects.hash(cancellationCount, limitReachedCount, resourceUsageTrackerStats, completionCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.search.backpressure.stats;

import org.opensearch.Version;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand All @@ -31,21 +32,29 @@
public class SearchTaskStats implements ToXContentObject, Writeable {
private final long cancellationCount;
private final long limitReachedCount;
private final long completionCount;
private final Map<TaskResourceUsageTrackerType, TaskResourceUsageTracker.Stats> resourceUsageTrackerStats;

public SearchTaskStats(
long cancellationCount,
long limitReachedCount,
long completionCount,
Map<TaskResourceUsageTrackerType, TaskResourceUsageTracker.Stats> resourceUsageTrackerStats
) {
this.cancellationCount = cancellationCount;
this.limitReachedCount = limitReachedCount;
this.completionCount = completionCount;
this.resourceUsageTrackerStats = resourceUsageTrackerStats;
}

public SearchTaskStats(StreamInput in) throws IOException {
this.cancellationCount = in.readVLong();
this.limitReachedCount = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.completionCount = in.readVLong();
} else {
this.completionCount = -1;
}

MapBuilder<TaskResourceUsageTrackerType, TaskResourceUsageTracker.Stats> builder = new MapBuilder<>();
builder.put(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, in.readOptionalWriteable(CpuUsageTracker.Stats::new));
Expand All @@ -63,6 +72,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(entry.getKey().getName(), entry.getValue());
}
builder.endObject();
if (completionCount != -1) {
builder.field("completion_count", completionCount);
}

builder.startObject("cancellation_stats")
.field("cancellation_count", cancellationCount)
Expand All @@ -76,6 +88,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(cancellationCount);
out.writeVLong(limitReachedCount);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeVLong(completionCount);
}

out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER));
out.writeOptionalWriteable(resourceUsageTrackerStats.get(TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER));
Expand All @@ -89,11 +104,12 @@ public boolean equals(Object o) {
SearchTaskStats that = (SearchTaskStats) o;
return cancellationCount == that.cancellationCount
&& limitReachedCount == that.limitReachedCount
&& completionCount == that.completionCount
&& resourceUsageTrackerStats.equals(that.resourceUsageTrackerStats);
}

@Override
public int hashCode() {
return Objects.hash(cancellationCount, limitReachedCount, resourceUsageTrackerStats);
return Objects.hash(cancellationCount, limitReachedCount, resourceUsageTrackerStats, completionCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,11 @@ public void testSearchTaskInFlightCancellation() {
verify(mockTaskManager, times(10)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any());
assertEquals(3, service.getSearchBackpressureState(SearchTask.class).getLimitReachedCount());

// Verify search backpressure stats.
// Verify search backpressure stats. Since we are not marking any task as completed the completionCount will be 0
// for SearchTaskStats here.
SearchBackpressureStats expectedStats = new SearchBackpressureStats(
new SearchTaskStats(10, 3, Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(10))),
new SearchShardTaskStats(0, 0, Collections.emptyMap()),
new SearchTaskStats(10, 3, 0, Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(10))),
new SearchShardTaskStats(0, 0, 0, Collections.emptyMap()),
SearchBackpressureMode.ENFORCED
);
SearchBackpressureStats actualStats = service.nodeStats();
Expand Down Expand Up @@ -323,10 +324,11 @@ public void testSearchShardTaskInFlightCancellation() {
verify(mockTaskManager, times(12)).cancelTaskAndDescendants(any(), anyString(), anyBoolean(), any());
assertEquals(3, service.getSearchBackpressureState(SearchShardTask.class).getLimitReachedCount());

// Verify search backpressure stats.
// Verify search backpressure stats. We are marking 20 SearchShardTasks as completed this should get
// reflected in SearchShardTaskStats.
SearchBackpressureStats expectedStats = new SearchBackpressureStats(
new SearchTaskStats(0, 0, Collections.emptyMap()),
new SearchShardTaskStats(12, 3, Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(12))),
new SearchTaskStats(0, 0, 0, Collections.emptyMap()),
new SearchShardTaskStats(12, 3, 20, Map.of(TaskResourceUsageTrackerType.CPU_USAGE_TRACKER, new MockStats(12))),
SearchBackpressureMode.ENFORCED
);
SearchBackpressureStats actualStats = service.nodeStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public static SearchShardTaskStats randomInstance() {
new ElapsedTimeTracker.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())
);

return new SearchShardTaskStats(randomNonNegativeLong(), randomNonNegativeLong(), resourceUsageTrackerStats);
return new SearchShardTaskStats(
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
resourceUsageTrackerStats
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ public static SearchTaskStats randomInstance() {
new ElapsedTimeTracker.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong())
);

return new SearchTaskStats(randomNonNegativeLong(), randomNonNegativeLong(), resourceUsageTrackerStats);
return new SearchTaskStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), resourceUsageTrackerStats);
}
}

0 comments on commit da1c531

Please sign in to comment.