From 3da80b6823c3165f6eeb89e4e1653233bf3513cd Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Tue, 1 Dec 2020 06:16:03 +0300 Subject: [PATCH] Merge branch 'Ayanda-D-call-timeout-safety' into master (cherry picked from commit e6c97547e498d4b5361a626dde92872c582ae107) --- deps/amqp_client/Makefile | 2 +- .../include/amqp_client_internal.hrl | 3 + deps/amqp_client/src/amqp_channel_sup.erl | 13 +- deps/amqp_client/src/amqp_connection.erl | 28 ++++ .../src/amqp_direct_connection.erl | 6 +- deps/amqp_client/src/amqp_gen_connection.erl | 4 +- deps/amqp_client/src/amqp_util.erl | 10 +- deps/amqp_client/test/system_SUITE.erl | 120 +++++++++++++++++- 8 files changed, 175 insertions(+), 11 deletions(-) diff --git a/deps/amqp_client/Makefile b/deps/amqp_client/Makefile index b15fd918f0b5..186441ce9a9d 100644 --- a/deps/amqp_client/Makefile +++ b/deps/amqp_client/Makefile @@ -30,7 +30,7 @@ PACKAGES_DIR ?= $(abspath PACKAGES) LOCAL_DEPS = xmerl DEPS = rabbit_common -TEST_DEPS = rabbitmq_ct_helpers rabbit +TEST_DEPS = rabbitmq_ct_helpers rabbit meck DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-test.mk DEP_PLUGINS = rabbit_common/mk/rabbitmq-build.mk \ diff --git a/deps/amqp_client/include/amqp_client_internal.hrl b/deps/amqp_client/include/amqp_client_internal.hrl index 01e099097e82..075f5f4a1259 100644 --- a/deps/amqp_client/include/amqp_client_internal.hrl +++ b/deps/amqp_client/include/amqp_client_internal.hrl @@ -28,3 +28,6 @@ {<<"authentication_failure_close">>, bool, true}]). -define(WAIT_FOR_CONFIRMS_TIMEOUT, {60000, millisecond}). + +-define(DIRECT_OPERATION_TIMEOUT, 120000). +-define(CALL_TIMEOUT_DEVIATION, 10000). diff --git a/deps/amqp_client/src/amqp_channel_sup.erl b/deps/amqp_client/src/amqp_channel_sup.erl index 9bd85ce9462c..bc346c958481 100644 --- a/deps/amqp_client/src/amqp_channel_sup.erl +++ b/deps/amqp_client/src/amqp_channel_sup.erl @@ -47,9 +47,16 @@ start_link(Type, Connection, ConnName, InfraArgs, ChNumber, start_writer(_Sup, direct, [ConnPid, Node, User, VHost, Collector, AmqpParams], ConnName, ChNumber, ChPid) -> - rpc:call(Node, rabbit_direct, start_channel, - [ChNumber, ChPid, ConnPid, ConnName, ?PROTOCOL, User, - VHost, ?CLIENT_CAPABILITIES, Collector, AmqpParams]); + case rpc:call(Node, rabbit_direct, start_channel, + [ChNumber, ChPid, ConnPid, ConnName, ?PROTOCOL, User, + VHost, ?CLIENT_CAPABILITIES, Collector, AmqpParams], ?DIRECT_OPERATION_TIMEOUT) of + {ok, _Writer} = Reply -> + Reply; + {badrpc, Reason} -> + {error, {Reason, Node}}; + Error -> + Error + end; start_writer(Sup, network, [Sock, FrameMax], ConnName, ChNumber, ChPid) -> GCThreshold = application:get_env(amqp_client, writer_gc_threshold, ?DEFAULT_GC_THRESHOLD), supervisor2:start_child( diff --git a/deps/amqp_client/src/amqp_connection.erl b/deps/amqp_client/src/amqp_connection.erl index 6800a44a3e4e..d69355453dc1 100644 --- a/deps/amqp_client/src/amqp_connection.erl +++ b/deps/amqp_client/src/amqp_connection.erl @@ -170,6 +170,7 @@ start(AmqpParams, ConnName) when ConnName == undefined; is_binary(ConnName) -> end, AmqpParams2 = set_connection_name(ConnName, AmqpParams1), AmqpParams3 = amqp_ssl:maybe_enhance_ssl_options(AmqpParams2), + ok = ensure_safe_call_timeout(AmqpParams3, amqp_util:call_timeout()), {ok, _Sup, Connection} = amqp_sup:start_connection_sup(AmqpParams3), amqp_gen_connection:connect(Connection). @@ -393,3 +394,30 @@ connection_name(ConnectionPid) -> {<<"connection_name">>, _, ConnName} -> ConnName; false -> undefined end. + +ensure_safe_call_timeout(#amqp_params_network{connection_timeout = ConnTimeout}, CallTimeout) -> + maybe_update_call_timeout(ConnTimeout, CallTimeout); +ensure_safe_call_timeout(#amqp_params_direct{}, CallTimeout) -> + case net_kernel:get_net_ticktime() of + NetTicktime when is_integer(NetTicktime) -> + maybe_update_call_timeout(tick_or_direct_timeout(NetTicktime * 1000), + CallTimeout); + {ongoing_change_to, NetTicktime} -> + maybe_update_call_timeout(tick_or_direct_timeout(NetTicktime * 1000), + CallTimeout); + ignore -> + maybe_update_call_timeout(?DIRECT_OPERATION_TIMEOUT, CallTimeout) + end. + +maybe_update_call_timeout(BaseTimeout, CallTimeout) + when is_integer(BaseTimeout), CallTimeout > BaseTimeout -> + ok; +maybe_update_call_timeout(BaseTimeout, CallTimeout) -> + EffectiveSafeCallTimeout = amqp_util:safe_call_timeout(BaseTimeout), + ?LOG_WARN("AMQP 0-9-1 client call timeout was ~p ms, is updated to a safe effective " + "value of ~p ms", [CallTimeout, EffectiveSafeCallTimeout]), + amqp_util:update_call_timeout(EffectiveSafeCallTimeout), + ok. + +tick_or_direct_timeout(Timeout) when Timeout >= ?DIRECT_OPERATION_TIMEOUT -> Timeout; +tick_or_direct_timeout(_Timeout) -> ?DIRECT_OPERATION_TIMEOUT. diff --git a/deps/amqp_client/src/amqp_direct_connection.erl b/deps/amqp_client/src/amqp_direct_connection.erl index a07c67074ee2..ea486aacd143 100644 --- a/deps/amqp_client/src/amqp_direct_connection.erl +++ b/deps/amqp_client/src/amqp_direct_connection.erl @@ -143,7 +143,7 @@ connect(Params = #amqp_params_direct{username = Username, DecryptedPassword = credentials_obfuscation:decrypt(Password), case rpc:call(Node, rabbit_direct, connect, [{Username, DecryptedPassword}, VHost, ?PROTOCOL, self(), - connection_info(State1)]) of + connection_info(State1)], ?DIRECT_OPERATION_TIMEOUT) of {ok, {User, ServerProperties}} -> {ok, ChMgr, Collector} = SIF(i(name, State1)), State2 = State1#state{user = User, @@ -158,8 +158,8 @@ connect(Params = #amqp_params_direct{username = Username, {ok, {ServerProperties, 0, ChMgr, State2}}; {error, _} = E -> E; - {badrpc, nodedown} -> - {error, {nodedown, Node}} + {badrpc, Reason} -> + {error, {Reason, Node}} end. ensure_adapter_info(none) -> diff --git a/deps/amqp_client/src/amqp_gen_connection.erl b/deps/amqp_client/src/amqp_gen_connection.erl index 5c826a5b5f5b..90a262ae9b4b 100644 --- a/deps/amqp_client/src/amqp_gen_connection.erl +++ b/deps/amqp_client/src/amqp_gen_connection.erl @@ -49,12 +49,14 @@ connect(Pid) -> gen_server:call(Pid, connect, amqp_util:call_timeout()). open_channel(Pid, ProposedNumber, Consumer) -> - case gen_server:call(Pid, + try gen_server:call(Pid, {command, {open_channel, ProposedNumber, Consumer}}, amqp_util:call_timeout()) of {ok, ChannelPid} -> ok = amqp_channel:open(ChannelPid), {ok, ChannelPid}; Error -> Error + catch + _:Reason -> {error, Reason} end. hard_error_in_channel(Pid, ChannelPid, Reason) -> diff --git a/deps/amqp_client/src/amqp_util.erl b/deps/amqp_client/src/amqp_util.erl index df7ce3066289..0324d4a17143 100644 --- a/deps/amqp_client/src/amqp_util.erl +++ b/deps/amqp_client/src/amqp_util.erl @@ -2,7 +2,7 @@ -include("amqp_client_internal.hrl"). --export([call_timeout/0]). +-export([call_timeout/0, update_call_timeout/1, safe_call_timeout/1]). call_timeout() -> case get(gen_server_call_timeout) of @@ -15,3 +15,11 @@ call_timeout() -> Timeout -> Timeout end. + +update_call_timeout(Timeout) -> + application:set_env(amqp_client, gen_server_call_timeout, Timeout), + put(gen_server_call_timeout, Timeout), + ok. + +safe_call_timeout(Threshold) -> + Threshold + ?CALL_TIMEOUT_DEVIATION. diff --git a/deps/amqp_client/test/system_SUITE.erl b/deps/amqp_client/test/system_SUITE.erl index 9e39e468b7d7..45ada5cf1969 100644 --- a/deps/amqp_client/test/system_SUITE.erl +++ b/deps/amqp_client/test/system_SUITE.erl @@ -77,9 +77,10 @@ all() -> {hard_error_loop, [{repeat, 100}, parallel], [hard_error]} ]). -define(COMMON_NON_PARALLEL_TEST_CASES, [ - basic_qos, %% Not parallel because it's time-based. + basic_qos, %% Not parallel because it's time-based, or has mocks connection_failure, - channel_death + channel_death, + safe_call_timeouts ]). groups() -> @@ -294,6 +295,111 @@ named_connection(Config) -> %% ------------------------------------------------------------------- +safe_call_timeouts(Config) -> + Params = ?config(amqp_client_conn_params, Config), + safe_call_timeouts_test(Params). + +safe_call_timeouts_test(Params = #amqp_params_network{}) -> + TestConnTimeout = 2000, + TestCallTimeout = 1000, + + Params1 = Params#amqp_params_network{connection_timeout = TestConnTimeout}, + + %% Normal connection + amqp_util:update_call_timeout(TestCallTimeout), + + {ok, Connection1} = amqp_connection:start(Params1), + ?assertEqual(TestConnTimeout + ?CALL_TIMEOUT_DEVIATION, amqp_util:call_timeout()), + + ?assertEqual(ok, amqp_connection:close(Connection1)), + wait_for_death(Connection1), + + %% Failing connection + amqp_util:update_call_timeout(TestCallTimeout), + + ok = meck:new(amqp_network_connection, [passthrough]), + ok = meck:expect(amqp_network_connection, connect, + fun(_AmqpParams, _SIF, _TypeSup, _State) -> + timer:sleep(TestConnTimeout), + {error, test_connection_timeout} + end), + + ?assertEqual({error, test_connection_timeout}, amqp_connection:start(Params1)), + + ?assertEqual(TestConnTimeout + ?CALL_TIMEOUT_DEVIATION, amqp_util:call_timeout()), + + meck:unload(amqp_network_connection); + +safe_call_timeouts_test(Params = #amqp_params_direct{}) -> + TestCallTimeout = 30000, + NetTicktime0 = net_kernel:get_net_ticktime(), + amqp_util:update_call_timeout(TestCallTimeout), + + %% 1. NetTicktime >= DIRECT_OPERATION_TIMEOUT (120s) + NetTicktime1 = 140, + net_kernel:set_net_ticktime(NetTicktime1, 1), + wait_until_net_ticktime(NetTicktime1), + + {ok, Connection1} = amqp_connection:start(Params), + ?assertEqual((NetTicktime1 * 1000) + ?CALL_TIMEOUT_DEVIATION, + amqp_util:call_timeout()), + + ?assertEqual(ok, amqp_connection:close(Connection1)), + wait_for_death(Connection1), + + %% Reset call timeout + amqp_util:update_call_timeout(TestCallTimeout), + + %% 2. Transitioning NetTicktime >= DIRECT_OPERATION_TIMEOUT (120s) + NetTicktime2 = 120, + net_kernel:set_net_ticktime(NetTicktime2, 1), + ?assertEqual({ongoing_change_to, NetTicktime2}, net_kernel:get_net_ticktime()), + + {ok, Connection2} = amqp_connection:start(Params), + ?assertEqual((NetTicktime2 * 1000) + ?CALL_TIMEOUT_DEVIATION, + amqp_util:call_timeout()), + + wait_until_net_ticktime(NetTicktime2), + + ?assertEqual(ok, amqp_connection:close(Connection2)), + wait_for_death(Connection2), + + %% Reset call timeout + amqp_util:update_call_timeout(TestCallTimeout), + + %% 3. NetTicktime < DIRECT_OPERATION_TIMEOUT (120s) + NetTicktime3 = 60, + net_kernel:set_net_ticktime(NetTicktime3, 1), + wait_until_net_ticktime(NetTicktime3), + + {ok, Connection3} = amqp_connection:start(Params), + ?assertEqual((?DIRECT_OPERATION_TIMEOUT + ?CALL_TIMEOUT_DEVIATION), + amqp_util:call_timeout()), + + net_kernel:set_net_ticktime(NetTicktime0, 1), + wait_until_net_ticktime(NetTicktime0), + ?assertEqual(ok, amqp_connection:close(Connection3)), + wait_for_death(Connection3), + + %% Failing direct connection + amqp_util:update_call_timeout(_LowCallTimeout = 1000), + + ok = meck:new(amqp_direct_connection, [passthrough]), + ok = meck:expect(amqp_direct_connection, connect, + fun(_AmqpParams, _SIF, _TypeSup, _State) -> + timer:sleep(2000), + {error, test_connection_timeout} + end), + + ?assertEqual({error, test_connection_timeout}, amqp_connection:start(Params)), + + ?assertEqual((?DIRECT_OPERATION_TIMEOUT + ?CALL_TIMEOUT_DEVIATION), + amqp_util:call_timeout()), + + meck:unload(amqp_direct_connection). + +%% ------------------------------------------------------------------- + simultaneous_close(Config) -> {ok, Connection} = new_connection(Config), %% We pick a high channel number, to avoid any conflict with other @@ -1456,6 +1562,16 @@ assert_down_with_error(MonitorRef, CodeAtom) -> exit(did_not_die) end. +wait_until_net_ticktime(NetTicktime) -> + case net_kernel:get_net_ticktime() of + NetTicktime -> ok; + {ongoing_change_to, NetTicktime} -> + timer:sleep(1000), + wait_until_net_ticktime(NetTicktime); + _ -> + throw({error, {net_ticktime_not_set, NetTicktime}}) + end. + set_resource_alarm(memory, Config) -> SrcDir = ?config(amqp_client_srcdir, Config), Nodename = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),