Skip to content
This repository has been archived by the owner on Nov 16, 2020. It is now read-only.

Lock support #1

Merged
merged 2 commits into from
Jun 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 140 additions & 70 deletions src/rabbit_peer_discovery_etcd.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,52 +20,14 @@

-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbitmq_peer_discovery_common/include/rabbit_peer_discovery.hrl").
-include("rabbit_peer_discovery_etcd.hrl").

-export([list_nodes/0, supports_registration/0, register/0, unregister/0,
post_registration/0]).

-export([start_node_key_updater/0, update_node_key/0]).

-define(CONFIG_MODULE, rabbit_peer_discovery_config).
-define(UTIL_MODULE, rabbit_peer_discovery_util).
-define(HTTPC_MODULE, rabbit_peer_discovery_httpc).

-define(BACKEND_CONFIG_KEY, peer_discovery_etcd).


-define(CONFIG_MAPPING,
#{
etcd_scheme => #peer_discovery_config_entry_meta{
type = string,
env_variable = "ETCD_SCHEME",
default_value = "http"
},
etcd_host => #peer_discovery_config_entry_meta{
type = string,
env_variable = "ETCD_HOST",
default_value = "localhost"
},
etcd_port => #peer_discovery_config_entry_meta{
type = integer,
env_variable = "ETCD_PORT",
default_value = 2379
},
etcd_prefix => #peer_discovery_config_entry_meta{
type = string,
env_variable = "ETCD_PREFIX",
default_value = "rabbitmq"
},
etcd_node_ttl => #peer_discovery_config_entry_meta{
type = integer,
env_variable = "ETCD_NODE_TTL",
default_value = 30
},
cluster_name => #peer_discovery_config_entry_meta{
type = string,
env_variable = "CLUSTER_NAME",
default_value = "default"
}
}).
post_registration/0, lock/1, unlock/1]).

-export([update_node_key/0]).

-export([lock_ttl_update_callback/1]).


%%
Expand Down Expand Up @@ -131,9 +93,28 @@ unregister() ->
-spec post_registration() -> ok | {error, Reason :: string()}.

post_registration() ->
start_node_key_updater(),
ok.

-spec lock(Node :: atom()) -> {ok, Data :: term()} | {error, Reason :: string()}.

lock(Node) ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
Now = erlang:system_time(seconds),
EndTime = Now + get_config_key(lock_wait_time, M),
lock(atom_to_list(Node) ++ " - " ++ generate_unique_string(), Now, EndTime).


-spec unlock(Data :: term()) -> ok.

unlock({UniqueId, TRef}) ->
stop_lock_ttl_updater(TRef),
case delete_etcd_lock_key(UniqueId) of
{ok, _} ->
ok;
{error, _} = Err ->
Err
end.

%%
%% Implementation
%%
Expand Down Expand Up @@ -260,32 +241,121 @@ etcd_put(Path, Query, Body, Map) ->
Path, Query, ?HTTPC_MODULE:build_query(Body))).


start_node_key_updater() ->
case rabbit_peer_discovery:backend() of
?MODULE ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
case get_config_key(etcd_node_ttl, M) of
undefined -> ok;
%% in seconds
Interval ->
%% We cannot use timer:apply_interval/4 here because this
%% function is executed in a short live process and when it
%% exits, the timer module will automatically cancel the
%% timer.
%%
%% Instead we delegate to a locally registered gen_server,
%% `rabbitmq_peer_discovery_etcd_health_check_helper`.
%%
%% The value is 1/2 of what's configured to avoid a race
%% condition between check TTL expiration and in flight
%% notifications
rabbitmq_peer_discovery_etcd_health_check_helper:start_timer(Interval * 500),
ok
end;
_ -> ok
end.

-spec update_node_key() -> ok.
update_node_key() ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
set_etcd_node_key(M).

%% @doc
%% Tries to acquire the lock. Will retry until the lock is finally
%% granted or time is up.
%% @end
-spec lock(string(), pos_integer(), pos_integer()) -> ok | {error, string()}.
lock(_, Now, EndTime) when EndTime < Now ->
{error, "Acquiring the lock taking too long, bailing out"};
lock(UniqueId, _, EndTime) ->
case try_insert_lock_key(UniqueId) of
true ->
TRef = start_lock_ttl_updater(UniqueId),
{ok, {UniqueId, TRef}};
false ->
wait_for_lock_release(),
lock(UniqueId, erlang:system_time(seconds), EndTime);
{error, Reason} ->
{error, lists:flatten(io_lib:format("Error while acquiring the lock, reason: ~p", [Reason]))}
end.

%% @doc Tries to acquire a lock in etcd. This can either succeed, fail
%% because somebody else is holding the lock, or completely file due
%% to some I/O error.
%% @end
-spec try_insert_lock_key(string()) -> boolean() | {error, term()}.
try_insert_lock_key(UniqueId) ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
Ttl = get_config_key(etcd_node_ttl, M),
case set_etcd_lock_key(UniqueId, Ttl) of
{ok, _} ->
true;
%% Precondition failed
{error, "412"} ->
false;
{error, _} = Err ->
Err
end.

%% @doc Orders etcd to create startup lock key if it doesn't exist already.
%% @end
-spec set_etcd_lock_key(string(), non_neg_integer()) -> {ok, term()} | {error, string()}.
set_etcd_lock_key(UniqueId, Ttl) ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
etcd_put(startup_lock_path(),
[{ttl, Ttl}, {'prevExist', "false"}],
[{value, UniqueId}],
M).

%% @doc Returns etcd path for startup lock
%% @end
-spec startup_lock_path() -> [rabbit_peer_discovery_httpc:path_component()].
startup_lock_path() ->
base_path() ++ ["startup_lock"].

%% @doc Return a list of path segments that are the base path for all
%% etcd keys related to current cluster.
%% @end
-spec base_path() -> [rabbit_peer_discovery_httpc:path_component()].
base_path() ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
[v2, keys, get_config_key(etcd_prefix, M), get_config_key(cluster_name, M)].

%% @doc Generate random string. We are using it for compare-and-change
%% operations in etcd.
%% @end
-spec generate_unique_string() -> string().
generate_unique_string() ->
[ $a - 1 + rand:uniform(26) || _ <- lists:seq(1, 32) ].

-spec start_lock_ttl_updater(string()) -> ok.
start_lock_ttl_updater(UniqueId) ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
Interval = get_config_key(etcd_node_ttl, M),
rabbit_log:debug("Starting startup lock refresher"),
{ok, TRef} = timer:apply_interval(Interval * 500, ?MODULE,
lock_ttl_update_callback, [UniqueId]),
TRef.

-spec stop_lock_ttl_updater(string()) -> ok.
stop_lock_ttl_updater(TRef) ->
timer:cancel(TRef),
rabbit_log:debug("Stopped startup lock refresher"),
ok.

-spec wait_for_lock_release() -> ok.
wait_for_lock_release() ->
%% XXX Try to use etcd wait feature, but we somehow need to know
%% the index from the last lock attempt operation.
timer:sleep(1000).

%% @doc Delete startup lock in etcd, but only if we are the holder of that lock.
%% @end
-spec delete_etcd_lock_key(string()) -> {ok, term()} | {error, string()}.
delete_etcd_lock_key(UniqueId) ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
etcd_delete(startup_lock_path(),
[{'prevExist', "true"}, {'prevValue', UniqueId}],
M).

-spec lock_ttl_update_callback(string()) -> string().
lock_ttl_update_callback(UniqueId) ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
_ = refresh_etcd_lock_ttl(UniqueId, get_config_key(etcd_node_ttl, M)),
UniqueId.

%% @doc Refresh startup lock TTL in etcd, but only if we are the holder of that lock.
%% @end
-spec refresh_etcd_lock_ttl(string(), non_neg_integer()) -> {ok, term()} | {error, string()}.
refresh_etcd_lock_ttl(UniqueId, Ttl) ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
etcd_put(startup_lock_path(),
[],
[{ttl, Ttl}, {'prevExist', true}, {'prevValue', UniqueId}, {refresh, true}],
M).
46 changes: 46 additions & 0 deletions src/rabbit_peer_discovery_etcd.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
-define(CONFIG_MODULE, rabbit_peer_discovery_config).
-define(UTIL_MODULE, rabbit_peer_discovery_util).
-define(HTTPC_MODULE, rabbit_peer_discovery_httpc).

-define(BACKEND_CONFIG_KEY, peer_discovery_etcd).


-define(CONFIG_MAPPING,
#{
etcd_scheme => #peer_discovery_config_entry_meta{
type = string,
env_variable = "ETCD_SCHEME",
default_value = "http"
},
etcd_host => #peer_discovery_config_entry_meta{
type = string,
env_variable = "ETCD_HOST",
default_value = "localhost"
},
etcd_port => #peer_discovery_config_entry_meta{
type = integer,
env_variable = "ETCD_PORT",
default_value = 2379
},
etcd_prefix => #peer_discovery_config_entry_meta{
type = string,
env_variable = "ETCD_PREFIX",
default_value = "rabbitmq"
},
etcd_node_ttl => #peer_discovery_config_entry_meta{
type = integer,
env_variable = "ETCD_NODE_TTL",
default_value = 30
},
cluster_name => #peer_discovery_config_entry_meta{
type = string,
env_variable = "CLUSTER_NAME",
default_value = "default"
},
lock_wait_time => #peer_discovery_config_entry_meta{
type = integer,
env_variable = "LOCK_WAIT_TIME",
default_value = 300
}
}).

45 changes: 29 additions & 16 deletions src/rabbitmq_peer_discovery_etcd_health_check_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@

-behaviour(gen_server).

-export([start_link/0, start_timer/1]).
-include_lib("rabbitmq_peer_discovery_common/include/rabbit_peer_discovery.hrl").
-include("rabbit_peer_discovery_etcd.hrl").

-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).

Expand All @@ -38,22 +41,32 @@
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

start_timer(Interval) ->
gen_server:call(?MODULE, {start_timer, Interval}, infinity).



init([]) ->
{ok, #state{timer_ref = undefined}}.

handle_call({start_timer, Interval}, _From, #state{timer_ref = undefined} = State) ->
rabbit_log:info("Starting etcd health check notifier (effective interval: ~p milliseconds)", [Interval]),
{ok, TRef} = timer:apply_interval(Interval, rabbit_peer_discovery_etcd,
update_node_key, []),
{reply, ok, State#state{timer_ref = TRef}};

handle_call({start_timer, _Interval}, _From, State) ->
{reply, ok, State};
Map = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
case ?CONFIG_MODULE:get(etcd_node_ttl, ?CONFIG_MAPPING, Map) of
undefined ->
{ok, #state{}};
%% in seconds
Interval ->
%% We cannot use timer:apply_interval/4 here because this
%% function is executed in a short live process and when it
%% exits, the timer module will automatically cancel the
%% timer.
%%
%% Instead we delegate to a locally registered gen_server,
%% `rabbitmq_peer_discovery_etcd_health_check_helper`.
%%
%% The value is 1/2 of what's configured to avoid a race
%% condition between check TTL expiration and in flight
%% notifications
rabbit_log:info("Starting etcd health check notifier "
"(effective interval: ~p milliseconds)",
[Interval]),
{ok, TRef} = timer:apply_interval(Interval * 500,
rabbit_peer_discovery_etcd,
update_node_key, []),
{ok, #state{timer_ref = TRef}}
end.

handle_call(_Msg, _From, State) ->
{reply, not_understood, State}.
Expand Down