Skip to content

Commit

Permalink
refactor node level threshold
Browse files Browse the repository at this point in the history
Signed-off-by: Kiran Prakash <awskiran@amazon.com>
  • Loading branch information
kiranprakash154 committed Aug 29, 2024
1 parent cbb51bd commit 4e846e2
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskCancellation;
import org.opensearch.wlm.QueryGroupLevelResourceUsageView;
import org.opensearch.wlm.WorkloadManagementSettings;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -47,6 +48,7 @@
public class DefaultTaskCancellation {
private static final long HEAP_SIZE_BYTES = JvmStats.jvmStats().getMem().getHeapMax().getBytes();

protected final WorkloadManagementSettings workloadManagementSettings;
protected final DefaultTaskSelectionStrategy defaultTaskSelectionStrategy;
// a map of QueryGroupId to its corresponding QueryGroupLevelResourceUsageView object
protected final Map<String, QueryGroupLevelResourceUsageView> queryGroupLevelResourceUsageViews;
Expand All @@ -55,12 +57,14 @@ public class DefaultTaskCancellation {
protected BooleanSupplier isNodeInDuress;

public DefaultTaskCancellation(
WorkloadManagementSettings workloadManagementSettings,
DefaultTaskSelectionStrategy defaultTaskSelectionStrategy,
Map<String, QueryGroupLevelResourceUsageView> queryGroupLevelResourceUsageViews,
Collection<QueryGroup> activeQueryGroups,
Collection<QueryGroup> deletedQueryGroups,
BooleanSupplier isNodeInDuress
) {
this.workloadManagementSettings = workloadManagementSettings;
this.defaultTaskSelectionStrategy = defaultTaskSelectionStrategy;
this.queryGroupLevelResourceUsageViews = queryGroupLevelResourceUsageViews;
this.activeQueryGroups = activeQueryGroups;
Expand Down Expand Up @@ -181,7 +185,7 @@ private List<TaskCancellation> getTaskCancellations(QueryGroup queryGroup, Resou
resourceType
);
List<TaskCancellation> taskCancellations = new ArrayList<>();
for(Task task : selectedTasksToCancel) {
for (Task task : selectedTasksToCancel) {
String cancellationReason = createCancellationReason(queryGroup, task, resourceType);
taskCancellations.add(createTaskCancellation((CancellableTask) task, cancellationReason));
}
Expand Down Expand Up @@ -213,7 +217,7 @@ protected List<TaskCancellation> getTaskCancellationsForDeletedQueryGroup(QueryG
queryGroupLevelResourceUsageViews.get(queryGroup.get_id()).getActiveTasks()
);
List<TaskCancellation> taskCancellations = new ArrayList<>();
for(Task task : tasks) {
for (Task task : tasks) {
String cancellationReason = "[Workload Management] Cancelling Task ID : "
+ task.getId()
+ " from QueryGroup ID : "
Expand All @@ -235,12 +239,16 @@ private Long convertThresholdIntoLong(ResourceType resourceType, Double resource
Long threshold = null;
if (resourceType == ResourceType.MEMORY) {
// Check if resource usage is breaching the threshold
threshold = (long) (resourceThresholdInPercentage * HEAP_SIZE_BYTES);
double nodeLevelCancellationThreshold = this.workloadManagementSettings.getNodeLevelMemoryCancellationThreshold()
* HEAP_SIZE_BYTES;
threshold = (long) (resourceThresholdInPercentage * nodeLevelCancellationThreshold);
} else if (resourceType == ResourceType.CPU) {
// Get the total CPU time of the process in milliseconds
long cpuTotalTimeInMillis = ProcessProbe.getInstance().getProcessCpuTotalTime();
double nodeLevelCancellationThreshold = this.workloadManagementSettings.getNodeLevelCpuCancellationThreshold()
* cpuTotalTimeInMillis;
// Check if resource usage is breaching the threshold
threshold = (long) (resourceThresholdInPercentage * cpuTotalTimeInMillis);
threshold = (long) (resourceThresholdInPercentage * nodeLevelCancellationThreshold);
}
return threshold;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@ public Comparator<Task> sortingCondition() {
* @return The list of selected tasks
* @throws IllegalArgumentException If the limit is less than zero
*/
public List<Task> selectTasksForCancellation(
List<Task> tasks,
long limit,
ResourceType resourceType
) {
public List<Task> selectTasksForCancellation(List<Task> tasks, long limit, ResourceType resourceType) {
if (limit < 0) {
throw new IllegalArgumentException("limit has to be greater than zero");
}
Expand Down Expand Up @@ -84,9 +80,6 @@ public List<Task> selectTasksForCancellation(
* @return A list of {@link TaskCancellation} objects representing the tasks selected for cancellation.
*/
public List<Task> selectTasksFromDeletedQueryGroup(List<Task> tasks) {
return tasks
.stream()
.filter(task -> task instanceof CancellableTask)
.collect(Collectors.toList());
return tasks.stream().filter(task -> task instanceof CancellableTask).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.tasks.TaskCancellation;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.wlm.QueryGroupLevelResourceUsageView;
import org.opensearch.wlm.WorkloadManagementSettings;
import org.junit.Before;

import java.util.Collection;
Expand All @@ -39,27 +40,38 @@ public class DefaultTaskCancellationTests extends OpenSearchTestCase {
private static class TestTaskCancellationImpl extends DefaultTaskCancellation {

public TestTaskCancellationImpl(
WorkloadManagementSettings workloadManagementSettings,
DefaultTaskSelectionStrategy defaultTaskSelectionStrategy,
Map<String, QueryGroupLevelResourceUsageView> queryGroupLevelViews,
Set<QueryGroup> activeQueryGroups,
Set<QueryGroup> deletedQueryGroups,
BooleanSupplier isNodeInDuress
) {
super(defaultTaskSelectionStrategy, queryGroupLevelViews, activeQueryGroups, deletedQueryGroups, isNodeInDuress);
super(
workloadManagementSettings,
defaultTaskSelectionStrategy,
queryGroupLevelViews,
activeQueryGroups,
deletedQueryGroups,
isNodeInDuress
);
}
}

private Map<String, QueryGroupLevelResourceUsageView> queryGroupLevelViews;
private Set<QueryGroup> activeQueryGroups;
private Set<QueryGroup> deletedQueryGroups;
private DefaultTaskCancellation taskCancellation;
private WorkloadManagementSettings workloadManagementSettings;

@Before
public void setup() {
workloadManagementSettings = mock(WorkloadManagementSettings.class);
queryGroupLevelViews = new HashMap<>();
activeQueryGroups = new HashSet<>();
deletedQueryGroups = new HashSet<>();
taskCancellation = new TestTaskCancellationImpl(
workloadManagementSettings,
new DefaultTaskSelectionStrategy(),
queryGroupLevelViews,
activeQueryGroups,
Expand Down Expand Up @@ -156,6 +168,7 @@ public void testGetCancellableTasksFrom_returnsNoTasksWhenNotBreachingThreshold(
QueryGroupLevelResourceUsageView mockView = createResourceUsageViewMock(resourceType, usage);
queryGroupLevelViews.put(queryGroupId1, mockView);
activeQueryGroups.add(queryGroup1);
when(workloadManagementSettings.getNodeLevelCpuCancellationThreshold()).thenReturn(0.90);

List<TaskCancellation> cancellableTasksFrom = taskCancellation.getCancellableTasksFrom(queryGroup1);
assertTrue(cancellableTasksFrom.isEmpty());
Expand All @@ -179,6 +192,7 @@ public void testGetCancellableTasksFrom_filtersQueryGroupCorrectly() {
activeQueryGroups.add(queryGroup1);

TestTaskCancellationImpl taskCancellation = new TestTaskCancellationImpl(
workloadManagementSettings,
new DefaultTaskSelectionStrategy(),
queryGroupLevelViews,
activeQueryGroups,
Expand Down Expand Up @@ -208,6 +222,7 @@ public void testCancelTasks_cancelsGivenTasks() {
activeQueryGroups.add(queryGroup1);

TestTaskCancellationImpl taskCancellation = new TestTaskCancellationImpl(
workloadManagementSettings,
new DefaultTaskSelectionStrategy(),
queryGroupLevelViews,
activeQueryGroups,
Expand Down Expand Up @@ -254,6 +269,7 @@ public void testCancelTasks_cancelsTasksFromDeletedQueryGroups() {
deletedQueryGroups.add(deletedQueryGroup);

TestTaskCancellationImpl taskCancellation = new TestTaskCancellationImpl(
workloadManagementSettings,
new DefaultTaskSelectionStrategy(),
queryGroupLevelViews,
activeQueryGroups,
Expand Down Expand Up @@ -310,6 +326,7 @@ public void testCancelTasks_does_not_cancelTasksFromDeletedQueryGroups_whenNodeN
deletedQueryGroups.add(deletedQueryGroup);

TestTaskCancellationImpl taskCancellation = new TestTaskCancellationImpl(
workloadManagementSettings,
new DefaultTaskSelectionStrategy(),
queryGroupLevelViews,
activeQueryGroups,
Expand Down Expand Up @@ -365,6 +382,7 @@ public void testCancelTasks_cancelsGivenTasks_WhenNodeInDuress() {
Collections.addAll(activeQueryGroups, queryGroup1, queryGroup2);

TestTaskCancellationImpl taskCancellation = new TestTaskCancellationImpl(
workloadManagementSettings,
new DefaultTaskSelectionStrategy(),
queryGroupLevelViews,
activeQueryGroups,
Expand Down Expand Up @@ -470,18 +488,10 @@ private QueryGroupLevelResourceUsageView createResourceUsageViewMock(ResourceTyp
return mockView;
}

private QueryGroupLevelResourceUsageView createResourceUsageViewMock(
ResourceType resourceType,
Long usage,
Collection<Integer> ids
) {
private QueryGroupLevelResourceUsageView createResourceUsageViewMock(ResourceType resourceType, Long usage, Collection<Integer> ids) {
QueryGroupLevelResourceUsageView mockView = mock(QueryGroupLevelResourceUsageView.class);
when(mockView.getResourceUsageData()).thenReturn(Collections.singletonMap(resourceType, usage));
when(mockView.getActiveTasks()).thenReturn(
ids.stream()
.map(this::getRandomSearchTask)
.collect(Collectors.toList())
);
when(mockView.getActiveTasks()).thenReturn(ids.stream().map(this::getRandomSearchTask).collect(Collectors.toList()));
return mockView;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,17 @@

import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchTask;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.core.tasks.resourcetracker.ResourceStats;
import org.opensearch.core.tasks.resourcetracker.ResourceStatsType;
import org.opensearch.core.tasks.resourcetracker.ResourceUsageMetric;
import org.opensearch.search.ResourceType;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskCancellation;
import org.opensearch.test.OpenSearchTestCase;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class DefaultTaskSelectionStrategyTests extends OpenSearchTestCase {

Expand All @@ -43,11 +40,7 @@ public void testSelectTasksToCancelSelectsTasksMeetingThreshold_ifReduceByIsGrea
long reduceBy = 50L;
ResourceType resourceType = ResourceType.MEMORY;
List<Task> tasks = getListOfTasks(thresholdInLong);
List<Task> selectedTasks = testDefaultTaskSelectionStrategy.selectTasksForCancellation(
tasks,
reduceBy,
resourceType
);
List<Task> selectedTasks = testDefaultTaskSelectionStrategy.selectTasksForCancellation(tasks, reduceBy, resourceType);
assertFalse(selectedTasks.isEmpty());
assertTrue(tasksUsageMeetsThreshold(selectedTasks, reduceBy));
}
Expand All @@ -72,11 +65,7 @@ public void testSelectTasksToCancelSelectsTasksMeetingThreshold_ifReduceByIsEqua
long reduceBy = 0;
ResourceType resourceType = ResourceType.MEMORY;
List<Task> tasks = getListOfTasks(thresholdInLong);
List<Task> selectedTasks = testDefaultTaskSelectionStrategy.selectTasksForCancellation(
tasks,
reduceBy,
resourceType
);
List<Task> selectedTasks = testDefaultTaskSelectionStrategy.selectTasksForCancellation(tasks, reduceBy, resourceType);
assertTrue(selectedTasks.isEmpty());
}

Expand Down

0 comments on commit 4e846e2

Please sign in to comment.