-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
An option to discard messages if queue is full. #1374
Conversation
src/rabbit_policies.erl
Outdated
@@ -104,7 +105,13 @@ validate_policy0(<<"queue-mode">>, <<"default">>) -> | |||
validate_policy0(<<"queue-mode">>, <<"lazy">>) -> | |||
ok; | |||
validate_policy0(<<"queue-mode">>, Value) -> | |||
{error, "~p is not a valid queue-mode value", [Value]}. | |||
{error, "~p is not a valid queue-mode value", [Value]}; | |||
validate_policy0(<<"overflow">>, <<"drop_head">>) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other policy names use dash characters instead of underscores. Should these be drop-head
and reject-publish
to be consistent? They could still be used as atoms elsewhere if single-quoted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good find. Will change.
src/rabbit_mirror_queue_slave.erl
Outdated
@@ -302,6 +302,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true}, | |||
%% We are acking messages to the channel process that sent us | |||
%% the message delivery. See | |||
%% rabbit_amqqueue_process:handle_ch_down for more info. | |||
%% TODO: reject publishes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm ... this is for future work and not necessary here?
src/rabbit_amqqueue_process.erl
Outdated
init_overflow(undefined, State) -> | ||
State; | ||
init_overflow(Overflow, State) -> | ||
%% TODO maybe drop head |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for future work and not necessary in this PR?
If a queue is to be overflowed by a delivery it can reject the delivery or drop messages from the head. To reject delivery overflow can be configured to `reject_publish`, to drop head it's `drop_head` (default setting). Messages which will be rejected should still confirm being routed, so mandatory expectations are not accumulated on the channel side. Slave nodes will only confirm if a message was published or discarded. To drop confirms from slaves, all confirms for a message are cleared when the message is rejected. When promoting a new master, left-behind deliveries should be rejected if the queue is full, just like normal deliveries. Fixes #995 [#151294447]
660c37d
to
e65e740
Compare
lists:foldl( | ||
fun ({MsgSeqNo, XName}, MSNs) -> | ||
?INCR_STATS(exchange_stats, XName, 1, | ||
confirm, State), | ||
[MsgSeqNo | MSNs] | ||
end, [], lists:append(C)), | ||
send_confirms(MsgSeqNos, State#ch{confirmed = []}); | ||
State1 = send_confirms(ConfirmMsgSeqNos, State#ch{confirmed = []}), | ||
%% TODO: msg seq nos, same as for confirms. Need to implement |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a future change or pending for this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have no nack
rates. So it's a future change. I'll create a new issue.
MaxLengthArgs = [{<<"x-max-length">>, long, 1}], | ||
MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], | ||
OverflowArgs = [{<<"x-overflow">>, longstr, <<"drop-head">>}], | ||
amqp_channel:call(Ch, #'queue.delete'{queue = QName}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If cleanup is properly implemented, these queue.delete
shouldn't be needed at the beginning of the test. Or am I missing something? I think instead end_per_group
these changes require end_per_testcase
.
MaxLengthArgs = [{<<"x-max-length">>, long, 1}], | ||
MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], | ||
OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}], | ||
amqp_channel:call(Ch, #'queue.delete'{queue = QName}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
Measuring throughput impact of this (in both overflow modes) is what's left to QA. |
@michaelklishin This PR doesn't introduce any (negative) impact on the throughput, in fact the speed up is over 3x! See below, first rate line belongs to a queue with |
Initial set of local tests do not demonstrate a meaningful deviation from master. We are going to set up a long running environment to see if this is true for runs that span hours/days in a more realistic deployment environment. |
Having run this PR against 3.7.0-rc.1 for ndq-1kb-autoack, we can see an 18% improvement (50660 msg/s vs 42930 msg/s) when looking at the average throughput over 4 hours (purple is this PR, blue is 3.7.0-rc.1): |
Thank you so much for getting this done. Is this available for testing? |
@uvzubovs will be in 3.7.0-rc.2 later this week. |
If a queue is to be overflowed by a delivery it can reject
the delivery or drop messages from the head.
To reject delivery overflow can be configured to
reject_publish
,to drop head it's
drop_head
(default setting).Messages which will be rejected should still confirm being routed,
so mandatory expectations are not accumulated on the channel side.
Slave nodes will only confirm if a message was published or discarded.
To drop confirms from slaves, all confirms for a message are cleared
when the message is rejected.
When promoting a new master, left-behind deliveries should
be rejected if the queue is full, just like normal deliveries.
Fixes #995
[#151294447]