Skip to content

Commit

Permalink
Merge branch 'Ayanda-D-call-timeout-safety' into master
Browse files Browse the repository at this point in the history
(cherry picked from commit e6c9754)
  • Loading branch information
michaelklishin committed Dec 1, 2020
1 parent 0d05403 commit 3da80b6
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 11 deletions.
2 changes: 1 addition & 1 deletion deps/amqp_client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ PACKAGES_DIR ?= $(abspath PACKAGES)

LOCAL_DEPS = xmerl
DEPS = rabbit_common
TEST_DEPS = rabbitmq_ct_helpers rabbit
TEST_DEPS = rabbitmq_ct_helpers rabbit meck

DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-test.mk
DEP_PLUGINS = rabbit_common/mk/rabbitmq-build.mk \
Expand Down
3 changes: 3 additions & 0 deletions deps/amqp_client/include/amqp_client_internal.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,6 @@
{<<"authentication_failure_close">>, bool, true}]).

-define(WAIT_FOR_CONFIRMS_TIMEOUT, {60000, millisecond}).

-define(DIRECT_OPERATION_TIMEOUT, 120000).
-define(CALL_TIMEOUT_DEVIATION, 10000).
13 changes: 10 additions & 3 deletions deps/amqp_client/src/amqp_channel_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,16 @@ start_link(Type, Connection, ConnName, InfraArgs, ChNumber,

start_writer(_Sup, direct, [ConnPid, Node, User, VHost, Collector, AmqpParams],
ConnName, ChNumber, ChPid) ->
rpc:call(Node, rabbit_direct, start_channel,
[ChNumber, ChPid, ConnPid, ConnName, ?PROTOCOL, User,
VHost, ?CLIENT_CAPABILITIES, Collector, AmqpParams]);
case rpc:call(Node, rabbit_direct, start_channel,
[ChNumber, ChPid, ConnPid, ConnName, ?PROTOCOL, User,
VHost, ?CLIENT_CAPABILITIES, Collector, AmqpParams], ?DIRECT_OPERATION_TIMEOUT) of
{ok, _Writer} = Reply ->
Reply;
{badrpc, Reason} ->
{error, {Reason, Node}};
Error ->
Error
end;
start_writer(Sup, network, [Sock, FrameMax], ConnName, ChNumber, ChPid) ->
GCThreshold = application:get_env(amqp_client, writer_gc_threshold, ?DEFAULT_GC_THRESHOLD),
supervisor2:start_child(
Expand Down
28 changes: 28 additions & 0 deletions deps/amqp_client/src/amqp_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ start(AmqpParams, ConnName) when ConnName == undefined; is_binary(ConnName) ->
end,
AmqpParams2 = set_connection_name(ConnName, AmqpParams1),
AmqpParams3 = amqp_ssl:maybe_enhance_ssl_options(AmqpParams2),
ok = ensure_safe_call_timeout(AmqpParams3, amqp_util:call_timeout()),
{ok, _Sup, Connection} = amqp_sup:start_connection_sup(AmqpParams3),
amqp_gen_connection:connect(Connection).

Expand Down Expand Up @@ -393,3 +394,30 @@ connection_name(ConnectionPid) ->
{<<"connection_name">>, _, ConnName} -> ConnName;
false -> undefined
end.

ensure_safe_call_timeout(#amqp_params_network{connection_timeout = ConnTimeout}, CallTimeout) ->
maybe_update_call_timeout(ConnTimeout, CallTimeout);
ensure_safe_call_timeout(#amqp_params_direct{}, CallTimeout) ->
case net_kernel:get_net_ticktime() of
NetTicktime when is_integer(NetTicktime) ->
maybe_update_call_timeout(tick_or_direct_timeout(NetTicktime * 1000),
CallTimeout);
{ongoing_change_to, NetTicktime} ->
maybe_update_call_timeout(tick_or_direct_timeout(NetTicktime * 1000),
CallTimeout);
ignore ->
maybe_update_call_timeout(?DIRECT_OPERATION_TIMEOUT, CallTimeout)
end.

maybe_update_call_timeout(BaseTimeout, CallTimeout)
when is_integer(BaseTimeout), CallTimeout > BaseTimeout ->
ok;
maybe_update_call_timeout(BaseTimeout, CallTimeout) ->
EffectiveSafeCallTimeout = amqp_util:safe_call_timeout(BaseTimeout),
?LOG_WARN("AMQP 0-9-1 client call timeout was ~p ms, is updated to a safe effective "
"value of ~p ms", [CallTimeout, EffectiveSafeCallTimeout]),
amqp_util:update_call_timeout(EffectiveSafeCallTimeout),
ok.

tick_or_direct_timeout(Timeout) when Timeout >= ?DIRECT_OPERATION_TIMEOUT -> Timeout;
tick_or_direct_timeout(_Timeout) -> ?DIRECT_OPERATION_TIMEOUT.
6 changes: 3 additions & 3 deletions deps/amqp_client/src/amqp_direct_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ connect(Params = #amqp_params_direct{username = Username,
DecryptedPassword = credentials_obfuscation:decrypt(Password),
case rpc:call(Node, rabbit_direct, connect,
[{Username, DecryptedPassword}, VHost, ?PROTOCOL, self(),
connection_info(State1)]) of
connection_info(State1)], ?DIRECT_OPERATION_TIMEOUT) of
{ok, {User, ServerProperties}} ->
{ok, ChMgr, Collector} = SIF(i(name, State1)),
State2 = State1#state{user = User,
Expand All @@ -158,8 +158,8 @@ connect(Params = #amqp_params_direct{username = Username,
{ok, {ServerProperties, 0, ChMgr, State2}};
{error, _} = E ->
E;
{badrpc, nodedown} ->
{error, {nodedown, Node}}
{badrpc, Reason} ->
{error, {Reason, Node}}
end.

ensure_adapter_info(none) ->
Expand Down
4 changes: 3 additions & 1 deletion deps/amqp_client/src/amqp_gen_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ connect(Pid) ->
gen_server:call(Pid, connect, amqp_util:call_timeout()).

open_channel(Pid, ProposedNumber, Consumer) ->
case gen_server:call(Pid,
try gen_server:call(Pid,
{command, {open_channel, ProposedNumber, Consumer}},
amqp_util:call_timeout()) of
{ok, ChannelPid} -> ok = amqp_channel:open(ChannelPid),
{ok, ChannelPid};
Error -> Error
catch
_:Reason -> {error, Reason}
end.

hard_error_in_channel(Pid, ChannelPid, Reason) ->
Expand Down
10 changes: 9 additions & 1 deletion deps/amqp_client/src/amqp_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

-include("amqp_client_internal.hrl").

-export([call_timeout/0]).
-export([call_timeout/0, update_call_timeout/1, safe_call_timeout/1]).

call_timeout() ->
case get(gen_server_call_timeout) of
Expand All @@ -15,3 +15,11 @@ call_timeout() ->
Timeout ->
Timeout
end.

update_call_timeout(Timeout) ->
application:set_env(amqp_client, gen_server_call_timeout, Timeout),
put(gen_server_call_timeout, Timeout),
ok.

safe_call_timeout(Threshold) ->
Threshold + ?CALL_TIMEOUT_DEVIATION.
120 changes: 118 additions & 2 deletions deps/amqp_client/test/system_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ all() ->
{hard_error_loop, [{repeat, 100}, parallel], [hard_error]}
]).
-define(COMMON_NON_PARALLEL_TEST_CASES, [
basic_qos, %% Not parallel because it's time-based.
basic_qos, %% Not parallel because it's time-based, or has mocks
connection_failure,
channel_death
channel_death,
safe_call_timeouts
]).

groups() ->
Expand Down Expand Up @@ -294,6 +295,111 @@ named_connection(Config) ->

%% -------------------------------------------------------------------

safe_call_timeouts(Config) ->
Params = ?config(amqp_client_conn_params, Config),
safe_call_timeouts_test(Params).

safe_call_timeouts_test(Params = #amqp_params_network{}) ->
TestConnTimeout = 2000,
TestCallTimeout = 1000,

Params1 = Params#amqp_params_network{connection_timeout = TestConnTimeout},

%% Normal connection
amqp_util:update_call_timeout(TestCallTimeout),

{ok, Connection1} = amqp_connection:start(Params1),
?assertEqual(TestConnTimeout + ?CALL_TIMEOUT_DEVIATION, amqp_util:call_timeout()),

?assertEqual(ok, amqp_connection:close(Connection1)),
wait_for_death(Connection1),

%% Failing connection
amqp_util:update_call_timeout(TestCallTimeout),

ok = meck:new(amqp_network_connection, [passthrough]),
ok = meck:expect(amqp_network_connection, connect,
fun(_AmqpParams, _SIF, _TypeSup, _State) ->
timer:sleep(TestConnTimeout),
{error, test_connection_timeout}
end),

?assertEqual({error, test_connection_timeout}, amqp_connection:start(Params1)),

?assertEqual(TestConnTimeout + ?CALL_TIMEOUT_DEVIATION, amqp_util:call_timeout()),

meck:unload(amqp_network_connection);

safe_call_timeouts_test(Params = #amqp_params_direct{}) ->
TestCallTimeout = 30000,
NetTicktime0 = net_kernel:get_net_ticktime(),
amqp_util:update_call_timeout(TestCallTimeout),

%% 1. NetTicktime >= DIRECT_OPERATION_TIMEOUT (120s)
NetTicktime1 = 140,
net_kernel:set_net_ticktime(NetTicktime1, 1),
wait_until_net_ticktime(NetTicktime1),

{ok, Connection1} = amqp_connection:start(Params),
?assertEqual((NetTicktime1 * 1000) + ?CALL_TIMEOUT_DEVIATION,
amqp_util:call_timeout()),

?assertEqual(ok, amqp_connection:close(Connection1)),
wait_for_death(Connection1),

%% Reset call timeout
amqp_util:update_call_timeout(TestCallTimeout),

%% 2. Transitioning NetTicktime >= DIRECT_OPERATION_TIMEOUT (120s)
NetTicktime2 = 120,
net_kernel:set_net_ticktime(NetTicktime2, 1),
?assertEqual({ongoing_change_to, NetTicktime2}, net_kernel:get_net_ticktime()),

{ok, Connection2} = amqp_connection:start(Params),
?assertEqual((NetTicktime2 * 1000) + ?CALL_TIMEOUT_DEVIATION,
amqp_util:call_timeout()),

wait_until_net_ticktime(NetTicktime2),

?assertEqual(ok, amqp_connection:close(Connection2)),
wait_for_death(Connection2),

%% Reset call timeout
amqp_util:update_call_timeout(TestCallTimeout),

%% 3. NetTicktime < DIRECT_OPERATION_TIMEOUT (120s)
NetTicktime3 = 60,
net_kernel:set_net_ticktime(NetTicktime3, 1),
wait_until_net_ticktime(NetTicktime3),

{ok, Connection3} = amqp_connection:start(Params),
?assertEqual((?DIRECT_OPERATION_TIMEOUT + ?CALL_TIMEOUT_DEVIATION),
amqp_util:call_timeout()),

net_kernel:set_net_ticktime(NetTicktime0, 1),
wait_until_net_ticktime(NetTicktime0),
?assertEqual(ok, amqp_connection:close(Connection3)),
wait_for_death(Connection3),

%% Failing direct connection
amqp_util:update_call_timeout(_LowCallTimeout = 1000),

ok = meck:new(amqp_direct_connection, [passthrough]),
ok = meck:expect(amqp_direct_connection, connect,
fun(_AmqpParams, _SIF, _TypeSup, _State) ->
timer:sleep(2000),
{error, test_connection_timeout}
end),

?assertEqual({error, test_connection_timeout}, amqp_connection:start(Params)),

?assertEqual((?DIRECT_OPERATION_TIMEOUT + ?CALL_TIMEOUT_DEVIATION),
amqp_util:call_timeout()),

meck:unload(amqp_direct_connection).

%% -------------------------------------------------------------------

simultaneous_close(Config) ->
{ok, Connection} = new_connection(Config),
%% We pick a high channel number, to avoid any conflict with other
Expand Down Expand Up @@ -1456,6 +1562,16 @@ assert_down_with_error(MonitorRef, CodeAtom) ->
exit(did_not_die)
end.

wait_until_net_ticktime(NetTicktime) ->
case net_kernel:get_net_ticktime() of
NetTicktime -> ok;
{ongoing_change_to, NetTicktime} ->
timer:sleep(1000),
wait_until_net_ticktime(NetTicktime);
_ ->
throw({error, {net_ticktime_not_set, NetTicktime}})
end.

set_resource_alarm(memory, Config) ->
SrcDir = ?config(amqp_client_srcdir, Config),
Nodename = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Expand Down

0 comments on commit 3da80b6

Please sign in to comment.