Skip to content

Commit

Permalink
Some fixes for conversion to stree Match() for mb.fss.
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jun 21, 2024
1 parent 001ea30 commit 1a8c6c9
Showing 1 changed file with 22 additions and 21 deletions.
43 changes: 22 additions & 21 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2316,6 +2316,10 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
// Mark fss activity.
mb.lsts = time.Now().UnixNano()

if filter == _EMPTY_ {
filter = fwcs
}

// If we only have 1 subject currently and it matches our filter we can also set isAll.
if !isAll && mb.fss.Size() == 1 {
_, isAll = mb.fss.Find(stringToBytes(filter))
Expand Down Expand Up @@ -2492,16 +2496,14 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (

var havePartial bool
mb.fss.Match(stringToBytes(filter), func(bsubj []byte, ss *SimpleState) {
if subj := bytesToString(bsubj); isAll || isMatch(subj) {
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
if sseq <= ss.First {
update(ss)
} else if sseq <= ss.Last {
// We matched but its a partial.
havePartial = true
}
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(bytesToString(bsubj), ss.First, ss)
}
if sseq <= ss.First {
update(ss)
} else if sseq <= ss.Last {
// We matched but its a partial.
havePartial = true
}
})

Expand All @@ -2527,12 +2529,14 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (
if sm == nil {
continue
}
total++
if first == 0 || seq < first {
first = seq
}
if seq > last {
last = seq
if isAll || isMatch(sm.subj) {
total++
if first == 0 || seq < first {
first = seq
}
if seq > last {
last = seq
}
}
}
// If we loaded this block for this operation go ahead and expire it here.
Expand Down Expand Up @@ -3149,11 +3153,8 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
// Mark fss activity.
mb.lsts = time.Now().UnixNano()

mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool {
if subj := bytesToString(bsubj); isMatch(subj) {
adjust += ss.Msgs
}
return true
mb.fss.Match(stringToBytes(filter), func(bsubj []byte, ss *SimpleState) {
adjust += ss.Msgs
})
}
} else {
Expand Down

0 comments on commit 1a8c6c9

Please sign in to comment.