From bf531fd017cbec756ee979299723adce76828c96 Mon Sep 17 00:00:00 2001 From: Daniil Fedotov Date: Tue, 12 Sep 2017 13:34:16 +0100 Subject: [PATCH 1/5] Add configurable queue overflow strategy If a queue is to be overflowed by a delivery it can reject the delivery or drop messages from the head. To reject delivery overflow can be configured to `reject_publish`, to drop head it's `drop_head` (default setting). Messages which will be rejected should still confirm being routed, so mandatory expectations are not accumulated on the channel side. Slave nodes will only confirm if a message was published or discarded. To drop confirms from slaves, all confirms for a message are cleared when the message is rejected. When promoting a new master, left-behind deliveries should be rejected if the queue is full, just like normal deliveries. Fixes #995 [#151294447] --- src/dtree.erl | 23 +++- src/rabbit_amqqueue.erl | 9 ++ src/rabbit_amqqueue_process.erl | 69 +++++++++- src/rabbit_channel.erl | 43 ++++-- src/rabbit_mirror_queue_slave.erl | 1 + src/rabbit_policies.erl | 9 +- test/clustering_management_SUITE.erl | 2 +- test/priority_queue_SUITE.erl | 15 +++ test/rabbit_ha_test_producer.erl | 53 +++++--- test/simple_ha_SUITE.erl | 39 +++++- test/unit_inbroker_parallel_SUITE.erl | 185 +++++++++++++++++++++++++- 11 files changed, 410 insertions(+), 38 deletions(-) diff --git a/src/dtree.erl b/src/dtree.erl index 466ec88f3348..e8b3481b36ff 100644 --- a/src/dtree.erl +++ b/src/dtree.erl @@ -32,7 +32,7 @@ -module(dtree). --export([empty/0, insert/4, take/3, take/2, take_all/2, drop/2, +-export([empty/0, insert/4, take/3, take/2, take_one/2, take_all/2, drop/2, is_defined/2, is_empty/1, smallest/1, size/1]). %%---------------------------------------------------------------------------- @@ -50,6 +50,7 @@ -spec insert(pk(), [sk()], val(), ?MODULE()) -> ?MODULE(). -spec take([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}. -spec take(sk(), ?MODULE()) -> {[kv()], ?MODULE()}. +-spec take_one(pk(), ?MODULE()) -> {[{pk(), val()}], ?MODULE()}. -spec take_all(sk(), ?MODULE()) -> {[kv()], ?MODULE()}. -spec drop(pk(), ?MODULE()) -> ?MODULE(). -spec is_defined(sk(), ?MODULE()) -> boolean(). @@ -107,6 +108,26 @@ take(SK, {P, S}) -> {KVs, {P1, gb_trees:delete(SK, S)}} end. +%% Drop an entry with the primary key and clears secondary keys for this key, +%% returning a list with a key-value pair as a result. +%% If the primary key does not exist, returns an empty list. +take_one(PK, {P, S}) -> + case gb_trees:lookup(PK, P) of + {value, {SKS, Value}} -> + P1 = gb_trees:delete(PK, P), + S1 = gb_sets:fold( + fun(SK, Acc) -> + {value, PKS} = gb_trees:lookup(SK, Acc), + PKS1 = gb_sets:delete(PK, PKS), + case gb_sets:is_empty(PKS1) of + true -> gb_trees:delete(SK, Acc); + false -> gb_trees:update(SK, PKS1, Acc) + end + end, S, SKS), + {[{PK, Value}], {P1, S1}}; + none -> {[], {P, S}} + end. + %% Drop all entries which contain the given secondary key, returning %% the primary-key/value pairs of these entries. It is ok for the %% given secondary key to not exist. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 378f3cbb7614..592de77c9757 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -576,6 +576,7 @@ declare_args() -> {<<"x-max-length">>, fun check_non_neg_int_arg/2}, {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2}, {<<"x-max-priority">>, fun check_non_neg_int_arg/2}, + {<<"x-overflow">>, fun check_overflow/2}, {<<"x-queue-mode">>, fun check_queue_mode/2}]. consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, @@ -623,6 +624,14 @@ check_dlxrk_arg({longstr, _}, Args) -> check_dlxrk_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. +check_overflow({longstr, Val}, _Args) -> + case lists:member(Val, [<<"drop_head">>, <<"reject_publish">>]) of + true -> ok; + false -> {error, invalid_overflow} + end; +check_overflow({Type, _}, _Args) -> + {error, {unacceptable_type, Type}}. + check_queue_mode({longstr, Val}, _Args) -> case lists:member(Val, [<<"default">>, <<"lazy">>]) of true -> ok; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9b7a569e94d1..3cd9f4195d16 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -79,6 +79,9 @@ max_length, %% max length in bytes, if configured max_bytes, + %% an action to perform if queue is to be over a limit, + %% can be either drop_head (default) or reject_publish + overflow, %% when policies change, this version helps queue %% determine what previously scheduled/set up state to ignore, %% e.g. message expiration messages from previously set up timers @@ -158,7 +161,8 @@ init_state(Q) -> senders = pmon:new(delegate), msg_id_to_channel = gb_trees:empty(), status = running, - args_policy_version = 0}, + args_policy_version = 0, + overflow = drop_head}, rabbit_event:init_stats_timer(State, #q.stats_timer). init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> @@ -259,7 +263,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, msg_id_to_channel = MTC}, State2 = process_args_policy(State1), State3 = lists:foldl(fun (Delivery, StateN) -> - deliver_or_enqueue(Delivery, true, StateN) + maybe_deliver_or_enqueue(Delivery, true, StateN) end, State2, Deliveries), notify_decorators(startup, State3), State3. @@ -377,6 +381,7 @@ process_args_policy(State = #q{q = Q, {<<"message-ttl">>, fun res_min/2, fun init_ttl/2}, {<<"max-length">>, fun res_min/2, fun init_max_length/2}, {<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2}, + {<<"overflow">>, fun res_arg/2, fun init_overflow/2}, {<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2}], drop_expired_msgs( lists:foldl(fun({Name, Resolve, Fun}, StateN) -> @@ -420,6 +425,12 @@ init_max_bytes(MaxBytes, State) -> {_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}), State1. +init_overflow(undefined, State) -> + State; +init_overflow(Overflow, State) -> + %% TODO maybe drop head + State#q{overflow = binary_to_existing_atom(Overflow, utf8)}. + init_queue_mode(undefined, State) -> State; init_queue_mode(Mode, State = #q {backing_queue = BQ, @@ -620,12 +631,22 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, State#q{consumers = Consumers})} end. +maybe_deliver_or_enqueue(Delivery, Delivered, State = #q{overflow = Overflow}) -> + send_mandatory(Delivery), %% must do this before confirms + case {will_overflow(Delivery, State), Overflow} of + {true, reject_publish} -> + %% Drop publish and nack to publisher + nack_publish_no_space(Delivery, Delivered, State); + _ -> + %% Enqueue and maybe drop head later + deliver_or_enqueue(Delivery, Delivered, State) + end. + deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid, flow = Flow}, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - send_mandatory(Delivery), %% must do this before confirms {Confirm, State1} = send_or_record_confirm(Delivery, State), Props = message_properties(Message, Confirm, State1), {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), @@ -643,6 +664,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC), State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1}; {undelivered, State3 = #q{backing_queue_state = BQS2}} -> + BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2), {Dropped, State4 = #q{backing_queue_state = BQS4}} = maybe_drop_head(State3#q{backing_queue_state = BQS3}), @@ -664,7 +686,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, maybe_drop_head(State = #q{max_length = undefined, max_bytes = undefined}) -> {false, State}; -maybe_drop_head(State) -> +maybe_drop_head(State = #q{overflow = reject_publish}) -> + {false, State}; +maybe_drop_head(State = #q{overflow = drop_head}) -> maybe_drop_head(false, State). maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ, @@ -683,6 +707,35 @@ maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ, {AlreadyDropped, State} end. +nack_publish_no_space(#delivery{confirm = true, + sender = SenderPid, + msg_seq_no = MsgSeqNo} = Delivery, + _Delivered, + State = #q{ backing_queue = BQ, + backing_queue_state = BQS, + msg_id_to_channel = MTC}) -> + {BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC), + gen_server2:cast(SenderPid, {reject_publish, MsgSeqNo, self()}), + State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 }; +nack_publish_no_space(#delivery{confirm = false}, + _Delivered, State) -> + State. + +will_overflow(_, #q{max_length = undefined, + max_bytes = undefined}) -> false; +will_overflow(#delivery{message = Message}, + #q{max_length = MaxLen, + max_bytes = MaxBytes, + backing_queue = BQ, + backing_queue_state = BQS}) -> + ExpectedQueueLength = BQ:len(BQS) + 1, + + #basic_message{content = #content{payload_fragments_rev = PFR}} = Message, + MessageSize = iolist_size(PFR), + ExpectedQueueSizeBytes = BQ:info(message_bytes_ready, BQS) + MessageSize, + + ExpectedQueueLength > MaxLen orelse ExpectedQueueSizeBytes > MaxBytes. + over_max_length(#q{max_length = MaxLen, max_bytes = MaxBytes, backing_queue = BQ, @@ -1242,8 +1295,10 @@ handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}); -handle_cast({deliver, Delivery = #delivery{sender = Sender, - flow = Flow}, SlaveWhenPublished}, +handle_cast({deliver, + Delivery = #delivery{sender = Sender, + flow = Flow}, + SlaveWhenPublished}, State = #q{senders = Senders}) -> Senders1 = case Flow of %% In both credit_flow:ack/1 we are acking messages to the channel @@ -1258,7 +1313,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender, noflow -> Senders end, State1 = State#q{senders = Senders1}, - noreply(deliver_or_enqueue(Delivery, SlaveWhenPublished, State1)); + noreply(maybe_deliver_or_enqueue(Delivery, SlaveWhenPublished, State1)); %% [0] The second ack is since the channel thought we were a slave at %% the time it published this message, so it used two credits (see %% rabbit_amqqueue:deliver/2). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c69a27d57c77..345f90d5e7f0 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -136,6 +136,9 @@ %% a list of tags for published messages that were %% delivered but are yet to be confirmed to the client confirmed, + %% a list of tags for published messages that were + %% rejected but are yet to be sent to the client + rejected, %% a dtree used to track oustanding notifications %% for messages published as mandatory mandatory, @@ -399,6 +402,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, confirm_enabled = false, publish_seqno = 1, unconfirmed = dtree:empty(), + rejected = [], confirmed = [], mandatory = dtree:empty(), capabilities = Capabilities, @@ -429,6 +433,7 @@ prioritise_call(Msg, _From, _Len, _State) -> prioritise_cast(Msg, _Len, _State) -> case Msg of {confirm, _MsgSeqNos, _QPid} -> 5; + {reject_publish, _MsgSeqNos, _QPid} -> 5; {mandatory_received, _MsgSeqNo, _QPid} -> 5; _ -> 0 end. @@ -578,6 +583,13 @@ handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) -> %% NB: don't call noreply/1 since we don't want to send confirms. noreply_coalesce(State#ch{mandatory = dtree:drop(MsgSeqNo, Mand)}); +handle_cast({reject_publish, MsgSeqNo, _QPid}, State = #ch{unconfirmed = UC}) -> + %% It does not matter which queue rejected the message, + %% if any queue rejected it - it should not be confirmed. + {MXs, UC1} = dtree:take_one(MsgSeqNo, UC), + %% NB: don't call noreply/1 since we don't want to send confirms. + noreply_coalesce(record_rejects(MXs, State#ch{unconfirmed = UC1})); + handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) -> {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC), %% NB: don't call noreply/1 since we don't want to send confirms. @@ -601,7 +613,7 @@ handle_info(emit_stats, State) -> State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer), %% NB: don't call noreply/1 since we don't want to kick off the %% stats timer. - {noreply, send_confirms(State1), hibernate}; + {noreply, send_confirms_and_nacks(State1), hibernate}; handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), @@ -661,10 +673,10 @@ reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}. noreply(NewState) -> {noreply, next_state(NewState), hibernate}. -next_state(State) -> ensure_stats_timer(send_confirms(State)). +next_state(State) -> ensure_stats_timer(send_confirms_and_nacks(State)). -noreply_coalesce(State = #ch{confirmed = C}) -> - Timeout = case C of [] -> hibernate; _ -> 0 end, +noreply_coalesce(State = #ch{confirmed = C, rejected = R}) -> + Timeout = case {C, R} of {[], []} -> hibernate; _ -> 0 end, {noreply, ensure_stats_timer(State), Timeout}. ensure_stats_timer(State) -> @@ -798,7 +810,7 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost RoutingKey, Permission) -> Resource = Name#resource{kind = topic}, - Timeout = get_operation_timeout(), + Timeout = get_operation_timeout(), AmqpParams = case ConnPid of none -> %% Called from outside the channel by mgmt API @@ -942,6 +954,15 @@ maybe_set_fast_reply_to( maybe_set_fast_reply_to(C, _State) -> C. +record_rejects([], State) -> + State; +record_rejects(MXs, State = #ch{rejected = R, tx = Tx}) -> + Tx1 = case Tx of + none -> none; + _ -> failed + end, + State#ch{rejected = [MXs | R], tx = Tx1}. + record_confirms([], State) -> State; record_confirms(MXs, State = #ch{confirmed = C}) -> @@ -1846,21 +1867,23 @@ send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation send_nacks(_, State) -> maybe_complete_tx(State#ch{tx = failed}). -send_confirms(State = #ch{tx = none, confirmed = []}) -> +send_confirms_and_nacks(State = #ch{tx = none, confirmed = [], rejected = []}) -> State; -send_confirms(State = #ch{tx = none, confirmed = C}) -> +send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) -> case rabbit_node_monitor:pause_partition_guard() of - ok -> MsgSeqNos = + ok -> ConfirmMsgSeqNos = lists:foldl( fun ({MsgSeqNo, XName}, MSNs) -> ?INCR_STATS([{exchange_stats, XName, 1}], confirm, State), [MsgSeqNo | MSNs] end, [], lists:append(C)), - send_confirms(MsgSeqNos, State#ch{confirmed = []}); + State1 = send_confirms(ConfirmMsgSeqNos, State#ch{confirmed = []}), + %% TODO: msg seq nos, same as for confirms. + send_nacks(lists:append(R), State1#ch{rejected = []}); pausing -> State end; -send_confirms(State) -> +send_confirms_and_nacks(State) -> case rabbit_node_monitor:pause_partition_guard() of ok -> maybe_complete_tx(State); pausing -> State diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index ee697be5011b..4a6e077f2470 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -302,6 +302,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true}, %% We are acking messages to the channel process that sent us %% the message delivery. See %% rabbit_amqqueue_process:handle_ch_down for more info. + %% TODO: reject publishes maybe_flow_ack(Sender, Flow), noreply(maybe_enqueue_message(Delivery, State)); diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index be7d3dcd76be..3fadfc82e1bb 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -42,6 +42,7 @@ register() -> {policy_validator, <<"max-length">>}, {policy_validator, <<"max-length-bytes">>}, {policy_validator, <<"queue-mode">>}, + {policy_validator, <<"overflow">>}, {operator_policy_validator, <<"expires">>}, {operator_policy_validator, <<"message-ttl">>}, {operator_policy_validator, <<"max-length">>}, @@ -104,7 +105,13 @@ validate_policy0(<<"queue-mode">>, <<"default">>) -> validate_policy0(<<"queue-mode">>, <<"lazy">>) -> ok; validate_policy0(<<"queue-mode">>, Value) -> - {error, "~p is not a valid queue-mode value", [Value]}. + {error, "~p is not a valid queue-mode value", [Value]}; +validate_policy0(<<"overflow">>, <<"drop_head">>) -> + ok; +validate_policy0(<<"overflow">>, <<"reject_publish">>) -> + ok; +validate_policy0(<<"overflow">>, Value) -> + {error, "~p is not a valid overflow value", [Value]}. merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal); merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal); diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl index 2a23c4997e08..8bf8a9a8b8d6 100644 --- a/test/clustering_management_SUITE.erl +++ b/test/clustering_management_SUITE.erl @@ -704,7 +704,7 @@ assert_failure(Fun) -> {error_string, Reason} -> Reason; {badrpc, {'EXIT', Reason}} -> Reason; {badrpc_multi, Reason, _Nodes} -> Reason; - Other -> exit({expected_failure, Other}) + Other -> error({expected_failure, Other}) end. stop_app(Node) -> diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl index eecd59b87904..eb781ffedf5f 100644 --- a/test/priority_queue_SUITE.erl +++ b/test/priority_queue_SUITE.erl @@ -32,6 +32,7 @@ groups() -> {cluster_size_2, [], [ ackfold, drop, + reject, dropwhile_fetchwhile, info_head_message_timestamp, matching, @@ -306,6 +307,20 @@ drop(Config) -> rabbit_ct_client_helpers:close_connection(Conn), passed. +reject(Config) -> + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Q = <<"reject-queue">>, + declare(Ch, Q, [{<<"x-max-length">>, long, 4}, + {<<"x-overflow">>, longstr, <<"reject_publish">>} + | arguments(3)]), + publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]), + %% First 4 messages are published, all others are discarded. + get_all(Ch, Q, do_ack, [3, 2, 1, 1]), + delete(Ch, Q), + rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), + passed. + purge(Config) -> {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Q = <<"purge-queue">>, diff --git a/test/rabbit_ha_test_producer.erl b/test/rabbit_ha_test_producer.erl index fe2d15ed9ab9..b6c301cd0cce 100644 --- a/test/rabbit_ha_test_producer.erl +++ b/test/rabbit_ha_test_producer.erl @@ -15,7 +15,7 @@ %% -module(rabbit_ha_test_producer). --export([await_response/1, start/5, create/5]). +-export([await_response/1, start/6, create/5, create/6]). -include_lib("amqp_client/include/amqp_client.hrl"). @@ -28,13 +28,20 @@ await_response(ProducerPid) -> end. create(Channel, Queue, TestPid, Confirm, MsgsToSend) -> + create(Channel, Queue, TestPid, Confirm, MsgsToSend, acks). + +create(Channel, Queue, TestPid, Confirm, MsgsToSend, Mode) -> + AckNackMsgs = case Mode of + acks -> {ok, {error, received_nacks}}; + nacks -> {{error, received_acks}, ok} + end, ProducerPid = spawn_link(?MODULE, start, [Channel, Queue, TestPid, - Confirm, MsgsToSend]), + Confirm, MsgsToSend, AckNackMsgs]), receive {ProducerPid, started} -> ProducerPid end. -start(Channel, Queue, TestPid, Confirm, MsgsToSend) -> +start(Channel, Queue, TestPid, Confirm, MsgsToSend, AckNackMsgs) -> ConfirmState = case Confirm of true -> amqp_channel:register_confirm_handler(Channel, self()), @@ -45,25 +52,27 @@ start(Channel, Queue, TestPid, Confirm, MsgsToSend) -> end, TestPid ! {self(), started}, error_logger:info_msg("publishing ~w msgs on ~p~n", [MsgsToSend, Channel]), - producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend). + producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend, AckNackMsgs). %% %% Private API %% -producer(_Channel, _Queue, TestPid, none, 0) -> +producer(_Channel, _Queue, TestPid, none, 0, _AckNackMsgs) -> TestPid ! {self(), ok}; -producer(Channel, _Queue, TestPid, ConfirmState, 0) -> +producer(Channel, _Queue, TestPid, ConfirmState, 0, {AckMsg, NackMsg}) -> error_logger:info_msg("awaiting confirms on channel ~p~n", [Channel]), - Msg = case drain_confirms(no_nacks, ConfirmState) of - no_nacks -> ok; - nacks -> {error, received_nacks}; + Msg = case drain_confirms(none, ConfirmState) of + %% No acks or nacks + acks -> AckMsg; + nacks -> NackMsg; + mix -> {error, received_both_acks_and_nacks}; {Nacks, CS} -> {error, {missing_confirms, Nacks, lists:sort(gb_trees:keys(CS))}} end, TestPid ! {self(), Msg}; -producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend) -> +producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend, AckNackMsgs) -> Method = #'basic.publish'{exchange = <<"">>, routing_key = Queue, mandatory = false, @@ -76,7 +85,7 @@ producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend) -> payload = list_to_binary( integer_to_list(MsgsToSend))}), - producer(Channel, Queue, TestPid, ConfirmState1, MsgsToSend - 1). + producer(Channel, Queue, TestPid, ConfirmState1, MsgsToSend - 1, AckNackMsgs). maybe_record_confirm(none, _, _) -> none; @@ -84,22 +93,34 @@ maybe_record_confirm(ConfirmState, Channel, MsgsToSend) -> SeqNo = amqp_channel:next_publish_seqno(Channel), gb_trees:insert(SeqNo, MsgsToSend, ConfirmState). -drain_confirms(Nacks, ConfirmState) -> +drain_confirms(Collected, ConfirmState) -> case gb_trees:is_empty(ConfirmState) of - true -> Nacks; + true -> Collected; false -> receive #'basic.ack'{delivery_tag = DeliveryTag, multiple = IsMulti} -> - drain_confirms(Nacks, + Collected1 = case Collected of + none -> acks; + acks -> acks; + nacks -> mix; + mix -> mix + end, + drain_confirms(Collected1, delete_confirms(DeliveryTag, IsMulti, ConfirmState)); #'basic.nack'{delivery_tag = DeliveryTag, multiple = IsMulti} -> - drain_confirms(nacks, + Collected1 = case Collected of + none -> nacks; + nacks -> nacks; + acks -> mix; + mix -> mix + end, + drain_confirms(Collected1, delete_confirms(DeliveryTag, IsMulti, ConfirmState)) after - 60000 -> {Nacks, ConfirmState} + 60000 -> {Collected, ConfirmState} end end. diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl index a0499b9d594a..297b53b713b4 100644 --- a/test/simple_ha_SUITE.erl +++ b/test/simple_ha_SUITE.erl @@ -44,7 +44,8 @@ groups() -> auto_resume_no_ccn_client, confirms_survive_stop, confirms_survive_sigkill, - confirms_survive_policy + confirms_survive_policy, + rejects_survive_stop ]} ]. @@ -156,6 +157,10 @@ confirms_survive_stop(Cf) -> confirms_survive(Cf, fun stop/2). confirms_survive_sigkill(Cf) -> confirms_survive(Cf, fun sigkill/2). confirms_survive_policy(Cf) -> confirms_survive(Cf, fun policy/2). +rejects_survive_stop(Cf) -> rejecets_survive(Cf, fun stop/2). +rejects_survive_sigkill(Cf) -> rejecets_survive(Cf, fun sigkill/2). +rejects_survive_policy(Cf) -> rejecets_survive(Cf, fun policy/2). + %%---------------------------------------------------------------------------- consume_survives(Config, DeathFun, CancelOnFailover) -> @@ -213,6 +218,38 @@ confirms_survive(Config, DeathFun) -> rabbit_ha_test_producer:await_response(ProducerPid), ok. +rejecets_survive(Config, DeathFun) -> + [A, B, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Msgs = rabbit_ct_helpers:cover_work_factor(Config, 20000), + Node1Channel = rabbit_ct_client_helpers:open_channel(Config, A), + Node2Channel = rabbit_ct_client_helpers:open_channel(Config, B), + + %% declare the queue on the master, mirrored to the two slaves + Queue = <<"test_rejects">>, + amqp_channel:call(Node1Channel,#'queue.declare'{queue = Queue, + auto_delete = false, + durable = true, + arguments = [{<<"x-max-length">>, long, 1}, + {<<"x-overflow">>, longstr, <<"reject_publish">>}]}), + Payload = <<"there can be only one">>, + amqp_channel:call(Node1Channel, + #'basic.publish'{routing_key = Queue}, + #amqp_msg{payload = Payload}), + + %% send a bunch of messages from the producer. Tolerating nacks. + ProducerPid = rabbit_ha_test_producer:create(Node2Channel, Queue, + self(), true, Msgs, nacks), + DeathFun(Config, A), + rabbit_ha_test_producer:await_response(ProducerPid), + + {#'basic.get_ok'{}, #amqp_msg{payload = Payload}} = + amqp_channel:call(Node2Channel, #'basic.get'{queue = Queue}), + %% There is only one message. + #'basic.get_empty'{} = amqp_channel:call(Node2Channel, #'basic.get'{queue = Queue}), + ok. + + + stop(Config, Node) -> rabbit_ct_broker_helpers:stop_node_after(Config, Node, 50). diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index dd8cd48b5af5..951cf6f21382 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -33,6 +33,9 @@ all() -> ]. groups() -> + MaxLengthTests = [max_length_drop_head, + max_length_reject_confirm, + max_length_drop_publish], [ {parallel_tests, [parallel], [ amqp_connection_refusal, @@ -48,7 +51,10 @@ groups() -> ]}, set_disk_free_limit_command, set_vm_memory_high_watermark_command, - topic_matching + topic_matching, + {queue_max_length, [], [ + {max_length_simple, [], MaxLengthTests}, + {max_length_mirrored, [], MaxLengthTests}]} ]} ]. @@ -63,6 +69,11 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). +init_per_group(max_length_mirrored, Config) -> + rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, + <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), + Config1 = rabbit_ct_helpers:set_config(Config, [{is_mirrored, true}]), + rabbit_ct_helpers:run_steps(Config1, []); init_per_group(Group, Config) -> case lists:member({group, Group}, all()) of true -> @@ -91,6 +102,18 @@ setup_file_handle_cache1() -> ok = file_handle_cache:set_limit(10), ok. +end_per_group(max_length_mirrored, Config) -> + rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"^max_length.*queue">>), + Config1 = rabbit_ct_helpers:set_config(Config, [{is_mirrored, false}]), + Config1; +end_per_group(queue_max_length, Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_reject_queue">>}), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_publish_queue">>}), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_head_queue">>}), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_default_drop_head_queue">>}), + rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), + Config; end_per_group(Group, Config) -> case lists:member({group, Group}, all()) of true -> @@ -1018,6 +1041,161 @@ set_vm_memory_high_watermark_command1(_Config) -> ) end. +max_length_drop_head(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = <<"max_length_drop_head_queue">>, + QNameDefault = <<"max_length_default_drop_head_queue">>, + QNameBytes = <<"max_length_bytes_drop_head_queue">>, + QNameDefaultBytes = <<"max_length_bytes_default_drop_head_queue">>, + + MaxLengthArgs = [{<<"x-max-length">>, long, 1}], + MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], + OverflowArgs = [{<<"x-overflow">>, longstr, <<"drop_head">>}], + amqp_channel:call(Ch, #'queue.delete'{queue = QName}), + amqp_channel:call(Ch, #'queue.delete'{queue = QNameDefault}), + amqp_channel:call(Ch, #'queue.delete'{queue = QNameBytes}), + amqp_channel:call(Ch, #'queue.delete'{queue = QNameDefaultBytes}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefault, arguments = MaxLengthArgs}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefaultBytes, arguments = MaxLengthBytesArgs}), + + check_max_length_drops_head(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>), + check_max_length_drops_head(Config, QNameDefault, Ch, <<"1">>, <<"2">>, <<"3">>), + + %% 80 bytes payload + Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>, + Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>, + Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>, + check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3), + check_max_length_drops_head(Config, QNameDefault, Ch, Payload1, Payload2, Payload3). + +max_length_reject_confirm(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = <<"max_length_reject_queue">>, + QNameBytes = <<"max_length_bytes_reject_queue">>, + MaxLengthArgs = [{<<"x-max-length">>, long, 1}], + MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], + OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject_publish">>}], + amqp_channel:call(Ch, #'queue.delete'{queue = QName}), + amqp_channel:call(Ch, #'queue.delete'{queue = QNameBytes}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>), + check_max_length_rejects(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>), + + %% 80 bytes payload + Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>, + Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>, + Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>, + + check_max_length_drops_publish(Config, QNameBytes, Ch, Payload1, Payload2, Payload3), + check_max_length_rejects(Config, QNameBytes, Ch, Payload1, Payload2, Payload3). + +max_length_drop_publish(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = <<"max_length_drop_publish_queue">>, + QNameBytes = <<"max_length_bytes_drop_publish_queue">>, + MaxLengthArgs = [{<<"x-max-length">>, long, 1}], + MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], + OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject_publish">>}], + amqp_channel:call(Ch, #'queue.delete'{queue = QName}), + amqp_channel:call(Ch, #'queue.delete'{queue = QNameBytes}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}), + %% If confirms are not enable, publishes will still be dropped in reject_publish mode. + check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>), + + %% 80 bytes payload + Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>, + Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>, + Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>, + + check_max_length_drops_publish(Config, QNameBytes, Ch, Payload1, Payload2, Payload3). + +check_max_length_drops_publish(Config, QName, Ch, Payload1, Payload2, Payload3) -> + sync_mirrors(QName, Config), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + %% A single message is published and consumed + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + + %% Message 2 is dropped, message 1 stays + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + + %% Messages 2 and 3 are dropped, message 1 stays + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). + +check_max_length_rejects(Config, QName, Ch, Payload1, Payload2, Payload3) -> + sync_mirrors(QName, Config), + amqp_channel:register_confirm_handler(Ch, self()), + flush(), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + %% First message can be enqueued and acks + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + receive #'basic.ack'{} -> ok + after 1000 -> error(expected_ack) + end, + + %% The message cannot be enqueued and nacks + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + receive #'basic.nack'{} -> ok + after 1000 -> error(expected_nack) + end, + + %% The message cannot be enqueued and nacks + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}), + receive #'basic.nack'{} -> ok + after 1000 -> error(expected_nack) + end, + + {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + + %% Now we can publish message 2. + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + receive #'basic.ack'{} -> ok + after 1000 -> error(expected_ack) + end, + + {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). + +check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) -> + sync_mirrors(QName, Config), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + %% A single message is published and consumed + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + + %% Message 1 is replaced by message 2 + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + + %% Messages 1 and 2 are replaced + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload3}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). + +sync_mirrors(QName, Config) -> + case ?config(is_mirrored, Config) of + true -> + rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, [<<"sync_queue">>, QName]); + _ -> ok + end. + %% --------------------------------------------------------------------------- %% rabbitmqctl helpers. %% --------------------------------------------------------------------------- @@ -1031,3 +1209,8 @@ expand_options(As, Bs) -> false -> [A | R] end end, Bs, As). + +flush() -> + receive _ -> flush() + after 10 -> ok + end. \ No newline at end of file From 79a00a6f74383543f4ade05b2210bc93ce7d7ecd Mon Sep 17 00:00:00 2001 From: Daniil Fedotov Date: Wed, 27 Sep 2017 10:22:16 +0100 Subject: [PATCH 2/5] Rename overflow values to use dashes instead of underscores --- src/rabbit_amqqueue.erl | 2 +- src/rabbit_amqqueue_process.erl | 16 ++++++++-------- src/rabbit_policies.erl | 4 ++-- test/priority_queue_SUITE.erl | 2 +- test/simple_ha_SUITE.erl | 2 +- test/unit_inbroker_parallel_SUITE.erl | 8 ++++---- 6 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 592de77c9757..ce7346060358 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -625,7 +625,7 @@ check_dlxrk_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. check_overflow({longstr, Val}, _Args) -> - case lists:member(Val, [<<"drop_head">>, <<"reject_publish">>]) of + case lists:member(Val, [<<"drop-head">>, <<"reject-publish">>]) of true -> ok; false -> {error, invalid_overflow} end; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3cd9f4195d16..7721fc70fc05 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -80,7 +80,7 @@ %% max length in bytes, if configured max_bytes, %% an action to perform if queue is to be over a limit, - %% can be either drop_head (default) or reject_publish + %% can be either drop-head (default) or reject-publish overflow, %% when policies change, this version helps queue %% determine what previously scheduled/set up state to ignore, @@ -162,7 +162,7 @@ init_state(Q) -> msg_id_to_channel = gb_trees:empty(), status = running, args_policy_version = 0, - overflow = drop_head}, + overflow = 'drop-head'}, rabbit_event:init_stats_timer(State, #q.stats_timer). init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> @@ -634,9 +634,9 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, maybe_deliver_or_enqueue(Delivery, Delivered, State = #q{overflow = Overflow}) -> send_mandatory(Delivery), %% must do this before confirms case {will_overflow(Delivery, State), Overflow} of - {true, reject_publish} -> + {true, 'reject-publish'} -> %% Drop publish and nack to publisher - nack_publish_no_space(Delivery, Delivered, State); + send_reject_publish(Delivery, Delivered, State); _ -> %% Enqueue and maybe drop head later deliver_or_enqueue(Delivery, Delivered, State) @@ -686,9 +686,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, maybe_drop_head(State = #q{max_length = undefined, max_bytes = undefined}) -> {false, State}; -maybe_drop_head(State = #q{overflow = reject_publish}) -> +maybe_drop_head(State = #q{overflow = 'reject-publish'}) -> {false, State}; -maybe_drop_head(State = #q{overflow = drop_head}) -> +maybe_drop_head(State = #q{overflow = 'drop-head'}) -> maybe_drop_head(false, State). maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ, @@ -707,7 +707,7 @@ maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ, {AlreadyDropped, State} end. -nack_publish_no_space(#delivery{confirm = true, +send_reject_publish(#delivery{confirm = true, sender = SenderPid, msg_seq_no = MsgSeqNo} = Delivery, _Delivered, @@ -717,7 +717,7 @@ nack_publish_no_space(#delivery{confirm = true, {BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC), gen_server2:cast(SenderPid, {reject_publish, MsgSeqNo, self()}), State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 }; -nack_publish_no_space(#delivery{confirm = false}, +send_reject_publish(#delivery{confirm = false}, _Delivered, State) -> State. diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index 3fadfc82e1bb..f48189b210cf 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -106,9 +106,9 @@ validate_policy0(<<"queue-mode">>, <<"lazy">>) -> ok; validate_policy0(<<"queue-mode">>, Value) -> {error, "~p is not a valid queue-mode value", [Value]}; -validate_policy0(<<"overflow">>, <<"drop_head">>) -> +validate_policy0(<<"overflow">>, <<"drop-head">>) -> ok; -validate_policy0(<<"overflow">>, <<"reject_publish">>) -> +validate_policy0(<<"overflow">>, <<"reject-publish">>) -> ok; validate_policy0(<<"overflow">>, Value) -> {error, "~p is not a valid overflow value", [Value]}. diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl index eb781ffedf5f..a1ae66dbbb2d 100644 --- a/test/priority_queue_SUITE.erl +++ b/test/priority_queue_SUITE.erl @@ -311,7 +311,7 @@ reject(Config) -> {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Q = <<"reject-queue">>, declare(Ch, Q, [{<<"x-max-length">>, long, 4}, - {<<"x-overflow">>, longstr, <<"reject_publish">>} + {<<"x-overflow">>, longstr, <<"reject-publish">>} | arguments(3)]), publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]), %% First 4 messages are published, all others are discarded. diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl index 297b53b713b4..f3af62d43a30 100644 --- a/test/simple_ha_SUITE.erl +++ b/test/simple_ha_SUITE.erl @@ -230,7 +230,7 @@ rejecets_survive(Config, DeathFun) -> auto_delete = false, durable = true, arguments = [{<<"x-max-length">>, long, 1}, - {<<"x-overflow">>, longstr, <<"reject_publish">>}]}), + {<<"x-overflow">>, longstr, <<"reject-publish">>}]}), Payload = <<"there can be only one">>, amqp_channel:call(Node1Channel, #'basic.publish'{routing_key = Queue}, diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index 951cf6f21382..d9ea349f1b8e 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -1050,7 +1050,7 @@ max_length_drop_head(Config) -> MaxLengthArgs = [{<<"x-max-length">>, long, 1}], MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], - OverflowArgs = [{<<"x-overflow">>, longstr, <<"drop_head">>}], + OverflowArgs = [{<<"x-overflow">>, longstr, <<"drop-head">>}], amqp_channel:call(Ch, #'queue.delete'{queue = QName}), amqp_channel:call(Ch, #'queue.delete'{queue = QNameDefault}), amqp_channel:call(Ch, #'queue.delete'{queue = QNameBytes}), @@ -1076,7 +1076,7 @@ max_length_reject_confirm(Config) -> QNameBytes = <<"max_length_bytes_reject_queue">>, MaxLengthArgs = [{<<"x-max-length">>, long, 1}], MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], - OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject_publish">>}], + OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}], amqp_channel:call(Ch, #'queue.delete'{queue = QName}), amqp_channel:call(Ch, #'queue.delete'{queue = QNameBytes}), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}), @@ -1099,12 +1099,12 @@ max_length_drop_publish(Config) -> QNameBytes = <<"max_length_bytes_drop_publish_queue">>, MaxLengthArgs = [{<<"x-max-length">>, long, 1}], MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], - OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject_publish">>}], + OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}], amqp_channel:call(Ch, #'queue.delete'{queue = QName}), amqp_channel:call(Ch, #'queue.delete'{queue = QNameBytes}), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}), - %% If confirms are not enable, publishes will still be dropped in reject_publish mode. + %% If confirms are not enable, publishes will still be dropped in reject-publish mode. check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>), %% 80 bytes payload From 4e87deff41daee0f79f7b3f92f27539292f45a65 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 28 Sep 2017 14:25:02 -0700 Subject: [PATCH 3/5] Ensure the new tests are run as part of the cluster_size_3 group --- test/simple_ha_SUITE.erl | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl index f3af62d43a30..41b6ec7a9444 100644 --- a/test/simple_ha_SUITE.erl +++ b/test/simple_ha_SUITE.erl @@ -45,7 +45,9 @@ groups() -> confirms_survive_stop, confirms_survive_sigkill, confirms_survive_policy, - rejects_survive_stop + rejects_survive_stop, + rejects_survive_sigkill, + rejects_survive_policy ]} ]. @@ -157,9 +159,9 @@ confirms_survive_stop(Cf) -> confirms_survive(Cf, fun stop/2). confirms_survive_sigkill(Cf) -> confirms_survive(Cf, fun sigkill/2). confirms_survive_policy(Cf) -> confirms_survive(Cf, fun policy/2). -rejects_survive_stop(Cf) -> rejecets_survive(Cf, fun stop/2). -rejects_survive_sigkill(Cf) -> rejecets_survive(Cf, fun sigkill/2). -rejects_survive_policy(Cf) -> rejecets_survive(Cf, fun policy/2). +rejects_survive_stop(Cf) -> rejects_survive(Cf, fun stop/2). +rejects_survive_sigkill(Cf) -> rejects_survive(Cf, fun sigkill/2). +rejects_survive_policy(Cf) -> rejects_survive(Cf, fun policy/2). %%---------------------------------------------------------------------------- @@ -218,7 +220,7 @@ confirms_survive(Config, DeathFun) -> rabbit_ha_test_producer:await_response(ProducerPid), ok. -rejecets_survive(Config, DeathFun) -> +rejects_survive(Config, DeathFun) -> [A, B, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Msgs = rabbit_ct_helpers:cover_work_factor(Config, 20000), Node1Channel = rabbit_ct_client_helpers:open_channel(Config, A), From e65e740b2a4d800bafd6d2f7e35ec5a9622a86b4 Mon Sep 17 00:00:00 2001 From: Daniil Fedotov Date: Mon, 2 Oct 2017 14:34:39 +0100 Subject: [PATCH 4/5] Drop head after changing a policy to drop-head. Clarify TODOs --- src/rabbit_amqqueue_process.erl | 10 ++++++++-- src/rabbit_channel.erl | 3 ++- src/rabbit_mirror_queue_slave.erl | 3 ++- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7721fc70fc05..ed88e6937844 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -428,8 +428,14 @@ init_max_bytes(MaxBytes, State) -> init_overflow(undefined, State) -> State; init_overflow(Overflow, State) -> - %% TODO maybe drop head - State#q{overflow = binary_to_existing_atom(Overflow, utf8)}. + OverflowVal = binary_to_existing_atom(Overflow, utf8), + case OverflowVal of + 'drop-head' -> + {_Dropped, State1} = maybe_drop_head(State#q{overflow = OverflowVal}), + State1; + _ -> + State#q{overflow = OverflowVal} + end. init_queue_mode(undefined, State) -> State; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 345f90d5e7f0..1167ac66f943 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1879,7 +1879,8 @@ send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) -> [MsgSeqNo | MSNs] end, [], lists:append(C)), State1 = send_confirms(ConfirmMsgSeqNos, State#ch{confirmed = []}), - %% TODO: msg seq nos, same as for confirms. + %% TODO: msg seq nos, same as for confirms. Need to implement + %% nack rates first. send_nacks(lists:append(R), State1#ch{rejected = []}); pausing -> State end; diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 4a6e077f2470..6139099ed1c4 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -302,7 +302,8 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true}, %% We are acking messages to the channel process that sent us %% the message delivery. See %% rabbit_amqqueue_process:handle_ch_down for more info. - %% TODO: reject publishes + %% If message is rejected by the master, the publish will be nacked + %% even if slaves confirm it. No need to check for length here. maybe_flow_ack(Sender, Flow), noreply(maybe_enqueue_message(Delivery, State)); From da2b0ccda34e85048b35d2633836d2ec708dd488 Mon Sep 17 00:00:00 2001 From: Daniil Fedotov Date: Thu, 2 Nov 2017 10:43:13 +0000 Subject: [PATCH 5/5] Move cleanup to end_per_testcase --- test/unit_inbroker_parallel_SUITE.erl | 35 ++++++++++++++++----------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index d9ea349f1b8e..89fd8fc6599c 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -107,12 +107,6 @@ end_per_group(max_length_mirrored, Config) -> Config1 = rabbit_ct_helpers:set_config(Config, [{is_mirrored, false}]), Config1; end_per_group(queue_max_length, Config) -> - {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_reject_queue">>}), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_publish_queue">>}), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_head_queue">>}), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_default_drop_head_queue">>}), - rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), Config; end_per_group(Group, Config) -> case lists:member({group, Group}, all()) of @@ -127,6 +121,27 @@ end_per_group(Group, Config) -> init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). +end_per_testcase(max_length_drop_head = Testcase, Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_head_queue">>}), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_default_drop_head_queue">>}), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_drop_head_queue">>}), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_default_drop_head_queue">>}), + rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), + rabbit_ct_helpers:testcase_finished(Config, Testcase); + +end_per_testcase(max_length_reject_confirm = Testcase, Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_reject_queue">>}), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_reject_queue">>}), + rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), + rabbit_ct_helpers:testcase_finished(Config, Testcase); +end_per_testcase(max_length_drop_publish = Testcase, Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_publish_queue">>}), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_bytes_drop_publish_queue">>}), + rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), + rabbit_ct_helpers:testcase_finished(Config, Testcase); end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). @@ -1051,10 +1066,6 @@ max_length_drop_head(Config) -> MaxLengthArgs = [{<<"x-max-length">>, long, 1}], MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], OverflowArgs = [{<<"x-overflow">>, longstr, <<"drop-head">>}], - amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - amqp_channel:call(Ch, #'queue.delete'{queue = QNameDefault}), - amqp_channel:call(Ch, #'queue.delete'{queue = QNameBytes}), - amqp_channel:call(Ch, #'queue.delete'{queue = QNameDefaultBytes}), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefault, arguments = MaxLengthArgs}), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}), @@ -1077,8 +1088,6 @@ max_length_reject_confirm(Config) -> MaxLengthArgs = [{<<"x-max-length">>, long, 1}], MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}], - amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - amqp_channel:call(Ch, #'queue.delete'{queue = QNameBytes}), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), @@ -1100,8 +1109,6 @@ max_length_drop_publish(Config) -> MaxLengthArgs = [{<<"x-max-length">>, long, 1}], MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}], OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject-publish">>}], - amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - amqp_channel:call(Ch, #'queue.delete'{queue = QNameBytes}), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}), %% If confirms are not enable, publishes will still be dropped in reject-publish mode.