diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 4d610020521a..c82ba0baa08c 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -44,6 +44,7 @@ dep_rabbitmq_event_exchange = git_rmq rabbitmq-event-exchange $(curren dep_rabbitmq_federation = git_rmq rabbitmq-federation $(current_rmq_ref) $(base_rmq_ref) master dep_rabbitmq_federation_management = git_rmq rabbitmq-federation-management $(current_rmq_ref) $(base_rmq_ref) master dep_rabbitmq_java_client = git_rmq rabbitmq-java-client $(current_rmq_ref) $(base_rmq_ref) master +dep_rabbitmq_jms_client = git_rmq rabbitmq-jms-client $(current_rmq_ref) $(base_rmq_ref) master dep_rabbitmq_jms_topic_exchange = git_rmq rabbitmq-jms-topic-exchange $(current_rmq_ref) $(base_rmq_ref) master dep_rabbitmq_lvc = git_rmq rabbitmq-lvc-plugin $(current_rmq_ref) $(base_rmq_ref) master dep_rabbitmq_management = git_rmq rabbitmq-management $(current_rmq_ref) $(base_rmq_ref) master diff --git a/scripts/rabbitmq-env.bat b/scripts/rabbitmq-env.bat index 71752c667f9a..4c5691bedb1b 100644 --- a/scripts/rabbitmq-env.bat +++ b/scripts/rabbitmq-env.bat @@ -397,8 +397,8 @@ goto :filter_paths_done set paths=%1 set paths=%paths:"=% for /f "tokens=1* delims=;" %%a in ("%paths%") do ( - if not "%%a" == "" call :filter_path %%a - if not "%%b" == "" call :filter_paths %%b + if not "%%a" == "" call :filter_path "%%a" + if not "%%b" == "" call :filter_paths "%%b" ) set paths= exit /b diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 10fcfe749328..710ec6ab2aa6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -937,8 +937,7 @@ i(recoverable_slaves, #q{q = #amqqueue{name = Name, i(state, #q{status = running}) -> credit_flow:state(); i(state, #q{status = State}) -> State; i(garbage_collection, _State) -> - {garbage_collection, GC} = erlang:process_info(self(), garbage_collection), - GC; + rabbit_misc:get_gc_info(self()); i(reductions, _State) -> {reductions, Reductions} = erlang:process_info(self(), reductions), Reductions; diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index 1eeff96c0e57..f60cf6c0c2ff 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -74,13 +74,11 @@ log(Category, Level, Fmt, Args) when is_list(Args) -> end, lager:log(Sink, Level, self(), Fmt, Args). -make_internal_sink_name(Category) when Category == channel; - Category == connection; - Category == mirroring; - Category == queue; - Category == federation -> - lager_util:make_internal_sink_name(list_to_atom("rabbit_" ++ - atom_to_list(Category))); +make_internal_sink_name(rabbit_log_connection) -> rabbit_log_connection_lager_event; +make_internal_sink_name(rabbit_log_channel) -> rabbit_log_channel_lager_event; +make_internal_sink_name(rabbit_log_mirroring) -> rabbit_log_mirroring_lager_event; +make_internal_sink_name(rabbit_log_queue) -> rabbit_log_queue_lager_event; +make_internal_sink_name(rabbit_log_federation) -> rabbit_log_federation_lager_event; make_internal_sink_name(Category) -> lager_util:make_internal_sink_name(Category). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 56e72daa9f17..43d268c37472 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -200,8 +200,17 @@ join_cluster(DiscoveryNode, NodeType) -> {error, Reason} end; true -> - rabbit_log:info("Already member of cluster: ~p~n", [ClusterNodes]), - {ok, already_member} + %% DiscoveryNode thinks that we are part of a cluster, but + %% do we think so ourselves? + case are_we_clustered_with(DiscoveryNode) of + true -> + rabbit_log:info("Asked to join a cluster but already a member of it: ~p~n", [ClusterNodes]), + {ok, already_member}; + false -> + Msg = format_inconsistent_cluster_message(DiscoveryNode, node()), + rabbit_log:error(Msg), + {error, {inconsistent_cluster, Msg}} + end end. %% return node to its virgin state, where it is not member of any @@ -790,9 +799,7 @@ check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) -> {ok, RemoteStatus}; false -> {error, {inconsistent_cluster, - rabbit_misc:format("Node ~p thinks it's clustered " - "with node ~p, but ~p disagrees", - [node(), Node, Node])}} + format_inconsistent_cluster_message(node(), Node)}} end. check_mnesia_or_otp_consistency(_Node, unsupported, OTP) -> @@ -898,6 +905,9 @@ is_only_clustered_disc_node() -> node_type() =:= disc andalso is_clustered() andalso cluster_nodes(disc) =:= [node()]. +are_we_clustered_with(Node) -> + lists:member(Node, mnesia_lib:all_nodes()). + me_in_nodes(Nodes) -> lists:member(node(), Nodes). nodes_incl_me(Nodes) -> lists:usort([node()|Nodes]). @@ -948,3 +958,8 @@ error_description(removing_node_from_offline_node) -> "from must be a disc node and all the other nodes must be offline."; error_description(no_running_cluster_nodes) -> "You cannot leave a cluster if no online nodes are present.". + +format_inconsistent_cluster_message(Thinker, Dissident) -> + rabbit_misc:format("Node ~p thinks it's clustered " + "with node ~p, but ~p disagrees", + [Thinker, Dissident, Dissident]). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 2a6e2cd1037e..8e2b1c0d4907 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -91,6 +91,8 @@ flying_ets, %% set of dying clients dying_clients, + %% index of file positions for client death messages + dying_client_index, %% map of references of all registered clients %% to callbacks clients, @@ -131,6 +133,12 @@ msg_store }). +-record(dying_client, + { client_ref, + file, + offset + }). + %%---------------------------------------------------------------------------- -export_type([gc_state/0, file_num/0]). @@ -416,6 +424,10 @@ %% performance with many healthy clients and few, if any, dying %% clients, which is the typical case. %% +%% Client termination messages are stored in a separate ets index to +%% avoid filling primary message store index and message files with +%% client termination messages. +%% %% When the msg_store has a backlog (i.e. it has unprocessed messages %% in its mailbox / gen_server priority queue), a further optimisation %% opportunity arises: we can eliminate pairs of 'write' and 'remove' @@ -687,7 +699,9 @@ client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts, end. clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM, - dying_clients = DyingClients }) -> + dying_clients = DyingClients, + dying_client_index = DyingIndex }) -> + ets:delete(DyingIndex, CRef), State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM), dying_clients = sets:del_element(CRef, DyingClients) }. @@ -741,6 +755,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> [ordered_set, public]), CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]), FlyingEts = ets:new(rabbit_msg_store_flying, [set, public]), + DyingIndex = ets:new(rabbit_msg_store_dying_client_index, + [set, public, {keypos, #dying_client.client_ref}]), {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit), @@ -772,6 +788,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> cur_file_cache_ets = CurFileCacheEts, flying_ets = FlyingEts, dying_clients = sets:new(), + dying_client_index = DyingIndex, clients = Clients, successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit, @@ -848,15 +865,21 @@ handle_call({contains, MsgId}, From, State) -> noreply(State1). handle_cast({client_dying, CRef}, - State = #msstate { dying_clients = DyingClients }) -> + State = #msstate { dying_clients = DyingClients, + dying_client_index = DyingIndex, + current_file_handle = CurHdl, + current_file = CurFile }) -> DyingClients1 = sets:add_element(CRef, DyingClients), - noreply(write_message(CRef, <<>>, - State #msstate { dying_clients = DyingClients1 })); + {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), + true = ets:insert_new(DyingIndex, #dying_client{client_ref = CRef, + file = CurFile, + offset = CurOffset}), + noreply(State #msstate { dying_clients = DyingClients1 }); handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> State1 = State #msstate { clients = dict:erase(CRef, Clients) }, - noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); + noreply(clear_client(CRef, State1)); handle_cast({write, CRef, MsgId, Flow}, State = #msstate { cur_file_cache_ets = CurFileCacheEts, @@ -1334,7 +1357,8 @@ blind_confirm(CRef, MsgIds, ActionTaken, State) -> %% msg and thus should be ignored. Note that this (correctly) returns %% false when testing to remove the death msg itself. should_mask_action(CRef, MsgId, - State = #msstate { dying_clients = DyingClients }) -> + State = #msstate { dying_clients = DyingClients, + dying_client_index = DyingIndex }) -> case {sets:is_element(CRef, DyingClients), index_lookup(MsgId, State)} of {false, Location} -> {false, Location}; @@ -1342,8 +1366,8 @@ should_mask_action(CRef, MsgId, {true, not_found}; {true, #msg_location { file = File, offset = Offset, ref_count = RefCount } = Location} -> - #msg_location { file = DeathFile, offset = DeathOffset } = - index_lookup(CRef, State), + [#dying_client { file = DeathFile, offset = DeathOffset }] = + ets:lookup(DyingIndex, CRef), {case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of {true, _} -> true; {false, 0} -> false_if_increment;