Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHANGED] MQTT s.clear() do not wait for JS responses when disconnecting the session #5575

Merged
merged 6 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ func (s *Server) mqttHandleClosedClient(c *client) {

// This needs to be done outside of any lock.
if doClean {
if err := sess.clear(); err != nil {
if err := sess.clear(true); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be invoked for "clean" sessions, which means sessions which state should be discarded and not reused in any subsequent session. Could there be a situation where a create/close/create causes an issue since some assets may not have been completely removed from the JS server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't do anything with the error other than logging it anyway, so the logic doesn't change. Also, if the session is re-created as clean, we'd clean out its state on CONNECT. If it is re-created as persistent... 0/5 we could wipe out and re-create the consumers if there was a session message; but if we failed to delete the session message on the "clean" DISCONNECT... well, it still would not change the existing logic, would it?

c.Errorf(err.Error())
}
}
Expand Down Expand Up @@ -1475,7 +1475,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
// Opportunistically delete the old (legacy) consumer, from v2.10.10 and
// before. Ignore any errors that might arise.
rmLegacyDurName := mqttRetainedMsgsStreamName + "_" + jsa.id
jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName)
jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName, true)

// Create a new, uniquely names consumer for retained messages for this
// server. The prior one will expire eventually.
Expand Down Expand Up @@ -1701,8 +1701,14 @@ func (jsa *mqttJSA) createDurableConsumer(cfg *CreateConsumerRequest) (*JSApiCon
return ccr, ccr.ToError()
}

func (jsa *mqttJSA) deleteConsumer(streamName, consName string) (*JSApiConsumerDeleteResponse, error) {
// if noWait is specified, does not wait for the JS response, returns nil
func (jsa *mqttJSA) deleteConsumer(streamName, consName string, noWait bool) (*JSApiConsumerDeleteResponse, error) {
subj := fmt.Sprintf(JSApiConsumerDeleteT, streamName, consName)
if noWait {
jsa.sendMsg(subj, nil)
return nil, nil
}

cdri, err := jsa.newRequest(mqttJSAConsumerDel, subj, 0, nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -3178,7 +3184,7 @@ func (sess *mqttSession) save() error {
//
// Runs from the client's readLoop.
// Lock not held on entry, but session is in the locked map.
func (sess *mqttSession) clear() error {
func (sess *mqttSession) clear(noWait bool) error {
var durs []string
var pubRelDur string

Expand Down Expand Up @@ -3206,19 +3212,19 @@ func (sess *mqttSession) clear() error {
sess.mu.Unlock()

for _, dur := range durs {
if _, err := sess.jsa.deleteConsumer(mqttStreamName, dur); isErrorOtherThan(err, JSConsumerNotFoundErr) {
if _, err := sess.jsa.deleteConsumer(mqttStreamName, dur, noWait); isErrorOtherThan(err, JSConsumerNotFoundErr) {
return fmt.Errorf("unable to delete consumer %q for session %q: %v", dur, sess.id, err)
}
}
if pubRelDur != _EMPTY_ {
_, err := sess.jsa.deleteConsumer(mqttOutStreamName, pubRelDur)
_, err := sess.jsa.deleteConsumer(mqttOutStreamName, pubRelDur, noWait)
if isErrorOtherThan(err, JSConsumerNotFoundErr) {
return fmt.Errorf("unable to delete consumer %q for session %q: %v", pubRelDur, sess.id, err)
}
}

if seq > 0 {
err := sess.jsa.deleteMsg(mqttSessStreamName, seq, true)
err := sess.jsa.deleteMsg(mqttSessStreamName, seq, !noWait)
// Ignore the various errors indicating that the message (or sequence)
// is already deleted, can happen in a cluster.
if isErrorOtherThan(err, JSSequenceNotFoundErrF) {
Expand Down Expand Up @@ -3484,7 +3490,7 @@ func (sess *mqttSession) untrackPubRel(pi uint16) (jsAckSubject string) {
func (sess *mqttSession) deleteConsumer(cc *ConsumerConfig) {
sess.mu.Lock()
sess.tmaxack -= cc.MaxAckPending
sess.jsa.sendq.push(&mqttJSPubMsg{subj: sess.jsa.prefixDomain(fmt.Sprintf(JSApiConsumerDeleteT, mqttStreamName, cc.Durable))})
sess.jsa.deleteConsumer(mqttStreamName, cc.Durable, true)
levb marked this conversation as resolved.
Show resolved Hide resolved
sess.mu.Unlock()
}

Expand Down Expand Up @@ -3823,7 +3829,7 @@ CHECK:
// This Session lasts as long as the Network Connection. State data
// associated with this Session MUST NOT be reused in any subsequent
// Session.
if err := es.clear(); err != nil {
if err := es.clear(false); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least here we make sure that the delete is complete, which I think is the right thing to do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I understand why checking the success is important here, thus preserved as before.

asm.removeSession(es, true)
return err
}
Expand Down
59 changes: 0 additions & 59 deletions server/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6789,65 +6789,6 @@ func TestMQTTConsumerMemStorageReload(t *testing.T) {
}
}

type unableToDeleteConsLogger struct {
DummyLogger
errCh chan string
}

func (l *unableToDeleteConsLogger) Errorf(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
if strings.Contains(msg, "unable to delete consumer") {
l.errCh <- msg
}
}

func TestMQTTSessionNotDeletedOnDeleteConsumerError(t *testing.T) {
org := mqttJSAPITimeout
mqttJSAPITimeout = 1000 * time.Millisecond
defer func() { mqttJSAPITimeout = org }()

cl := createJetStreamClusterWithTemplate(t, testMQTTGetClusterTemplaceNoLeaf(), "MQTT", 2)
defer cl.shutdown()

o := cl.opts[0]
s1 := cl.servers[0]
// Plug error logger to s1
l := &unableToDeleteConsLogger{errCh: make(chan string, 10)}
s1.SetLogger(l, false, false)

nc, js := jsClientConnect(t, s1)
defer nc.Close()

mc, r := testMQTTConnectRetry(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port, 5)
defer mc.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)

testMQTTSub(t, 1, mc, r, []*mqttFilter{{filter: "foo", qos: 1}}, []byte{1})
testMQTTFlush(t, mc, nil, r)

// Now shutdown server 2, we should lose quorum
cl.servers[1].Shutdown()

// Close the MQTT client:
testMQTTDisconnect(t, mc, nil)

// We should have reported that there was an error deleting the consumer
select {
case <-l.errCh:
// OK
case <-time.After(time.Second):
t.Fatal("Server did not report any error")
}

// Now restart the server 2 so that we can check that the session is still persisted.
cl.restartAllSamePorts()
cl.waitOnStreamLeader(globalAccountName, mqttSessStreamName)

si, err := js.StreamInfo(mqttSessStreamName)
require_NoError(t, err)
require_True(t, si.State.Msgs == 1)
}

// Test for auto-cleanup of consumers.
func TestMQTTConsumerInactiveThreshold(t *testing.T) {
tdir := t.TempDir()
Expand Down