Skip to content

Commit

Permalink
Fix ThreadPerTaskScheduler busyness accounting (#3859)
Browse files Browse the repository at this point in the history
This change prevents the same `BoundedElasticThreadPerTaskScheduler` being
picked up when the maximum number of Virtual Threads are already being
executed in parallel. The consequence of improper busyness accounting
was that tasks were executed sequentially instead of being run in
parallel because the same `Worker` was being picked by operators.

Resolves #3857
  • Loading branch information
chemicL committed Aug 1, 2024
1 parent 63c0de6 commit bcab229
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2023-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -882,7 +882,7 @@ static long retain(SequentialThreadPerTaskExecutor instance) {

static long incrementRefCnt(long state) {
long rawRefCnt = state & REF_CNT_MASK;
return (rawRefCnt) == REF_CNT_MASK ? state : (rawRefCnt >> 31 + 1) << 31 | (state &~ REF_CNT_MASK);
return (rawRefCnt) == REF_CNT_MASK ? state : ((rawRefCnt >> 31) + 1) << 31 | (state &~ REF_CNT_MASK);
}

static long release(SequentialThreadPerTaskExecutor instance) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
Expand Down Expand Up @@ -775,4 +777,82 @@ void ensuresTotalTasksMathIsDoneCorrectlyInEdgeCase() {
// in the capacity counting since they don't occupy a queue
Assertions.assertThat(scheduler.estimateRemainingTaskCapacity()).isEqualTo(10L * (Integer.MAX_VALUE / 10 + 1) - 1000 + 10);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void allTasksExecuteInParallelWhenMaxProvided(boolean useWorker) throws Exception {
int total = 2 * Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE;

BoundedElasticThreadPerTaskScheduler scheduler = newScheduler(total, total);
scheduler.init();

CountDownLatch latch = new CountDownLatch(total);

try {
for (int i = 0; i < total; i++) {
Runnable task = () -> {
try {
latch.countDown();
latch.await();
}
catch (InterruptedException e) {
// ignore
}
};
if (useWorker) {
scheduler.createWorker().schedule(task);
} else {
scheduler.schedule(task);
}
}

Assertions.assertThat(latch.await(1, TimeUnit.SECONDS)).isTrue();
} finally {
scheduler.dispose();
}
}

@ParameterizedTest
@ValueSource(booleans = { false, true })
void allTasksExecuteInParallelWhenUsingDefaultMax(boolean useWorker) throws Exception {
int parallelTasks = Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE;
int total = 2 * parallelTasks;

Scheduler scheduler = Schedulers.boundedElastic();
scheduler.init();

CountDownLatch allScheduled = new CountDownLatch(1);

CountDownLatch secondBatch = new CountDownLatch(parallelTasks);

try {
for (int i = 0; i < total; i++) {
final int taskIndex = i;
Runnable task = () -> {
try {
System.out.println("Task #" + taskIndex);
allScheduled.await();
if (taskIndex >= parallelTasks) {
secondBatch.countDown();
secondBatch.await();
}
}
catch (InterruptedException e) {
// ignore
}
};
if (useWorker) {
scheduler.createWorker().schedule(task);
} else {
scheduler.schedule(task);
}
}

allScheduled.countDown();

Assertions.assertThat(secondBatch.await(1, TimeUnit.SECONDS)).isTrue();
} finally {
scheduler.dispose();
}
}
}

0 comments on commit bcab229

Please sign in to comment.