diff --git a/Makefile b/Makefile index b5c86a60..22369ff0 100644 --- a/Makefile +++ b/Makefile @@ -4,8 +4,6 @@ BUILD_DEPS = rabbitmq_codegen DEPS = lager TEST_DEPS = mochiweb -dep_lager = git https://github.com/rabbitmq/lager.git master - .DEFAULT_GOAL = all EXTRA_SOURCES += include/rabbit_framing.hrl \ diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl index c50a429a..fdd850a1 100644 --- a/src/rabbit_auth_mechanism.erl +++ b/src/rabbit_auth_mechanism.erl @@ -16,6 +16,10 @@ -module(rabbit_auth_mechanism). +-behaviour(rabbit_registry_class). + +-export([added_to_rabbit_registry/2, removed_from_rabbit_registry/1]). + -ifdef(use_specs). %% A description. @@ -54,3 +58,6 @@ behaviour_info(_Other) -> undefined. -endif. + +added_to_rabbit_registry(_Type, _ModuleName) -> ok. +removed_from_rabbit_registry(_Type) -> ok. \ No newline at end of file diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index adde84dc..7465195d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -58,6 +58,7 @@ -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1, info_all/3]). -export([refresh_config_local/0, ready_for_close/1]). +-export([refresh_interceptors/0]). -export([force_event_refresh/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, @@ -149,6 +150,8 @@ -define(MAX_PERMISSION_CACHE_SIZE, 12). +-define(REFRESH_TIMEOUT, 15000). + -define(STATISTICS_KEYS, [pid, transactional, @@ -337,6 +340,12 @@ refresh_config_local() -> list_local()), ok. +refresh_interceptors() -> + rabbit_misc:upmap( + fun (C) -> gen_server2:call(C, refresh_interceptors, ?REFRESH_TIMEOUT) end, + list_local()), + ok. + ready_for_close(Pid) -> gen_server2:cast(Pid, ready_for_close). @@ -430,6 +439,10 @@ handle_call({info, Items}, _From, State) -> handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) -> reply(ok, State#ch{trace_state = rabbit_trace:init(VHost)}); +handle_call(refresh_interceptors, _From, State) -> + IState = rabbit_channel_interceptor:init(State), + reply(ok, State#ch{interceptor_state = IState}); + handle_call({declare_fast_reply_to, Key}, _From, State = #ch{reply_consumer = Consumer}) -> reply(case Consumer of @@ -1977,6 +1990,8 @@ i(state, #ch{state = State}) -> State; i(prefetch_count, #ch{consumer_prefetch = C}) -> C; i(global_prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_prefetch_limit(Limiter); +i(interceptors, #ch{interceptor_state = IState}) -> + IState; i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl index 9793459c..448e571f 100644 --- a/src/rabbit_channel_interceptor.erl +++ b/src/rabbit_channel_interceptor.erl @@ -21,6 +21,10 @@ -export([init/1, intercept_in/3]). +-behaviour(rabbit_registry_class). + +-export([added_to_rabbit_registry/2, removed_from_rabbit_registry/1]). + -ifdef(use_specs). -type(method_name() :: rabbit_framing:amqp_method_name()). @@ -51,6 +55,11 @@ behaviour_info(_Other) -> -endif. +added_to_rabbit_registry(_Type, _ModuleName) -> + rabbit_channel:refresh_interceptors(). +removed_from_rabbit_registry(_Type) -> + rabbit_channel:refresh_interceptors(). + init(Ch) -> Mods = [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor)], check_no_overlap(Mods), diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl index bf57b2aa..97271b34 100644 --- a/src/rabbit_exchange_decorator.erl +++ b/src/rabbit_exchange_decorator.erl @@ -18,7 +18,11 @@ -include("rabbit.hrl"). --export([select/2, set/1, register/2, unregister/1]). +-export([select/2, set/1]). + +-behaviour(rabbit_registry_class). + +-export([added_to_rabbit_registry/2, removed_from_rabbit_registry/1]). %% This is like an exchange type except that: %% @@ -84,6 +88,13 @@ behaviour_info(_Other) -> %%---------------------------------------------------------------------------- +added_to_rabbit_registry(_Type, _ModuleName) -> + [maybe_recover(X) || X <- rabbit_exchange:list()], + ok. +removed_from_rabbit_registry(_Type) -> + [maybe_recover(X) || X <- rabbit_exchange:list()], + ok. + %% select a subset of active decorators select(all, {Route, NoRoute}) -> filter(Route ++ NoRoute); select(route, {Route, _NoRoute}) -> filter(Route); @@ -105,16 +116,6 @@ list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)]. cons_if_eq(Select, Select, Item, List) -> [Item | List]; cons_if_eq(_Select, _Other, _Item, List) -> List. -register(TypeName, ModuleName) -> - rabbit_registry:register(exchange_decorator, TypeName, ModuleName), - [maybe_recover(X) || X <- rabbit_exchange:list()], - ok. - -unregister(TypeName) -> - rabbit_registry:unregister(exchange_decorator, TypeName), - [maybe_recover(X) || X <- rabbit_exchange:list()], - ok. - maybe_recover(X = #exchange{name = Name, decorators = Decs}) -> #exchange{decorators = Decs1} = set(X), diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 0b7fda61..379938bf 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -16,6 +16,10 @@ -module(rabbit_exchange_type). +-behaviour(rabbit_registry_class). + +-export([added_to_rabbit_registry/2, removed_from_rabbit_registry/1]). + -ifdef(use_specs). -type(tx() :: 'transaction' | 'none'). @@ -79,3 +83,8 @@ behaviour_info(_Other) -> undefined. -endif. + +added_to_rabbit_registry(_Type, _ModuleName) -> ok. +removed_from_rabbit_registry(_Type) -> ok. + + diff --git a/src/rabbit_policy_validator.erl b/src/rabbit_policy_validator.erl index bd890608..8604390a 100644 --- a/src/rabbit_policy_validator.erl +++ b/src/rabbit_policy_validator.erl @@ -16,6 +16,10 @@ -module(rabbit_policy_validator). +-behaviour(rabbit_registry_class). + +-export([added_to_rabbit_registry/2, removed_from_rabbit_registry/1]). + -ifdef(use_specs). -export_type([validate_results/0]). @@ -37,3 +41,6 @@ behaviour_info(_Other) -> undefined. -endif. + +added_to_rabbit_registry(_Type, _ModuleName) -> ok. +removed_from_rabbit_registry(_Type) -> ok. diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl index aab2812a..f73043b7 100644 --- a/src/rabbit_queue_decorator.erl +++ b/src/rabbit_queue_decorator.erl @@ -20,6 +20,10 @@ -export([select/1, set/1, register/2, unregister/1]). +-behaviour(rabbit_registry_class). + +-export([added_to_rabbit_registry/2, removed_from_rabbit_registry/1]). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -51,6 +55,9 @@ behaviour_info(_Other) -> %%---------------------------------------------------------------------------- +added_to_rabbit_registry(_Type, _ModuleName) -> ok. +removed_from_rabbit_registry(_Type) -> ok. + select(Modules) -> [M || M <- Modules, code:which(M) =/= non_existing]. diff --git a/src/rabbit_queue_master_locator.erl b/src/rabbit_queue_master_locator.erl index a73a307c..b819f949 100644 --- a/src/rabbit_queue_master_locator.erl +++ b/src/rabbit_queue_master_locator.erl @@ -16,6 +16,10 @@ -module(rabbit_queue_master_locator). +-behaviour(rabbit_registry_class). + +-export([added_to_rabbit_registry/2, removed_from_rabbit_registry/1]). + -ifdef(use_specs). -callback description() -> [proplists:property()]. @@ -31,3 +35,6 @@ behaviour_info(_Other) -> undefined. -endif. + +added_to_rabbit_registry(_Type, _ModuleName) -> ok. +removed_from_rabbit_registry(_Type) -> ok. \ No newline at end of file diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl new file mode 100644 index 00000000..252a57d2 --- /dev/null +++ b/src/rabbit_registry.erl @@ -0,0 +1,177 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_registry). + +-behaviour(gen_server). + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([register/3, unregister/2, + binary_to_type/1, lookup_module/2, lookup_all/1]). + +-define(SERVER, ?MODULE). +-define(ETS_NAME, ?MODULE). + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(register/3 :: (atom(), binary(), atom()) -> 'ok'). +-spec(unregister/2 :: (atom(), binary()) -> 'ok'). +-spec(binary_to_type/1 :: + (binary()) -> atom() | rabbit_types:error('not_found')). +-spec(lookup_module/2 :: + (atom(), atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')). +-spec(lookup_all/1 :: (atom()) -> [{atom(), atom()}]). + +-endif. + +%%--------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%--------------------------------------------------------------------------- + +register(Class, TypeName, ModuleName) -> + gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}, infinity). + +unregister(Class, TypeName) -> + gen_server:call(?SERVER, {unregister, Class, TypeName}, infinity). + +%% This is used with user-supplied arguments (e.g., on exchange +%% declare), so we restrict it to existing atoms only. This means it +%% can throw a badarg, indicating that the type cannot have been +%% registered. +binary_to_type(TypeBin) when is_binary(TypeBin) -> + case catch list_to_existing_atom(binary_to_list(TypeBin)) of + {'EXIT', {badarg, _}} -> {error, not_found}; + TypeAtom -> TypeAtom + end. + +lookup_module(Class, T) when is_atom(T) -> + case ets:lookup(?ETS_NAME, {Class, T}) of + [{_, Module}] -> + {ok, Module}; + [] -> + {error, not_found} + end. + +lookup_all(Class) -> + [{K, V} || [K, V] <- ets:match(?ETS_NAME, {{Class, '$1'}, '$2'})]. + +%%--------------------------------------------------------------------------- + +internal_binary_to_type(TypeBin) when is_binary(TypeBin) -> + list_to_atom(binary_to_list(TypeBin)). + +internal_register(Class, TypeName, ModuleName) + when is_atom(Class), is_binary(TypeName), is_atom(ModuleName) -> + ClassModule = class_module(Class), + Type = internal_binary_to_type(TypeName), + RegArg = {{Class, Type}, ModuleName}, + ok = sanity_check_module(ClassModule, ModuleName), + true = ets:insert(?ETS_NAME, RegArg), + conditional_register(RegArg), + ok = ClassModule:added_to_rabbit_registry(Type, ModuleName), + ok. + +internal_unregister(Class, TypeName) -> + ClassModule = class_module(Class), + Type = internal_binary_to_type(TypeName), + UnregArg = {Class, Type}, + conditional_unregister(UnregArg), + true = ets:delete(?ETS_NAME, UnregArg), + ok = ClassModule:removed_from_rabbit_registry(Type), + ok. + +%% register exchange decorator route callback only when implemented, +%% in order to avoid unnecessary decorator calls on the fast +%% publishing path +conditional_register({{exchange_decorator, Type}, ModuleName}) -> + case erlang:function_exported(ModuleName, route, 2) of + true -> true = ets:insert(?ETS_NAME, + {{exchange_decorator_route, Type}, + ModuleName}); + false -> ok + end; +conditional_register(_) -> + ok. + +conditional_unregister({exchange_decorator, Type}) -> + true = ets:delete(?ETS_NAME, {exchange_decorator_route, Type}), + ok; +conditional_unregister(_) -> + ok. + +sanity_check_module(ClassModule, Module) -> + case catch lists:member(ClassModule, + lists:flatten( + [Bs || {Attr, Bs} <- + Module:module_info(attributes), + Attr =:= behavior orelse + Attr =:= behaviour])) of + {'EXIT', {undef, _}} -> {error, not_module}; + false -> {error, {not_type, ClassModule}}; + true -> ok + end. + + +% Registry class modules. There should exist module for each registry class. +% Class module should be behaviour (export behaviour_info/1) and implement +% rabbit_registry_class behaviour itself: export added_to_rabbit_registry/2 +% and removed_from_rabbit_registry/1 functions. +class_module(exchange) -> rabbit_exchange_type; +class_module(auth_mechanism) -> rabbit_auth_mechanism; +class_module(runtime_parameter) -> rabbit_runtime_parameter; +class_module(exchange_decorator) -> rabbit_exchange_decorator; +class_module(queue_decorator) -> rabbit_queue_decorator; +class_module(policy_validator) -> rabbit_policy_validator; +class_module(ha_mode) -> rabbit_mirror_queue_mode; +class_module(channel_interceptor) -> rabbit_channel_interceptor; +class_module(queue_master_locator)-> rabbit_queue_master_locator. + +%%--------------------------------------------------------------------------- + +init([]) -> + ?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]), + {ok, none}. + +handle_call({register, Class, TypeName, ModuleName}, _From, State) -> + ok = internal_register(Class, TypeName, ModuleName), + {reply, ok, State}; + +handle_call({unregister, Class, TypeName}, _From, State) -> + ok = internal_unregister(Class, TypeName), + {reply, ok, State}; + +handle_call(Request, _From, State) -> + {stop, {unhandled_call, Request}, State}. + +handle_cast(Request, State) -> + {stop, {unhandled_cast, Request}, State}. + +handle_info(Message, State) -> + {stop, {unhandled_info, Message}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_registry_class.erl b/src/rabbit_registry_class.erl new file mode 100644 index 00000000..366850c0 --- /dev/null +++ b/src/rabbit_registry_class.erl @@ -0,0 +1,18 @@ +-module(rabbit_registry_class). + +-ifdef(use_specs). + +-callback added_to_rabbit_registry(atom(), atom()) -> ok. + +-callback removed_from_rabbit_registry(atom()) -> ok. + +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [{added_to_rabbit_registry, 2}, {removed_from_rabbit_registry, 1}]; +behaviour_info(_Other) -> + undefined. + +-endif. diff --git a/src/rabbit_runtime_parameter.erl b/src/rabbit_runtime_parameter.erl index 4e360687..fe285747 100644 --- a/src/rabbit_runtime_parameter.erl +++ b/src/rabbit_runtime_parameter.erl @@ -16,6 +16,10 @@ -module(rabbit_runtime_parameter). +-behaviour(rabbit_registry_class). + +-export([added_to_rabbit_registry/2, removed_from_rabbit_registry/1]). + -ifdef(use_specs). -type(validate_results() :: @@ -40,3 +44,6 @@ behaviour_info(_Other) -> undefined. -endif. + +added_to_rabbit_registry(_Type, _ModuleName) -> ok. +removed_from_rabbit_registry(_Type) -> ok.