Skip to content
This repository has been archived by the owner on Nov 17, 2020. It is now read-only.

Commit

Permalink
Merge pull request #119 from binarin/rabbitmq-server-818-master
Browse files Browse the repository at this point in the history
Merge 118 into master
  • Loading branch information
michaelklishin committed Jul 14, 2016
2 parents 137d5a3 + e1d1151 commit c4218d9
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 67 deletions.
5 changes: 4 additions & 1 deletion src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
emit_info_all/5, list_local/1]).
emit_info_all/5, list_local/1, info_local/1]).
-export([list_down/1]).
-export([force_event_refresh/1, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
Expand Down Expand Up @@ -637,6 +637,9 @@ emit_info_down(VHostPath, Items, Ref, AggregatorPid) ->
AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end,
list_down(VHostPath)).

info_local(VHostPath) ->
map(list_local(VHostPath), fun (Q) -> info(Q, [name]) end).

list_local(VHostPath) ->
[ Q || #amqqueue{state = State, pid=QPid} = Q <- list(VHostPath),
State =/= crashed,
Expand Down
5 changes: 4 additions & 1 deletion src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
-export([send_command/2, deliver/4, deliver_reply/2,
send_credit_reply/2, send_drained/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1,
emit_info_all/4]).
emit_info_all/4, info_local/1]).
-export([refresh_config_local/0, ready_for_close/1]).
-export([refresh_interceptors/0]).
-export([force_event_refresh/1]).
Expand Down Expand Up @@ -328,6 +328,9 @@ info_all() ->
info_all(Items) ->
rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).

info_local(Items) ->
rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list_local()).

emit_info_all(Nodes, Items, Ref, AggregatorPid) ->
Pids = [ spawn_link(Node, rabbit_channel, emit_info_local, [Items, Ref, AggregatorPid]) || Node <- Nodes ],
rabbit_control_misc:await_emitters_termination(Pids).
Expand Down
115 changes: 50 additions & 65 deletions src/rabbit_health_check.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,82 +15,67 @@
%%
-module(rabbit_health_check).

-export([node/1]).
%% External API
-export([node/2]).

-define(NODE_HEALTH_CHECK_TIMEOUT, 70000).
%% Internal API
-export([local/0]).

-spec node(node()) -> 'true' | no_return().
-spec node(node(), timeout()) -> ok | {badrpc, term()} | {error_string, string()}.
-spec local() -> ok | {error_string, string()}.

%%----------------------------------------------------------------------------
%% External functions
%%----------------------------------------------------------------------------
node(Node) ->
node_health_check(Node, is_running),
node_health_check(Node, list_channels),
node_health_check(Node, list_queues),
node_health_check(Node, alarms).
node(Node, Timeout) ->
rabbit_misc:rpc_call(Node, rabbit_health_check, local, [], Timeout).

local() ->
run_checks([list_channels, list_queues, alarms]).

%%----------------------------------------------------------------------------
%% Internal functions
%%----------------------------------------------------------------------------
node_health_check(Node, is_running) ->
node_health_check(
Node, {rabbit, is_running, []},
fun(true) ->
true;
(false) ->
throw({node_is_ko, "rabbit application is not running", 70})
end);
node_health_check(Node, list_channels) ->
node_health_check(
Node, {rabbit_channel, info_all, [[pid]]},
fun(L) when is_list(L) ->
true;
(Other) ->
ErrorMsg = io_lib:format("list_channels unexpected output: ~p",
[Other]),
throw({node_is_ko, ErrorMsg, 70})
end);
node_health_check(Node, list_queues) ->
node_health_check(
Node, {rabbit_amqqueue, info_all, [[pid]]},
fun(L) when is_list(L) ->
true;
(Other) ->
ErrorMsg = io_lib:format("list_queues unexpected output: ~p",
[Other]),
throw({node_is_ko, ErrorMsg, 70})
end);
node_health_check(Node, alarms) ->
node_health_check(
Node, {rabbit, status, []},
fun(Props) ->
case proplists:get_value(alarms, Props) of
[] ->
true;
Alarms ->
ErrorMsg = io_lib:format("resource alarm(s) in effect:~p", [Alarms]),
throw({node_is_ko, ErrorMsg, 70})
end
end).
run_checks([]) ->
ok;
run_checks([C|Cs]) ->
case node_health_check(C) of
ok ->
run_checks(Cs);
Error ->
Error
end.

node_health_check(Node, {M, F, A}, Fun) ->
case rabbit_misc:rpc_call(Node, M, F, A, ?NODE_HEALTH_CHECK_TIMEOUT) of
{badrpc, timeout} ->
ErrorMsg = io_lib:format(
"health check of node ~p fails: timed out (~p ms)",
[Node, ?NODE_HEALTH_CHECK_TIMEOUT]),
throw({node_is_ko, ErrorMsg, 70});
{badrpc, nodedown} ->
ErrorMsg = io_lib:format(
"health check of node ~p fails: nodedown", [Node]),
throw({node_is_ko, ErrorMsg, 68});
{badrpc, Reason} ->
ErrorMsg = io_lib:format(
"health check of node ~p fails: ~p", [Node, Reason]),
throw({node_is_ko, ErrorMsg, 70});
node_health_check(list_channels) ->
case rabbit_channel:info_local([pid]) of
L when is_list(L) ->
ok;
Other ->
Fun(Other)
ErrorMsg = io_lib:format("list_channels unexpected output: ~p",
[Other]),
{error_string, ErrorMsg}
end;

node_health_check(list_queues) ->
health_check_queues(rabbit_vhost:list());

node_health_check(alarms) ->
case proplists:get_value(alarms, rabbit:status()) of
[] ->
ok;
Alarms ->
ErrorMsg = io_lib:format("resource alarm(s) in effect:~p", [Alarms]),
{error_string, ErrorMsg}
end.


health_check_queues([]) ->
ok;
health_check_queues([VHost|RestVHosts]) ->
case rabbit_amqqueue:info_local(VHost) of
L when is_list(L) ->
health_check_queues(RestVHosts);
Other ->
ErrorMsg = io_lib:format("list_queues unexpected output for vhost ~s: ~p",
[VHost, Other]),
{error_string, ErrorMsg}
end.

0 comments on commit c4218d9

Please sign in to comment.