Skip to content

Commit

Permalink
Merge branch 'master' into rabbitmq-server-500
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Jul 14, 2016
2 parents f2831e1 + 492406e commit b656f9e
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 24 deletions.
1 change: 1 addition & 0 deletions rabbitmq-components.mk
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ dep_rabbitmq_event_exchange = git_rmq rabbitmq-event-exchange $(curren
dep_rabbitmq_federation = git_rmq rabbitmq-federation $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_federation_management = git_rmq rabbitmq-federation-management $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_java_client = git_rmq rabbitmq-java-client $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_jms_client = git_rmq rabbitmq-jms-client $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_jms_topic_exchange = git_rmq rabbitmq-jms-topic-exchange $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_lvc = git_rmq rabbitmq-lvc-plugin $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_management = git_rmq rabbitmq-management $(current_rmq_ref) $(base_rmq_ref) master
Expand Down
4 changes: 2 additions & 2 deletions scripts/rabbitmq-env.bat
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,8 @@ goto :filter_paths_done
set paths=%1
set paths=%paths:"=%
for /f "tokens=1* delims=;" %%a in ("%paths%") do (
if not "%%a" == "" call :filter_path %%a
if not "%%b" == "" call :filter_paths %%b
if not "%%a" == "" call :filter_path "%%a"
if not "%%b" == "" call :filter_paths "%%b"
)
set paths=
exit /b
Expand Down
3 changes: 1 addition & 2 deletions src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -937,8 +937,7 @@ i(recoverable_slaves, #q{q = #amqqueue{name = Name,
i(state, #q{status = running}) -> credit_flow:state();
i(state, #q{status = State}) -> State;
i(garbage_collection, _State) ->
{garbage_collection, GC} = erlang:process_info(self(), garbage_collection),
GC;
rabbit_misc:get_gc_info(self());
i(reductions, _State) ->
{reductions, Reductions} = erlang:process_info(self(), reductions),
Reductions;
Expand Down
12 changes: 5 additions & 7 deletions src/rabbit_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,11 @@ log(Category, Level, Fmt, Args) when is_list(Args) ->
end,
lager:log(Sink, Level, self(), Fmt, Args).

make_internal_sink_name(Category) when Category == channel;
Category == connection;
Category == mirroring;
Category == queue;
Category == federation ->
lager_util:make_internal_sink_name(list_to_atom("rabbit_" ++
atom_to_list(Category)));
make_internal_sink_name(rabbit_log_connection) -> rabbit_log_connection_lager_event;
make_internal_sink_name(rabbit_log_channel) -> rabbit_log_channel_lager_event;
make_internal_sink_name(rabbit_log_mirroring) -> rabbit_log_mirroring_lager_event;
make_internal_sink_name(rabbit_log_queue) -> rabbit_log_queue_lager_event;
make_internal_sink_name(rabbit_log_federation) -> rabbit_log_federation_lager_event;
make_internal_sink_name(Category) ->
lager_util:make_internal_sink_name(Category).

Expand Down
25 changes: 20 additions & 5 deletions src/rabbit_mnesia.erl
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,17 @@ join_cluster(DiscoveryNode, NodeType) ->
{error, Reason}
end;
true ->
rabbit_log:info("Already member of cluster: ~p~n", [ClusterNodes]),
{ok, already_member}
%% DiscoveryNode thinks that we are part of a cluster, but
%% do we think so ourselves?
case are_we_clustered_with(DiscoveryNode) of
true ->
rabbit_log:info("Asked to join a cluster but already a member of it: ~p~n", [ClusterNodes]),
{ok, already_member};
false ->
Msg = format_inconsistent_cluster_message(DiscoveryNode, node()),
rabbit_log:error(Msg),
{error, {inconsistent_cluster, Msg}}
end
end.

%% return node to its virgin state, where it is not member of any
Expand Down Expand Up @@ -790,9 +799,7 @@ check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) ->
{ok, RemoteStatus};
false ->
{error, {inconsistent_cluster,
rabbit_misc:format("Node ~p thinks it's clustered "
"with node ~p, but ~p disagrees",
[node(), Node, Node])}}
format_inconsistent_cluster_message(node(), Node)}}
end.

check_mnesia_or_otp_consistency(_Node, unsupported, OTP) ->
Expand Down Expand Up @@ -898,6 +905,9 @@ is_only_clustered_disc_node() ->
node_type() =:= disc andalso is_clustered() andalso
cluster_nodes(disc) =:= [node()].

are_we_clustered_with(Node) ->
lists:member(Node, mnesia_lib:all_nodes()).

me_in_nodes(Nodes) -> lists:member(node(), Nodes).

nodes_incl_me(Nodes) -> lists:usort([node()|Nodes]).
Expand Down Expand Up @@ -948,3 +958,8 @@ error_description(removing_node_from_offline_node) ->
"from must be a disc node and all the other nodes must be offline.";
error_description(no_running_cluster_nodes) ->
"You cannot leave a cluster if no online nodes are present.".

format_inconsistent_cluster_message(Thinker, Dissident) ->
rabbit_misc:format("Node ~p thinks it's clustered "
"with node ~p, but ~p disagrees",
[Thinker, Dissident, Dissident]).
40 changes: 32 additions & 8 deletions src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@
flying_ets,
%% set of dying clients
dying_clients,
%% index of file positions for client death messages
dying_client_index,
%% map of references of all registered clients
%% to callbacks
clients,
Expand Down Expand Up @@ -131,6 +133,12 @@
msg_store
}).

-record(dying_client,
{ client_ref,
file,
offset
}).

%%----------------------------------------------------------------------------

-export_type([gc_state/0, file_num/0]).
Expand Down Expand Up @@ -416,6 +424,10 @@
%% performance with many healthy clients and few, if any, dying
%% clients, which is the typical case.
%%
%% Client termination messages are stored in a separate ets index to
%% avoid filling primary message store index and message files with
%% client termination messages.
%%
%% When the msg_store has a backlog (i.e. it has unprocessed messages
%% in its mailbox / gen_server priority queue), a further optimisation
%% opportunity arises: we can eliminate pairs of 'write' and 'remove'
Expand Down Expand Up @@ -687,7 +699,9 @@ client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts,
end.

clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
dying_clients = DyingClients }) ->
dying_clients = DyingClients,
dying_client_index = DyingIndex }) ->
ets:delete(DyingIndex, CRef),
State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM),
dying_clients = sets:del_element(CRef, DyingClients) }.

Expand Down Expand Up @@ -741,6 +755,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
[ordered_set, public]),
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
FlyingEts = ets:new(rabbit_msg_store_flying, [set, public]),
DyingIndex = ets:new(rabbit_msg_store_dying_client_index,
[set, public, {keypos, #dying_client.client_ref}]),

{ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),

Expand Down Expand Up @@ -772,6 +788,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
cur_file_cache_ets = CurFileCacheEts,
flying_ets = FlyingEts,
dying_clients = sets:new(),
dying_client_index = DyingIndex,
clients = Clients,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
Expand Down Expand Up @@ -848,15 +865,21 @@ handle_call({contains, MsgId}, From, State) ->
noreply(State1).

handle_cast({client_dying, CRef},
State = #msstate { dying_clients = DyingClients }) ->
State = #msstate { dying_clients = DyingClients,
dying_client_index = DyingIndex,
current_file_handle = CurHdl,
current_file = CurFile }) ->
DyingClients1 = sets:add_element(CRef, DyingClients),
noreply(write_message(CRef, <<>>,
State #msstate { dying_clients = DyingClients1 }));
{ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
true = ets:insert_new(DyingIndex, #dying_client{client_ref = CRef,
file = CurFile,
offset = CurOffset}),
noreply(State #msstate { dying_clients = DyingClients1 });

handle_cast({client_delete, CRef},
State = #msstate { clients = Clients }) ->
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
noreply(clear_client(CRef, State1));

handle_cast({write, CRef, MsgId, Flow},
State = #msstate { cur_file_cache_ets = CurFileCacheEts,
Expand Down Expand Up @@ -1334,16 +1357,17 @@ blind_confirm(CRef, MsgIds, ActionTaken, State) ->
%% msg and thus should be ignored. Note that this (correctly) returns
%% false when testing to remove the death msg itself.
should_mask_action(CRef, MsgId,
State = #msstate { dying_clients = DyingClients }) ->
State = #msstate { dying_clients = DyingClients,
dying_client_index = DyingIndex }) ->
case {sets:is_element(CRef, DyingClients), index_lookup(MsgId, State)} of
{false, Location} ->
{false, Location};
{true, not_found} ->
{true, not_found};
{true, #msg_location { file = File, offset = Offset,
ref_count = RefCount } = Location} ->
#msg_location { file = DeathFile, offset = DeathOffset } =
index_lookup(CRef, State),
[#dying_client { file = DeathFile, offset = DeathOffset }] =
ets:lookup(DyingIndex, CRef),
{case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of
{true, _} -> true;
{false, 0} -> false_if_increment;
Expand Down

0 comments on commit b656f9e

Please sign in to comment.