Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is there a standard way to solve blocking that must happen. #1756

Closed
ZhangSanFengByGit opened this issue Jun 20, 2019 · 11 comments
Closed

Is there a standard way to solve blocking that must happen. #1756

ZhangSanFengByGit opened this issue Jun 20, 2019 · 11 comments
Labels
for/stackoverflow Questions are best asked on SO or Gitter

Comments

@ZhangSanFengByGit
Copy link

ZhangSanFengByGit commented Jun 20, 2019

Expected behavior

In the reference guide, it mentioned the way to wrap the blocking part:

Mono blockingWrapper = Mono.fromCallable(() -> { 
    return /* make a remote synchronous call */ 
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.elastic());

At a glance, this wrapper deliver the part of blocking code to a elastic thread.

However, there is a situation that makes me confused:
Imagine one of the flatmap operator in flux-operators-subscriber chain uses this blocking wrapper, what happens when this flatmap's FlapMapInners subscribe to the blockingWrapper?
It seems to be that those inners also have to wait for the results provided by the process in the elastic thread to move on to onNext in the afterward operator in the main chain. (which actually does not solve the blocking in fact?)

So, how could this solve the actual blocking scenarios in programming?

Actual behavior

After being confused for days, it occurs to me that one way may solve the must-blocking scenarios:

sourceFlux.map(...normal logic...)
                  .publishOn(Schedulars.elastic())
                  .flatmap(....mapper including blocking scenarios...)
                  .publishOn(Main Thread)
                  .subscribe(......)

In one word, I inject the blocking part into two PublishOn operators. This seems to be little problematic, but is there any better way to solve standard blocking situations? thx a lot!

@bsideup bsideup added the for/stackoverflow Questions are best asked on SO or Gitter label Jun 20, 2019
@bsideup
Copy link
Contributor

bsideup commented Jun 20, 2019

A few things to note here:

.publishOn(Main Thread)

Why would you want to move it back to the main thread? Is there any specific requirement?
Reactor work processing is implemented as "work stealing" algorithm, and will attempt to avoid thread context switch (a very expensive operation) as much as possible.
Also, publishOn is a heavy operator, and should only be used when you actually need to move the processing to another scheduler (thread pool)

what happens when this flatmap's FlapMapInners subscribe to the blockingWrapper

It will subscribe (as requested by subscribeOn) on the elastic scheduler's thread and continue the processing on it

It seems to be that those inners also have to wait for the results provided by the process in the elastic thread to move on to onNext in the afterward operator in the main chain. (which actually does not solve the blocking in fact?)

It solves the blocking problem by running the blocking call in a separate thread pool that does not suffer from thread starvation. Everything emitted from your inner will be emitted on the same thread (elastic, in your case).
Since every other operation is either non-blocking or should be explicitly moved to a blocking-friendly scheduler, further processing will not inherit the blocking issues.

@bsideup
Copy link
Contributor

bsideup commented Jun 20, 2019

Based on your code:

Mono<User> getUser(String id) {
    return Mono
              .fromCallable(() -> userRepository.findOne(id))
              .subscribeOn(Schedulers.elastic());
}

// ...
sourceFlux
          .map(Request::getUserId)
          .flatMap(this::getUser)
          .map(User::getName)
          .subscribe(......)

The reactive type (Mono/Flux/etc) returned from anywhere must be non-blocking, and consumer is safe to assume that. This is why we have subscribeOn in getUser and not in the pipeline inside flatMap

@crankydillo
Copy link

My team is having the exact same discussion. This example above is what I keep coming back to (e.g. flatMapping over a blocking call). People expressed concern over thread explosion; however, that is constrained by the concurrency factor (256 by default) on the flatMap. Anyhow, if any other light can be shed here, it would be greatly appreciated.

@bsideup
Copy link
Contributor

bsideup commented Jun 20, 2019

@crankydillo instead of using Schedulers.elastic() you can as well define a custom scheduler from an executor with a fixed pool.

@crankydillo
Copy link

Thanks @bsideup. In our case, we are running a JAX-RS service that creates Mono/Fluxes to process work. Some of this processing will involve blocking calls. It ultimately looks something like this:

Mono.just(httpRequestData).map(doStuff).flatMap(blockingCalls).toFuture()

Since the requests come in concurrently, the best idea we have so far is to use a shared, fixed size scheduler. Does this seem like the best option for you?

@ZhangSanFengByGit apologies if you feel I've hijacked your question. I can start another question if you aren't getting the answers you need. Just let me know.

@bsideup
Copy link
Contributor

bsideup commented Jun 21, 2019

@crankydillo I would definitely suggest moving this conversation to Gitter/StackOverflow or at least a separate issue :)

@crankydillo
Copy link

@bsideup I created this stackoverflow post. Any comments are appreciated!

@bsideup
Copy link
Contributor

bsideup commented Jul 12, 2019

I'm closing the issue since the question is answered.
@ZhangSanFengByGit @crankydillo feel free to ask on SO if you have more questions

@bsideup bsideup closed this as completed Jul 12, 2019
@bsideup
Copy link
Contributor

bsideup commented Jul 12, 2019

@crankydillo FYI we're also considering either limiting the number of elastic threads of creating a new blocking-friendly but pool-limited scheduler:
#1804

@crankydillo
Copy link

@bsideup Thanks. Please consider updating the SO post:) I think we will mark it accepted if you propose a pool-limited 'elastic' scheduler.

@bsideup
Copy link
Contributor

bsideup commented Dec 2, 2019

@crankydillo updated with Schedulers.boundedElastic() (the # of threads is capped, queues items when the limit is reached) :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for/stackoverflow Questions are best asked on SO or Gitter
Projects
None yet
Development

No branches or pull requests

3 participants