Skip to content
This repository has been archived by the owner on Nov 16, 2020. It is now read-only.

Commit

Permalink
Merge pull request #65 from rabbitmq/rabbitmq-web-mqtt-64
Browse files Browse the repository at this point in the history
Stop on errors

(cherry picked from commit 04d3402)
  • Loading branch information
michaelklishin committed Jun 17, 2020
1 parent 93d3711 commit e997e95
Showing 1 changed file with 40 additions and 22 deletions.
62 changes: 40 additions & 22 deletions src/rabbit_web_mqtt_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -113,28 +113,28 @@ websocket_info({#'basic.deliver'{}, #amqp_msg{}, _DeliveryCtx} = Delivery,
{ok, ProcState} ->
{ok, State #state { proc_state = ProcState }, hibernate};
{error, _, _} ->
{stop, State}
stop(State)
end;
websocket_info(#'basic.ack'{} = Ack, State = #state{ proc_state = ProcState0 }) ->
case rabbit_mqtt_processor:amqp_callback(Ack, ProcState0) of
{ok, ProcState} ->
{ok, State #state { proc_state = ProcState }, hibernate};
{error, _, _} ->
{stop, State}
stop(State)
end;
websocket_info(#'basic.consume_ok'{}, State) ->
{ok, State, hibernate};
websocket_info(#'basic.cancel'{}, State) ->
{stop, State};
stop(State);
websocket_info({reply, Data}, State) ->
{reply, {binary, Data}, State, hibernate};
websocket_info({'EXIT', _, _}, State) ->
{stop, State};
stop(State);
websocket_info({'$gen_cast', duplicate_id}, State = #state{ proc_state = ProcState,
conn_name = ConnName }) ->
rabbit_log_connection:warning("Web MQTT disconnecting duplicate client id ~p (~p)~n",
[rabbit_mqtt_processor:info(client_id, ProcState), ConnName]),
{stop, State};
stop(State);
websocket_info({start_keepalives, Keepalive},
State = #state{ socket = Sock, keepalive_sup = KeepaliveSup }) ->
%% Only the client has the responsibility for sending keepalives
Expand All @@ -146,30 +146,30 @@ websocket_info({start_keepalives, Keepalive},
{ok, State #state { keepalive = Heartbeater }, hibernate};
websocket_info(keepalive_timeout, State = #state{conn_name = ConnStr}) ->
rabbit_log_connection:error("closing Web MQTT connection ~p (keepalive timeout)~n", [ConnStr]),
{stop, State};
stop(State);
websocket_info(emit_stats, State) ->
{ok, emit_stats(State), hibernate};
websocket_info({ra_event, _, _}, State) ->
{ok, State, hibernate};
websocket_info(Msg, State) ->
rabbit_log_connection:info("Web MQTT: unexpected message ~p~n",
[Msg]),
{ok, State, hibernate}.

terminate(_, _, #state{ proc_state = undefined }) ->
ok;
terminate(_, _, State = #state{ proc_state = ProcState,
conn_name = ConnName }) ->
maybe_emit_stats(State),
rabbit_log_connection:info("closing Web MQTT connection ~p (~s)~n", [self(), ConnName]),
rabbit_mqtt_processor:send_will(ProcState),
rabbit_mqtt_processor:close_connection(ProcState),
terminate(_, _, State) ->
stop_rabbit_mqtt_processor(State),
ok.

%% Internal.

handle_data(Data, State0) ->
handle_data(Data, State0 = #state{conn_name = ConnStr}) ->
case handle_data1(Data, State0) of
{ok, State1 = #state{state = blocked}, hibernate} ->
{[{active, false}], State1, hibernate};
{error, Error} ->
stop_with_framing_error(State0, Error, ConnStr);
Other ->
Other
end.
Expand All @@ -195,20 +195,38 @@ handle_data1(Data, State = #state{ parse_state = ParseState,
{error, Reason, _} ->
rabbit_log_connection:info("MQTT protocol error ~p for connection ~p~n",
[Reason, ConnStr]),
{stop, State};
stop(State, 1002, Reason);
{error, Error} ->
rabbit_log_connection:error("MQTT detected framing error '~p' for connection ~p~n",
[Error, ConnStr]),
{stop, State};
stop_with_framing_error(State, Error, ConnStr);
{stop, _} ->
{stop, State}
stop(State)
end;
{error, Error} ->
rabbit_log_connection:error("MQTT detected framing error '~p' for connection ~p~n",
[ConnStr, Error]),
{stop, State}
Other ->
Other
end.

stop(State) ->
stop(State, 1000, "MQTT died").

stop(State, CloseCode, Error0) ->
stop_rabbit_mqtt_processor(State),
Error1 = rabbit_data_coercion:to_binary(Error0),
{[{close, CloseCode, Error1}], State}.

stop_with_framing_error(State, Error0, ConnStr) ->
Error1 = rabbit_misc:format("~p", [Error0]),
rabbit_log_connection:error("MQTT detected framing error '~s' for connection ~p~n",
[Error1, ConnStr]),
stop(State, 1007, Error1).

stop_rabbit_mqtt_processor(State = #state{state = running,
proc_state = ProcState,
conn_name = ConnName}) ->
maybe_emit_stats(State),
rabbit_log_connection:info("closing Web MQTT connection ~p (~s)~n", [self(), ConnName]),
rabbit_mqtt_processor:send_will(ProcState),
rabbit_mqtt_processor:close_connection(ProcState).

handle_credits(State0) ->
case control_throttle(State0) of
State = #state{state = running} ->
Expand Down

0 comments on commit e997e95

Please sign in to comment.