Skip to content
This repository has been archived by the owner on Nov 17, 2020. It is now read-only.

Commit

Permalink
Merge branch 'master' into rabbitmq-server-500
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Jul 20, 2016
2 parents 6998e6a + 3aa3e93 commit f284cf9
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 87 deletions.
1 change: 1 addition & 0 deletions mk/rabbitmq-components.mk
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ RABBITMQ_COMPONENTS = amqp_client \
rabbitmq_federation \
rabbitmq_federation_management \
rabbitmq_java_client \
rabbitmq_jms_client \
rabbitmq_jms_topic_exchange \
rabbitmq_lvc \
rabbitmq_management \
Expand Down
5 changes: 4 additions & 1 deletion src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
emit_info_all/5, list_local/1]).
emit_info_all/5, list_local/1, info_local/1]).
-export([list_down/1]).
-export([force_event_refresh/1, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
Expand Down Expand Up @@ -637,6 +637,9 @@ emit_info_down(VHostPath, Items, Ref, AggregatorPid) ->
AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end,
list_down(VHostPath)).

info_local(VHostPath) ->
map(list_local(VHostPath), fun (Q) -> info(Q, [name]) end).

list_local(VHostPath) ->
[ Q || #amqqueue{state = State, pid=QPid} = Q <- list(VHostPath),
State =/= crashed,
Expand Down
5 changes: 4 additions & 1 deletion src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
-export([send_command/2, deliver/4, deliver_reply/2,
send_credit_reply/2, send_drained/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1,
emit_info_all/4]).
emit_info_all/4, info_local/1]).
-export([refresh_config_local/0, ready_for_close/1]).
-export([refresh_interceptors/0]).
-export([force_event_refresh/1]).
Expand Down Expand Up @@ -328,6 +328,9 @@ info_all() ->
info_all(Items) ->
rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).

info_local(Items) ->
rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list_local()).

emit_info_all(Nodes, Items, Ref, AggregatorPid) ->
Pids = [ spawn_link(Node, rabbit_channel, emit_info_local, [Items, Ref, AggregatorPid]) || Node <- Nodes ],
rabbit_control_misc:await_emitters_termination(Pids).
Expand Down
115 changes: 50 additions & 65 deletions src/rabbit_health_check.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,82 +15,67 @@
%%
-module(rabbit_health_check).

-export([node/1]).
%% External API
-export([node/2]).

-define(NODE_HEALTH_CHECK_TIMEOUT, 70000).
%% Internal API
-export([local/0]).

-spec node(node()) -> 'true' | no_return().
-spec node(node(), timeout()) -> ok | {badrpc, term()} | {error_string, string()}.
-spec local() -> ok | {error_string, string()}.

%%----------------------------------------------------------------------------
%% External functions
%%----------------------------------------------------------------------------
node(Node) ->
node_health_check(Node, is_running),
node_health_check(Node, list_channels),
node_health_check(Node, list_queues),
node_health_check(Node, alarms).
node(Node, Timeout) ->
rabbit_misc:rpc_call(Node, rabbit_health_check, local, [], Timeout).

local() ->
run_checks([list_channels, list_queues, alarms]).

%%----------------------------------------------------------------------------
%% Internal functions
%%----------------------------------------------------------------------------
node_health_check(Node, is_running) ->
node_health_check(
Node, {rabbit, is_running, []},
fun(true) ->
true;
(false) ->
throw({node_is_ko, "rabbit application is not running", 70})
end);
node_health_check(Node, list_channels) ->
node_health_check(
Node, {rabbit_channel, info_all, [[pid]]},
fun(L) when is_list(L) ->
true;
(Other) ->
ErrorMsg = io_lib:format("list_channels unexpected output: ~p",
[Other]),
throw({node_is_ko, ErrorMsg, 70})
end);
node_health_check(Node, list_queues) ->
node_health_check(
Node, {rabbit_amqqueue, info_all, [[pid]]},
fun(L) when is_list(L) ->
true;
(Other) ->
ErrorMsg = io_lib:format("list_queues unexpected output: ~p",
[Other]),
throw({node_is_ko, ErrorMsg, 70})
end);
node_health_check(Node, alarms) ->
node_health_check(
Node, {rabbit, status, []},
fun(Props) ->
case proplists:get_value(alarms, Props) of
[] ->
true;
Alarms ->
ErrorMsg = io_lib:format("resource alarm(s) in effect:~p", [Alarms]),
throw({node_is_ko, ErrorMsg, 70})
end
end).
run_checks([]) ->
ok;
run_checks([C|Cs]) ->
case node_health_check(C) of
ok ->
run_checks(Cs);
Error ->
Error
end.

node_health_check(Node, {M, F, A}, Fun) ->
case rabbit_misc:rpc_call(Node, M, F, A, ?NODE_HEALTH_CHECK_TIMEOUT) of
{badrpc, timeout} ->
ErrorMsg = io_lib:format(
"health check of node ~p fails: timed out (~p ms)",
[Node, ?NODE_HEALTH_CHECK_TIMEOUT]),
throw({node_is_ko, ErrorMsg, 70});
{badrpc, nodedown} ->
ErrorMsg = io_lib:format(
"health check of node ~p fails: nodedown", [Node]),
throw({node_is_ko, ErrorMsg, 68});
{badrpc, Reason} ->
ErrorMsg = io_lib:format(
"health check of node ~p fails: ~p", [Node, Reason]),
throw({node_is_ko, ErrorMsg, 70});
node_health_check(list_channels) ->
case rabbit_channel:info_local([pid]) of
L when is_list(L) ->
ok;
Other ->
Fun(Other)
ErrorMsg = io_lib:format("list_channels unexpected output: ~p",
[Other]),
{error_string, ErrorMsg}
end;

node_health_check(list_queues) ->
health_check_queues(rabbit_vhost:list());

node_health_check(alarms) ->
case proplists:get_value(alarms, rabbit:status()) of
[] ->
ok;
Alarms ->
ErrorMsg = io_lib:format("resource alarm(s) in effect:~p", [Alarms]),
{error_string, ErrorMsg}
end.


health_check_queues([]) ->
ok;
health_check_queues([VHost|RestVHosts]) ->
case rabbit_amqqueue:info_local(VHost) of
L when is_list(L) ->
health_check_queues(RestVHosts);
Other ->
ErrorMsg = io_lib:format("list_queues unexpected output for vhost ~s: ~p",
[VHost, Other]),
{error_string, ErrorMsg}
end.
11 changes: 10 additions & 1 deletion src/rabbit_misc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
-export([interval_operation/5]).
-export([ensure_timer/4, stop_timer/2, send_after/3, cancel_timer/1]).
-export([get_parent/0]).
-export([store_proc_name/1, store_proc_name/2]).
-export([store_proc_name/1, store_proc_name/2, get_proc_name/0]).
-export([moving_average/4]).
-export([get_env/3]).
-export([get_channel_operation_timeout/0]).
Expand Down Expand Up @@ -256,6 +256,7 @@
-spec get_parent() -> pid().
-spec store_proc_name(atom(), rabbit_types:proc_name()) -> ok.
-spec store_proc_name(rabbit_types:proc_type_and_name()) -> ok.
-spec get_proc_name() -> rabbit_types:proc_name().
-spec moving_average(float(), float(), float(), float() | 'undefined') ->
float().
-spec get_env(atom(), atom(), term()) -> term().
Expand Down Expand Up @@ -1122,6 +1123,14 @@ cancel_timer({timer, Ref}) -> {ok, cancel} = timer:cancel(Ref),
store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}).
store_proc_name(TypeProcName) -> put(process_name, TypeProcName).

get_proc_name() ->
case get(process_name) of
undefined ->
undefined;
{_Type, Name} ->
{ok, Name}
end.

%% application:get_env/3 is only available in R16B01 or later.
get_env(Application, Key, Def) ->
case application:get_env(Application, Key) of
Expand Down
60 changes: 41 additions & 19 deletions src/rabbit_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ socket_op(Sock, Fun) ->
start_connection(Parent, HelperSup, Deb, Sock) ->
process_flag(trap_exit, true),
Name = case rabbit_net:connection_string(Sock, inbound) of
{ok, Str} -> Str;
{ok, Str} -> list_to_binary(Str);
{error, enotconn} -> rabbit_net:fast_close(Sock),
exit(normal);
{error, Reason} -> socket_error(Reason),
Expand All @@ -311,11 +311,12 @@ start_connection(Parent, HelperSup, Deb, Sock) ->
erlang:send_after(HandshakeTimeout, self(), handshake_timeout),
{PeerHost, PeerPort, Host, Port} =
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
?store_proc_name(list_to_binary(Name)),
?store_proc_name(Name),
State = #v1{parent = Parent,
sock = Sock,
connection = #connection{
name = list_to_binary(Name),
name = Name,
log_name = Name,
host = Host,
peer_host = PeerHost,
port = Port,
Expand Down Expand Up @@ -352,10 +353,10 @@ start_connection(Parent, HelperSup, Deb, Sock) ->
State, #v1.stats_timer),
handshake, 8)]}),
rabbit_log_connection:info("closing AMQP connection ~p (~s)~n",
[self(), Name])
[self(), dynamic_connection_name(Name)])
catch
Ex ->
log_connection_exception(Name, Ex)
log_connection_exception(dynamic_connection_name(Name), Ex)
after
%% We don't call gen_tcp:close/1 here since it waits for
%% pending output to be sent, which results in unnecessary
Expand Down Expand Up @@ -672,9 +673,9 @@ wait_for_channel_termination(0, TimerRef, State) ->
wait_for_channel_termination(N, TimerRef,
State = #v1{connection_state = CS,
connection = #connection{
name = ConnName,
user = User,
vhost = VHost},
log_name = ConnName,
user = User,
vhost = VHost},
sock = Sock}) ->
receive
{'DOWN', _MRef, process, ChPid, Reason} ->
Expand Down Expand Up @@ -723,7 +724,7 @@ format_hard_error(Reason) ->

log_hard_error(#v1{connection_state = CS,
connection = #connection{
name = ConnName,
log_name = ConnName,
user = User,
vhost = VHost}}, Channel, Reason) ->
rabbit_log_connection:error(
Expand All @@ -741,7 +742,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol},
respond_and_close(State, Channel, Protocol, Reason, Reason);
%% authentication failure
handle_exception(State = #v1{connection = #connection{protocol = Protocol,
name = ConnName,
log_name = ConnName,
capabilities = Capabilities},
connection_state = starting},
Channel, Reason = #amqp_error{name = access_refused,
Expand All @@ -760,7 +761,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol,
%% when loopback-only user tries to connect from a non-local host
%% when user tries to access a vhost it has no permissions for
handle_exception(State = #v1{connection = #connection{protocol = Protocol,
name = ConnName,
log_name = ConnName,
user = User},
connection_state = opening},
Channel, Reason = #amqp_error{name = not_allowed,
Expand All @@ -777,7 +778,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol},
%% when negotiation fails, e.g. due to channel_max being higher than the
%% maxiumum allowed limit
handle_exception(State = #v1{connection = #connection{protocol = Protocol,
name = ConnName,
log_name = ConnName,
user = User},
connection_state = tuning},
Channel, Reason = #amqp_error{name = not_allowed,
Expand Down Expand Up @@ -1062,21 +1063,22 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
response = Response,
client_properties = ClientProperties},
State0 = #v1{connection_state = starting,
connection = Connection,
connection = Connection0,
sock = Sock}) ->
AuthMechanism = auth_mechanism_to_module(Mechanism, Sock),
Capabilities =
case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of
{table, Capabilities1} -> Capabilities1;
_ -> []
end,
Connection1 = Connection0#connection{
client_properties = ClientProperties,
capabilities = Capabilities,
auth_mechanism = {Mechanism, AuthMechanism},
auth_state = AuthMechanism:init(Sock)},
Connection2 = augment_connection_log_name(Connection1),
State = State0#v1{connection_state = securing,
connection =
Connection#connection{
client_properties = ClientProperties,
capabilities = Capabilities,
auth_mechanism = {Mechanism, AuthMechanism},
auth_state = AuthMechanism:init(Sock)}},
connection = Connection2},
auth_phase(Response, State);

handle_method0(#'connection.secure_ok'{response = Response},
Expand Down Expand Up @@ -1573,3 +1575,23 @@ control_throttle(State = #v1{connection_state = CS,
blocked -> maybe_block(maybe_unblock(State1));
_ -> State1
end.

augment_connection_log_name(#connection{client_properties = ClientProperties,
name = Name} = Connection) ->
case rabbit_misc:table_lookup(ClientProperties, <<"connection_name">>) of
{longstr, UserSpecifiedName} ->
LogName = <<Name/binary, " - ", UserSpecifiedName/binary>>,
rabbit_log_connection:info("Connection ~p (~s) has a client-provided name: ~s~n", [self(), Name, UserSpecifiedName]),
?store_proc_name(LogName),
Connection#connection{log_name = LogName};
_ ->
Connection
end.

dynamic_connection_name(Default) ->
case rabbit_misc:get_proc_name() of
{ok, Name} ->
Name;
_ ->
Default
end.

0 comments on commit f284cf9

Please sign in to comment.