Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make candidate validation bounded again #2125

Merged
merged 19 commits into from
Jan 21, 2024

Conversation

s0me0ne-unkn0wn
Copy link
Contributor

@s0me0ne-unkn0wn s0me0ne-unkn0wn commented Nov 1, 2023

This PR aims to channel the backpressure of the PVF host's preparation and execution queues to the candidate validation subsystem consumers.

Related: #708

@s0me0ne-unkn0wn s0me0ne-unkn0wn changed the title [DNM] yet Make candidate validation bounded again Nov 2, 2023
@s0me0ne-unkn0wn s0me0ne-unkn0wn marked this pull request as ready for review November 2, 2023 08:17
@s0me0ne-unkn0wn s0me0ne-unkn0wn added the T0-node This PR/Issue is related to the topic “node”. label Nov 2, 2023
@alindima
Copy link
Contributor

alindima commented Nov 2, 2023

Maybe I don't have the full picture, but how does this PR introduce boundedness to the number of spawned tasks? AFAICT, the number is still not bounded. The tasks are just added to a FuturesUnordered collection. Maybe we want to check the size before pushing another one?

@s0me0ne-unkn0wn
Copy link
Contributor Author

s0me0ne-unkn0wn commented Nov 2, 2023

It's totally possible that I'm doing something wrong as I'm not a Rust async code expert yet (but trying to become one 🙂)

My idea was like this: when validate_from_exhaustive() (or another function communicating with the PVF host) is ready to submit a job to the PVF host, it sends a ToHost message to the PVF host through an mpsc channel, which is bounded. If the PVF host's message queue is full, that send() will block creating (or channeling) the backpressure, and the await on the validate_from_exhaustive() future wouldn't resolve until there's free buffer space in the PVF host's queue, channeling that backpressure further up. Does it work like that, or I'm getting the concept totally wrong?

@s0me0ne-unkn0wn
Copy link
Contributor Author

Ah, you probably mean that the number of added tasks is not bounded by itself, so no real backpressure is channeled up from the candidate validation subsys... Yes, looks like a problem I've overseen. I'll think about how to fix it (and ideas are appreciated).

@alindima
Copy link
Contributor

alindima commented Nov 2, 2023

Ah, you probably mean that the number of added tasks is not bounded by itself, so no real backpressure is channeled up from the candidate validation subsys... Yes, looks like a problem I've overseen. I'll think about how to fix it (and ideas are appreciated).

Yes, that's what I was referring to. I guess we need to add check the size of the FuturesUnordered collection before pushing another element there

@s0me0ne-unkn0wn
Copy link
Contributor Author

Well, it looked easy at first, but after a lot of research, reading, and chatting with people, I gave birth to this monster. I'd definitely appreciate any help in improving it.

Copy link
Contributor

@alexggh alexggh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, left you some other ideas if you want to experiment with them.

.zip(stream::repeat(sender))
// FIXME: The backlog size here is the same as in PVF host queues. It should either use a
// common constant, or another appropriate value should be chosen
.for_each_concurrent(30, |message_and_sender| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we add the common constant now rather than the todo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, but I'm not totally sure it makes sense. If the candidate validation messages were just retransmitted to the PVF host's queue, it would be fine. But some invariants (like ValidateFromChainState) require candidate validation subsys to do additional work involving runtime requests on which it awaits. So, if 30 ValidateFromChainState messages arrive, they are processed concurrently, and the PVF host queue is empty. If some ValidateFromExhaustive arrives at the moment, it is waiting until a free slot appears. But it could indeed fall through into the queue as it doesn't require any additional processing. That's why I think the limit here could be higher than the queue limit.
On the other hand, 30 concurrent ValidateFromChainStates is an unlikely situation. The real message stream is mixed Validate* requests with a handful of pre-checking requests. So I don't know, I just cannot decide which value is appropriate here.

.zip(stream::repeat(sender))
// FIXME: The backlog size here is the same as in PVF host queues. It should either use a
// common constant, or another appropriate value should be chosen
.for_each_concurrent(30, |message_and_sender| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing is that for_each_concurrent is not equivalent with calling ctx.spawn several times, because

the closure are run concurrently (but not in parallel– this combinator does not introduce any threads).

ctx.spawn gives us the opportunity if this being possibly run in parallel, not sure if it really matters, it might not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Speculatively, it has to be okay. What it actually does under the hood is to send a payload to the PVF host and then wait for the answer. All the actual processing is done by the PVF host within its own threading model. But I'm not 100% sure I'm not missing something not obvious here.

Copy link
Contributor

@alindima alindima Nov 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, it also does code decompression, which can take a couple of ms: #599.
So that means that one future will block all other from making progress, since they are not being spawned on new tasks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, fair enough. And now we have a dilemma. Go back to the FuturesUnordered and add a semaphore, or move the decompression to the validation host (something we want to do anyway) and leave this PR as-is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your call :D.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmarked the decompression with the Moonbeam runtime (a large one), it takes 4.33 msec, not the end of the world, but still significant.

As far as I understand the problem, moving the decompression to the PVF host side (which requires some effort by itself) is only meaningful if we run the PVF host on the blocking pool? And if we ever switch to non-blocking (which we want to do), we'll still have the same problem with some executor threads being blocked in decompression?

If that's true, it's better not to move it but spawn it as a blocking task on the subsystem side. I benchmarked that just in case; it doesn't introduce any overhead and is a future-proof solution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me. or we can just use regular spawn like before and see if #599 is worth taking a look at separately.

maybe it's too much of a micro-optimisation that doesn't make any difference

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if we ever switch to non-blocking (which we want to do), we'll still have the same problem with some executor threads being blocked in decompression?

If that's true, it's better not to move it but spawn it as a blocking task on the subsystem side.

Sounds good to me too. I do think the PVF host should be non-blocking as it does no blocking work itself. It waits on the child processes but that shouldn't block.

let mut res = Ok(());
let sender = ctx.sender().to_owned();

let read_stream = stream::poll_fn(|c| loop {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another possible way for implementing this would be to use a async_semaphore.

let s = Semaphore::new(MAX_CONCURENT_REQUESTS))

s.acquire() before all the spawn and you move the resulting guard in the future.

@s0me0ne-unkn0wn
Copy link
Contributor Author

Okay, so here's take three: semaphores.

I didn't do anything with decompression yet (as it's a micro-optimization anyway and may be left for a follow-up). The problem is, IIUC, we cannot spawn tasks from other tasks. The subsystem context exposing spawn() and spawn_blocking() is not cloneable, it cannot be moved into a task as the subsystem main loop needs it, and it cannot be borrowed in a task as spawn() takes &mut self. In C, I'd just pass a handler to .spawn_blocking() to the task and yolo ftw. Here, I don't see a way.

I could've spawned my own thread pool from a task to do parallel decompressions, but it sounds like firing a cannon at sparrows. I can also spawn a separate task on the blocking pool that will do decompressions, but in that case, it will do it FIFO; that is, tasks requesting decompression will not be blocked but will wait until all the decompression requests submitted by other tasks are processed.

All in all, it seems to be a more complex problem than it sounds, or I'm just not enlightened enough yet to invent a simple solution.

.await;

let _ = response_sender.send(precheck_result);
futures::select! {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we gave up on spawn I imagined we can do something like this, what am I missing ?

--- a/polkadot/node/core/candidate-validation/src/lib.rs
+++ b/polkadot/node/core/candidate-validation/src/lib.rs
@@ -152,6 +152,8 @@ async fn run<Context>(
        )
        .await;
        ctx.spawn_blocking("pvf-validation-host", task.boxed())?;
+       use async_semaphore::Semaphore;
+       let semaphore = Arc::new(Semaphore::new(10));
 
        loop {
                match ctx.recv().await? {
@@ -171,9 +173,12 @@ async fn run<Context>(
                                                let mut sender = ctx.sender().clone();
                                                let metrics = metrics.clone();
                                                let validation_host = validation_host.clone();
+                                               let guard = semaphore.acquire_arc().await;
 
                                                async move {
                                                        let _timer = metrics.time_validate_from_chain_state();
+                                                       let _guard = guard;
+
                                                        let res = validate_from_chain_state(
                                                                &mut sender,
                                                                validation_host,
@@ -205,9 +210,12 @@ async fn run<Context>(
                                        let bg = {
                                                let metrics = metrics.clone();
                                                let validation_host = validation_host.clone();
+                                               let guard = semaphore.acquire_arc().await;
 
                                                async move {
                                                        let _timer = metrics.time_validate_from_exhaustive();
+                                                       let _guard = guard;
+
                                                        let res = validate_candidate_exhaustive(
                                                                validation_host,
                                                                validation_data,
@@ -236,8 +244,11 @@ async fn run<Context>(
                                        let bg = {
                                                let mut sender = ctx.sender().clone();
                                                let validation_host = validation_host.clone();
+                                               let guard = semaphore.acquire_arc().await;
 
                                                async move {
+                                                       let _guard = guard;
+

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind :D I was dumb we need the select if don't want to block signals processing while the queue is full.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was dumb we need the select if don't want to block signals processing while the queue is full.

AFAICT, in its current form, this PR will still not process signals while the queue is full. The select only helps with the other tasks being processed concurrently with the overseer messages (if we were using spawn, we wouldn't need the select)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, and as far as I understand, that's exactly what we want. When the queue is full, signal processing stops, thus channeling the backpressure upstream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we'd like to not block signal processing when the queue is full (which is really only for Conclude) we should add a separate task that handles CandidateValidationMessages (and the main task sends work to it via a channel).

Not having this would prevent the subsystem from shutting down if the queue is full I guess. Not sure if it's a real problem with a queue size of 10.. idk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a dumb question but how exactly do signals and messages differ? Signals only notify of some change in state (i.e. shutdown, leaf update, etc)? Do they only originate from the overseer or from somewhere else as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly signals originate from the overseer and are prioritized.

The suggestion is to have an additional call recv_signal(), which only receives signals, but no messages ever. Indeed, we could make it three functions in total:

  • recv ... the existing call, receiving everything.
  • recv_signal ... only receiving signals.
  • recv_msg .... only receiving messages.

Then you would either use recv or the recv_signal/recv_msg combination, giving you more control

Prioritizing signals even more than what is done by default should not do any harm. The overseer already ensures that you are never receiving messages that have been sent after a leaf activation, before you have seen that signal as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly signals originate from the overseer and are prioritized.

Is it possible to document this @s0me0ne-unkn0wn? Also, the docs for OverseerSignal are not very helpful: "Signals sent by an overseer to a subsystem".

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signals are sent as a form of broadcast to all subsystems by integration code, here it's (from memory) from the substrate block import pipeline (with a few intermediate steps).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the better solution is to change orchestra to allow implementation of subsystems that don't really care about signals, maybe except Conclude. What I mean is that we either have a per subsystem configurable SIGNAL_TIMEOUT, or annotate these subsystems such that broadcast_signal only forwards them Conclude. Then we'd have to be careful to ensure nodes get stuck at shutdown because of this.

@paritytech-cicd-pr
Copy link

The CI pipeline was cancelled due to failure one of the required jobs.
Job name: cargo-clippy
Logs: https://gitlab.parity.io/parity/mirrors/polkadot-sdk/-/jobs/4835830

Copy link
Contributor

@sandreim sandreim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently message_channel_capacity is 2048, if we really want to backpressure here, we'd have to make the channel size much smaller.

@s0me0ne-unkn0wn
Copy link
Contributor Author

@sandreim thanks for the review! I'm not going to merge before the new orchestra crate is released anyway. I've temporarily switched everything to the orchestra's master branch in this PR to see how CI reacts.

@s0me0ne-unkn0wn
Copy link
Contributor Author

And I wonder if we should push further to achieve the real backpressure.

@rphmeier what's the point of spawning background tasks for the validation in the backing subsystem? If we wanted a real backpressure, we'd need to make them foreground so that backing could be blocked if the candidate validation queue is overflowed. Leaf activations could still be processed separately in that case thanks to the new separate signal processing mechanism, but other messages like CanSecond would also be blocked, thus backpressuring on the upstream subsystems.

That all sounds like a correct backpressure chain to me, but I'm not sure if some time-critical work the backing subsystem does should be prioritized over the backpressure correctness.

@s0me0ne-unkn0wn
Copy link
Contributor Author

I gave it a local zombienet burn-in, 10 hours, 20 validators, 6 parachains. Seems to be stable.

@s0me0ne-unkn0wn s0me0ne-unkn0wn added this pull request to the merge queue Jan 21, 2024
Merged via the queue into master with commit d37a456 Jan 21, 2024
124 checks passed
@s0me0ne-unkn0wn s0me0ne-unkn0wn deleted the s0me0ne/bounded-candidate-validation branch January 21, 2024 14:39
alexggh added a commit that referenced this pull request Jan 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
T0-node This PR/Issue is related to the topic “node”.
Projects
Status: Completed
Development

Successfully merging this pull request may close these issues.

9 participants