Skip to content

Commit

Permalink
mvcc: Clone the key index for compaction and lock on each item
Browse files Browse the repository at this point in the history
For compaction, clone the original Btree for traversal purposes, so as to
not hold the lock for the duration of compaction. This allows read/write
throughput by not blocking when the index tree is large (> 1M entries).
  • Loading branch information
braintreeps committed Apr 17, 2018
1 parent 46e19d2 commit fd4e132
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 27 deletions.
43 changes: 18 additions & 25 deletions mvcc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package mvcc
import (
"sort"
"sync"

"github.com/google/btree"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -185,27 +184,32 @@ func (ti *treeIndex) RangeSince(key, end []byte, rev int64) []revision {

func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
available := make(map[revision]struct{})
var emptyki []*keyIndex
if ti.lg != nil {
ti.lg.Info("compact tree index", zap.Int64("revision", rev))
} else {
plog.Printf("store.index: compact %d", rev)
}
// TODO: do not hold the lock for long time?
// This is probably OK. Compacting 10M keys takes O(10ms).
ti.Lock()
defer ti.Unlock()
ti.tree.Ascend(compactIndex(rev, available, &emptyki))
for _, ki := range emptyki {
item := ti.tree.Delete(ki)
if item == nil {
if ti.lg != nil {
ti.lg.Panic("failed to delete during compaction")
} else {
plog.Panic("store.index: unexpected delete failure during compaction")
clone := ti.tree.Clone()
ti.Unlock()

clone.Ascend(func(item btree.Item) bool {
keyi := item.(*keyIndex)
ti.Lock()
keyi.compact(rev, available)
if keyi.isEmpty() {
item := ti.tree.Delete(keyi)
if item == nil {
if ti.lg != nil {
ti.lg.Panic("failed to delete during compaction")
} else {
plog.Panic("store.index: unexpected delete failure during compaction")
}
}
}
}
ti.Unlock()
return true
})
return available
}

Expand All @@ -222,17 +226,6 @@ func (ti *treeIndex) Keep(rev int64) map[revision]struct{} {
return available
}

func compactIndex(rev int64, available map[revision]struct{}, emptyki *[]*keyIndex) func(i btree.Item) bool {
return func(i btree.Item) bool {
keyi := i.(*keyIndex)
keyi.compact(rev, available)
if keyi.isEmpty() {
*emptyki = append(*emptyki, keyi)
}
return true
}
}

func (ti *treeIndex) Equal(bi index) bool {
b := bi.(*treeIndex)

Expand Down
25 changes: 25 additions & 0 deletions mvcc/index_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package mvcc

import (
"testing"
)

func BenchmarkIndexCompact1(b *testing.B) { benchmarkIndexCompact(b, 1) }
func BenchmarkIndexCompact100(b *testing.B) { benchmarkIndexCompact(b, 100) }
func BenchmarkIndexCompact10000(b *testing.B) { benchmarkIndexCompact(b, 10000) }
func BenchmarkIndexCompact100000(b *testing.B) { benchmarkIndexCompact(b, 100000) }
func BenchmarkIndexCompact1000000(b *testing.B) { benchmarkIndexCompact(b, 1000000) }

func benchmarkIndexCompact(b *testing.B, size int) {
kvindex := newTreeIndex()

bytesN := 64
keys := createBytesSlice(bytesN, size)
for i := 1; i < size; i++ {
kvindex.Put(keys[i], revision {main: int64(i), sub: int64(i)})
}
b.ResetTimer()
for i := 1; i < b.N; i++ {
kvindex.Compact(int64(i))
}
}
4 changes: 2 additions & 2 deletions mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,10 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
// ensure that desired compaction is persisted
s.b.ForceCommit()

keep := s.kvindex.Compact(rev)
ch := make(chan struct{})
var j = func(ctx context.Context) {
keep := s.kvindex.Compact(rev)
indexCompactionPauseDurations.Observe(float64(time.Since(start) / time.Millisecond))
if ctx.Err() != nil {
s.compactBarrier(ctx, ch)
return
Expand All @@ -261,7 +262,6 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {

s.fifoSched.Schedule(j)

indexCompactionPauseDurations.Observe(float64(time.Since(start) / time.Millisecond))
return ch, nil
}

Expand Down

0 comments on commit fd4e132

Please sign in to comment.