Skip to content

Commit

Permalink
see #1804 Enforce 100K task cap on Schedulers.boundedElastic()
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Sep 20, 2019
1 parent 07e0be7 commit 262ea1d
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 11 deletions.
2 changes: 1 addition & 1 deletion docs/asciidoc/coreFeatures.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ problems and lead to too many threads (see below).
* A bounded elastic thread pool (`Schedulers.boundedElastic()`). Like its predecessor `elastic()`, it
creates new worker pools as needed and reuses idle ones. Worker pools that stay idle for too long (the default is 60s) are
also disposed. Unlike its `elastic()` predecessor, it has a cap on the number of backing threads it can create (default is number of CPU cores x 10).
Tasks submitted after the cap has been reached are enqueued and will be re-scheduled when a thread becomes available
Up to 100 000 tasks submitted after the cap has been reached are enqueued and will be re-scheduled when a thread becomes available
(when scheduling with a delay, the delay starts when the thread becomes available). This is a better choice for I/O blocking work.
`Schedulers.boundedElastic()` is a handy way to give a blocking process its own thread so that
it does not tie up other resources. See <<faq.wrap-blocking>>, but doesn't pressure the system too much with new threads.
Expand Down
3 changes: 2 additions & 1 deletion docs/asciidoc/faq.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ from `Schedulers.boundedElastic()`.
You should use a `Mono`, because the source returns one value. You should use
`Schedulers.boundedElastic`, because it creates a dedicated thread to wait for the
blocking resource without impacting other non-blocking processing, while also ensuring
that there is a limit to the amount of threads that can be created.
that there is a limit to the amount of threads that can be created, and blocking tasks
that can be enqueued and deferred during a spike.

Note that `subscribeOn` does not subscribe to the `Mono`. It specifies what
kind of `Scheduler` to use when a subscribe call happens.
Expand Down
21 changes: 13 additions & 8 deletions reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ public abstract class Schedulers {
/**
* Default maximum number of enqueued tasks for the global {@link #boundedElastic()} {@link Scheduler}, initialized
* by system property {@code reactor.schedulers.defaultBoundedElasticQueueSize} and falls back to
* unbounded by default ({@link Integer#MAX_VALUE}).
* a bound of 100 000 tasks.
*
* @see #boundedElastic()
*/
public static final int DEFAULT_BOUNDED_ELASTIC_QUEUESIZE =
Optional.ofNullable(System.getProperty("reactor.schedulers.defaultBoundedElasticQueueSize"))
.map(Integer::parseInt)
.orElseGet(() -> Integer.MAX_VALUE);
.orElse(100000);

static volatile BiConsumer<Thread, ? super Throwable> onHandleErrorHook;

Expand Down Expand Up @@ -193,17 +193,22 @@ public static Scheduler elastic() {
* Workers, reusing them once the Workers have been shut down. The underlying daemon
* threads can be evicted if idle for more than {@link BoundedElasticScheduler#DEFAULT_TTL_SECONDS 60} seconds.
* <p>
* The maximum number of created thread pools is bounded by a {@code cap} (by default,
* The maximum number of created thread pools is bounded by a {@code cap} (by default
* ten times the number of available CPU cores, see {@link #DEFAULT_BOUNDED_ELASTIC_SIZE}).
* If a worker is requested while the cap is reached, a facade {@link reactor.core.scheduler.Scheduler.Worker}
* The maximum number of task submissions that can be enqueued and deferred after this thread cap
* has been reached is bounded (by default 100K additional tasks, see {@link #DEFAULT_BOUNDED_ELASTIC_QUEUESIZE}).
* <p>
* If a worker is requested while the thread cap is reached, a facade {@link reactor.core.scheduler.Scheduler.Worker}
* is provided which will enqueue the tasks submitted to it, deferring the actual submission
* of tasks until a thread-backed worker becomes available. This can thus affect initial delays of tasks.
* If a task is directly submitted to the {@link Scheduler} while the cap has been reached,
* it will be similarly enqueue and deferred (unless property {@link #DEFAULT_BOUNDED_ELASTIC_QUEUESIZE} is
* tuned on startup).
* If a task is directly submitted to the {@link Scheduler} while the thread cap has been reached,
* it will be similarly enqueue and deferred.
* In both cases, once the task cap has also been reached, further submissions are rejected with a
* {@link RejectedExecutionException}.
*
* @return a new {@link Scheduler} that dynamically create workers with an upper bound to
* the number of backing threads, reuses threads and evict idle ones
* the number of backing threads and after that on the number of enqueued tasks,
* that reuses threads and evict idle ones
*/
public static Scheduler boundedElastic() {
return cache(CACHED_BOUNDED_ELASTIC, BOUNDED_ELASTIC, BOUNDED_ELASTIC_SUPPLIER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ public void defaultBoundedElasticConfigurationIsConsistentWithJavadoc() {
//unbounded task queueing
assertThat(boundedElastic.deferredTaskCap)
.as("default unbounded task queueing")
.isEqualTo(Integer.MAX_VALUE)
.isEqualTo(100_000)
.isEqualTo(Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE);
}

Expand Down

0 comments on commit 262ea1d

Please sign in to comment.