Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve idempotency latency by introducing rm_stm pipelining #5157

Merged
merged 11 commits into from
Jun 24, 2022

Conversation

rystsov
Copy link
Contributor

@rystsov rystsov commented Jun 17, 2022

Cover letter

Kafka protocol is asynchronous: a client may send the next write request without waiting for the previous request to finish. Since Redpanda's Raft implementation is also asynchronous, it's possible to process Kafka requests with minimal overhead and/or delays.

The challenge is to process causally related requests without pipeline stalling. Imagine that we have two requests A and B. When we want to process them as fast as possible we should start replicating the latter request without waiting until the replication of the first request is over.

But in case the requests are the RSM commands and are causally related e.g. request A is "set x=2" and B is "set y=3 if x=2" then to guarantee that B's precondition holds Redpanda can't start replication of B request without knowing that

  1. The request A is successfully executed
  2. No other command sneaks in between A and B

In case of idempotency the condition we need to preserve is the monotonicity of the seq numbers without gaps.

In order to preserve causality and to avoid pipeline stalls we use optimistic replication. A leader knows about its ongoing operations so it may optimistically predict the outcome of operation A and start executing operation B assuming its prediction is true.

But what happens if Redpanda is wrong with its prediction, how does it ensure safety? Let's break down all the failure scenarios.

What enforces the order between A and B?

A leader uses a mutex and consesnsus::replicate_in_stages order all the incoming requests. Before the first stage is resolves the mutex controls the order and after the first stage is resolved Redpanda's raft implementation guarantees order:

auto u = co_await mutex.lock();
auto stages = raft.replicate_in_stages(request);
co_await stages.enqueued;
u.return_all();
// after this point the order between the requests is certain
// the order enforced by mutex is preserved in the log

What prevents a command C getting in between A and B?

The mutex / replicate_in_stages combination may enforce partial order but can't prevent out-of-nowhere writes. Imagine a node loses the leadership then the new leader inserts a command C and transfers the leadership back to the original node.

To fight this Redpanda uses conditional replication:

auto term = persisted_stm::sync();
auto u = co_await mutex.lock();
auto stages = raft.replicate_in_stages(term, request);
co_await stages.enqueued;
u.return_all();

It uses a sync method to make sure that the RSM's state reflects all the commands replicated by previous term (during this phase it may learn about the "A" command) and then it uses the raft's term to issue the replication command (conditional replicate). By design the replication call can't succeed if the term is wrong so instead of leading the ACB state the replication of B is doomed to fail.

What if replication of A fails after the A-B order is already defined?

Redpanda's Raft implementation guarantees that if A was enqueued before B within the same term then the replication of B can't be successful without replication of A so we may not worry that A may be dropped.

What if replication of A times out (fails) before Redpanda scheduled an execution of B?

But we still have uncertainty. Should Redpanda process B assuming that A was successful or should it assume it failed? In order to resolve the uncertainty the leader steps down. Since sync() guarantees that the RSM's state reflects all the commands replicated by previous term - the next leader will resolve the uncertainty.

The combination of those methods lead to preserving causal relationships between the requests (idempotency) without introducing pipelining stalls.


The idempotency logic is straightforward:

  • Take a lock identified by producer id (concurrent producers don't affect each other)
    • If the seq number and its offset is already known then return the offset
    • If the seq number is known and in flight then "park" the current request (once the original resolved it pings all the parked requests with the written offsets or an error)
    • If the seq number is seen for the first time then:
      • Reject it if there is a gap between it and the last known seq
      • Start processing if there is no gap

Release notes

  • Reduce latency of the idempotent producers by 50%

Fixes #5054

@mmaslankaprv
Copy link
Member

I am wondering would it be possible to return raft::replicate_stages all the way from rm_stm to cluster::partition this way we would have exactly the same processing of idepotent and classic replications ?

@rystsov rystsov force-pushed the idempotency-latency branch 4 times, most recently from 0d0091f to b949b1b Compare June 18, 2022 02:29
@rystsov
Copy link
Contributor Author

rystsov commented Jun 18, 2022

I am wondering would it be possible to return raft::replicate_stages all the way from rm_stm to cluster::partition this way we would have exactly the same processing of idepotent and classic replications ?

done, I've wrapped promise api by raft::replicate_stages

@rystsov rystsov changed the title add rm_stm pipeline Improve idempotency latency by introducing rm_stm pipelining Jun 18, 2022
@rystsov rystsov marked this pull request as ready for review June 18, 2022 05:13
@rystsov rystsov requested a review from ajfabbri June 19, 2022 05:32
Copy link
Contributor

@bharathv bharathv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still trying to digest some parts of sequence tracking replicate_seq, publishing remaining comments (mostly questions) meanwhile.

src/v/cluster/rm_stm.cc Show resolved Hide resolved
src/v/cluster/rm_stm.cc Outdated Show resolved Hide resolved
tests/rptest/services/rpk_producer.py Outdated Show resolved Hide resolved
src/v/redpanda/admin_server.cc Show resolved Hide resolved
src/v/cluster/rm_stm.h Outdated Show resolved Hide resolved
src/v/cluster/rm_stm.h Show resolved Hide resolved
src/v/cluster/rm_stm.cc Show resolved Hide resolved
src/v/cluster/rm_stm.cc Show resolved Hide resolved
src/v/cluster/rm_stm.cc Show resolved Hide resolved
src/v/cluster/rm_stm.cc Show resolved Hide resolved
@rystsov rystsov force-pushed the idempotency-latency branch 3 times, most recently from 2d53cc6 to c545419 Compare June 20, 2022 00:39
@@ -766,6 +788,11 @@ ss::future<result<raft::replicate_result>> rm_stm::replicate(
return _c->replicate(std::move(b), opts);
})
.finally([u = std::move(unit)] {});
})
.finally([enqueued] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part is a little bit confusing to me, we pass in enqueued to replicate_seq so we want the code in replicate_seq to set enqueued value, here we actually check if the promise was set, can we do in inside of replicate_seq ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add comment on it. It serves two purposes: to increase safety and to reduce visual noise.

Safety. We want to avoid a case when replicate_finished is set but enqueued isn't. This is the reason why we need the finally clause.

Visual noise. Without using finally we need to set enqueued for every return (a error, a cached value). It doesn't help that c++ doesn't have finally block so using seastart primitive. do_replicate seems like a logical place because it also covers replicate_tx.

Imagine a reader seeing a promise creation in replicate_in_stages and wondering if it's ever set (is it safe). If the finally block is in do_replicate it's a single hop away from increasing their certainty.

Copy link
Contributor Author

@rystsov rystsov Jun 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved the finally block to replicate_in_stages so it is just where we declare the promise, added a comment explaining why we need the finally block and another comment in replicate_seq with info on why it's ok not to set it

@@ -151,7 +151,11 @@ class partition {

ss::future<std::error_code>
transfer_leadership(std::optional<model::node_id> target) {
return _raft->do_transfer_leadership(target);
if (_rm_stm) {
return _rm_stm->transfer_leadership(target);
Copy link
Member

@mmaslankaprv mmaslankaprv Jun 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a general question, can the livelock happen when leader is elected because heartbeats were lost ?

Copy link
Contributor Author

@rystsov rystsov Jun 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about this situation?

  1. something happens
  2. since consensus::replication doesn't have a timeout and it doesn't return control flow in time
  3. leadership balancer tries to do its job but it gets block by the unfinished replication

As I understand the leadership balancer is an optimization and if it doesn't work in some cases it isn't a problem. Imaging a node is being isolated from the rest of the cluster, even if the leadership balancer is fully functional it can do anything. Just like in this case eventually a new node becomes a leader and carries on.

@rystsov rystsov force-pushed the idempotency-latency branch 3 times, most recently from f0e98f1 to 5757747 Compare June 22, 2022 02:46
bharathv
bharathv previously approved these changes Jun 22, 2022
Copy link
Contributor

@bharathv bharathv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't fully understand the implications of coordination between step_down and transfer leadership as I don't have a good idea of failure conditions in that part of the code but rest (around rm_stm and sequence number invariants) lgtm.

src/v/cluster/rm_stm.cc Outdated Show resolved Hide resolved
src/v/cluster/rm_stm.cc Show resolved Hide resolved
src/v/cluster/rm_stm.h Outdated Show resolved Hide resolved
@rystsov rystsov force-pushed the idempotency-latency branch 2 times, most recently from f98785a to 1185ce7 Compare June 23, 2022 23:45
Return invalid_request instead of processing a wrong if branch
Kafka protocol is asynchronous: a client may send the next write
request without waiting for the previous request to finish. Since
Redpanda's Raft implementation is also asynchronous, it's possible
to process Kafka requests with minimal overhead and/or delays.

The challenge is to process causally related requests without
pipeline stalling. Imagine that we have two requests A and B. When
we want to process them as fast as possible we should start repli-
cating the latter request without waiting until the replication of
the first request is over.

But in case the requests are the RSM commands and are causally
related e.g. request A is "set x=2" and B is "set y=3 if x=2" then
to guarantee that B's precondition holds Redpanda can't start
replication of B request without knowing that

  - The request A is successfully executed
  - No other command sneaks in between A and B

In case of idempotency the condition we need to preserve is the
monotonicity of the seq numbers without gaps.

In order to preserve causality and to avoid pipeline stalls we use
optimistic replication. A leader knows about its ongoing operations
so it may optimistically predict the outcome of operation A and
start executing operation B assuming its prediction is true.

But what happens if Redpanda is wrong with its prediction, how does
it ensure safety? Let's break down all the failure scenarios.

A leader uses a mutex and consesnsus::replicate_in_stages order all
the incoming requests. Before the first stage is resolves the mutex
controls the order and after the first stage is resolved Redpanda's
raft implementation guarantees order:

  auto u = co_await mutex.lock();
  auto stages = raft.replicate_in_stages(request);
  co_await stages.enqueued;
  u.return_all();
  // after this point the order between the requests is certain
  // the order enforced by mutex is preserved in the log

The mutex / replicate_in_stages combination may enforce partial
order but can't prevent out-of-nowhere writes. Imagine a node loses
the leadership then new leader inserts a command C and transfers
the leadership back to the original node.

To fight this Redpanda uses conditional replication:

  auto term = persisted_stm::sync();
  auto u = co_await mutex.lock();
  auto stages = raft.replicate_in_stages(term, request);
  co_await stages.enqueued;
  u.return_all();

It uses a sync method to make sure that the RSM's state reflects
all the commands replicated by previous term (during this phase it
may learn about the "A" command) and then it uses the raft's term
to issue the replication command (conditional replicate). By design
the replication call can't succeed if the term is wrong so instead
of leading the ACB state the replication of B is doomed to fail.

Redpanda's Raft implementation guarantees that if A was enqueued
before B within the same term then the replication of B can't be
successful without replication of A so we may not worry that A may
be dropped.

But we still have uncertainty. Should Redpanda process B assuming
that A was successful or should it assume it failed? In order to
resolve the uncertainty the leader steps down.

Since sync() guarantees that the RSM's state reflects all the
commands replicated by previous term - the next leader will resolve
the uncertainty.

The combination of those methods lead to preserving causal
relationships between the requests (idempotency) without
introducing pipelining stalls.

-------------------------------------------------------------------

The idempotency logic is straightforward:

 - Take a lock identified by producer id (concurrent producers
   don't affect each other)
    - If the seq number and its offset is already known then
      return the offset
    - If the seq number is known and in flight then "park" the
      current request (once the original resolved it pings all
      the parked requests with the written offsets or an error)
    - If the seq number is seen for the first time then:
        - Reject it if there is a gap between it and the last
          known seq
        - Start processing if there is no gap

fixes redpanda-data#5054
Leadership transfer and rm_stm should be coordinated to avoid a
possibility of the live lock:

  - leadership transfer sets _transferring_leadership
  - it causes a write to fail
  - failing write initiate leader step down
  - the step down causes the transfer to fail
making transfer_leadership_to more stable by checking that all
nodes has updated metadata. previously we could choose a stale
server, get 503 and retry the requests
temporary adding a faster version of wait_until before the
ducktape repo is updated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Turning on Idempotent Producer doubles the latency
5 participants