Skip to content

Commit

Permalink
Optimize Futures.allAsList handling of already completed Futures. Ski…
Browse files Browse the repository at this point in the history
…ps adding a listener on already completed futures and directly collects the completed futures value.

RELNOTES=Optimize Futures.allAsList handling of already completed Futures. Skips adding a listener on already completed futures and directly collects the completed futures value.
PiperOrigin-RevId: 646265045
  • Loading branch information
java-team-github-bot authored and Google Java Core Libraries committed Jun 24, 2024
1 parent 8c31d52 commit dc80e7e
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.ALL_INPUT_FUTURES_PROCESSED;
import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.OUTPUT_FUTURE_DONE;
import static com.google.common.util.concurrent.Futures.getDone;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
import static java.util.Objects.requireNonNull;
import static java.util.logging.Level.SEVERE;

Expand Down Expand Up @@ -141,27 +141,12 @@ final void init() {
int i = 0;
for (ListenableFuture<? extends InputT> future : futures) {
int index = i++;
future.addListener(
() -> {
try {
if (future.isCancelled()) {
// Clear futures prior to cancelling children. This sets our own state but lets
// the input futures keep running, as some of them may be used elsewhere.
futures = null;
cancel(false);
} else {
collectValueFromNonCancelledFuture(index, future);
}
} finally {
/*
* "null" means: There is no need to access `futures` again during
* `processCompleted` because we're reading each value during a call to
* handleOneInputDone.
*/
decrementCountAndMaybeComplete(null);
}
},
directExecutor());
if (future.isDone()) {
processAllMustSucceedDoneFuture(index, future);
} else {
future.addListener(
() -> processAllMustSucceedDoneFuture(index, future), directExecutor());
}
}
} else {
/*
Expand All @@ -184,11 +169,36 @@ final void init() {
collectsValues ? futures : null;
Runnable listener = () -> decrementCountAndMaybeComplete(localFutures);
for (ListenableFuture<? extends InputT> future : futures) {
future.addListener(listener, directExecutor());
if (future.isDone()) {
decrementCountAndMaybeComplete(localFutures);
} else {
future.addListener(listener, directExecutor());
}
}
}
}

private void processAllMustSucceedDoneFuture(
int index, ListenableFuture<? extends InputT> future) {
try {
if (future.isCancelled()) {
// Clear futures prior to cancelling children. This sets our own state but lets
// the input futures keep running, as some of them may be used elsewhere.
futures = null;
cancel(false);
} else {
collectValueFromNonCancelledFuture(index, future);
}
} finally {
/*
* "null" means: There is no need to access `futures` again during
* `processCompleted` because we're reading each value during a call to
* handleOneInputDone.
*/
decrementCountAndMaybeComplete(null);
}
}

/**
* Fails this future with the given Throwable if {@link #allMustSucceed} is true. Also, logs the
* throwable if it is an {@link Error} or if {@link #allMustSucceed} is {@code true}, the
Expand Down Expand Up @@ -269,7 +279,8 @@ final void addInitialException(Set<Throwable> seen) {
private void collectValueFromNonCancelledFuture(int index, Future<? extends InputT> future) {
try {
// We get the result, even if collectOneValue is a no-op, so that we can fail fast.
collectOneValue(index, getDone(future));
// We use getUninterruptibly over getDone as a micro-optimization, we know the future is done.
collectOneValue(index, getUninterruptibly(future));
} catch (ExecutionException e) {
handleException(e.getCause());
} catch (Throwable t) { // sneaky checked exception
Expand Down
59 changes: 35 additions & 24 deletions guava/src/com/google/common/util/concurrent/AggregateFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.ALL_INPUT_FUTURES_PROCESSED;
import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.OUTPUT_FUTURE_DONE;
import static com.google.common.util.concurrent.Futures.getDone;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
import static java.util.Objects.requireNonNull;
import static java.util.logging.Level.SEVERE;

Expand Down Expand Up @@ -141,27 +141,12 @@ final void init() {
int i = 0;
for (ListenableFuture<? extends InputT> future : futures) {
int index = i++;
future.addListener(
() -> {
try {
if (future.isCancelled()) {
// Clear futures prior to cancelling children. This sets our own state but lets
// the input futures keep running, as some of them may be used elsewhere.
futures = null;
cancel(false);
} else {
collectValueFromNonCancelledFuture(index, future);
}
} finally {
/*
* "null" means: There is no need to access `futures` again during
* `processCompleted` because we're reading each value during a call to
* handleOneInputDone.
*/
decrementCountAndMaybeComplete(null);
}
},
directExecutor());
if (future.isDone()) {
processAllMustSucceedDoneFuture(index, future);
} else {
future.addListener(
() -> processAllMustSucceedDoneFuture(index, future), directExecutor());
}
}
} else {
/*
Expand All @@ -184,11 +169,36 @@ final void init() {
collectsValues ? futures : null;
Runnable listener = () -> decrementCountAndMaybeComplete(localFutures);
for (ListenableFuture<? extends InputT> future : futures) {
future.addListener(listener, directExecutor());
if (future.isDone()) {
decrementCountAndMaybeComplete(localFutures);
} else {
future.addListener(listener, directExecutor());
}
}
}
}

private void processAllMustSucceedDoneFuture(
int index, ListenableFuture<? extends InputT> future) {
try {
if (future.isCancelled()) {
// Clear futures prior to cancelling children. This sets our own state but lets
// the input futures keep running, as some of them may be used elsewhere.
futures = null;
cancel(false);
} else {
collectValueFromNonCancelledFuture(index, future);
}
} finally {
/*
* "null" means: There is no need to access `futures` again during
* `processCompleted` because we're reading each value during a call to
* handleOneInputDone.
*/
decrementCountAndMaybeComplete(null);
}
}

/**
* Fails this future with the given Throwable if {@link #allMustSucceed} is true. Also, logs the
* throwable if it is an {@link Error} or if {@link #allMustSucceed} is {@code true}, the
Expand Down Expand Up @@ -269,7 +279,8 @@ final void addInitialException(Set<Throwable> seen) {
private void collectValueFromNonCancelledFuture(int index, Future<? extends InputT> future) {
try {
// We get the result, even if collectOneValue is a no-op, so that we can fail fast.
collectOneValue(index, getDone(future));
// We use getUninterruptibly over getDone as a micro-optimization, we know the future is done.
collectOneValue(index, getUninterruptibly(future));
} catch (ExecutionException e) {
handleException(e.getCause());
} catch (Throwable t) { // sneaky checked exception
Expand Down

0 comments on commit dc80e7e

Please sign in to comment.