Skip to content
This repository has been archived by the owner on Nov 18, 2020. It is now read-only.

Commit

Permalink
Replace infinite timeouts with sensible defaults.
Browse files Browse the repository at this point in the history
Change gen_server:call timeouts to use a configurable
default that is cached inside each process' process dictionary.
Also make supervisor shutdown timeouts use
the SUPERVISOR_WAIT value.

[#147178169]
  • Loading branch information
kjnilsson committed Jun 15, 2017
1 parent b4c7018 commit 2e75861
Show file tree
Hide file tree
Showing 14 changed files with 54 additions and 32 deletions.
2 changes: 1 addition & 1 deletion include/amqp_client.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
channel_max = 0,
frame_max = 0,
heartbeat = 10,
connection_timeout = infinity,
connection_timeout = 30000,
ssl_options = none,
auth_mechanisms =
[fun amqp_auth_mechanisms:plain/3,
Expand Down
4 changes: 4 additions & 0 deletions include/amqp_client_internal.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@
{<<"consumer_cancel_notify">>, bool, true},
{<<"connection.blocked">>, bool, true},
{<<"authentication_failure_close">>, bool, true}]).

-define(CALL_TIMEOUT, rabbit_misc:get_env(amqp_client, gen_server_call_timeout,
30000)).

21 changes: 11 additions & 10 deletions src/amqp_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@
%% @spec (Channel, Method) -> Result
%% @doc This is equivalent to amqp_channel:call(Channel, Method, none).
call(Channel, Method) ->
gen_server:call(Channel, {call, Method, none, self()}, infinity).
gen_server:call(Channel, {call, Method, none, self()}, amqp_util:call_timeout()).

%% @spec (Channel, Method, Content) -> Result
%% where
Expand All @@ -163,7 +163,7 @@ call(Channel, Method) ->
%% the broker. It does not necessarily imply that the broker has
%% accepted responsibility for the message.
call(Channel, Method, Content) ->
gen_server:call(Channel, {call, Method, Content, self()}, infinity).
gen_server:call(Channel, {call, Method, Content, self()}, amqp_util:call_timeout()).

%% @spec (Channel, Method) -> ok
%% @doc This is equivalent to amqp_channel:cast(Channel, Method, none).
Expand Down Expand Up @@ -208,15 +208,15 @@ close(Channel) ->
%% @doc Closes the channel, allowing the caller to supply a reply code and
%% text. If the channel is already closing, the atom 'closing' is returned.
close(Channel, Code, Text) ->
gen_server:call(Channel, {close, Code, Text}, infinity).
gen_server:call(Channel, {close, Code, Text}, amqp_util:call_timeout()).

%% @spec (Channel) -> integer()
%% where
%% Channel = pid()
%% @doc When in confirm mode, returns the sequence number of the next
%% message to be published.
next_publish_seqno(Channel) ->
gen_server:call(Channel, next_publish_seqno, infinity).
gen_server:call(Channel, next_publish_seqno, amqp_util:call_timeout()).

%% @spec (Channel) -> boolean() | 'timeout'
%% where
Expand All @@ -225,7 +225,7 @@ next_publish_seqno(Channel) ->
%% been either ack'd or nack'd by the broker. Note, when called on a
%% non-Confirm channel, waitForConfirms returns an error.
wait_for_confirms(Channel) ->
wait_for_confirms(Channel, infinity).
wait_for_confirms(Channel, amqp_util:call_timeout()).

%% @spec (Channel, Timeout) -> boolean() | 'timeout'
%% where
Expand All @@ -236,7 +236,7 @@ wait_for_confirms(Channel) ->
%% Note, when called on a non-Confirm channel, waitForConfirms throws
%% an exception.
wait_for_confirms(Channel, Timeout) ->
case gen_server:call(Channel, {wait_for_confirms, Timeout}, infinity) of
case gen_server:call(Channel, {wait_for_confirms, Timeout}, amqp_util:call_timeout()) of
{error, Reason} -> throw(Reason);
Other -> Other
end.
Expand All @@ -248,7 +248,7 @@ wait_for_confirms(Channel, Timeout) ->
%% received, the calling process is immediately sent an
%% exit(nack_received).
wait_for_confirms_or_die(Channel) ->
wait_for_confirms_or_die(Channel, infinity).
wait_for_confirms_or_die(Channel, amqp_util:call_timeout()).

%% @spec (Channel, Timeout) -> true
%% where
Expand Down Expand Up @@ -328,7 +328,7 @@ unregister_flow_handler(Channel) ->
%% where Consumer is the amqp_gen_consumer implementation registered with
%% the channel.
call_consumer(Channel, Msg) ->
gen_server:call(Channel, {call_consumer, Msg}, infinity).
gen_server:call(Channel, {call_consumer, Msg}, amqp_util:call_timeout()).

%% @spec (Channel, BasicConsume, Subscriber) -> ok
%% where
Expand All @@ -338,7 +338,7 @@ call_consumer(Channel, Msg) ->
%% @doc Subscribe the given pid to a queue using the specified
%% basic.consume method.
subscribe(Channel, BasicConsume = #'basic.consume'{}, Subscriber) ->
gen_server:call(Channel, {subscribe, BasicConsume, Subscriber}, infinity).
gen_server:call(Channel, {subscribe, BasicConsume, Subscriber}, amqp_util:call_timeout()).

%%---------------------------------------------------------------------------
%% Internal interface
Expand All @@ -364,7 +364,7 @@ connection_closing(Pid, ChannelCloseType, Reason) ->

%% @private
open(Pid) ->
gen_server:call(Pid, open, infinity).
gen_server:call(Pid, open, amqp_util:call_timeout()).

%%---------------------------------------------------------------------------
%% gen_server callbacks
Expand Down Expand Up @@ -993,3 +993,4 @@ call_to_consumer(Method, Args, DeliveryCtx, #state{consumer = Consumer}) ->

safe_cancel_timer(undefined) -> ok;
safe_cancel_timer(TRef) -> erlang:cancel_timer(TRef).

6 changes: 3 additions & 3 deletions src/amqp_channels_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ start_link(Connection, ConnName, ChSupSup) ->

open_channel(ChMgr, ProposedNumber, Consumer, InfraArgs) ->
gen_server:call(ChMgr, {open_channel, ProposedNumber, Consumer, InfraArgs},
infinity).
amqp_util:call_timeout()).

set_channel_max(ChMgr, ChannelMax) ->
gen_server:cast(ChMgr, {set_channel_max, ChannelMax}).

is_empty(ChMgr) ->
gen_server:call(ChMgr, is_empty, infinity).
gen_server:call(ChMgr, is_empty, amqp_util:call_timeout()).

num_channels(ChMgr) ->
gen_server:call(ChMgr, num_channels, infinity).
gen_server:call(ChMgr, num_channels, amqp_util:call_timeout()).

pass_frame(ChMgr, ChNumber, Frame) ->
gen_server:cast(ChMgr, {pass_frame, ChNumber, Frame}).
Expand Down
4 changes: 2 additions & 2 deletions src/amqp_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
%% defaults to 0 (turned off) (network only)</li>
%% <li>connection_timeout :: non_neg_integer() | 'infinity'
%% - The connection timeout in milliseconds,
%% defaults to 'infinity' (network only)</li>
%% defaults to 30000 (network only)</li>
%% <li>ssl_options :: term() - The second parameter to be used with the
%% ssl:connect/2 function, defaults to 'none' (network only)</li>
%% <li>client_properties :: [{binary(), atom(), binary()}] - A list of extra
Expand Down Expand Up @@ -278,7 +278,7 @@ close(ConnectionPid, Timeout) ->
%% @doc Closes the AMQP connection, allowing the caller to set the reply
%% code and text.
close(ConnectionPid, Code, Text) ->
close(ConnectionPid, Code, Text, infinity).
close(ConnectionPid, Code, Text, amqp_util:call_timeout()).

%% @spec (ConnectionPid, Code, Text, Timeout) -> ok | closing
%% where
Expand Down
2 changes: 1 addition & 1 deletion src/amqp_connection_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ start_link(AMQPParams) ->
{ok, TypeSup} = supervisor2:start_child(
Sup, {connection_type_sup,
{amqp_connection_type_sup, start_link, []},
transient, infinity, supervisor,
transient, ?SUPERVISOR_WAIT, supervisor,
[amqp_connection_type_sup]}),
{ok, Connection} = supervisor2:start_child(
Sup, {connection, {amqp_gen_connection, start_link,
Expand Down
2 changes: 1 addition & 1 deletion src/amqp_connection_type_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ start_channels_manager(Sup, Conn, ConnName, Type) ->
Sup,
{channel_sup_sup, {amqp_channel_sup_sup, start_link,
[Type, Conn, ConnName]},
intrinsic, infinity, supervisor,
intrinsic, ?SUPERVISOR_WAIT, supervisor,
[amqp_channel_sup_sup]}),
{ok, _} = supervisor2:start_child(
Sup,
Expand Down
10 changes: 5 additions & 5 deletions src/amqp_gen_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ start_link(TypeSup, AMQPParams) ->
gen_server:start_link(?MODULE, {TypeSup, AMQPParams}, []).

connect(Pid) ->
gen_server:call(Pid, connect, infinity).
gen_server:call(Pid, connect, amqp_util:call_timeout()).

open_channel(Pid, ProposedNumber, Consumer) ->
case gen_server:call(Pid,
{command, {open_channel, ProposedNumber, Consumer}},
infinity) of
amqp_util:call_timeout()) of
{ok, ChannelPid} -> ok = amqp_channel:open(ChannelPid),
{ok, ChannelPid};
Error -> Error
Expand All @@ -79,19 +79,19 @@ channels_terminated(Pid) ->
gen_server:cast(Pid, channels_terminated).

close(Pid, Close, Timeout) ->
gen_server:call(Pid, {command, {close, Close, Timeout}}, infinity).
gen_server:call(Pid, {command, {close, Close, Timeout}}, amqp_util:call_timeout()).

server_close(Pid, Close) ->
gen_server:cast(Pid, {server_close, Close}).

info(Pid, Items) ->
gen_server:call(Pid, {info, Items}, infinity).
gen_server:call(Pid, {info, Items}, amqp_util:call_timeout()).

info_keys() ->
?INFO_KEYS.

info_keys(Pid) ->
gen_server:call(Pid, info_keys, infinity).
gen_server:call(Pid, info_keys, amqp_util:call_timeout()).

%%---------------------------------------------------------------------------
%% Behaviour
Expand Down
6 changes: 3 additions & 3 deletions src/amqp_gen_consumer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ start_link(ConsumerModule, ExtraParams, Identity) ->
%% @doc This function is used to perform arbitrary calls into the
%% consumer module.
call_consumer(Pid, Msg) ->
gen_server2:call(Pid, {consumer_call, Msg}, infinity).
gen_server2:call(Pid, {consumer_call, Msg}, amqp_util:call_timeout()).

%% @spec (Consumer, Method, Args) -> ok
%% where
Expand All @@ -69,10 +69,10 @@ call_consumer(Pid, Msg) ->
%% @doc This function is used by amqp_channel to forward received
%% methods and deliveries to the consumer module.
call_consumer(Pid, Method, Args) ->
gen_server2:call(Pid, {consumer_call, Method, Args}, infinity).
gen_server2:call(Pid, {consumer_call, Method, Args}, amqp_util:call_timeout()).

call_consumer(Pid, Method, Args, DeliveryCtx) ->
gen_server2:call(Pid, {consumer_call, Method, Args, DeliveryCtx}, infinity).
gen_server2:call(Pid, {consumer_call, Method, Args, DeliveryCtx}, amqp_util:call_timeout()).

%%---------------------------------------------------------------------------
%% Behaviour
Expand Down
4 changes: 2 additions & 2 deletions src/amqp_main_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ init([Sock, Connection, ConnName, ChMgr, AState]) ->
channels_manager = ChMgr,
astate = AState,
message = none},
case rabbit_net:async_recv(Sock, 0, infinity) of
case rabbit_net:async_recv(Sock, 0, amqp_util:call_timeout()) of
{ok, _} -> {ok, State};
{error, Reason} -> {stop, Reason, _} = handle_error(Reason, State),
{stop, Reason}
Expand All @@ -72,7 +72,7 @@ handle_cast(Cast, State) ->
handle_info({inet_async, Sock, _, {ok, Data}},
State = #state {sock = Sock}) ->
%% Latency hiding: Request next packet first, then process data
case rabbit_net:async_recv(Sock, 0, infinity) of
case rabbit_net:async_recv(Sock, 0, amqp_util:call_timeout()) of
{ok, _} -> handle_data(Data, State);
{error, Reason} -> handle_error(Reason, State)
end;
Expand Down
4 changes: 2 additions & 2 deletions src/amqp_rpc_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ start_link(Connection, Queue) ->
%% RpcClient = pid()
%% @doc Stops an exisiting RPC client.
stop(Pid) ->
gen_server:call(Pid, stop, infinity).
gen_server:call(Pid, stop, amqp_util:call_timeout()).

%% @spec (RpcClient, Payload) -> ok
%% where
Expand All @@ -79,7 +79,7 @@ stop(Pid) ->
%% @doc Invokes an RPC. Note the caller of this function is responsible for
%% encoding the request and decoding the response.
call(RpcClient, Payload) ->
gen_server:call(RpcClient, {call, Payload}, infinity).
gen_server:call(RpcClient, {call, Payload}, amqp_util:call_timeout()).

%%--------------------------------------------------------------------------
%% Plumbing
Expand Down
2 changes: 1 addition & 1 deletion src/amqp_rpc_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ start_link(Connection, Queue, Fun) ->
%% RpcServer = pid()
%% @doc Stops an exisiting RPC server.
stop(Pid) ->
gen_server:call(Pid, stop, infinity).
gen_server:call(Pid, stop, amqp_util:call_timeout()).

%%--------------------------------------------------------------------------
%% gen_server callbacks
Expand Down
2 changes: 1 addition & 1 deletion src/amqp_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ start_connection_sup(AmqpParams) ->
init([]) ->
{ok, {{simple_one_for_one, 0, 1},
[{connection_sup, {amqp_connection_sup, start_link, []},
temporary, infinity, supervisor, [amqp_connection_sup]}]}}.
temporary, ?SUPERVISOR_WAIT, supervisor, [amqp_connection_sup]}]}}.
17 changes: 17 additions & 0 deletions src/amqp_util.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-module(amqp_util).

-include("amqp_client_internal.hrl").

-export([call_timeout/0]).

call_timeout() ->
case get(gen_server_call_timeout) of
undefined ->
Timeout = rabbit_misc:get_env(amqp_client,
gen_server_call_timeout,
?CALL_TIMEOUT),
put(gen_server_call_timeout, Timeout),
Timeout;
Timeout ->
Timeout
end.

0 comments on commit 2e75861

Please sign in to comment.