Skip to content

Commit

Permalink
Fix deadlock in ForkJoinPoolHierarchicalTestExecutorService (#3981)
Browse files Browse the repository at this point in the history
The service now checks if the `ExclusiveTask` that should run
is executed on a thread that is already executing another task.
If this is scenario is detected, it checks if the lock is compatible to
the enclosing locks.
1. If compatible, it is executed and marked done
2. If incompatible, it is added to a list of deferred tasks and left
   unfinished. The deferred tasks will be re-forked afterwards.

Fixes #3945.

---------

Co-authored-by: Marc Philipp <mail@marcphilipp.de>
  • Loading branch information
leonard84 and marcphilipp committed Sep 17, 2024
1 parent f5cea9e commit 88f8859
Show file tree
Hide file tree
Showing 10 changed files with 549 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.locks.Lock;

import org.junit.platform.commons.util.Preconditions;

/**
* @since 1.3
*/
Expand All @@ -23,7 +25,7 @@ class CompositeLock implements ResourceLock {
private final List<Lock> locks;

CompositeLock(List<Lock> locks) {
this.locks = locks;
this.locks = Preconditions.notEmpty(locks, "Locks must not be empty");
}

// for tests only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.reflect.Constructor;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -26,7 +28,6 @@
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
Expand All @@ -51,7 +52,9 @@
public class ForkJoinPoolHierarchicalTestExecutorService implements HierarchicalTestExecutorService {

private final ForkJoinPool forkJoinPool;
private final TaskEventListener taskEventListener;
private final int parallelism;
private final ThreadLocal<ThreadLock> threadLocks = ThreadLocal.withInitial(ThreadLock::new);

/**
* Create a new {@code ForkJoinPoolHierarchicalTestExecutorService} based on
Expand All @@ -71,7 +74,13 @@ public ForkJoinPoolHierarchicalTestExecutorService(ConfigurationParameters confi
*/
@API(status = STABLE, since = "1.10")
public ForkJoinPoolHierarchicalTestExecutorService(ParallelExecutionConfiguration configuration) {
this(configuration, TaskEventListener.NOOP);
}

ForkJoinPoolHierarchicalTestExecutorService(ParallelExecutionConfiguration configuration,
TaskEventListener taskEventListener) {
forkJoinPool = createForkJoinPool(configuration);
this.taskEventListener = taskEventListener;
parallelism = forkJoinPool.getParallelism();
LoggerFactory.getLogger(getClass()).config(() -> "Using ForkJoinPool with parallelism of " + parallelism);
}
Expand Down Expand Up @@ -132,7 +141,7 @@ public Future<Void> submit(TestTask testTask) {
if (testTask.getExecutionMode() == CONCURRENT && ForkJoinTask.getSurplusQueuedTaskCount() < parallelism) {
return exclusiveTask.fork();
}
exclusiveTask.compute();
exclusiveTask.execSync();
return completedFuture(null);
}

Expand All @@ -143,7 +152,7 @@ private boolean isAlreadyRunningInForkJoinPool() {
@Override
public void invokeAll(List<? extends TestTask> tasks) {
if (tasks.size() == 1) {
new ExclusiveTask(tasks.get(0)).compute();
new ExclusiveTask(tasks.get(0)).execSync();
return;
}
Deque<ExclusiveTask> nonConcurrentTasks = new LinkedList<>();
Expand All @@ -169,15 +178,26 @@ private void forkConcurrentTasks(List<? extends TestTask> tasks, Deque<Exclusive

private void executeNonConcurrentTasks(Deque<ExclusiveTask> nonConcurrentTasks) {
for (ExclusiveTask task : nonConcurrentTasks) {
task.compute();
task.execSync();
}
}

private void joinConcurrentTasksInReverseOrderToEnableWorkStealing(
Deque<ExclusiveTask> concurrentTasksInReverseOrder) {
for (ExclusiveTask forkedTask : concurrentTasksInReverseOrder) {
forkedTask.join();
resubmitDeferredTasks();
}
}

private void resubmitDeferredTasks() {
List<ExclusiveTask> deferredTasks = threadLocks.get().deferredTasks;
for (ExclusiveTask deferredTask : deferredTasks) {
if (!deferredTask.isDone()) {
deferredTask.fork();
}
}
deferredTasks.clear();
}

@Override
Expand All @@ -186,26 +206,70 @@ public void close() {
}

// this class cannot not be serialized because TestTask is not Serializable
@SuppressWarnings("serial")
static class ExclusiveTask extends RecursiveAction {
@SuppressWarnings({ "serial", "RedundantSuppression" })
class ExclusiveTask extends ForkJoinTask<Void> {

private final TestTask testTask;

ExclusiveTask(TestTask testTask) {
this.testTask = testTask;
}

/**
* Always returns {@code null}.
*
* @return {@code null} always
*/
public final Void getRawResult() {
return null;
}

/**
* Requires null completion value.
*/
protected final void setRawResult(Void mustBeNull) {
}

void execSync() {
boolean completed = exec();
if (!completed) {
throw new IllegalStateException(
"Task was deferred but should have been executed synchronously: " + testTask);
}
}

@SuppressWarnings("try")
@Override
public void compute() {
try (ResourceLock lock = testTask.getResourceLock().acquire()) {
public boolean exec() {
// Check if this task is compatible with the current resource lock, if there is any.
// If not, we put this task in the thread local as a deferred task
// and let the worker thread fork it once it is done with the current task.
ResourceLock resourceLock = testTask.getResourceLock();
ThreadLock threadLock = threadLocks.get();
if (!threadLock.areAllHeldLocksCompatibleWith(resourceLock)) {
threadLock.addDeferredTask(this);
taskEventListener.deferred(testTask);
// Return false to indicate that this task is not done yet
// this means that .join() will wait.
return false;
}
try (ResourceLock lock = resourceLock.acquire()) {
threadLock.incrementNesting(lock);
testTask.execute();
return true;
}
catch (InterruptedException e) {
throw ExceptionUtils.throwAsUncheckedException(e);
}
finally {
threadLock.decrementNesting();
}
}

@Override
public String toString() {
return "ExclusiveTask [" + testTask + "]";
}
}

static class WorkerThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {
Expand All @@ -228,4 +292,34 @@ static class WorkerThread extends ForkJoinWorkerThread {

}

static class ThreadLock {
private final Deque<ResourceLock> locks = new ArrayDeque<>(2);
private final List<ExclusiveTask> deferredTasks = new ArrayList<>();

void addDeferredTask(ExclusiveTask task) {
deferredTasks.add(task);
}

void incrementNesting(ResourceLock lock) {
locks.push(lock);
}

@SuppressWarnings("resource")
void decrementNesting() {
locks.pop();
}

boolean areAllHeldLocksCompatibleWith(ResourceLock lock) {
return locks.stream().allMatch(l -> l.isCompatible(lock));
}
}

interface TaskEventListener {

TaskEventListener NOOP = __ -> {
};

void deferred(TestTask testTask);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@

package org.junit.platform.engine.support.hierarchical;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.Comparator.comparing;
import static java.util.Comparator.naturalOrder;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
import static org.junit.platform.commons.util.CollectionUtils.getOnlyElement;
import static org.junit.platform.engine.support.hierarchical.ExclusiveResource.GLOBAL_KEY;
import static org.junit.platform.engine.support.hierarchical.ExclusiveResource.GLOBAL_READ;
import static org.junit.platform.engine.support.hierarchical.ExclusiveResource.GLOBAL_READ_WRITE;
import static org.junit.platform.engine.support.hierarchical.ExclusiveResource.LockMode.READ;

import java.util.Collection;
Expand All @@ -28,6 +32,9 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.junit.platform.engine.support.hierarchical.SingleLock.GlobalReadLock;
import org.junit.platform.engine.support.hierarchical.SingleLock.GlobalReadWriteLock;

/**
* @since 1.3
*/
Expand All @@ -42,20 +49,29 @@ private static Comparator<String> globalKeyFirst() {
}

private final Map<String, ReadWriteLock> locksByKey = new ConcurrentHashMap<>();
private final GlobalReadLock globalReadLock;
private final GlobalReadWriteLock globalReadWriteLock;

public LockManager() {
globalReadLock = new GlobalReadLock(toLock(GLOBAL_READ));
globalReadWriteLock = new GlobalReadWriteLock(toLock(GLOBAL_READ_WRITE));
}

ResourceLock getLockForResources(Collection<ExclusiveResource> resources) {
if (resources.size() == 1) {
return getLockForResource(getOnlyElement(resources));
}
List<Lock> locks = getDistinctSortedLocks(resources);
return toResourceLock(locks);
return toResourceLock(toDistinctSortedLocks(resources));
}

ResourceLock getLockForResource(ExclusiveResource resource) {
return new SingleLock(toLock(resource));
return toResourceLock(toLock(resource));
}

private List<Lock> getDistinctSortedLocks(Collection<ExclusiveResource> resources) {
private List<Lock> toDistinctSortedLocks(Collection<ExclusiveResource> resources) {
if (resources.isEmpty()) {
return emptyList();
}
if (resources.size() == 1) {
return singletonList(toLock(getOnlyElement(resources)));
}
// @formatter:off
Map<String, List<ExclusiveResource>> resourcesByKey = resources.stream()
.sorted(COMPARATOR)
Expand All @@ -79,10 +95,20 @@ private ResourceLock toResourceLock(List<Lock> locks) {
case 0:
return NopLock.INSTANCE;
case 1:
return new SingleLock(locks.get(0));
return toResourceLock(locks.get(0));
default:
return new CompositeLock(locks);
}
}

private ResourceLock toResourceLock(Lock lock) {
if (lock == toLock(GLOBAL_READ)) {
return globalReadLock;
}
if (lock == toLock(GLOBAL_READ_WRITE)) {
return globalReadWriteLock;
}
return new SingleLock(lock);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ public ExecutionMode getExecutionMode() {
return taskContext.getExecutionAdvisor().getForcedExecutionMode(testDescriptor).orElse(node.getExecutionMode());
}

@Override
public String toString() {
return "NodeTestTask [" + testDescriptor + "]";
}

void setParentContext(C parentContext) {
this.parentContext = parentContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,12 @@ default void close() {
release();
}

/**
* {@return whether the given lock is compatible with this lock}
* @param other the other lock to check for compatibility
*/
default boolean isCompatible(ResourceLock other) {
return this instanceof NopLock || other instanceof NopLock;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,20 @@ public boolean isReleasable() {

}

static class GlobalReadLock extends SingleLock {
GlobalReadLock(Lock lock) {
super(lock);
}

@Override
public boolean isCompatible(ResourceLock other) {
return !(other instanceof GlobalReadWriteLock);
}
}

static class GlobalReadWriteLock extends SingleLock {
GlobalReadWriteLock(Lock lock) {
super(lock);
}
}
}
Loading

0 comments on commit 88f8859

Please sign in to comment.