Skip to content

Commit

Permalink
[FIXED] Improvements to dealing with old or non-existant index.db (#5893
Browse files Browse the repository at this point in the history
)

We had a condition where an old index.db was not able to properly
restore a stream due to max msgs per subject being set and certain
blocks being compacted away and removing subject info for those
sequences. In addition we fixed recovery after Truncate and PurgeEx by
subject when the index.db was corrupt or not available.

This change also moves generating the index.db file to after writing the
blocks during a snapshot and we do a force call to make sure it is
written even when complex.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 17, 2024
2 parents 6fc51c7 + e5d2e77 commit 83c77b4
Show file tree
Hide file tree
Showing 3 changed files with 300 additions and 38 deletions.
177 changes: 145 additions & 32 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,9 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
// Attempt to recover our state.
err = fs.recoverFullState()
if err != nil {
if !os.IsNotExist(err) {
fs.warn("Recovering stream state from index errored: %v", err)
}
// Hold onto state
prior := fs.state
// Reset anything that could have been set from above.
Expand Down Expand Up @@ -469,7 +472,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
go fs.cleanupOldMeta()
}()

// Lock while do enforcements and removals.
// Lock while we do enforcements and removals.
fs.mu.Lock()

// Check if we have any left over tombstones to process.
Expand Down Expand Up @@ -975,7 +978,6 @@ func (mb *msgBlock) ensureLastChecksumLoaded() {
// Lock held on entry
func (fs *fileStore) recoverMsgBlock(index uint32) (*msgBlock, error) {
mb := fs.initMsgBlock(index)

// Open up the message file, but we will try to recover from the index file.
// We will check that the last checksums match.
file, err := mb.openBlock()
Expand Down Expand Up @@ -1357,6 +1359,9 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
minTombstoneTs int64
)

// To detect gaps from compaction.
var last uint64

for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
if index+msgHdrSize > lbuf {
truncate(index)
Expand Down Expand Up @@ -1444,8 +1449,16 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
mb.bytes += uint64(rl)
}

// Check for any gaps from compaction, meaning no ebit entry.
if last > 0 && seq != last+1 {
for dseq := last + 1; dseq < seq; dseq++ {
addToDmap(dseq)
}
}

// Always set last
atomic.StoreUint64(&mb.last.seq, seq)
last = seq
atomic.StoreUint64(&mb.last.seq, last)
mb.last.ts = ts

// Advance to next record.
Expand Down Expand Up @@ -1665,7 +1678,8 @@ func (fs *fileStore) recoverFullState() (rerr error) {
for i := 0; i < int(numBlocks); i++ {
index, nbytes, fseq, fts, lseq, lts, numDeleted := uint32(readU64()), readU64(), readU64(), readI64(), readU64(), readI64(), readU64()
if bi < 0 {
break
os.Remove(fn)
return errCorruptState
}
mb := fs.initMsgBlock(index)
atomic.StoreUint64(&mb.first.seq, fseq)
Expand Down Expand Up @@ -1734,6 +1748,12 @@ func (fs *fileStore) recoverFullState() (rerr error) {
return errPriorState
}
if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched {
// If we are tracking max msgs per subject and we are not up to date we should rebuild.
if fs.cfg.MaxMsgsPer > 0 {
fs.warn("Stream state block state outdated, will rebuild")
return errPriorState
}

// Remove the last message block since recover will add in the new one.
fs.removeMsgBlockFromList(mb)
// Reverse update of tracking state for this mb, will add new state in below.
Expand Down Expand Up @@ -1776,11 +1796,19 @@ func (fs *fileStore) recoverFullState() (rerr error) {
// Update top level accounting
if fseq := atomic.LoadUint64(&nmb.first.seq); fs.state.FirstSeq == 0 || fseq < fs.state.FirstSeq {
fs.state.FirstSeq = fseq
fs.state.FirstTime = time.Unix(0, nmb.first.ts).UTC()
if nmb.first.ts == 0 {
fs.state.FirstTime = time.Time{}
} else {
fs.state.FirstTime = time.Unix(0, nmb.first.ts).UTC()
}
}
if lseq := atomic.LoadUint64(&nmb.last.seq); lseq > fs.state.LastSeq {
fs.state.LastSeq = lseq
fs.state.LastTime = time.Unix(0, nmb.last.ts).UTC()
if mb.last.ts == 0 {
fs.state.LastTime = time.Time{}
} else {
fs.state.LastTime = time.Unix(0, nmb.last.ts).UTC()
}
}
fs.state.Msgs += nmb.msgs
fs.state.Bytes += nmb.bytes
Expand Down Expand Up @@ -5415,7 +5443,8 @@ func (mb *msgBlock) ensureRawBytesLoaded() error {
// Sync msg and index files as needed. This is called from a timer.
func (fs *fileStore) syncBlocks() {
fs.mu.RLock()
if fs.closed {
// If closed or a snapshot is in progress bail.
if fs.closed || fs.sips > 0 {
fs.mu.RUnlock()
return
}
Expand Down Expand Up @@ -6899,6 +6928,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
}

var smv StoreMsg
var tombs []msgId

fs.mu.Lock()
// We may remove blocks as we purge, so don't range directly on fs.blks
Expand Down Expand Up @@ -6952,9 +6982,11 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
purged++
bytes += rl
}
// FSS updates.
// PSIM and FSS updates.
mb.removeSeqPerSubject(sm.subj, seq)
fs.removePerSubject(sm.subj)
// Track tombstones we need to write.
tombs = append(tombs, msgId{sm.seq, sm.ts})

// Check for first message.
if seq == atomic.LoadUint64(&mb.first.seq) {
Expand Down Expand Up @@ -6993,7 +7025,16 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
if firstSeqNeedsUpdate {
fs.selectNextFirst()
}
fseq := fs.state.FirstSeq

// Write any tombstones as needed.
for _, tomb := range tombs {
if tomb.seq > fseq {
fs.lmb.writeTombstone(tomb.seq, tomb.ts)
}
}

os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))
fs.dirty++
cb := fs.scb
fs.mu.Unlock()
Expand Down Expand Up @@ -7036,7 +7077,7 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {
fs.bim = make(map[uint32]*msgBlock)
// Clear any per subject tracking.
fs.psim, fs.tsl = fs.psim.Empty(), 0
// Mark dirty
// Mark dirty.
fs.dirty++

// Move the msgs directory out of the way, will delete out of band.
Expand Down Expand Up @@ -7092,6 +7133,11 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {
cb := fs.scb
fs.mu.Unlock()

// Force a new index.db to be written.
if purged > 0 {
fs.forceWriteFullState()
}

if cb != nil {
cb(-int64(purged), -rbytes, 0, _EMPTY_)
}
Expand Down Expand Up @@ -7286,11 +7332,19 @@ SKIP:
}
fs.state.Bytes -= bytes

// Any existing state file no longer applicable. We will force write a new one
// after we release the lock.
os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))
fs.dirty++

cb := fs.scb
fs.mu.Unlock()

// Force a new index.db to be written.
if purged > 0 {
fs.forceWriteFullState()
}

if cb != nil && purged > 0 {
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
}
Expand Down Expand Up @@ -7351,6 +7405,40 @@ func (fs *fileStore) reset() error {
return nil
}

// Return all active tombstones in this msgBlock.
// Write lock should be held.
func (mb *msgBlock) tombs() []msgId {
var tombs []msgId

if !mb.cacheAlreadyLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return nil
}
}

var le = binary.LittleEndian
buf := mb.cache.buf

for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
if index+msgHdrSize > lbuf {
return tombs
}
hdr := buf[index : index+msgHdrSize]
rl, seq := le.Uint32(hdr[0:]), le.Uint64(hdr[4:])
// Clear any headers bit that could be set.
rl &^= hbit
// Check for tombstones.
if seq&tbit != 0 {
ts := int64(le.Uint64(hdr[12:]))
tombs = append(tombs, msgId{seq &^ tbit, ts})
}
// Advance to next record.
index += rl
}

return tombs
}

// Truncate will truncate a stream store up to seq. Sequence needs to be valid.
func (fs *fileStore) Truncate(seq uint64) error {
// Check for request to reset.
Expand Down Expand Up @@ -7386,6 +7474,10 @@ func (fs *fileStore) Truncate(seq uint64) error {
fs.mu.Unlock()
return err
}
// Collect all tombstones, we want to put these back so we can survive
// a restore without index.db properly.
var tombs []msgId
tombs = append(tombs, nlmb.tombs()...)

var purged, bytes uint64

Expand All @@ -7403,6 +7495,8 @@ func (fs *fileStore) Truncate(seq uint64) error {
getLastMsgBlock := func() *msgBlock { return fs.blks[len(fs.blks)-1] }
for mb := getLastMsgBlock(); mb != nlmb; mb = getLastMsgBlock() {
mb.mu.Lock()
// We do this to load tombs.
tombs = append(tombs, mb.tombs()...)
purged += mb.msgs
bytes += mb.bytes
fs.removeMsgBlock(mb)
Expand All @@ -7425,11 +7519,29 @@ func (fs *fileStore) Truncate(seq uint64) error {
// Reset our subject lookup info.
fs.resetGlobalPerSubjectInfo()

// Always create new write block.
fs.newMsgBlockForWrite()

// Write any tombstones as needed.
for _, tomb := range tombs {
if tomb.seq <= lsm.seq {
fs.lmb.writeTombstone(tomb.seq, tomb.ts)
}
}

// Any existing state file no longer applicable. We will force write a new one
// after we release the lock.
os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))
fs.dirty++

cb := fs.scb
fs.mu.Unlock()

// Force a new index.db to be written.
if purged > 0 {
fs.forceWriteFullState()
}

if cb != nil {
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
}
Expand Down Expand Up @@ -8251,26 +8363,6 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool) {
msgPre := msgDir + "/"
var bbuf []byte

const minLen = 32
sfn := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)
if buf, err := os.ReadFile(sfn); err == nil && len(buf) >= minLen {
if fs.aek != nil {
ns := fs.aek.NonceSize()
buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:len(buf)-highwayhash.Size64], nil)
if err == nil {
// Redo hash checksum at end on plaintext.
fs.mu.Lock()
hh.Reset()
hh.Write(buf)
buf = fs.hh.Sum(buf)
fs.mu.Unlock()
}
}
if err == nil && writeFile(msgPre+streamStreamStateFile, buf) != nil {
return
}
}

// Now do messages themselves.
for _, mb := range blks {
if mb.pendingWriteSize() > 0 {
Expand Down Expand Up @@ -8309,6 +8401,30 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool) {
}
}

// Do index.db last. We will force a write as well.
// Write out full state as well before proceeding.
if err := fs.forceWriteFullState(); err == nil {
const minLen = 32
sfn := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)
if buf, err := os.ReadFile(sfn); err == nil && len(buf) >= minLen {
if fs.aek != nil {
ns := fs.aek.NonceSize()
buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:len(buf)-highwayhash.Size64], nil)
if err == nil {
// Redo hash checksum at end on plaintext.
fs.mu.Lock()
hh.Reset()
hh.Write(buf)
buf = fs.hh.Sum(buf)
fs.mu.Unlock()
}
}
if err == nil && writeFile(msgPre+streamStreamStateFile, buf) != nil {
return
}
}
}

// Bail if no consumers requested.
if !includeConsumers {
return
Expand Down Expand Up @@ -8381,9 +8497,6 @@ func (fs *fileStore) Snapshot(deadline time.Duration, checkMsgs, includeConsumer
}
}

// Write out full state as well before proceeding.
fs.writeFullState()

pr, pw := net.Pipe()

// Set a write deadline here to protect ourselves.
Expand Down
Loading

0 comments on commit 83c77b4

Please sign in to comment.