Skip to content

Commit

Permalink
Towards covering node termination/unavailability in connection tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Jul 3, 2016
1 parent ab99361 commit 078a78a
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 22 deletions.
51 changes: 47 additions & 4 deletions src/rabbit_connection_tracking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
%% * rabbit_event

-export([register_connection/1, unregister_connection/1,
list/0, list/1, list_on_node/1,
tracked_connection_from_connection_created/1,
is_over_connection_limit/1, count_connections_in/1]).
is_over_connection_limit/1, count_connections_in/1,
on_node_down/1]).

-include_lib("rabbit.hrl").

-define(TABLE, rabbit_tracked_connection).
-define(TABLE, rabbit_tracked_connection).
-define(PER_VHOST_COUNTER_TABLE, rabbit_tracked_connection_per_vhost).
-define(SERVER, ?MODULE).

%%
Expand All @@ -58,12 +61,49 @@ unregister_connection(ConnId = {_Node, _Name}) ->
[] -> ok;
[Row] ->
mnesia:dirty_update_counter(
rabbit_tracked_connection_per_vhost,
?PER_VHOST_COUNTER_TABLE,
Row#tracked_connection.vhost, -1),
mnesia:delete({?TABLE, ConnId})
end
end).


-spec list() -> [rabbit_types:tracked_connection()].

list() ->
mnesia:dirty_match_object(?TABLE, #tracked_connection{_ = '_'}).


-spec list(rabbit_types:vhost()) -> [rabbit_types:tracked_connection()].

list(VHost) ->
mnesia:dirty_match_object(?TABLE, #tracked_connection{vhost = VHost, _ = '_'}).


-spec list_on_node(node()) -> [rabbit_types:tracked_connection()].

list_on_node(Node) ->
mnesia:dirty_match_object(?TABLE, #tracked_connection{node = Node, _ = '_'}).


-spec on_node_down(node()) -> ok.

on_node_down(Node) ->
case lists:member(Node, nodes()) of
false ->
Cs = list_on_node(Node),
rabbit_log:info(
"Node ~p is down, unregistering ~p connections to it~n",
[Node, length(Cs)]),
[unregister_connection(Id) || #tracked_connection{id = Id} <- Cs],
ok;
true -> rabbit_log:info(
"Keep ~s connections: the node is already back~n", [Node])
end.


-spec is_over_connection_limit(rabbit_types:vhost()) -> boolean().

is_over_connection_limit(VirtualHost) ->
ConnectionCount = count_connections_in(VirtualHost),
case rabbit_vhost_limit:connection_limit(VirtualHost) of
Expand All @@ -77,12 +117,15 @@ is_over_connection_limit(VirtualHost) ->
end
end.


-spec count_connections_in(rabbit_types:vhost()) -> non_neg_integer().

count_connections_in(VirtualHost) ->
try
case mnesia:transaction(
fun() ->
case mnesia:dirty_read(
{rabbit_tracked_connection_per_vhost,
{?PER_VHOST_COUNTER_TABLE,
VirtualHost}) of
[] -> 0;
[Val] ->
Expand Down
2 changes: 1 addition & 1 deletion src/rabbit_connection_tracking_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
[rabbit_event, ?MODULE, []]}},
{cleanup, {gen_event, delete_handler,
[rabbit_event, ?MODULE, []]}},
{requires, rabbit_event},
{requires, [rabbit_event, rabbit_node_monitor]},
{enables, recovery}]}).


Expand Down
1 change: 1 addition & 0 deletions src/rabbit_node_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,7 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions,
ok = rabbit_amqqueue:on_node_down(Node),
ok = rabbit_alarm:on_node_down(Node),
ok = rabbit_mnesia:on_node_down(Node),
ok = rabbit_connection_tracking:on_node_down(Node),
%% If we have been partitioned, and we are now in the only remaining
%% partition, we no longer care about partitions - forget them. Note
%% that we do not attempt to deal with individual (other) partitions
Expand Down
191 changes: 174 additions & 17 deletions test/per_vhost_connection_limit_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
-import(rabbit_ct_client_helpers, [open_unmanaged_connection/2,
open_unmanaged_connection/3]).


all() ->
[
{group, cluster_size_1},
Expand All @@ -34,25 +35,35 @@ all() ->
groups() ->
[
{cluster_size_1, [], [
most_basic_single_node_connection_tracking_test,
single_node_single_vhost_connection_tracking_test,
single_node_multiple_vhost_connection_tracking_test
most_basic_single_node_test,
single_node_single_vhost_test,
single_node_multiple_vhost_test,
single_node_list_in_vhost_test
]},
{cluster_size_2, [], [
most_basic_cluster_connection_tracking_test,
cluster_single_vhost_connection_tracking_test,
cluster_multiple_vhost_connection_tracking_test,
cluster_node_shutdown_connection_tracking_test
]}
most_basic_cluster_test,
cluster_single_vhost_test,
cluster_multiple_vhost_test,
cluster_node_restart_test,
cluster_node_list_on_node_test
]},
{partition_handling, [], [
cluster_full_partition_test
]}
].

%% see partitions_SUITE
-define(DELAY, 9000).

%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------

init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config).
rabbit_ct_helpers:run_setup_steps(Config, [
fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1
]).

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
Expand All @@ -67,9 +78,15 @@ init_per_group(cluster_size_1, Config) ->
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps());
init_per_group(cluster_size_2, Config) ->
init_per_multinode_group(cluster_size_2, Config, 2);
init_per_group(partition_handling, Config) ->
Config1 = rabbit_ct_helpers:set_config(Config, [{net_ticktime, 1}]),
init_per_multinode_group(partition_handling, Config1, 3).

init_per_multinode_group(_GroupName, Config, NodeCount) ->
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodes_count, 3},
{rmq_nodes_count, NodeCount},
{rmq_nodename_suffix, Suffix}
]),
rabbit_ct_helpers:run_steps(Config1,
Expand All @@ -93,7 +110,7 @@ end_per_testcase(Testcase, Config) ->
%% Test cases.
%% -------------------------------------------------------------------

most_basic_single_node_connection_tracking_test(Config) ->
most_basic_single_node_test(Config) ->
VHost = <<"/">>,
?assertEqual(0, count_connections_in(Config, VHost)),
Conn = open_unmanaged_connection(Config, 0),
Expand All @@ -103,7 +120,7 @@ most_basic_single_node_connection_tracking_test(Config) ->

passed.

single_node_single_vhost_connection_tracking_test(Config) ->
single_node_single_vhost_test(Config) ->
VHost = <<"/">>,
?assertEqual(0, count_connections_in(Config, VHost)),

Expand Down Expand Up @@ -135,7 +152,7 @@ single_node_single_vhost_connection_tracking_test(Config) ->

passed.

single_node_multiple_vhost_connection_tracking_test(Config) ->
single_node_multiple_vhost_test(Config) ->
VHost1 = <<"vhost1">>,
VHost2 = <<"vhost2">>,

Expand Down Expand Up @@ -184,7 +201,52 @@ single_node_multiple_vhost_connection_tracking_test(Config) ->

passed.

most_basic_cluster_connection_tracking_test(Config) ->
single_node_list_in_vhost_test(Config) ->
VHost1 = <<"vhost1">>,
VHost2 = <<"vhost2">>,

rabbit_ct_broker_helpers:add_vhost(Config, VHost1),
rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost1),

rabbit_ct_broker_helpers:add_vhost(Config, VHost2),
rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost2),

?assertEqual(0, length(connections_in(Config, VHost1))),
?assertEqual(0, length(connections_in(Config, VHost2))),

Conn1 = open_unmanaged_connection(Config, 0, VHost1),
[#tracked_connection{vhost = VHost1}] = connections_in(Config, VHost1),
amqp_connection:close(Conn1),
?assertEqual(0, length(connections_in(Config, VHost1))),

Conn2 = open_unmanaged_connection(Config, 0, VHost2),
[#tracked_connection{vhost = VHost2}] = connections_in(Config, VHost2),

Conn3 = open_unmanaged_connection(Config, 0, VHost1),
[#tracked_connection{vhost = VHost1}] = connections_in(Config, VHost1),

Conn4 = open_unmanaged_connection(Config, 0, VHost1),
(catch exit(Conn4, please_terminate)),
[#tracked_connection{vhost = VHost1}] = connections_in(Config, VHost1),

Conn5 = open_unmanaged_connection(Config, 0, VHost2),
Conn6 = open_unmanaged_connection(Config, 0, VHost2),
[<<"vhost1">>, <<"vhost2">>] =
lists:usort(lists:map(fun (#tracked_connection{vhost = V}) -> V end,
all_connections(Config))),

lists:foreach(fun (C) ->
amqp_connection:close(C)
end, [Conn2, Conn3, Conn5, Conn6]),

?assertEqual(0, length(all_connections(Config))),

rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),

passed.

most_basic_cluster_test(Config) ->
VHost = <<"/">>,
?assertEqual(0, count_connections_in(Config, VHost)),
Conn1 = open_unmanaged_connection(Config, 0),
Expand All @@ -204,7 +266,7 @@ most_basic_cluster_connection_tracking_test(Config) ->

passed.

cluster_single_vhost_connection_tracking_test(Config) ->
cluster_single_vhost_test(Config) ->
VHost = <<"/">>,
?assertEqual(0, count_connections_in(Config, VHost)),

Expand Down Expand Up @@ -236,7 +298,7 @@ cluster_single_vhost_connection_tracking_test(Config) ->

passed.

cluster_multiple_vhost_connection_tracking_test(Config) ->
cluster_multiple_vhost_test(Config) ->
VHost1 = <<"vhost1">>,
VHost2 = <<"vhost2">>,

Expand Down Expand Up @@ -285,7 +347,7 @@ cluster_multiple_vhost_connection_tracking_test(Config) ->

passed.

cluster_node_shutdown_connection_tracking_test(Config) ->
cluster_node_restart_test(Config) ->
VHost = <<"/">>,
?assertEqual(0, count_connections_in(Config, VHost)),

Expand Down Expand Up @@ -317,6 +379,75 @@ cluster_node_shutdown_connection_tracking_test(Config) ->

passed.

cluster_node_list_on_node_test(Config) ->
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

?assertEqual(0, length(all_connections(Config))),
?assertEqual(0, length(connections_on_node(Config, 0))),

Conn1 = open_unmanaged_connection(Config, 0),
[#tracked_connection{node = A}] = connections_on_node(Config, 0),
amqp_connection:close(Conn1),
?assertEqual(0, length(connections_on_node(Config, 0))),

_Conn2 = open_unmanaged_connection(Config, 1),
[#tracked_connection{node = B}] = connections_on_node(Config, 1),

Conn3 = open_unmanaged_connection(Config, 0),
?assertEqual(1, length(connections_on_node(Config, 0))),

Conn4 = open_unmanaged_connection(Config, 1),
?assertEqual(2, length(connections_on_node(Config, 1))),

(catch exit(Conn4, please_terminate)),
?assertEqual(1, length(connections_on_node(Config, 1))),

Conn5 = open_unmanaged_connection(Config, 0),
?assertEqual(2, length(connections_on_node(Config, 0))),

rabbit_ct_broker_helpers:stop_broker(Config, 1),
?assertEqual(2, length(all_connections(Config))),
?assertEqual(0, length(connections_on_node(Config, 0, B))),

lists:foreach(fun (C) ->
amqp_connection:close(C)
end, [Conn3, Conn5]),

timer:sleep(100),
?assertEqual(0, length(all_connections(Config, 0))),

passed.

cluster_full_partition_test(Config) ->
VHost = <<"/">>,
rabbit_ct_broker_helpers:set_partition_handling_mode_globally(Config, autoheal),

?assertEqual(0, count_connections_in(Config, VHost)),
[A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

%% 3 connections, 1 per node
Conn1 = open_unmanaged_connection(Config, 0),
Conn2 = open_unmanaged_connection(Config, 1),
Conn3 = open_unmanaged_connection(Config, 2),
?assertEqual(3, count_connections_in(Config, VHost)),

%% B drops off the network, non-reachable by either A or C
rabbit_ct_broker_helpers:block_traffic_between(A, B),
rabbit_ct_broker_helpers:block_traffic_between(B, C),
timer:sleep(?DELAY),

?assertEqual(3, count_connections_in(Config, VHost)),

rabbit_ct_broker_helpers:allow_traffic_between(A, B),
rabbit_ct_broker_helpers:allow_traffic_between(B, C),
?assertEqual(3, count_connections_in(Config, VHost)),

lists:foreach(fun (Conn) ->
(catch amqp_connection:close(Conn))
end, [Conn1, Conn2, Conn3]),

passed.


%% -------------------------------------------------------------------
%% Helpers
Expand All @@ -328,3 +459,29 @@ count_connections_in(Config, VHost, NodeIndex) ->
rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
rabbit_connection_tracking,
count_connections_in, [VHost]).

connections_in(Config, VHost) ->
connections_in(Config, 0, VHost).
connections_in(Config, NodeIndex, VHost) ->
rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
rabbit_connection_tracking,
list, [VHost]).

connections_on_node(Config) ->
connections_on_node(Config, 0).
connections_on_node(Config, NodeIndex) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, NodeIndex, nodename),
rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
rabbit_connection_tracking,
list_on_node, [Node]).
connections_on_node(Config, NodeIndex, NodeForListing) ->
rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
rabbit_connection_tracking,
list_on_node, [NodeForListing]).

all_connections(Config) ->
all_connections(Config, 0).
all_connections(Config, NodeIndex) ->
rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
rabbit_connection_tracking,
list, []).

0 comments on commit 078a78a

Please sign in to comment.