Skip to content

Commit

Permalink
fix tx_buffer inconsistency if there is key value overwrite.
Browse files Browse the repository at this point in the history
Signed-off-by: Siyuan Zhang <sizhang@google.com>
  • Loading branch information
siyuanfoundation committed Jan 3, 2024
1 parent c1433f1 commit 3ff51d8
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 2 deletions.
68 changes: 68 additions & 0 deletions server/storage/backend/batch_tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,74 @@ func TestRangeAfterDeleteMatch(t *testing.T) {
checkTxnResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
}

func TestRangeAfterOverwriteMatch(t *testing.T) {
b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, b)

tx := b.BatchTx()

tx.Lock()
tx.UnsafeCreateBucket(schema.Test)
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar"))
tx.Unlock()
tx.Commit()

checkTxnResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)

tx.Lock()
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar0"))
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar2"))
tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar10"))
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar1"))
tx.Unlock()

checkTxnResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
checkTxnResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo1"), nil, 0)

tx.Lock()
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar3"))
tx.Unlock()

checkTxnResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), []byte("foo3"), 1)
}

func TestRangeAfterDeleteAndOverwriteMatch(t *testing.T) {
b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, b)

tx := b.BatchTx()

tx.Lock()
tx.UnsafeCreateBucket(schema.Test)
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar"))
tx.Unlock()
tx.Commit()

checkTxnResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)

tx.Lock()
tx.UnsafeDelete(schema.Test, []byte("foo"))
tx.Unlock()
checkTxnResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)

tx.Lock()
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar0"))
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar2"))
tx.UnsafePut(schema.Test, []byte("foo1"), []byte("bar10"))
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar1"))
tx.Unlock()

checkTxnResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
checkTxnResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo1"), nil, 0)

tx.Lock()
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar3"))
tx.Unlock()

checkTxnResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
checkTxnResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo1"), nil, 0)
}

func checkTxnResponseMatch(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, key, endKey []byte, limit int64) {
tx.Lock()
ks1, vs1 := tx.UnsafeRange(schema.Test, key, endKey, limit)
Expand Down
8 changes: 6 additions & 2 deletions server/storage/backend/tx_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
if !ok {
delete(txw.buckets, k)
txr.buckets[k] = wb
wb.dedupe()
continue
}
if seq, ok := txw.bucket2seq[k]; ok && !seq && wb.used > 1 {
Expand Down Expand Up @@ -203,9 +204,12 @@ func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) {
if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 {
return
}
sort.Stable(bb)
bb.dedupe()
}

// remove duplicates, using only newest update
// dedupe removes duplicates, using only newest update
func (bb *bucketBuffer) dedupe() {
sort.Stable(bb)
widx := 0
for ridx := 1; ridx < bb.used; ridx++ {
if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {
Expand Down

0 comments on commit 3ff51d8

Please sign in to comment.