diff --git a/src/rabbit_peer_discovery_etcd.erl b/src/rabbit_peer_discovery_etcd.erl index e7aed8f..5f38cad 100644 --- a/src/rabbit_peer_discovery_etcd.erl +++ b/src/rabbit_peer_discovery_etcd.erl @@ -20,52 +20,14 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbitmq_peer_discovery_common/include/rabbit_peer_discovery.hrl"). +-include("rabbit_peer_discovery_etcd.hrl"). -export([list_nodes/0, supports_registration/0, register/0, unregister/0, - post_registration/0]). - --export([start_node_key_updater/0, update_node_key/0]). - --define(CONFIG_MODULE, rabbit_peer_discovery_config). --define(UTIL_MODULE, rabbit_peer_discovery_util). --define(HTTPC_MODULE, rabbit_peer_discovery_httpc). - --define(BACKEND_CONFIG_KEY, peer_discovery_etcd). - - --define(CONFIG_MAPPING, - #{ - etcd_scheme => #peer_discovery_config_entry_meta{ - type = string, - env_variable = "ETCD_SCHEME", - default_value = "http" - }, - etcd_host => #peer_discovery_config_entry_meta{ - type = string, - env_variable = "ETCD_HOST", - default_value = "localhost" - }, - etcd_port => #peer_discovery_config_entry_meta{ - type = integer, - env_variable = "ETCD_PORT", - default_value = 2379 - }, - etcd_prefix => #peer_discovery_config_entry_meta{ - type = string, - env_variable = "ETCD_PREFIX", - default_value = "rabbitmq" - }, - etcd_node_ttl => #peer_discovery_config_entry_meta{ - type = integer, - env_variable = "ETCD_NODE_TTL", - default_value = 30 - }, - cluster_name => #peer_discovery_config_entry_meta{ - type = string, - env_variable = "CLUSTER_NAME", - default_value = "default" - } - }). + post_registration/0, lock/1, unlock/1]). + +-export([update_node_key/0]). + +-export([lock_ttl_update_callback/1]). %% @@ -131,9 +93,28 @@ unregister() -> -spec post_registration() -> ok | {error, Reason :: string()}. post_registration() -> - start_node_key_updater(), ok. +-spec lock(Node :: atom()) -> {ok, Data :: term()} | {error, Reason :: string()}. + +lock(Node) -> + M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), + Now = erlang:system_time(seconds), + EndTime = Now + get_config_key(lock_wait_time, M), + lock(atom_to_list(Node) ++ " - " ++ generate_unique_string(), Now, EndTime). + + +-spec unlock(Data :: term()) -> ok. + +unlock({UniqueId, TRef}) -> + stop_lock_ttl_updater(TRef), + case delete_etcd_lock_key(UniqueId) of + {ok, _} -> + ok; + {error, _} = Err -> + Err + end. + %% %% Implementation %% @@ -260,32 +241,121 @@ etcd_put(Path, Query, Body, Map) -> Path, Query, ?HTTPC_MODULE:build_query(Body))). -start_node_key_updater() -> - case rabbit_peer_discovery:backend() of - ?MODULE -> - M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), - case get_config_key(etcd_node_ttl, M) of - undefined -> ok; - %% in seconds - Interval -> - %% We cannot use timer:apply_interval/4 here because this - %% function is executed in a short live process and when it - %% exits, the timer module will automatically cancel the - %% timer. - %% - %% Instead we delegate to a locally registered gen_server, - %% `rabbitmq_peer_discovery_etcd_health_check_helper`. - %% - %% The value is 1/2 of what's configured to avoid a race - %% condition between check TTL expiration and in flight - %% notifications - rabbitmq_peer_discovery_etcd_health_check_helper:start_timer(Interval * 500), - ok - end; - _ -> ok - end. - -spec update_node_key() -> ok. update_node_key() -> M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), set_etcd_node_key(M). + +%% @doc +%% Tries to acquire the lock. Will retry until the lock is finally +%% granted or time is up. +%% @end +-spec lock(string(), pos_integer(), pos_integer()) -> ok | {error, string()}. +lock(_, Now, EndTime) when EndTime < Now -> + {error, "Acquiring the lock taking too long, bailing out"}; +lock(UniqueId, _, EndTime) -> + case try_insert_lock_key(UniqueId) of + true -> + TRef = start_lock_ttl_updater(UniqueId), + {ok, {UniqueId, TRef}}; + false -> + wait_for_lock_release(), + lock(UniqueId, erlang:system_time(seconds), EndTime); + {error, Reason} -> + {error, lists:flatten(io_lib:format("Error while acquiring the lock, reason: ~p", [Reason]))} + end. + +%% @doc Tries to acquire a lock in etcd. This can either succeed, fail +%% because somebody else is holding the lock, or completely file due +%% to some I/O error. +%% @end +-spec try_insert_lock_key(string()) -> boolean() | {error, term()}. +try_insert_lock_key(UniqueId) -> + M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), + Ttl = get_config_key(etcd_node_ttl, M), + case set_etcd_lock_key(UniqueId, Ttl) of + {ok, _} -> + true; + %% Precondition failed + {error, "412"} -> + false; + {error, _} = Err -> + Err + end. + +%% @doc Orders etcd to create startup lock key if it doesn't exist already. +%% @end +-spec set_etcd_lock_key(string(), non_neg_integer()) -> {ok, term()} | {error, string()}. +set_etcd_lock_key(UniqueId, Ttl) -> + M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), + etcd_put(startup_lock_path(), + [{ttl, Ttl}, {'prevExist', "false"}], + [{value, UniqueId}], + M). + +%% @doc Returns etcd path for startup lock +%% @end +-spec startup_lock_path() -> [rabbit_peer_discovery_httpc:path_component()]. +startup_lock_path() -> + base_path() ++ ["startup_lock"]. + +%% @doc Return a list of path segments that are the base path for all +%% etcd keys related to current cluster. +%% @end +-spec base_path() -> [rabbit_peer_discovery_httpc:path_component()]. +base_path() -> + M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), + [v2, keys, get_config_key(etcd_prefix, M), get_config_key(cluster_name, M)]. + +%% @doc Generate random string. We are using it for compare-and-change +%% operations in etcd. +%% @end +-spec generate_unique_string() -> string(). +generate_unique_string() -> + [ $a - 1 + rand:uniform(26) || _ <- lists:seq(1, 32) ]. + +-spec start_lock_ttl_updater(string()) -> ok. +start_lock_ttl_updater(UniqueId) -> + M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), + Interval = get_config_key(etcd_node_ttl, M), + rabbit_log:debug("Starting startup lock refresher"), + {ok, TRef} = timer:apply_interval(Interval * 500, ?MODULE, + lock_ttl_update_callback, [UniqueId]), + TRef. + +-spec stop_lock_ttl_updater(string()) -> ok. +stop_lock_ttl_updater(TRef) -> + timer:cancel(TRef), + rabbit_log:debug("Stopped startup lock refresher"), + ok. + +-spec wait_for_lock_release() -> ok. +wait_for_lock_release() -> + %% XXX Try to use etcd wait feature, but we somehow need to know + %% the index from the last lock attempt operation. + timer:sleep(1000). + +%% @doc Delete startup lock in etcd, but only if we are the holder of that lock. +%% @end +-spec delete_etcd_lock_key(string()) -> {ok, term()} | {error, string()}. +delete_etcd_lock_key(UniqueId) -> + M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), + etcd_delete(startup_lock_path(), + [{'prevExist', "true"}, {'prevValue', UniqueId}], + M). + +-spec lock_ttl_update_callback(string()) -> string(). +lock_ttl_update_callback(UniqueId) -> + M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), + _ = refresh_etcd_lock_ttl(UniqueId, get_config_key(etcd_node_ttl, M)), + UniqueId. + +%% @doc Refresh startup lock TTL in etcd, but only if we are the holder of that lock. +%% @end +-spec refresh_etcd_lock_ttl(string(), non_neg_integer()) -> {ok, term()} | {error, string()}. +refresh_etcd_lock_ttl(UniqueId, Ttl) -> + M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), + etcd_put(startup_lock_path(), + [], + [{ttl, Ttl}, {'prevExist', true}, {'prevValue', UniqueId}, {refresh, true}], + M). diff --git a/src/rabbit_peer_discovery_etcd.hrl b/src/rabbit_peer_discovery_etcd.hrl new file mode 100644 index 0000000..12880c4 --- /dev/null +++ b/src/rabbit_peer_discovery_etcd.hrl @@ -0,0 +1,46 @@ +-define(CONFIG_MODULE, rabbit_peer_discovery_config). +-define(UTIL_MODULE, rabbit_peer_discovery_util). +-define(HTTPC_MODULE, rabbit_peer_discovery_httpc). + +-define(BACKEND_CONFIG_KEY, peer_discovery_etcd). + + +-define(CONFIG_MAPPING, + #{ + etcd_scheme => #peer_discovery_config_entry_meta{ + type = string, + env_variable = "ETCD_SCHEME", + default_value = "http" + }, + etcd_host => #peer_discovery_config_entry_meta{ + type = string, + env_variable = "ETCD_HOST", + default_value = "localhost" + }, + etcd_port => #peer_discovery_config_entry_meta{ + type = integer, + env_variable = "ETCD_PORT", + default_value = 2379 + }, + etcd_prefix => #peer_discovery_config_entry_meta{ + type = string, + env_variable = "ETCD_PREFIX", + default_value = "rabbitmq" + }, + etcd_node_ttl => #peer_discovery_config_entry_meta{ + type = integer, + env_variable = "ETCD_NODE_TTL", + default_value = 30 + }, + cluster_name => #peer_discovery_config_entry_meta{ + type = string, + env_variable = "CLUSTER_NAME", + default_value = "default" + }, + lock_wait_time => #peer_discovery_config_entry_meta{ + type = integer, + env_variable = "LOCK_WAIT_TIME", + default_value = 300 + } + }). + diff --git a/src/rabbitmq_peer_discovery_etcd_health_check_helper.erl b/src/rabbitmq_peer_discovery_etcd_health_check_helper.erl index 44f968e..723742c 100644 --- a/src/rabbitmq_peer_discovery_etcd_health_check_helper.erl +++ b/src/rabbitmq_peer_discovery_etcd_health_check_helper.erl @@ -24,7 +24,10 @@ -behaviour(gen_server). --export([start_link/0, start_timer/1]). +-include_lib("rabbitmq_peer_discovery_common/include/rabbit_peer_discovery.hrl"). +-include("rabbit_peer_discovery_etcd.hrl"). + +-export([start_link/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -38,22 +41,32 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -start_timer(Interval) -> - gen_server:call(?MODULE, {start_timer, Interval}, infinity). - - - init([]) -> - {ok, #state{timer_ref = undefined}}. - -handle_call({start_timer, Interval}, _From, #state{timer_ref = undefined} = State) -> - rabbit_log:info("Starting etcd health check notifier (effective interval: ~p milliseconds)", [Interval]), - {ok, TRef} = timer:apply_interval(Interval, rabbit_peer_discovery_etcd, - update_node_key, []), - {reply, ok, State#state{timer_ref = TRef}}; - -handle_call({start_timer, _Interval}, _From, State) -> - {reply, ok, State}; + Map = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), + case ?CONFIG_MODULE:get(etcd_node_ttl, ?CONFIG_MAPPING, Map) of + undefined -> + {ok, #state{}}; + %% in seconds + Interval -> + %% We cannot use timer:apply_interval/4 here because this + %% function is executed in a short live process and when it + %% exits, the timer module will automatically cancel the + %% timer. + %% + %% Instead we delegate to a locally registered gen_server, + %% `rabbitmq_peer_discovery_etcd_health_check_helper`. + %% + %% The value is 1/2 of what's configured to avoid a race + %% condition between check TTL expiration and in flight + %% notifications + rabbit_log:info("Starting etcd health check notifier " + "(effective interval: ~p milliseconds)", + [Interval]), + {ok, TRef} = timer:apply_interval(Interval * 500, + rabbit_peer_discovery_etcd, + update_node_key, []), + {ok, #state{timer_ref = TRef}} + end. handle_call(_Msg, _From, State) -> {reply, not_understood, State}.