Skip to content

Commit

Permalink
commit bbolt transaction if there is any pending deleting operations
Browse files Browse the repository at this point in the history
Signed-off-by: Siyuan Zhang <sizhang@google.com>
  • Loading branch information
siyuanfoundation committed Jan 9, 2024
1 parent 5b8f46e commit 906e962
Showing 1 changed file with 34 additions and 2 deletions.
36 changes: 34 additions & 2 deletions server/storage/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ func (t *batchTx) commit(stop bool) {

type batchTxBuffered struct {
batchTx
buf txWriteBuffer
buf txWriteBuffer
pendingDeleteOperations int
}

func newBatchTxBuffered(backend *backend) *batchTxBuffered {
Expand All @@ -310,7 +311,27 @@ func (t *batchTxBuffered) Unlock() {
t.buf.writeback(&t.backend.readTx.buf)
// gofail: var afterWritebackBuf struct{}
t.backend.readTx.Unlock()
if t.pending >= t.backend.batchLimit {
// We commit the transaction when the number of pending operations
// reaches the configured limit(batchLimit) to prevent it from
// becoming excessively large.
//
// But we also need to commit the transaction immediately if there
// is any pending deleting operation, otherwise etcd might run into
// a situation that it haven't finished committing the data into backend
// storage (note: etcd periodically commits the bbolt transactions
// instead of on each request) when it applies next request. Accordingly,
// etcd may still read the stale data from bbolt when processing next
// request. So it breaks the linearizability.
//
// Note we don't need to commit the transaction for put requests if
// it doesn't exceed the batch limit, because there is a buffer on top
// of the bbolt. Each time when etcd reads data from backend storage,
// it will read data from both bbolt and the buffer. But there is no
// such a buffer for delete requests.
//
// Please also refer to
// https://github.com/etcd-io/etcd/pull/17119#issuecomment-1857547158
if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 {
t.commit(false)
}
}
Expand Down Expand Up @@ -356,6 +377,7 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) {
}

t.batchTx.commit(stop)
t.pendingDeleteOperations = 0

if !stop {
t.backend.readTx.tx = t.backend.begin(false)
Expand All @@ -371,3 +393,13 @@ func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
t.batchTx.UnsafeSeqPut(bucket, key, value)
t.buf.putSeq(bucket, key, value)
}

func (t *batchTxBuffered) UnsafeDelete(bucketType Bucket, key []byte) {
t.batchTx.UnsafeDelete(bucketType, key)
t.pendingDeleteOperations++
}

func (t *batchTxBuffered) UnsafeDeleteBucket(bucket Bucket) {
t.batchTx.UnsafeDeleteBucket(bucket)
t.pendingDeleteOperations++
}

0 comments on commit 906e962

Please sign in to comment.