Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

An option to discard messages if queue is full. #1374

Merged
merged 10 commits into from
Nov 15, 2017
23 changes: 22 additions & 1 deletion src/dtree.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

-module(dtree).

-export([empty/0, insert/4, take/3, take/2, take_all/2, drop/2,
-export([empty/0, insert/4, take/3, take/2, take_one/2, take_all/2, drop/2,
is_defined/2, is_empty/1, smallest/1, size/1]).

%%----------------------------------------------------------------------------
Expand All @@ -50,6 +50,7 @@
-spec insert(pk(), [sk()], val(), ?MODULE()) -> ?MODULE().
-spec take([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
-spec take(sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
-spec take_one(pk(), ?MODULE()) -> {[{pk(), val()}], ?MODULE()}.
-spec take_all(sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
-spec drop(pk(), ?MODULE()) -> ?MODULE().
-spec is_defined(sk(), ?MODULE()) -> boolean().
Expand Down Expand Up @@ -107,6 +108,26 @@ take(SK, {P, S}) ->
{KVs, {P1, gb_trees:delete(SK, S)}}
end.

%% Drop an entry with the primary key and clears secondary keys for this key,
%% returning a list with a key-value pair as a result.
%% If the primary key does not exist, returns an empty list.
take_one(PK, {P, S}) ->
case gb_trees:lookup(PK, P) of
{value, {SKS, Value}} ->
P1 = gb_trees:delete(PK, P),
S1 = gb_sets:fold(
fun(SK, Acc) ->
{value, PKS} = gb_trees:lookup(SK, Acc),
PKS1 = gb_sets:delete(PK, PKS),
case gb_sets:is_empty(PKS1) of
true -> gb_trees:delete(SK, Acc);
false -> gb_trees:update(SK, PKS1, Acc)
end
end, S, SKS),
{[{PK, Value}], {P1, S1}};
none -> {[], {P, S}}
end.

%% Drop all entries which contain the given secondary key, returning
%% the primary-key/value pairs of these entries. It is ok for the
%% given secondary key to not exist.
Expand Down
9 changes: 9 additions & 0 deletions src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ declare_args() ->
{<<"x-max-length">>, fun check_non_neg_int_arg/2},
{<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2},
{<<"x-max-priority">>, fun check_non_neg_int_arg/2},
{<<"x-overflow">>, fun check_overflow/2},
{<<"x-queue-mode">>, fun check_queue_mode/2}].

consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
Expand Down Expand Up @@ -623,6 +624,14 @@ check_dlxrk_arg({longstr, _}, Args) ->
check_dlxrk_arg({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.

check_overflow({longstr, Val}, _Args) ->
case lists:member(Val, [<<"drop-head">>, <<"reject-publish">>]) of
true -> ok;
false -> {error, invalid_overflow}
end;
check_overflow({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.

check_queue_mode({longstr, Val}, _Args) ->
case lists:member(Val, [<<"default">>, <<"lazy">>]) of
true -> ok;
Expand Down
75 changes: 68 additions & 7 deletions src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@
max_length,
%% max length in bytes, if configured
max_bytes,
%% an action to perform if queue is to be over a limit,
%% can be either drop-head (default) or reject-publish
overflow,
%% when policies change, this version helps queue
%% determine what previously scheduled/set up state to ignore,
%% e.g. message expiration messages from previously set up timers
Expand Down Expand Up @@ -159,7 +162,8 @@ init_state(Q) ->
senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
status = running,
args_policy_version = 0},
args_policy_version = 0,
overflow = 'drop-head'},
rabbit_event:init_stats_timer(State, #q.stats_timer).

init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) ->
Expand Down Expand Up @@ -260,7 +264,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
msg_id_to_channel = MTC},
State2 = process_args_policy(State1),
State3 = lists:foldl(fun (Delivery, StateN) ->
deliver_or_enqueue(Delivery, true, StateN)
maybe_deliver_or_enqueue(Delivery, true, StateN)
end, State2, Deliveries),
notify_decorators(startup, State3),
State3.
Expand Down Expand Up @@ -378,6 +382,7 @@ process_args_policy(State = #q{q = Q,
{<<"message-ttl">>, fun res_min/2, fun init_ttl/2},
{<<"max-length">>, fun res_min/2, fun init_max_length/2},
{<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2},
{<<"overflow">>, fun res_arg/2, fun init_overflow/2},
{<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2}],
drop_expired_msgs(
lists:foldl(fun({Name, Resolve, Fun}, StateN) ->
Expand Down Expand Up @@ -421,6 +426,18 @@ init_max_bytes(MaxBytes, State) ->
{_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}),
State1.

init_overflow(undefined, State) ->
State;
init_overflow(Overflow, State) ->
OverflowVal = binary_to_existing_atom(Overflow, utf8),
case OverflowVal of
'drop-head' ->
{_Dropped, State1} = maybe_drop_head(State#q{overflow = OverflowVal}),
State1;
_ ->
State#q{overflow = OverflowVal}
end.

init_queue_mode(undefined, State) ->
State;
init_queue_mode(Mode, State = #q {backing_queue = BQ,
Expand Down Expand Up @@ -621,12 +638,22 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
State#q{consumers = Consumers})}
end.

maybe_deliver_or_enqueue(Delivery, Delivered, State = #q{overflow = Overflow}) ->
send_mandatory(Delivery), %% must do this before confirms
case {will_overflow(Delivery, State), Overflow} of
{true, 'reject-publish'} ->
%% Drop publish and nack to publisher
send_reject_publish(Delivery, Delivered, State);
_ ->
%% Enqueue and maybe drop head later
deliver_or_enqueue(Delivery, Delivered, State)
end.

deliver_or_enqueue(Delivery = #delivery{message = Message,
sender = SenderPid,
flow = Flow},
Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
send_mandatory(Delivery), %% must do this before confirms
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State1),
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
Expand All @@ -644,6 +671,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
{BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
{undelivered, State3 = #q{backing_queue_state = BQS2}} ->

BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
{Dropped, State4 = #q{backing_queue_state = BQS4}} =
maybe_drop_head(State3#q{backing_queue_state = BQS3}),
Expand All @@ -665,7 +693,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
maybe_drop_head(State = #q{max_length = undefined,
max_bytes = undefined}) ->
{false, State};
maybe_drop_head(State) ->
maybe_drop_head(State = #q{overflow = 'reject-publish'}) ->
{false, State};
maybe_drop_head(State = #q{overflow = 'drop-head'}) ->
maybe_drop_head(false, State).

maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ,
Expand All @@ -684,6 +714,35 @@ maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ,
{AlreadyDropped, State}
end.

send_reject_publish(#delivery{confirm = true,
sender = SenderPid,
msg_seq_no = MsgSeqNo} = Delivery,
_Delivered,
State = #q{ backing_queue = BQ,
backing_queue_state = BQS,
msg_id_to_channel = MTC}) ->
{BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC),
gen_server2:cast(SenderPid, {reject_publish, MsgSeqNo, self()}),
State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 };
send_reject_publish(#delivery{confirm = false},
_Delivered, State) ->
State.

will_overflow(_, #q{max_length = undefined,
max_bytes = undefined}) -> false;
will_overflow(#delivery{message = Message},
#q{max_length = MaxLen,
max_bytes = MaxBytes,
backing_queue = BQ,
backing_queue_state = BQS}) ->
ExpectedQueueLength = BQ:len(BQS) + 1,

#basic_message{content = #content{payload_fragments_rev = PFR}} = Message,
MessageSize = iolist_size(PFR),
ExpectedQueueSizeBytes = BQ:info(message_bytes_ready, BQS) + MessageSize,

ExpectedQueueLength > MaxLen orelse ExpectedQueueSizeBytes > MaxBytes.

over_max_length(#q{max_length = MaxLen,
max_bytes = MaxBytes,
backing_queue = BQ,
Expand Down Expand Up @@ -1254,8 +1313,10 @@ handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});

handle_cast({deliver, Delivery = #delivery{sender = Sender,
flow = Flow}, SlaveWhenPublished},
handle_cast({deliver,
Delivery = #delivery{sender = Sender,
flow = Flow},
SlaveWhenPublished},
State = #q{senders = Senders}) ->
Senders1 = case Flow of
%% In both credit_flow:ack/1 we are acking messages to the channel
Expand All @@ -1270,7 +1331,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender,
noflow -> Senders
end,
State1 = State#q{senders = Senders1},
noreply(deliver_or_enqueue(Delivery, SlaveWhenPublished, State1));
noreply(maybe_deliver_or_enqueue(Delivery, SlaveWhenPublished, State1));
%% [0] The second ack is since the channel thought we were a slave at
%% the time it published this message, so it used two credits (see
%% rabbit_amqqueue:deliver/2).
Expand Down
44 changes: 34 additions & 10 deletions src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@
%% a list of tags for published messages that were
%% delivered but are yet to be confirmed to the client
confirmed,
%% a list of tags for published messages that were
%% rejected but are yet to be sent to the client
rejected,
%% a dtree used to track oustanding notifications
%% for messages published as mandatory
mandatory,
Expand Down Expand Up @@ -419,6 +422,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
confirm_enabled = false,
publish_seqno = 1,
unconfirmed = dtree:empty(),
rejected = [],
confirmed = [],
mandatory = dtree:empty(),
capabilities = Capabilities,
Expand Down Expand Up @@ -449,6 +453,7 @@ prioritise_call(Msg, _From, _Len, _State) ->
prioritise_cast(Msg, _Len, _State) ->
case Msg of
{confirm, _MsgSeqNos, _QPid} -> 5;
{reject_publish, _MsgSeqNos, _QPid} -> 5;
{mandatory_received, _MsgSeqNo, _QPid} -> 5;
_ -> 0
end.
Expand Down Expand Up @@ -598,6 +603,13 @@ handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) ->
%% NB: don't call noreply/1 since we don't want to send confirms.
noreply_coalesce(State#ch{mandatory = dtree:drop(MsgSeqNo, Mand)});

handle_cast({reject_publish, MsgSeqNo, _QPid}, State = #ch{unconfirmed = UC}) ->
%% It does not matter which queue rejected the message,
%% if any queue rejected it - it should not be confirmed.
{MXs, UC1} = dtree:take_one(MsgSeqNo, UC),
%% NB: don't call noreply/1 since we don't want to send confirms.
noreply_coalesce(record_rejects(MXs, State#ch{unconfirmed = UC1}));

handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) ->
{MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
%% NB: don't call noreply/1 since we don't want to send confirms.
Expand All @@ -621,7 +633,7 @@ handle_info(emit_stats, State) ->
State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer),
%% NB: don't call noreply/1 since we don't want to kick off the
%% stats timer.
{noreply, send_confirms(State1), hibernate};
{noreply, send_confirms_and_nacks(State1), hibernate};

handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
Expand Down Expand Up @@ -681,10 +693,10 @@ reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.

noreply(NewState) -> {noreply, next_state(NewState), hibernate}.

next_state(State) -> ensure_stats_timer(send_confirms(State)).
next_state(State) -> ensure_stats_timer(send_confirms_and_nacks(State)).

noreply_coalesce(State = #ch{confirmed = C}) ->
Timeout = case C of [] -> hibernate; _ -> 0 end,
noreply_coalesce(State = #ch{confirmed = C, rejected = R}) ->
Timeout = case {C, R} of {[], []} -> hibernate; _ -> 0 end,
{noreply, ensure_stats_timer(State), Timeout}.

ensure_stats_timer(State) ->
Expand Down Expand Up @@ -818,7 +830,7 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost
RoutingKey,
Permission) ->
Resource = Name#resource{kind = topic},
Timeout = get_operation_timeout(),
Timeout = get_operation_timeout(),
AmqpParams = case ConnPid of
none ->
%% Called from outside the channel by mgmt API
Expand Down Expand Up @@ -962,6 +974,15 @@ maybe_set_fast_reply_to(
maybe_set_fast_reply_to(C, _State) ->
C.

record_rejects([], State) ->
State;
record_rejects(MXs, State = #ch{rejected = R, tx = Tx}) ->
Tx1 = case Tx of
none -> none;
_ -> failed
end,
State#ch{rejected = [MXs | R], tx = Tx1}.

record_confirms([], State) ->
State;
record_confirms(MXs, State = #ch{confirmed = C}) ->
Expand Down Expand Up @@ -1874,21 +1895,24 @@ send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation
send_nacks(_, State) ->
maybe_complete_tx(State#ch{tx = failed}).

send_confirms(State = #ch{tx = none, confirmed = []}) ->
send_confirms_and_nacks(State = #ch{tx = none, confirmed = [], rejected = []}) ->
State;
send_confirms(State = #ch{tx = none, confirmed = C}) ->
send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) ->
case rabbit_node_monitor:pause_partition_guard() of
ok -> MsgSeqNos =
ok -> ConfirmMsgSeqNos =
lists:foldl(
fun ({MsgSeqNo, XName}, MSNs) ->
?INCR_STATS(exchange_stats, XName, 1,
confirm, State),
[MsgSeqNo | MSNs]
end, [], lists:append(C)),
send_confirms(MsgSeqNos, State#ch{confirmed = []});
State1 = send_confirms(ConfirmMsgSeqNos, State#ch{confirmed = []}),
%% TODO: msg seq nos, same as for confirms. Need to implement
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a future change or pending for this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have no nack rates. So it's a future change. I'll create a new issue.

%% nack rates first.
send_nacks(lists:append(R), State1#ch{rejected = []});
pausing -> State
end;
send_confirms(State) ->
send_confirms_and_nacks(State) ->
case rabbit_node_monitor:pause_partition_guard() of
ok -> maybe_complete_tx(State);
pausing -> State
Expand Down
2 changes: 2 additions & 0 deletions src/rabbit_mirror_queue_slave.erl
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true},
%% We are acking messages to the channel process that sent us
%% the message delivery. See
%% rabbit_amqqueue_process:handle_ch_down for more info.
%% If message is rejected by the master, the publish will be nacked
%% even if slaves confirm it. No need to check for length here.
maybe_flow_ack(Sender, Flow),
noreply(maybe_enqueue_message(Delivery, State));

Expand Down
9 changes: 8 additions & 1 deletion src/rabbit_policies.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ register() ->
{policy_validator, <<"max-length">>},
{policy_validator, <<"max-length-bytes">>},
{policy_validator, <<"queue-mode">>},
{policy_validator, <<"overflow">>},
{operator_policy_validator, <<"expires">>},
{operator_policy_validator, <<"message-ttl">>},
{operator_policy_validator, <<"max-length">>},
Expand Down Expand Up @@ -104,7 +105,13 @@ validate_policy0(<<"queue-mode">>, <<"default">>) ->
validate_policy0(<<"queue-mode">>, <<"lazy">>) ->
ok;
validate_policy0(<<"queue-mode">>, Value) ->
{error, "~p is not a valid queue-mode value", [Value]}.
{error, "~p is not a valid queue-mode value", [Value]};
validate_policy0(<<"overflow">>, <<"drop-head">>) ->
ok;
validate_policy0(<<"overflow">>, <<"reject-publish">>) ->
ok;
validate_policy0(<<"overflow">>, Value) ->
{error, "~p is not a valid overflow value", [Value]}.

merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal);
Expand Down
2 changes: 1 addition & 1 deletion test/clustering_management_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ assert_failure(Fun) ->
%% Failure to start an app result in node shutdown
{badrpc, nodedown} -> nodedown;
{badrpc_multi, Reason, _Nodes} -> Reason;
Other -> exit({expected_failure, Other})
Other -> error({expected_failure, Other})
end.

stop_app(Node) ->
Expand Down
Loading