Skip to content

Commit

Permalink
Store and delete tracked connections in a table
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Feb 3, 2016
1 parent 4e849cf commit 655e351
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 52 deletions.
69 changes: 65 additions & 4 deletions src/rabbit_connection_tracking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
%% * rabbit_reader
%% * rabbit_event

-export([register_connection/1, unregister_connection/1]).
-export([register_connection/1, unregister_connection/1,
tracked_connection_from_connection_created/1]).

-ifdef(use_specs).

Expand All @@ -44,8 +45,68 @@
%%

register_connection(Conn) ->
mnesia:write(?TABLE, Conn, write).
rabbit_misc:execute_mnesia_transaction(fun() ->
mnesia:write(?TABLE, Conn, write)
end).

unregister_connection(ConnName) ->
mnesia:delete({?TABLE, ConnName}).
unregister_connection(ConnId = {_Node, _Name}) ->
rabbit_misc:execute_mnesia_transaction(fun() ->
mnesia:delete({?TABLE, ConnId})
end).

%% Returns a #tracked_connection from connection_created
%% event details.
%%
%% @see rabbit_connection_tracking_handler.
tracked_connection_from_connection_created(EventDetails) ->
%% Example event:
%%
%% [{type,network},
%% {pid,<0.329.0>},
%% {name,<<"127.0.0.1:60998 -> 127.0.0.1:5672">>},
%% {port,5672},
%% {peer_port,60998},
%% {host,{0,0,0,0,0,65535,32512,1}},
%% {peer_host,{0,0,0,0,0,65535,32512,1}},
%% {ssl,false},
%% {peer_cert_subject,''},
%% {peer_cert_issuer,''},
%% {peer_cert_validity,''},
%% {auth_mechanism,<<"PLAIN">>},
%% {ssl_protocol,''},
%% {ssl_key_exchange,''},
%% {ssl_cipher,''},
%% {ssl_hash,''},
%% {protocol,{0,9,1}},
%% {user,<<"guest">>},
%% {vhost,<<"/">>},
%% {timeout,14},
%% {frame_max,131072},
%% {channel_max,65535},
%% {client_properties,
%% [{<<"capabilities">>,table,
%% [{<<"publisher_confirms">>,bool,true},
%% {<<"consumer_cancel_notify">>,bool,true},
%% {<<"exchange_exchange_bindings">>,bool,true},
%% {<<"basic.nack">>,bool,true},
%% {<<"connection.blocked">>,bool,true},
%% {<<"authentication_failure_close">>,bool,true}]},
%% {<<"product">>,longstr,<<"Bunny">>},
%% {<<"platform">>,longstr,
%% <<"ruby 2.3.0p0 (2015-12-25 revision 53290) [x86_64-darwin15]">>},
%% {<<"version">>,longstr,<<"2.3.0.pre">>},
%% {<<"information">>,longstr,
%% <<"http://rubybunny.info">>}]},
%% {connected_at,1453214290847}]
Name = proplists:get_value(name, EventDetails),
Node = proplists:get_value(node, EventDetails),
io:format("{Node, Name}: ~p~n", [{Node, Name}]),
#tracked_connection{id = {Node, Name},
name = Name,
node = Node,
vhost = proplists:get_value(vhost, EventDetails),
username = proplists:get_value(user, EventDetails),
connected_at = proplists:get_value(connected_at, EventDetails),
pid = proplists:get_value(pid, EventDetails),
peer_host = proplists:get_value(peer_host, EventDetails),
peer_port = proplists:get_value(peer_port, EventDetails)}.
50 changes: 11 additions & 39 deletions src/rabbit_connection_tracking_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,46 +55,18 @@
init([]) ->
{ok, []}.

handle_event(#event{type = connection_created, props = _Details}, State) ->
%% [{type,network},
%% {pid,<0.329.0>},
%% {name,<<"127.0.0.1:60998 -> 127.0.0.1:5672">>},
%% {port,5672},
%% {peer_port,60998},
%% {host,{0,0,0,0,0,65535,32512,1}},
%% {peer_host,{0,0,0,0,0,65535,32512,1}},
%% {ssl,false},
%% {peer_cert_subject,''},
%% {peer_cert_issuer,''},
%% {peer_cert_validity,''},
%% {auth_mechanism,<<"PLAIN">>},
%% {ssl_protocol,''},
%% {ssl_key_exchange,''},
%% {ssl_cipher,''},
%% {ssl_hash,''},
%% {protocol,{0,9,1}},
%% {user,<<"guest">>},
%% {vhost,<<"/">>},
%% {timeout,14},
%% {frame_max,131072},
%% {channel_max,65535},
%% {client_properties,
%% [{<<"capabilities">>,table,
%% [{<<"publisher_confirms">>,bool,true},
%% {<<"consumer_cancel_notify">>,bool,true},
%% {<<"exchange_exchange_bindings">>,bool,true},
%% {<<"basic.nack">>,bool,true},
%% {<<"connection.blocked">>,bool,true},
%% {<<"authentication_failure_close">>,bool,true}]},
%% {<<"product">>,longstr,<<"Bunny">>},
%% {<<"platform">>,longstr,
%% <<"ruby 2.3.0p0 (2015-12-25 revision 53290) [x86_64-darwin15]">>},
%% {<<"version">>,longstr,<<"2.3.0.pre">>},
%% {<<"information">>,longstr,
%% <<"http://rubybunny.info">>}]},
%% {connected_at,1453214290847}]
handle_event(#event{type = connection_created, props = Details}, State) ->
rabbit_connection_tracking:register_connection(
rabbit_connection_tracking:tracked_connection_from_connection_created(Details)
),
{ok, State};
handle_event(#event{type = connection_closed, props = _Details}, State) ->
handle_event(#event{type = connection_closed, props = Details}, State) ->
%% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>},
%% {pid,<0.1774.0>},
%% {node, rabbit@hostname}]
rabbit_connection_tracking:unregister_connection(
{proplists:get_value(node, Details),
proplists:get_value(name, Details)}),
{ok, State};
handle_event(_Event, State) ->
{ok, State}.
Expand Down
21 changes: 14 additions & 7 deletions src/rabbit_types.erl
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,22 @@

%% used e.g. by rabbit_connection_tracking
-type(tracked_connection() ::
#tracked_connection{vhost :: vhost(),
name :: connection_name(),
pid :: pid(),
protocol :: protocol(),
peer_host :: rabbit_networking:hostname(),
peer_port :: rabbit_networking:ip_port()}).

#tracked_connection{id :: {node(), connection_name()},
node :: node(),
vhost :: vhost(),
name :: connection_name(),
pid :: pid(),
protocol :: protocol_name(),
peer_host :: rabbit_networking:hostname(),
peer_port :: rabbit_networking:ip_port(),
username :: username(),
connected_at :: integer()}).

%% old AMQP 0-9-1-centric type, avoid when possible
-type(protocol() :: rabbit_framing:protocol()).

-type(protocol_name() :: 'amqp0_8' | 'amqp0_9_1' | 'amqp1_0' | 'mqtt' | 'stomp' | any()).

-type(auth_user() ::
#auth_user{username :: username(),
tags :: [atom()],
Expand Down
5 changes: 3 additions & 2 deletions src/rabbit_upgrade_functions.erl
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@

tracked_connection() ->
create(rabbit_tracked_connection, [{record_name, tracked_connection},
{attributes, [vhost, name, pid, protocol,
{attributes, [id, node, vhost, name,
pid, protocol,
peer_host, peer_port,
connected_at]}]).
username, connected_at]}]).

%% replaces vhost.dummy (used to avoid having a single-field record
%% which Mnesia doesn't like) with vhost.limits (which is actually
Expand Down

0 comments on commit 655e351

Please sign in to comment.