Skip to content
This repository has been archived by the owner on Nov 17, 2020. It is now read-only.

Commit

Permalink
Merge branch 'stable'
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Jul 16, 2016
2 parents c4218d9 + 76f9c36 commit 3aa3e93
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 20 deletions.
11 changes: 10 additions & 1 deletion src/rabbit_misc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
-export([interval_operation/5]).
-export([ensure_timer/4, stop_timer/2, send_after/3, cancel_timer/1]).
-export([get_parent/0]).
-export([store_proc_name/1, store_proc_name/2]).
-export([store_proc_name/1, store_proc_name/2, get_proc_name/0]).
-export([moving_average/4]).
-export([get_env/3]).
-export([get_channel_operation_timeout/0]).
Expand Down Expand Up @@ -256,6 +256,7 @@
-spec get_parent() -> pid().
-spec store_proc_name(atom(), rabbit_types:proc_name()) -> ok.
-spec store_proc_name(rabbit_types:proc_type_and_name()) -> ok.
-spec get_proc_name() -> rabbit_types:proc_name().
-spec moving_average(float(), float(), float(), float() | 'undefined') ->
float().
-spec get_env(atom(), atom(), term()) -> term().
Expand Down Expand Up @@ -1122,6 +1123,14 @@ cancel_timer({timer, Ref}) -> {ok, cancel} = timer:cancel(Ref),
store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}).
store_proc_name(TypeProcName) -> put(process_name, TypeProcName).

get_proc_name() ->
case get(process_name) of
undefined ->
undefined;
{_Type, Name} ->
{ok, Name}
end.

%% application:get_env/3 is only available in R16B01 or later.
get_env(Application, Key, Def) ->
case application:get_env(Application, Key) of
Expand Down
63 changes: 44 additions & 19 deletions src/rabbit_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@
-record(connection, {
%% e.g. <<"127.0.0.1:55054 -> 127.0.0.1:5672">>
name,
%% used for logging: same as `name`, but optionally
%% augmented with user-supplied name
log_name,
%% server host
host,
%% client host
Expand Down Expand Up @@ -338,7 +341,7 @@ socket_op(Sock, Fun) ->
start_connection(Parent, HelperSup, Deb, Sock) ->
process_flag(trap_exit, true),
Name = case rabbit_net:connection_string(Sock, inbound) of
{ok, Str} -> Str;
{ok, Str} -> list_to_binary(Str);
{error, enotconn} -> rabbit_net:fast_close(Sock),
exit(normal);
{error, Reason} -> socket_error(Reason),
Expand All @@ -350,11 +353,12 @@ start_connection(Parent, HelperSup, Deb, Sock) ->
erlang:send_after(HandshakeTimeout, self(), handshake_timeout),
{PeerHost, PeerPort, Host, Port} =
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
?store_proc_name(list_to_binary(Name)),
?store_proc_name(Name),
State = #v1{parent = Parent,
sock = Sock,
connection = #connection{
name = list_to_binary(Name),
name = Name,
log_name = Name,
host = Host,
peer_host = PeerHost,
port = Port,
Expand Down Expand Up @@ -391,10 +395,10 @@ start_connection(Parent, HelperSup, Deb, Sock) ->
State, #v1.stats_timer),
handshake, 8)]}),
rabbit_log_connection:info("closing AMQP connection ~p (~s)~n",
[self(), Name])
[self(), dynamic_connection_name(Name)])
catch
Ex ->
log_connection_exception(Name, Ex)
log_connection_exception(dynamic_connection_name(Name), Ex)
after
%% We don't call gen_tcp:close/1 here since it waits for
%% pending output to be sent, which results in unnecessary
Expand Down Expand Up @@ -704,9 +708,9 @@ wait_for_channel_termination(0, TimerRef, State) ->
wait_for_channel_termination(N, TimerRef,
State = #v1{connection_state = CS,
connection = #connection{
name = ConnName,
user = User,
vhost = VHost},
log_name = ConnName,
user = User,
vhost = VHost},
sock = Sock}) ->
receive
{'DOWN', _MRef, process, ChPid, Reason} ->
Expand Down Expand Up @@ -755,7 +759,7 @@ format_hard_error(Reason) ->

log_hard_error(#v1{connection_state = CS,
connection = #connection{
name = ConnName,
log_name = ConnName,
user = User,
vhost = VHost}}, Channel, Reason) ->
rabbit_log_connection:error(
Expand All @@ -773,7 +777,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol},
respond_and_close(State, Channel, Protocol, Reason, Reason);
%% authentication failure
handle_exception(State = #v1{connection = #connection{protocol = Protocol,
name = ConnName,
log_name = ConnName,
capabilities = Capabilities},
connection_state = starting},
Channel, Reason = #amqp_error{name = access_refused,
Expand All @@ -792,7 +796,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol,
%% when loopback-only user tries to connect from a non-local host
%% when user tries to access a vhost it has no permissions for
handle_exception(State = #v1{connection = #connection{protocol = Protocol,
name = ConnName,
log_name = ConnName,
user = User},
connection_state = opening},
Channel, Reason = #amqp_error{name = not_allowed,
Expand All @@ -809,7 +813,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol},
%% when negotiation fails, e.g. due to channel_max being higher than the
%% maxiumum allowed limit
handle_exception(State = #v1{connection = #connection{protocol = Protocol,
name = ConnName,
log_name = ConnName,
user = User},
connection_state = tuning},
Channel, Reason = #amqp_error{name = not_allowed,
Expand Down Expand Up @@ -1094,21 +1098,22 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
response = Response,
client_properties = ClientProperties},
State0 = #v1{connection_state = starting,
connection = Connection,
connection = Connection0,
sock = Sock}) ->
AuthMechanism = auth_mechanism_to_module(Mechanism, Sock),
Capabilities =
case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of
{table, Capabilities1} -> Capabilities1;
_ -> []
end,
Connection1 = Connection0#connection{
client_properties = ClientProperties,
capabilities = Capabilities,
auth_mechanism = {Mechanism, AuthMechanism},
auth_state = AuthMechanism:init(Sock)},
Connection2 = augment_connection_log_name(Connection1),
State = State0#v1{connection_state = securing,
connection =
Connection#connection{
client_properties = ClientProperties,
capabilities = Capabilities,
auth_mechanism = {Mechanism, AuthMechanism},
auth_state = AuthMechanism:init(Sock)}},
connection = Connection2},
auth_phase(Response, State);

handle_method0(#'connection.secure_ok'{response = Response},
Expand Down Expand Up @@ -1597,3 +1602,23 @@ control_throttle(State = #v1{connection_state = CS,
blocked -> maybe_block(maybe_unblock(State1));
_ -> State1
end.

augment_connection_log_name(#connection{client_properties = ClientProperties,
name = Name} = Connection) ->
case rabbit_misc:table_lookup(ClientProperties, <<"connection_name">>) of
{longstr, UserSpecifiedName} ->
LogName = <<Name/binary, " - ", UserSpecifiedName/binary>>,
rabbit_log_connection:info("Connection ~p (~s) has a client-provided name: ~s~n", [self(), Name, UserSpecifiedName]),
?store_proc_name(LogName),
Connection#connection{log_name = LogName};
_ ->
Connection
end.

dynamic_connection_name(Default) ->
case rabbit_misc:get_proc_name() of
{ok, Name} ->
Name;
_ ->
Default
end.

0 comments on commit 3aa3e93

Please sign in to comment.