Skip to content

Commit

Permalink
Use stree for message block fss
Browse files Browse the repository at this point in the history
Should provide some deduplication of long subjects in memory.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Jun 21, 2024
1 parent 9c3b5d4 commit 1f57304
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 55 deletions.
118 changes: 67 additions & 51 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ type msgBlock struct {
bytes uint64 // User visible bytes count.
rbytes uint64 // Total bytes (raw) including deleted. Used for rolling to new blk.
msgs uint64 // User visible message count.
fss map[string]*SimpleState
fss *stree.SubjectTree[SimpleState]
kfn string
lwts int64
llts int64
Expand Down Expand Up @@ -2063,11 +2063,13 @@ func (fs *fileStore) expireMsgsOnRecover() {
}
// Make sure we do subject cleanup as well.
mb.ensurePerSubjectInfoLoaded()
for subj, ss := range mb.fss {
mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool {
subj := bytesToString(bsubj)
for i := uint64(0); i < ss.Msgs; i++ {
fs.removePerSubject(subj)
}
}
return true
})
mb.dirtyCloseWithRemove(true)
deleted++
}
Expand Down Expand Up @@ -2315,8 +2317,8 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
mb.lsts = time.Now().UnixNano()

// If we only have 1 subject currently and it matches our filter we can also set isAll.
if !isAll && len(mb.fss) == 1 {
_, isAll = mb.fss[filter]
if !isAll && mb.fss.Size() == 1 {
_, isAll = mb.fss.Find(stringToBytes(filter))
}
// Make sure to start at mb.first.seq if fseq < mb.first.seq
if seq := atomic.LoadUint64(&mb.first.seq); seq > fseq {
Expand Down Expand Up @@ -2345,26 +2347,27 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
// 25th quantile of a match in a linear walk. Filter should be a wildcard.
// We should consult fss if our cache is not loaded and we only have fss loaded.
if !doLinearScan && wc && mb.cacheAlreadyLoaded() {
doLinearScan = len(mb.fss)*4 > int(lseq-fseq)
doLinearScan = mb.fss.Size()*4 > int(lseq-fseq)
}

if !doLinearScan {
// If we have a wildcard match against all tracked subjects we know about.
if wc {
subs = subs[:0]
for subj := range mb.fss {
if isMatch(subj) {
subs = append(subs, subj)
mb.fss.Iter(func(bsubj []byte, _ *SimpleState) bool {
if subj := bytesToString(bsubj); isMatch(subj) {
subs = append(subs, string(bsubj))
}
}
return true
})
// Check if we matched anything
if len(subs) == 0 {
return nil, didLoad, ErrStoreMsgNotFound
}
}
fseq = lseq + 1
for _, subj := range subs {
ss := mb.fss[subj]
ss, _ := mb.fss.Find(stringToBytes(subj))
if ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
Expand Down Expand Up @@ -2485,8 +2488,8 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (
}

var havePartial bool
for subj, ss := range mb.fss {
if isAll || isMatch(subj) {
mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool {
if subj := bytesToString(bsubj); isAll || isMatch(subj) {
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
Expand All @@ -2495,10 +2498,11 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (
} else if sseq <= ss.Last {
// We matched but its a partial.
havePartial = true
break
return false
}
}
}
return true
})

// If we did not encounter any partials we can return here.
if !havePartial {
Expand Down Expand Up @@ -2711,7 +2715,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
}
// Mark fss activity.
mb.lsts = time.Now().UnixNano()
for subj, ss := range mb.fss {
mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool {
subj := string(bsubj)
if subject == _EMPTY_ || subject == fwcs || subjectIsSubsetMatch(subj, subject) {
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
Expand All @@ -2725,7 +2730,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
fss[subj] = oss
}
}
}
return true
})
if shouldExpire {
// Expire this cache before moving on.
mb.tryForceExpireCacheLocked()
Expand Down Expand Up @@ -2810,7 +2816,7 @@ func (fs *fileStore) MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed i
mb.mu.Lock()
mb.ensurePerSubjectInfoLoaded()
for subj, psi := range subs {
if ss := mb.fss[subj]; ss != nil {
if ss, ok := mb.fss.Find(stringToBytes(subj)); ok && ss != nil {
if ss.Last <= maxSeq {
seqs = append(seqs, ss.Last)
delete(subs, subj)
Expand Down Expand Up @@ -3008,7 +3014,8 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
mb.lsts = time.Now().UnixNano()

var havePartial bool
for subj, ss := range mb.fss {
mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool {
subj := bytesToString(bsubj)
if isMatch(subj) {
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
Expand All @@ -3018,10 +3025,11 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
} else if sseq <= ss.Last {
// We matched but its a partial.
havePartial = true
break
return false
}
}
}
return true
})

// See if we need to scan msgs here.
if havePartial {
Expand Down Expand Up @@ -3099,11 +3107,12 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
// Mark fss activity.
mb.lsts = time.Now().UnixNano()

for subj, ss := range mb.fss {
if isMatch(subj) {
mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool {
if subj := bytesToString(bsubj); isMatch(subj) {
adjust += ss.Msgs
}
}
return true
})
}
} else {
// This is the last block. We need to scan per message here.
Expand Down Expand Up @@ -3224,7 +3233,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
// Lock should be held to quiet race detector.
mb.mu.Lock()
mb.setupWriteCache(rbuf)
mb.fss = make(map[string]*SimpleState)
mb.fss = stree.NewSubjectTree[SimpleState]()

// Set cache time to creation time to start.
ts := time.Now().UnixNano()
Expand Down Expand Up @@ -3676,10 +3685,11 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
// Mark fss activity.
mb.lsts = time.Now().UnixNano()

if ss := mb.fss[subj]; ss != nil {
bsubj := stringToBytes(subj)
if ss, ok := mb.fss.Find(bsubj); ok && ss != nil {
// Adjust first if it was not where we thought it should be.
if i != start {
if info, ok := fs.psim.Find(stringToBytes(subj)); ok {
if info, ok := fs.psim.Find(bsubj); ok {
info.fblk = i
}
}
Expand Down Expand Up @@ -3812,8 +3822,8 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
// Grab the ss entry for this subject in case sparse.
mb.mu.Lock()
mb.ensurePerSubjectInfoLoaded()
ss := mb.fss[subj]
if ss != nil && ss.firstNeedsUpdate {
ss, ok := mb.fss.Find(stringToBytes(subj))
if ok && ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
}
mb.mu.Unlock()
Expand Down Expand Up @@ -4908,11 +4918,11 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
}
// Mark fss activity.
mb.lsts = time.Now().UnixNano()
if ss := mb.fss[subj]; ss != nil {
if ss, ok := mb.fss.Find(stringToBytes(subj)); ok && ss != nil {
ss.Msgs++
ss.Last = seq
} else {
mb.fss[subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
mb.fss.Insert(stringToBytes(subj), SimpleState{Msgs: 1, First: seq, Last: seq})
}
}

Expand Down Expand Up @@ -5513,7 +5523,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
// Create FSS if we should track.
var popFss bool
if mb.fssNotLoaded() {
mb.fss = make(map[string]*SimpleState)
mb.fss = stree.NewSubjectTree[SimpleState]()
popFss = true
}
// Mark fss activity.
Expand Down Expand Up @@ -5580,15 +5590,15 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
// Handle FSS inline here.
if popFss && slen > 0 && !mb.noTrack && !erased && !mb.dmap.Exists(seq) {
bsubj := buf[index+msgHdrSize : index+msgHdrSize+uint32(slen)]
if ss := mb.fss[string(bsubj)]; ss != nil {
if ss, ok := mb.fss.Find(bsubj); ok && ss != nil {
ss.Msgs++
ss.Last = seq
} else {
mb.fss[string(bsubj)] = &SimpleState{
mb.fss.Insert(bsubj, SimpleState{
Msgs: 1,
First: seq,
Last: seq,
}
})
}
}
}
Expand Down Expand Up @@ -6302,7 +6312,7 @@ func (fs *fileStore) loadLast(subj string, sm *StoreMsg) (lsm *StoreMsg, err err
var l uint64
// Optimize if subject is not a wildcard.
if !wc {
if ss := mb.fss[subj]; ss != nil {
if ss, ok := mb.fss.Find(stringToBytes(subj)); ok && ss != nil {
l = ss.Last
}
}
Expand Down Expand Up @@ -7018,11 +7028,13 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
bytes += mb.bytes
// Make sure we do subject cleanup as well.
mb.ensurePerSubjectInfoLoaded()
for subj, ss := range mb.fss {
mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool {
subj := bytesToString(bsubj)
for i := uint64(0); i < ss.Msgs; i++ {
fs.removePerSubject(subj)
}
}
return true
})
// Now close.
mb.dirtyCloseWithRemove(true)
mb.mu.Unlock()
Expand Down Expand Up @@ -7423,13 +7435,17 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {
// Lock should be held.
func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) {
mb.ensurePerSubjectInfoLoaded()
ss := mb.fss[subj]
if ss == nil {
if mb.fss == nil {
return
}
bsubj := stringToBytes(subj)
ss, ok := mb.fss.Find(bsubj)
if !ok || ss == nil {
return
}

if ss.Msgs == 1 {
delete(mb.fss, subj)
mb.fss.Delete(bsubj)
return
}

Expand Down Expand Up @@ -7531,7 +7547,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error {
}

// Create new one regardless.
mb.fss = make(map[string]*SimpleState)
mb.fss = stree.NewSubjectTree[SimpleState]()

var smv StoreMsg
fseq, lseq := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq)
Expand All @@ -7548,16 +7564,16 @@ func (mb *msgBlock) generatePerSubjectInfo() error {
return err
}
if sm != nil && len(sm.subj) > 0 {
if ss := mb.fss[sm.subj]; ss != nil {
if ss, ok := mb.fss.Find(stringToBytes(sm.subj)); ok && ss != nil {
ss.Msgs++
ss.Last = seq
} else {
mb.fss[sm.subj] = &SimpleState{Msgs: 1, First: seq, Last: seq}
mb.fss.Insert(stringToBytes(sm.subj), SimpleState{Msgs: 1, First: seq, Last: seq})
}
}
}

if len(mb.fss) > 0 {
if mb.fss.Size() > 0 {
// Make sure we run the cache expire timer.
mb.llts = time.Now().UnixNano()
// Mark fss activity.
Expand All @@ -7578,7 +7594,7 @@ func (mb *msgBlock) ensurePerSubjectInfoLoaded() error {
return nil
}
if mb.msgs == 0 {
mb.fss = make(map[string]*SimpleState)
mb.fss = stree.NewSubjectTree[SimpleState]()
return nil
}
return mb.generatePerSubjectInfo()
Expand All @@ -7595,20 +7611,20 @@ func (fs *fileStore) populateGlobalPerSubjectInfo(mb *msgBlock) {
}

// Now populate psim.
for subj, ss := range mb.fss {
if len(subj) > 0 {
bsubj := stringToBytes(subj)
mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool {
if len(bsubj) > 0 {
if info, ok := fs.psim.Find(bsubj); ok {
info.total += ss.Msgs
if mb.index > info.lblk {
info.lblk = mb.index
}
} else {
fs.psim.Insert(bsubj, psi{total: ss.Msgs, fblk: mb.index, lblk: mb.index})
fs.tsl += len(subj)
fs.tsl += len(bsubj)
}
}
}
return true
})
}

// Close the message block.
Expand Down
8 changes: 4 additions & 4 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4098,10 +4098,10 @@ func TestFileStoreNoFSSBugAfterRemoveFirst(t *testing.T) {
mb := fs.blks[0]
fs.mu.Unlock()
mb.mu.RLock()
ss := mb.fss["foo.bar.0"]
ss, ok := mb.fss.Find([]byte("foo.bar.0"))
mb.mu.RUnlock()

if ss != nil {
if ok && ss != nil {
t.Fatalf("Expected no state for %q, but got %+v\n", "foo.bar.0", ss)
}
})
Expand Down Expand Up @@ -6883,15 +6883,15 @@ func TestFileStoreFSSExpireNumPending(t *testing.T) {
require_True(t, elapsed > time.Since(start))

// Sleep enough so that all mb.fss should expire, which is 2s above.
time.Sleep(3 * time.Second)
time.Sleep(4 * time.Second)
fs.mu.RLock()
for i, mb := range fs.blks {
mb.mu.RLock()
fss := mb.fss
mb.mu.RUnlock()
if fss != nil {
fs.mu.RUnlock()
t.Fatalf("Detected loaded fss for mb %d", i)
t.Fatalf("Detected loaded fss for mb %d (size %d)", i, fss.Size())
}
}
fs.mu.RUnlock()
Expand Down

0 comments on commit 1f57304

Please sign in to comment.