Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Turning on Idempotent Producer doubles the latency #5054

Closed
rkruze opened this issue Jun 7, 2022 · 1 comment · Fixed by #5157
Closed

Turning on Idempotent Producer doubles the latency #5054

rkruze opened this issue Jun 7, 2022 · 1 comment · Fixed by #5157
Assignees
Labels
kind/bug Something isn't working performance

Comments

@rkruze
Copy link
Contributor

rkruze commented Jun 7, 2022

Version & Environment

Redpanda version: (use rpk version):

v22.1.3 compared with v21.11.16

What went wrong?

By default, v22.1 is double the latency of v21.11. This seems to be due to idempotent producers, as when this is turned off we see that the latency of v22.1 is reduced to that of what was seen in v21.11.

image

What should have happened instead?

There should be no additional latency added when using an idempotent producer.

@rkruze rkruze added the kind/bug Something isn't working label Jun 7, 2022
rystsov added a commit to rystsov/redpanda that referenced this issue Jun 18, 2022
rystsov added a commit to rystsov/redpanda that referenced this issue Jun 19, 2022
rystsov added a commit to rystsov/redpanda that referenced this issue Jun 20, 2022
rystsov added a commit to rystsov/redpanda that referenced this issue Jun 22, 2022
rystsov added a commit to rystsov/redpanda that referenced this issue Jun 22, 2022
rystsov added a commit to rystsov/redpanda that referenced this issue Jun 22, 2022
Kafka protocol is asynchronous: a client may send the next write
request without waiting for the previous request to finish. Since
Redpanda's Raft implementation is also asynchronous, it's possible
to process Kafka requests with minimal overhead and/or delays.

The challenge is to process causally related requests without
pipeline stalling. Imagine that we have two requests A and B. When
we want to process them as fast as possible we should start repli-
cating the latter request without waiting until the replication of
the first request is over.

But in case the requests are the RSM commands and are causally
related e.g. request A is "set x=2" and B is "set y=3 if x=2" then
to guarantee that B's precondition holds Redpanda can't start
replication of B request without knowing that

  - The request A is successfully executed
  - No other command sneaks in between A and B

In case of idempotency the condition we need to preserve is the
monotonicity of the seq numbers without gaps.

In order to preserve causality and to avoid pipeline stalls we use
optimistic replication. A leader knows about its ongoing operations
so it may optimistically predict the outcome of operation A and
start executing operation B assuming its prediction is true.

But what happens if Redpanda is wrong with its prediction, how does
it ensure safety? Let's break down all the failure scenarios.

A leader uses a mutex and consesnsus::replicate_in_stages order all
the incoming requests. Before the first stage is resolves the mutex
controls the order and after the first stage is resolved Redpanda's
raft implementation guarantees order:

  auto u = co_await mutex.lock();
  auto stages = raft.replicate_in_stages(request);
  co_await stages.enqueued;
  u.return_all();
  // after this point the order between the requests is certain
  // the order enforced by mutex is preserved in the log

The mutex / replicate_in_stages combination may enforce partial
order but can't prevent out-of-nowhere writes. Imagine a node loses
the leadership then new leader inserts a command C and transfers
the leadership back to the original node.

To fight this Redpanda uses conditional replication:

  auto term = persisted_stm::sync();
  auto u = co_await mutex.lock();
  auto stages = raft.replicate_in_stages(term, request);
  co_await stages.enqueued;
  u.return_all();

It uses a sync method to make sure that the RSM's state reflects
all the commands replicated by previous term (during this phase it
may learn about the "A" command) and then it uses the raft's term
to issue the replication command (conditional replicate). By design
the replication call can't succeed if the term is wrong so instead
of leading the ACB state the replication of B is doomed to fail.

Redpanda's Raft implementation guarantees that if A was enqueued
before B within the same term then the replication of B can't be
successful without replication of A so we may not worry that A may
be dropped.

But we still have uncertainty. Should Redpanda process B assuming
that A was successful or should it assume it failed? In order to
resolve the uncertainty the leader steps down.

Since sync() guarantees that the RSM's state reflects all the
commands replicated by previous term - the next leader will resolve
the uncertainty.

The combination of those methods lead to preserving causal
relationships between the requests (idempotency) without
introducing pipelining stalls.

-------------------------------------------------------------------

The idempotency logic is straightforward:

 - Take a lock identified by producer id (concurrent producers
   don't affect each other)
    - If the seq number and its offset is already known then
      return the offset
    - If the seq number is known and in flight then "park" the
      current request (once the original resolved it pings all
      the parked requests with the written offsets or an error)
    - If the seq number is seen for the first time then:
        - Reject it if there is a gap between it and the last
          known seq
        - Start processing if there is no gap

fixes redpanda-data#5054
rystsov added a commit to rystsov/redpanda that referenced this issue Jun 22, 2022
Kafka protocol is asynchronous: a client may send the next write
request without waiting for the previous request to finish. Since
Redpanda's Raft implementation is also asynchronous, it's possible
to process Kafka requests with minimal overhead and/or delays.

The challenge is to process causally related requests without
pipeline stalling. Imagine that we have two requests A and B. When
we want to process them as fast as possible we should start repli-
cating the latter request without waiting until the replication of
the first request is over.

But in case the requests are the RSM commands and are causally
related e.g. request A is "set x=2" and B is "set y=3 if x=2" then
to guarantee that B's precondition holds Redpanda can't start
replication of B request without knowing that

  - The request A is successfully executed
  - No other command sneaks in between A and B

In case of idempotency the condition we need to preserve is the
monotonicity of the seq numbers without gaps.

In order to preserve causality and to avoid pipeline stalls we use
optimistic replication. A leader knows about its ongoing operations
so it may optimistically predict the outcome of operation A and
start executing operation B assuming its prediction is true.

But what happens if Redpanda is wrong with its prediction, how does
it ensure safety? Let's break down all the failure scenarios.

A leader uses a mutex and consesnsus::replicate_in_stages order all
the incoming requests. Before the first stage is resolves the mutex
controls the order and after the first stage is resolved Redpanda's
raft implementation guarantees order:

  auto u = co_await mutex.lock();
  auto stages = raft.replicate_in_stages(request);
  co_await stages.enqueued;
  u.return_all();
  // after this point the order between the requests is certain
  // the order enforced by mutex is preserved in the log

The mutex / replicate_in_stages combination may enforce partial
order but can't prevent out-of-nowhere writes. Imagine a node loses
the leadership then new leader inserts a command C and transfers
the leadership back to the original node.

To fight this Redpanda uses conditional replication:

  auto term = persisted_stm::sync();
  auto u = co_await mutex.lock();
  auto stages = raft.replicate_in_stages(term, request);
  co_await stages.enqueued;
  u.return_all();

It uses a sync method to make sure that the RSM's state reflects
all the commands replicated by previous term (during this phase it
may learn about the "A" command) and then it uses the raft's term
to issue the replication command (conditional replicate). By design
the replication call can't succeed if the term is wrong so instead
of leading the ACB state the replication of B is doomed to fail.

Redpanda's Raft implementation guarantees that if A was enqueued
before B within the same term then the replication of B can't be
successful without replication of A so we may not worry that A may
be dropped.

But we still have uncertainty. Should Redpanda process B assuming
that A was successful or should it assume it failed? In order to
resolve the uncertainty the leader steps down.

Since sync() guarantees that the RSM's state reflects all the
commands replicated by previous term - the next leader will resolve
the uncertainty.

The combination of those methods lead to preserving causal
relationships between the requests (idempotency) without
introducing pipelining stalls.

-------------------------------------------------------------------

The idempotency logic is straightforward:

 - Take a lock identified by producer id (concurrent producers
   don't affect each other)
    - If the seq number and its offset is already known then
      return the offset
    - If the seq number is known and in flight then "park" the
      current request (once the original resolved it pings all
      the parked requests with the written offsets or an error)
    - If the seq number is seen for the first time then:
        - Reject it if there is a gap between it and the last
          known seq
        - Start processing if there is no gap

fixes redpanda-data#5054
rystsov added a commit to rystsov/redpanda that referenced this issue Jun 23, 2022
Kafka protocol is asynchronous: a client may send the next write
request without waiting for the previous request to finish. Since
Redpanda's Raft implementation is also asynchronous, it's possible
to process Kafka requests with minimal overhead and/or delays.

The challenge is to process causally related requests without
pipeline stalling. Imagine that we have two requests A and B. When
we want to process them as fast as possible we should start repli-
cating the latter request without waiting until the replication of
the first request is over.

But in case the requests are the RSM commands and are causally
related e.g. request A is "set x=2" and B is "set y=3 if x=2" then
to guarantee that B's precondition holds Redpanda can't start
replication of B request without knowing that

  - The request A is successfully executed
  - No other command sneaks in between A and B

In case of idempotency the condition we need to preserve is the
monotonicity of the seq numbers without gaps.

In order to preserve causality and to avoid pipeline stalls we use
optimistic replication. A leader knows about its ongoing operations
so it may optimistically predict the outcome of operation A and
start executing operation B assuming its prediction is true.

But what happens if Redpanda is wrong with its prediction, how does
it ensure safety? Let's break down all the failure scenarios.

A leader uses a mutex and consesnsus::replicate_in_stages order all
the incoming requests. Before the first stage is resolves the mutex
controls the order and after the first stage is resolved Redpanda's
raft implementation guarantees order:

  auto u = co_await mutex.lock();
  auto stages = raft.replicate_in_stages(request);
  co_await stages.enqueued;
  u.return_all();
  // after this point the order between the requests is certain
  // the order enforced by mutex is preserved in the log

The mutex / replicate_in_stages combination may enforce partial
order but can't prevent out-of-nowhere writes. Imagine a node loses
the leadership then new leader inserts a command C and transfers
the leadership back to the original node.

To fight this Redpanda uses conditional replication:

  auto term = persisted_stm::sync();
  auto u = co_await mutex.lock();
  auto stages = raft.replicate_in_stages(term, request);
  co_await stages.enqueued;
  u.return_all();

It uses a sync method to make sure that the RSM's state reflects
all the commands replicated by previous term (during this phase it
may learn about the "A" command) and then it uses the raft's term
to issue the replication command (conditional replicate). By design
the replication call can't succeed if the term is wrong so instead
of leading the ACB state the replication of B is doomed to fail.

Redpanda's Raft implementation guarantees that if A was enqueued
before B within the same term then the replication of B can't be
successful without replication of A so we may not worry that A may
be dropped.

But we still have uncertainty. Should Redpanda process B assuming
that A was successful or should it assume it failed? In order to
resolve the uncertainty the leader steps down.

Since sync() guarantees that the RSM's state reflects all the
commands replicated by previous term - the next leader will resolve
the uncertainty.

The combination of those methods lead to preserving causal
relationships between the requests (idempotency) without
introducing pipelining stalls.

-------------------------------------------------------------------

The idempotency logic is straightforward:

 - Take a lock identified by producer id (concurrent producers
   don't affect each other)
    - If the seq number and its offset is already known then
      return the offset
    - If the seq number is known and in flight then "park" the
      current request (once the original resolved it pings all
      the parked requests with the written offsets or an error)
    - If the seq number is seen for the first time then:
        - Reject it if there is a gap between it and the last
          known seq
        - Start processing if there is no gap

fixes redpanda-data#5054
rystsov added a commit to rystsov/redpanda that referenced this issue Jun 24, 2022
Kafka protocol is asynchronous: a client may send the next write
request without waiting for the previous request to finish. Since
Redpanda's Raft implementation is also asynchronous, it's possible
to process Kafka requests with minimal overhead and/or delays.

The challenge is to process causally related requests without
pipeline stalling. Imagine that we have two requests A and B. When
we want to process them as fast as possible we should start repli-
cating the latter request without waiting until the replication of
the first request is over.

But in case the requests are the RSM commands and are causally
related e.g. request A is "set x=2" and B is "set y=3 if x=2" then
to guarantee that B's precondition holds Redpanda can't start
replication of B request without knowing that

  - The request A is successfully executed
  - No other command sneaks in between A and B

In case of idempotency the condition we need to preserve is the
monotonicity of the seq numbers without gaps.

In order to preserve causality and to avoid pipeline stalls we use
optimistic replication. A leader knows about its ongoing operations
so it may optimistically predict the outcome of operation A and
start executing operation B assuming its prediction is true.

But what happens if Redpanda is wrong with its prediction, how does
it ensure safety? Let's break down all the failure scenarios.

A leader uses a mutex and consesnsus::replicate_in_stages order all
the incoming requests. Before the first stage is resolves the mutex
controls the order and after the first stage is resolved Redpanda's
raft implementation guarantees order:

  auto u = co_await mutex.lock();
  auto stages = raft.replicate_in_stages(request);
  co_await stages.enqueued;
  u.return_all();
  // after this point the order between the requests is certain
  // the order enforced by mutex is preserved in the log

The mutex / replicate_in_stages combination may enforce partial
order but can't prevent out-of-nowhere writes. Imagine a node loses
the leadership then new leader inserts a command C and transfers
the leadership back to the original node.

To fight this Redpanda uses conditional replication:

  auto term = persisted_stm::sync();
  auto u = co_await mutex.lock();
  auto stages = raft.replicate_in_stages(term, request);
  co_await stages.enqueued;
  u.return_all();

It uses a sync method to make sure that the RSM's state reflects
all the commands replicated by previous term (during this phase it
may learn about the "A" command) and then it uses the raft's term
to issue the replication command (conditional replicate). By design
the replication call can't succeed if the term is wrong so instead
of leading the ACB state the replication of B is doomed to fail.

Redpanda's Raft implementation guarantees that if A was enqueued
before B within the same term then the replication of B can't be
successful without replication of A so we may not worry that A may
be dropped.

But we still have uncertainty. Should Redpanda process B assuming
that A was successful or should it assume it failed? In order to
resolve the uncertainty the leader steps down.

Since sync() guarantees that the RSM's state reflects all the
commands replicated by previous term - the next leader will resolve
the uncertainty.

The combination of those methods lead to preserving causal
relationships between the requests (idempotency) without
introducing pipelining stalls.

-------------------------------------------------------------------

The idempotency logic is straightforward:

 - Take a lock identified by producer id (concurrent producers
   don't affect each other)
    - If the seq number and its offset is already known then
      return the offset
    - If the seq number is known and in flight then "park" the
      current request (once the original resolved it pings all
      the parked requests with the written offsets or an error)
    - If the seq number is seen for the first time then:
        - Reject it if there is a gap between it and the last
          known seq
        - Start processing if there is no gap

fixes redpanda-data#5054
@bharathv bharathv added this to the v22.1.5 milestone Jun 27, 2022
@rystsov
Copy link
Contributor

rystsov commented Jul 12, 2022

/backport v22.1.x

rystsov added a commit to rystsov/redpanda that referenced this issue Jul 13, 2022
Kafka protocol is asynchronous: a client may send the next write
request without waiting for the previous request to finish. Since
Redpanda's Raft implementation is also asynchronous, it's possible
to process Kafka requests with minimal overhead and/or delays.

The challenge is to process causally related requests without
pipeline stalling. Imagine that we have two requests A and B. When
we want to process them as fast as possible we should start repli-
cating the latter request without waiting until the replication of
the first request is over.

But in case the requests are the RSM commands and are causally
related e.g. request A is "set x=2" and B is "set y=3 if x=2" then
to guarantee that B's precondition holds Redpanda can't start
replication of B request without knowing that

  - The request A is successfully executed
  - No other command sneaks in between A and B

In case of idempotency the condition we need to preserve is the
monotonicity of the seq numbers without gaps.

In order to preserve causality and to avoid pipeline stalls we use
optimistic replication. A leader knows about its ongoing operations
so it may optimistically predict the outcome of operation A and
start executing operation B assuming its prediction is true.

But what happens if Redpanda is wrong with its prediction, how does
it ensure safety? Let's break down all the failure scenarios.

A leader uses a mutex and consesnsus::replicate_in_stages order all
the incoming requests. Before the first stage is resolves the mutex
controls the order and after the first stage is resolved Redpanda's
raft implementation guarantees order:

  auto u = co_await mutex.lock();
  auto stages = raft.replicate_in_stages(request);
  co_await stages.enqueued;
  u.return_all();
  // after this point the order between the requests is certain
  // the order enforced by mutex is preserved in the log

The mutex / replicate_in_stages combination may enforce partial
order but can't prevent out-of-nowhere writes. Imagine a node loses
the leadership then new leader inserts a command C and transfers
the leadership back to the original node.

To fight this Redpanda uses conditional replication:

  auto term = persisted_stm::sync();
  auto u = co_await mutex.lock();
  auto stages = raft.replicate_in_stages(term, request);
  co_await stages.enqueued;
  u.return_all();

It uses a sync method to make sure that the RSM's state reflects
all the commands replicated by previous term (during this phase it
may learn about the "A" command) and then it uses the raft's term
to issue the replication command (conditional replicate). By design
the replication call can't succeed if the term is wrong so instead
of leading the ACB state the replication of B is doomed to fail.

Redpanda's Raft implementation guarantees that if A was enqueued
before B within the same term then the replication of B can't be
successful without replication of A so we may not worry that A may
be dropped.

But we still have uncertainty. Should Redpanda process B assuming
that A was successful or should it assume it failed? In order to
resolve the uncertainty the leader steps down.

Since sync() guarantees that the RSM's state reflects all the
commands replicated by previous term - the next leader will resolve
the uncertainty.

The combination of those methods lead to preserving causal
relationships between the requests (idempotency) without
introducing pipelining stalls.

-------------------------------------------------------------------

The idempotency logic is straightforward:

 - Take a lock identified by producer id (concurrent producers
   don't affect each other)
    - If the seq number and its offset is already known then
      return the offset
    - If the seq number is known and in flight then "park" the
      current request (once the original resolved it pings all
      the parked requests with the written offsets or an error)
    - If the seq number is seen for the first time then:
        - Reject it if there is a gap between it and the last
          known seq
        - Start processing if there is no gap

fixes redpanda-data#5054

(cherry picked from commit 88423d7)
(cherry picked from commit 00e4118ad3f711d2a527faf2d28d4c580421b447)
@rystsov rystsov removed this from the v22.1.5 milestone Jul 13, 2022
rystsov added a commit to rystsov/redpanda that referenced this issue Jul 13, 2022
Kafka protocol is asynchronous: a client may send the next write
request without waiting for the previous request to finish. Since
Redpanda's Raft implementation is also asynchronous, it's possible
to process Kafka requests with minimal overhead and/or delays.

The challenge is to process causally related requests without
pipeline stalling. Imagine that we have two requests A and B. When
we want to process them as fast as possible we should start repli-
cating the latter request without waiting until the replication of
the first request is over.

But in case the requests are the RSM commands and are causally
related e.g. request A is "set x=2" and B is "set y=3 if x=2" then
to guarantee that B's precondition holds Redpanda can't start
replication of B request without knowing that

  - The request A is successfully executed
  - No other command sneaks in between A and B

In case of idempotency the condition we need to preserve is the
monotonicity of the seq numbers without gaps.

In order to preserve causality and to avoid pipeline stalls we use
optimistic replication. A leader knows about its ongoing operations
so it may optimistically predict the outcome of operation A and
start executing operation B assuming its prediction is true.

But what happens if Redpanda is wrong with its prediction, how does
it ensure safety? Let's break down all the failure scenarios.

A leader uses a mutex and consesnsus::replicate_in_stages order all
the incoming requests. Before the first stage is resolves the mutex
controls the order and after the first stage is resolved Redpanda's
raft implementation guarantees order:

  auto u = co_await mutex.lock();
  auto stages = raft.replicate_in_stages(request);
  co_await stages.enqueued;
  u.return_all();
  // after this point the order between the requests is certain
  // the order enforced by mutex is preserved in the log

The mutex / replicate_in_stages combination may enforce partial
order but can't prevent out-of-nowhere writes. Imagine a node loses
the leadership then new leader inserts a command C and transfers
the leadership back to the original node.

To fight this Redpanda uses conditional replication:

  auto term = persisted_stm::sync();
  auto u = co_await mutex.lock();
  auto stages = raft.replicate_in_stages(term, request);
  co_await stages.enqueued;
  u.return_all();

It uses a sync method to make sure that the RSM's state reflects
all the commands replicated by previous term (during this phase it
may learn about the "A" command) and then it uses the raft's term
to issue the replication command (conditional replicate). By design
the replication call can't succeed if the term is wrong so instead
of leading the ACB state the replication of B is doomed to fail.

Redpanda's Raft implementation guarantees that if A was enqueued
before B within the same term then the replication of B can't be
successful without replication of A so we may not worry that A may
be dropped.

But we still have uncertainty. Should Redpanda process B assuming
that A was successful or should it assume it failed? In order to
resolve the uncertainty the leader steps down.

Since sync() guarantees that the RSM's state reflects all the
commands replicated by previous term - the next leader will resolve
the uncertainty.

The combination of those methods lead to preserving causal
relationships between the requests (idempotency) without
introducing pipelining stalls.

-------------------------------------------------------------------

The idempotency logic is straightforward:

 - Take a lock identified by producer id (concurrent producers
   don't affect each other)
    - If the seq number and its offset is already known then
      return the offset
    - If the seq number is known and in flight then "park" the
      current request (once the original resolved it pings all
      the parked requests with the written offsets or an error)
    - If the seq number is seen for the first time then:
        - Reject it if there is a gap between it and the last
          known seq
        - Start processing if there is no gap

fixes redpanda-data#5054

(cherry picked from commit 88423d7)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug Something isn't working performance
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants