Skip to content
This repository has been archived by the owner on Nov 17, 2020. It is now read-only.

Commit

Permalink
Merge pull request #107 from rabbitmq/mk-handle-internal-exchange-che…
Browse files Browse the repository at this point in the history
…ck-timeouts

Handle operation timeouts during internal exchange checks

(cherry picked from commit 65476b6)
  • Loading branch information
michaelklishin committed Jun 17, 2020
1 parent bc9aa11 commit 795d66e
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ PROJECT_MOD = rabbit_federation_app
define PROJECT_ENV
[
{pgroup_name_cluster_id, false},
{internal_exchange_check_interval, 30000}
{internal_exchange_check_interval, 90000}
]
endef

Expand Down
8 changes: 6 additions & 2 deletions src/rabbit_federation_exchange_link.erl
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,14 @@ handle_info({'DOWN', _Ref, process, Pid, Reason},
{Upstream, UParams, XName}, State);

handle_info(check_internal_exchange, State = #state{internal_exchange = IntXNameBin,
internal_exchange_interval = Int}) ->
internal_exchange_interval = Interval}) ->
case check_internal_exchange(IntXNameBin, State) of
upstream_not_found ->
rabbit_log_federation:warning("Federation link could not find upstream exchange '~s' and will restart",
[IntXNameBin]),
{stop, {shutdown, restart}, State};
_ ->
TRef = erlang:send_after(Int, self(), check_internal_exchange),
TRef = erlang:send_after(Interval, self(), check_internal_exchange),
{noreply, State#state{internal_exchange_timer = TRef}}
end;

Expand Down Expand Up @@ -476,6 +478,8 @@ go(S0 = {not_started, {Upstream, UParams, DownXName}}) ->
unacked = Unacked,
internal_exchange_interval = Interval}),
Bindings),
rabbit_log_federation:info("Federation link for ~s (upstream: ~s) will perform internal exchange checks "
"every ~b seconds", [rabbit_misc:rs(DownXName), UName, round(Interval / 1000)]),
TRef = erlang:send_after(Interval, self(), check_internal_exchange),
{noreply, State#state{internal_exchange_timer = TRef}}
end, Upstream, UParams, DownXName, S0).
Expand Down
65 changes: 43 additions & 22 deletions src/rabbit_federation_link_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ get_connection_name(_, _) ->

connection_name(Upstream, Policy) when is_binary(Upstream), is_binary(Policy) ->
<<<<"Federation link (upstream: ">>/binary, Upstream/binary, <<", policy: ">>/binary, Policy/binary, <<")">>/binary>>;

connection_name(_, _) ->
<<"Federation link">>.

Expand All @@ -133,38 +132,49 @@ ensure_channel_closed(Ch) -> catch amqp_channel:close(Ch).
ensure_connection_closed(Conn) ->
catch amqp_connection:close(Conn, ?MAX_CONNECTION_CLOSE_TIMEOUT).

connection_error(remote_start, {{shutdown, {server_initiated_close, Code, Message}}, _} = E,
Upstream, UParams, XorQName, State) ->
rabbit_federation_status:report(
Upstream, UParams, XorQName, clean_reason(E)),
log_warning(XorQName,
"did not connect to ~s. Server has closed the connection due to an error, code: ~p, "
"message: ~s",
[rabbit_federation_upstream:params_to_string(UParams),
Code, Message]),
{stop, {shutdown, restart}, State};

connection_error(remote_start, E, Upstream, UParams, XorQName, State) ->
rabbit_federation_status:report(
Upstream, UParams, XorQName, clean_reason(E)),
log_warning(XorQName, "did not connect to ~s. Reason: ~p~n",
log_warning(XorQName, "did not connect to ~s. Reason: ~p",
[rabbit_federation_upstream:params_to_string(UParams),
E]),
{stop, {shutdown, restart}, State};

connection_error(remote, E, Upstream, UParams, XorQName, State) ->
rabbit_federation_status:report(
Upstream, UParams, XorQName, clean_reason(E)),
log_info(XorQName, "disconnected from ~s~n~p~n",
log_info(XorQName, "disconnected from ~s~n~p",
[rabbit_federation_upstream:params_to_string(UParams), E]),
{stop, {shutdown, restart}, State};

connection_error(command_channel, E, Upstream, UParams, XorQName, State) ->
rabbit_federation_status:report(
Upstream, UParams, XorQName, clean_reason(E)),
log_info(XorQName, "failed to open a command channel for upstream ~s~n~p~n",
log_info(XorQName, "failed to open a command channel for upstream ~s~n~p",
[rabbit_federation_upstream:params_to_string(UParams), E]),
{stop, {shutdown, restart}, State};

connection_error(local, basic_cancel, Upstream, UParams, XorQName, State) ->
rabbit_federation_status:report(
Upstream, UParams, XorQName, {error, basic_cancel}),
log_info(XorQName, "received 'basic.cancel'~n", []),
log_info(XorQName, "received a 'basic.cancel'", []),
{stop, {shutdown, restart}, State};

connection_error(local_start, E, Upstream, UParams, XorQName, State) ->
rabbit_federation_status:report(
Upstream, UParams, XorQName, clean_reason(E)),
log_warning(XorQName, "did not connect locally~n~p~n", [E]),
log_warning(XorQName, "did not connect locally~n~p", [E]),
{stop, {shutdown, restart}, State}.

%% If we terminate due to a gen_server call exploding (almost
Expand Down Expand Up @@ -313,8 +323,8 @@ disposable_channel_call(Conn, Method, ErrFun) ->
{ok, Ch} = amqp_connection:open_channel(Conn),
try
amqp_channel:call(Ch, Method)
catch exit:{{shutdown, {server_initiated_close, Code, Text}}, _} ->
ErrFun(Code, Text)
catch exit:{{shutdown, {server_initiated_close, Code, Message}}, _} ->
ErrFun(Code, Message)
after
ensure_channel_closed(Ch)
end
Expand All @@ -324,18 +334,29 @@ disposable_channel_call(Conn, Method, ErrFun) ->
end.

disposable_connection_call(Params, Method, ErrFun) ->
case open(Params, undefined) of
{ok, Conn, Ch} ->
try
amqp_channel:call(Ch, Method)
catch exit:{{shutdown, {connection_closing,
{server_initiated_close, Code, Txt}}}, _} ->
ErrFun(Code, Txt);
exit:{{shutdown, {server_initiated_close, Code, Txt}}, _} ->
ErrFun(Code, Txt)
after
ensure_connection_closed(Conn)
end;
E ->
E
try
rabbit_log_federation:debug("Disposable connection parameters: ~p", [Params]),
case open(Params, <<"Disposable exchange federation link connection">>) of
{ok, Conn, Ch} ->
try
amqp_channel:call(Ch, Method)
catch exit:{{shutdown, {connection_closing, {server_initiated_close, Code, Message}}}, _} ->
ErrFun(Code, Message);
exit:{{shutdown, {server_initiated_close, Code, Message}}, _} ->
ErrFun(Code, Message)
after
ensure_connection_closed(Conn)
end;
{error, {auth_failure, Message}} ->
rabbit_log_federation:error("Federation link could not open a disposable (one-off) connection "
"due to an authentication failure: ~s~n", [Message]);
Error ->
rabbit_log_federation:error("Federation link could not open a disposable (one-off) connection, "
"reason: ~p~n", [Error]),
Error
end
catch
Exception:Reason ->
rabbit_log_federation:error("Federation link could not create a disposable (one-off) connection "
"due to an error ~p: ~p~n", [Exception, Reason])
end.

0 comments on commit 795d66e

Please sign in to comment.