From b880baba077f45917be1f38961910d6d7438ae2e Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Wed, 5 Feb 2020 08:02:06 -0500 Subject: [PATCH 1/6] internal/cache: move the bulk of allocations off the Go heap Use C malloc/free for the bulk of cache allocations. This required elevating `cache.Value` to a public citizen of the cache package. A distinction is made between manually managed memory and automatically managed memory. Weak handles can only be made from values stored in automatically managed memory. Note that weak handles are only used for the index, filter, and range-del blocks, so the number of weak handles is O(num-tables). A finalizer is set on `*allocCache` and `*Cache` in order to ensure that any outstanding manually allocated memory is released when these objects are collected. When `invariants` are enabled, finalizers are also set on `*Value` and sstable iterators to ensure that we're not leaking manually managed memory. Fixes #11 --- db_test.go | 6 +- internal/cache/alloc.go | 29 +++- internal/cache/alloc_test.go | 8 +- internal/cache/clockpro.go | 250 +++++++++++++++++++---------- internal/cache/clockpro_test.go | 69 +++++--- internal/cache/value.go | 60 +++++++ internal/cache/value_invariants.go | 40 +++++ internal/cache/value_normal.go | 32 ++++ internal/cache/value_notracing.go | 30 ++++ internal/cache/value_tracing.go | 49 ++++++ internal/manual/manual.go | 48 ++++++ internal/manual/manual_32bit.go | 12 ++ internal/manual/manual_64bit.go | 12 ++ internal/manual/manual_nocgo.go | 19 +++ sstable/block.go | 12 +- sstable/data_test.go | 1 + sstable/reader.go | 169 ++++++++++++++----- sstable/reader_test.go | 24 +++ sstable/table_test.go | 7 +- sstable/writer_test.go | 11 +- testdata/event_listener | 2 +- testdata/metrics | 4 +- 22 files changed, 719 insertions(+), 175 deletions(-) create mode 100644 internal/cache/value.go create mode 100644 internal/cache/value_invariants.go create mode 100644 internal/cache/value_normal.go create mode 100644 internal/cache/value_notracing.go create mode 100644 internal/cache/value_tracing.go create mode 100644 internal/manual/manual.go create mode 100644 internal/manual/manual_32bit.go create mode 100644 internal/manual/manual_64bit.go create mode 100644 internal/manual/manual_nocgo.go diff --git a/db_test.go b/db_test.go index 6932a04464..8ca894edb6 100644 --- a/db_test.go +++ b/db_test.go @@ -689,6 +689,7 @@ func TestIterLeak(t *testing.T) { t.Fatal(err) } } else { + defer iter.Close() if err := d.Close(); err == nil { t.Fatalf("expected failure, but found success") } else if !strings.HasPrefix(err.Error(), "leaked iterators:") { @@ -714,7 +715,10 @@ func TestMemTableReservation(t *testing.T) { // Add a block to the cache. Note that the memtable size is larger than the // cache size, so opening the DB should cause this block to be evicted. tmpID := opts.Cache.NewID() - opts.Cache.Set(tmpID, 0, 0, []byte("hello world")) + helloWorld := []byte("hello world") + value := opts.Cache.AllocManual(len(helloWorld)) + copy(value.Buf(), helloWorld) + opts.Cache.Set(tmpID, 0, 0, value).Release() d, err := Open("", opts) if err != nil { diff --git a/internal/cache/alloc.go b/internal/cache/alloc.go index 846a195a3a..58db9d066e 100644 --- a/internal/cache/alloc.go +++ b/internal/cache/alloc.go @@ -5,9 +5,11 @@ package cache import ( + "runtime" "sync" "time" + "github.com/cockroachdb/pebble/internal/manual" "golang.org/x/exp/rand" ) @@ -30,8 +32,17 @@ var allocPool = sync.Pool{ }, } -// allocNew allocates a slice of size n. The use of sync.Pool provides a -// per-cpu cache of allocCache structures to allocate from. +// allocNew allocates a non-garbage collected slice of size n. Every call to +// allocNew() MUST be paired with a call to allocFree(). Failure to do so will +// result in a memory leak. The use of sync.Pool provides a per-cpu cache of +// allocCache structures to allocate from. +// +// TODO(peter): Is the allocCache still necessary for performance? Before the +// introduction of manual memory management, the allocCache dramatically +// reduced GC pressure by reducing allocation bandwidth. It no longer serves +// this purpose because manual.{New,Free} don't produce any GC pressure. Will +// need to run benchmark workloads to see if this can be removed which would +// allow the removal of the one required use of runtime.SetFinalizer. func allocNew(n int) []byte { a := allocPool.Get().(*allocCache) b := a.alloc(n) @@ -73,12 +84,20 @@ func newAllocCache() *allocCache { bufs: make([][]byte, 0, allocCacheCountLimit), } c.rnd.Seed(uint64(time.Now().UnixNano())) + runtime.SetFinalizer(c, freeAllocCache) return c } +func freeAllocCache(obj interface{}) { + c := obj.(*allocCache) + for i := range c.bufs { + manual.Free(c.bufs[i]) + } +} + func (c *allocCache) alloc(n int) []byte { if n < allocCacheMinSize || n >= allocCacheMaxSize { - return make([]byte, n) + return manual.New(n) } class := sizeToClass(n) @@ -92,12 +111,13 @@ func (c *allocCache) alloc(n int) []byte { } } - return make([]byte, n, classToSize(class)) + return manual.New(classToSize(class))[:n] } func (c *allocCache) free(b []byte) { n := cap(b) if n < allocCacheMinSize || n >= allocCacheMaxSize { + manual.Free(b) return } b = b[:n:n] @@ -117,6 +137,7 @@ func (c *allocCache) free(b []byte) { // are biased, but that is fine for the usage here. j := (uint32(len(c.bufs)) * (uint32(c.rnd.Uint64()) & ((1 << 16) - 1))) >> 16 c.size -= cap(c.bufs[j]) + manual.Free(c.bufs[j]) c.bufs[i], c.bufs[j] = nil, c.bufs[i] c.bufs = c.bufs[:i] } diff --git a/internal/cache/alloc_test.go b/internal/cache/alloc_test.go index d5a5405184..a5b83022ed 100644 --- a/internal/cache/alloc_test.go +++ b/internal/cache/alloc_test.go @@ -7,12 +7,14 @@ package cache import ( "testing" "unsafe" + + "github.com/cockroachdb/pebble/internal/manual" ) func TestAllocCache(t *testing.T) { c := newAllocCache() for i := 0; i < 64; i++ { - c.free(make([]byte, 1025)) + c.free(manual.New(1025)) if c.size == 0 { t.Fatalf("expected cache size to be non-zero") } @@ -34,7 +36,7 @@ func TestAllocCache(t *testing.T) { func TestAllocCacheEvict(t *testing.T) { c := newAllocCache() for i := 0; i < allocCacheCountLimit; i++ { - c.free(make([]byte, 1024)) + c.free(manual.New(1024)) } bufs := make([][]byte, allocCacheCountLimit) @@ -61,7 +63,7 @@ func BenchmarkAllocCache(b *testing.B) { // Populate the cache with buffers if one size class. c := newAllocCache() for i := 0; i < allocCacheCountLimit; i++ { - c.free(make([]byte, 1024)) + c.free(manual.New(1024)) } // Benchmark allocating buffers of a different size class. diff --git a/internal/cache/clockpro.go b/internal/cache/clockpro.go index 1e47d22dfb..8282a4bed6 100644 --- a/internal/cache/clockpro.go +++ b/internal/cache/clockpro.go @@ -18,6 +18,7 @@ package cache // import "github.com/cockroachdb/pebble/internal/cache" import ( + "fmt" "runtime" "sync" "sync/atomic" @@ -55,31 +56,6 @@ type key struct { offset uint64 } -type value struct { - buf []byte - // The number of references on the value. When refs drops to 0, the buf - // associated with the value may be reused. This is a form of manual memory - // management. See Cache.Free. - refs int32 -} - -func newValue(b []byte) *value { - if b == nil { - return nil - } - // A value starts with 2 references. One for the cache, and one for the - // handle that will be returned. - return &value{buf: b, refs: 2} -} - -func (v *value) acquire() { - atomic.AddInt32(&v.refs, 1) -} - -func (v *value) release() bool { - return atomic.AddInt32(&v.refs, -1) == 0 -} - type entry struct { key key val unsafe.Pointer @@ -153,37 +129,25 @@ func (e *entry) unlinkFile() *entry { return next } -func (e *entry) setValue(v *value) { +func (e *entry) setValue(v *Value) { if old := e.getValue(); old != nil { - if old.release() { - allocFree(old.buf) - } + old.release() } atomic.StorePointer(&e.val, unsafe.Pointer(v)) } -func (e *entry) getValue() *value { - return (*value)(atomic.LoadPointer(&e.val)) -} - -func (e *entry) Get() []byte { - v := e.getValue() - if v == nil { - return nil - } - atomic.StoreInt32(&e.referenced, 1) - // Record a cache hit because the entry is being used as a WeakHandle and - // successfully avoided a more expensive shard.Get() operation. - atomic.AddInt64(&e.shard.hits, 1) - return v.buf +func (e *entry) getValue() *Value { + return (*Value)(atomic.LoadPointer(&e.val)) } // Handle provides a strong reference to an entry in the cache. The reference // does not pin the entry in the cache, but it does prevent the underlying byte -// slice from being reused. +// slice from being reused. When entry is non-nil, value is initialized to +// entry.val. Over the lifetime of the handle (until Release is called), +// entry.val may change, but value will remain unchanged. type Handle struct { entry *entry - value *value + value *Value } // Get returns the value stored in handle. @@ -199,9 +163,7 @@ func (h Handle) Get() []byte { // Release releases the reference to the cache entry. func (h Handle) Release() { if h.value != nil { - if h.value.release() { - allocFree(h.value.buf) - } + h.value.release() } } @@ -211,26 +173,32 @@ func (h Handle) Release() { // the reference count on the value is incremented which will prevent the // associated buffer from ever being reused until it is GC'd by the Go // runtime. It is not necessary to call Handle.Release() after calling Weak(). -func (h Handle) Weak() WeakHandle { - if h.entry == nil { - return nil // return a nil interface, not (*entry)(nil) +func (h Handle) Weak() *WeakHandle { + if h.value.manual() { + panic("pebble: cannot make manual Value into a WeakHandle") } - // Add a reference to the value which will never be cleared. This is - // necessary because WeakHandle.Get() performs an atomic load of the value, - // but we need to ensure that nothing can concurrently be freeing the buffer - // for reuse. Rather than add additional locking to this code path, we add a - // reference here so that the underlying buffer can never be reused. And we - // rely on the Go runtime to eventually GC the buffer. - h.value.acquire() - return h.entry + return (*WeakHandle)(h.entry) } // WeakHandle provides a "weak" reference to an entry in the cache. A weak // reference allows the entry to be evicted, but also provides fast access -type WeakHandle interface { - // Get retrieves the value associated with the weak handle, returning nil if - // no value is present. - Get() []byte +type WeakHandle entry + +// Get retrieves the value associated with the weak handle, returning nil if no +// value is present. The calls to Get must be balanced with the calls to +// Release. +func (h *WeakHandle) Get() []byte { + e := (*entry)(h) + v := e.getValue() + if v == nil { + return nil + } + + atomic.StoreInt32(&e.referenced, 1) + // Record a cache hit because the entry is being used as a WeakHandle and + // successfully avoided a more expensive shard.Get() operation. + atomic.AddInt64(&e.shard.hits, 1) + return v.buf } type shard struct { @@ -257,7 +225,7 @@ type shard struct { func (c *shard) Get(id, fileNum, offset uint64) Handle { c.mu.RLock() e := c.blocks[key{fileKey{id, fileNum}, offset}] - var value *value + var value *Value if e != nil { value = e.getValue() if value != nil { @@ -276,54 +244,72 @@ func (c *shard) Get(id, fileNum, offset uint64) Handle { return Handle{value: value} } -func (c *shard) Set(id, fileNum, offset uint64, value []byte) Handle { +func (c *shard) Set(id, fileNum, offset uint64, value *Value) Handle { + if n := atomic.LoadInt32(&value.refs); n != 1 && n >= 0 { + panic(fmt.Sprintf("pebble: Value has already been added to the cache: refs=%d", n)) + } + c.mu.Lock() defer c.mu.Unlock() k := key{fileKey{id, fileNum}, offset} e := c.blocks[k] - v := newValue(value) switch { case e == nil: // no cache entry? add it - e = &entry{ptype: etCold, key: k, size: int64(len(value)), shard: c} + e = &entry{ptype: etCold, key: k, size: int64(len(value.buf)), shard: c} e.init() - e.setValue(v) + e.setValue(value) if c.metaAdd(k, e) { + value.trace("add-cold") + value.acquire() // add reference for the cache c.sizeCold += e.size + } else { + value.trace("skip-cold") } case e.getValue() != nil: // cache entry was a hot or cold page - e.setValue(v) + value.acquire() // add reference for the cache + e.setValue(value) atomic.StoreInt32(&e.referenced, 1) - delta := int64(len(value)) - e.size - e.size = int64(len(value)) + delta := int64(len(value.buf)) - e.size + e.size = int64(len(value.buf)) if e.ptype == etHot { + value.trace("add-hot") c.sizeHot += delta } else { + value.trace("add-cold") c.sizeCold += delta } c.evict() default: // cache entry was a test page + c.metaDel(e) + c.sizeTest -= e.size + c.coldSize += e.size if c.coldSize > c.targetSize() { c.coldSize = c.targetSize() } + atomic.StoreInt32(&e.referenced, 0) - e.setValue(v) + e.setValue(value) e.ptype = etHot - c.sizeTest -= e.size - c.metaDel(e) if c.metaAdd(k, e) { + value.trace("add-hot") + value.acquire() // add reference for the cache c.sizeHot += e.size + } else { + value.trace("skip-hot") } } - return Handle{entry: e, value: v} + // Values are initialized with a reference count of 1. That reference count + // is being transferred to the returned Handle. + return Handle{entry: e, value: value} } // Delete deletes the cached value for the specified file and offset. @@ -415,6 +401,11 @@ func (c *shard) metaAdd(key key, e *entry) bool { } func (c *shard) metaDel(e *entry) { + if value := e.getValue(); value != nil { + value.trace("metaDel") + } + e.setValue(nil) + delete(c.blocks, e.key) if e == c.handHot { @@ -543,7 +534,65 @@ type Metrics struct { Misses int64 } -// Cache ... +// Cache implements Pebble's sharded block cache. The Clock-PRO algorithm is +// used for page replacement +// (http://static.usenix.org/event/usenix05/tech/general/full_papers/jiang/jiang_html/html.html). In +// order to provide better concurrency, 2 x NumCPUs shards are created, with +// each shard being given 1/n of the target cache size. The Clock-PRO algorithm +// is run independently on each shard. +// +// Blocks are keyed by an (id, fileNum, offset) triple. The ID is a namespace +// for file numbers and allows a single Cache to be shared between multiple +// Pebble instances. The fileNum and offset refer to an sstable file number and +// the offset of the block within the file. Because sstables are immutable and +// file numbers are never reused, (fileNum,offset) are unique for the lifetime +// of a Pebble instance. +// +// In addition to maintaining a map from (fileNum,offset) to data, each shard +// maintains a map of the cached blocks for a particular fileNum. This allows +// efficient eviction of all of the blocks for a file which is used when an +// sstable is deleted from disk. +// +// Strong vs Weak Handles +// +// When a block is retrieved from the cache, a Handle is returned. The Handle +// maintains a reference to the associated value and will prevent the value +// from being removed. The caller must call Handle.Release() when they are done +// using the value (failure to do so will result in a memory leak). Until +// Handle.Release() is called, the value is pinned in memory. +// +// A Handle can be transformed into a WeakHandle by a call to Handle.Weak(). A +// WeakHandle does not pin the associated value in memory, and instead allows +// the value to be evicted from the cache and reclaimed by the Go GC. The value +// associated with a WeakHandle can be retrieved by WeakHandle.Get() which may +// return nil if the value has been evicted from the cache. WeakHandle's are +// useful for avoiding the overhad of a cache lookup. They are used for sstable +// index, filter, and range-del blocks, which are frequently accessed, almost +// always in the cache, but which we want to allow to be evicted under memory +// pressure. +// +// Memory Management +// +// In order to reduce pressure on the Go GC, manual memory management is +// performed for the majority of blocks in the cache. Manual memory management +// is selected using Cache.AllocManual() to allocate a value, rather than +// Cache.AllocAuto(). Note that manual values cannot be used for +// WeakHandles. The general rule is to use AllocAuto when a WeakHandle will be +// retrieved, and AllocManual in all other cases. +// +// Manual memory management is performed by calling into C.{malloc,free} to +// allocate memory. Cache.Values are reference counted and the memory backing a +// manual value is freed when the reference count drops to 0. +// +// Manual memory management brings the possibility of memory leaks. It is +// imperative that every Handle returned by Cache.{Get,Set} is eventually +// released. The "invariants" build tag enables a leak detection facility that +// places a GC finalizer on cache.Value. When the cache.Value finalizer is run, +// if the underlying buffer is still present a leak has occurred. The "tracing" +// build tag enables tracing of cache.Value reference count manipulation and +// eases finding where a leak has occurred. These two facilities are usually +// used in combination by specifying `-tags invariants,tracing`. Note that +// "tracing" produces a significant slowdown, while "invariants" does not. type Cache struct { maxSize int64 idAlloc uint64 @@ -556,6 +605,17 @@ func New(size int64) *Cache { return newShards(size, 2*runtime.NumCPU()) } +func clearCache(obj interface{}) { + c := obj.(*Cache) + for i := range c.shards { + s := &c.shards[i] + s.mu.Lock() + s.maxSize = 0 + s.evict() + s.mu.Unlock() + } +} + func newShards(size int64, shards int) *Cache { c := &Cache{ maxSize: size, @@ -570,6 +630,11 @@ func newShards(size int64, shards int) *Cache { files: make(map[fileKey]*entry), } } + // TODO(peter): This finalizer is used to clear the cache when the Cache + // itself is GC'd. Investigate making this explicit, and then changing the + // finalizer to only be installed when invariants.Enabled is true and to only + // check that all of the manual memory has been freed. + runtime.SetFinalizer(c, clearCache) return c } @@ -611,8 +676,8 @@ func (c *Cache) Get(id, fileNum, offset uint64) Handle { // Set sets the cache value for the specified file and offset, overwriting an // existing value if present. A Handle is returned which provides faster // retrieval of the cached value than Get (lock-free and avoidance of the map -// lookup). -func (c *Cache) Set(id, fileNum, offset uint64, value []byte) Handle { +// lookup). The value must have been allocated by Cache.Alloc. +func (c *Cache) Set(id, fileNum, offset uint64, value *Value) Handle { return c.getShard(id, fileNum, offset).Set(id, fileNum, offset, value) } @@ -645,16 +710,29 @@ func (c *Cache) Size() int64 { return size } -// Alloc allocates a byte slice of the specified size, possibly reusing -// previously allocated but unused memory. -func (c *Cache) Alloc(n int) []byte { - return allocNew(n) +// AllocManual allocates a byte slice of the specified size, possibly reusing +// previously allocated but unused memory. The memory backing the value is +// manually managed. The caller MUST either add the value to the cache (via +// Cache.Set), or release the value (via Cache.Free). Failure to do so will +// result in a memory leak. +func (c *Cache) AllocManual(n int) *Value { + return newManualValue(n) } -// Free frees the specified slice of memory. The buffer will possibly be -// reused, making it invalid to use the buffer after calling Free. -func (c *Cache) Free(b []byte) { - allocFree(b) +// AllocAuto allocates an automatically managed value using buf as the internal +// buffer. +func (c *Cache) AllocAuto(buf []byte) *Value { + return newAutoValue(buf) +} + +// Free frees the specified value. The buffer associated with the value will +// possibly be reused, making it invalid to use the buffer after calling +// Free. Do not call Free on a value that has been added to the cache. +func (c *Cache) Free(v *Value) { + if n := atomic.LoadInt32(&v.refs); n > 1 { + panic(fmt.Sprintf("pebble: Value has been added to the cache: refs=%d", n)) + } + v.release() } // Reserve N bytes in the cache. This effectively shrinks the size of the cache diff --git a/internal/cache/clockpro_test.go b/internal/cache/clockpro_test.go index 5c535092dd..bf6bc4301c 100644 --- a/internal/cache/clockpro_test.go +++ b/internal/cache/clockpro_test.go @@ -37,7 +37,9 @@ func TestCache(t *testing.T) { var hit bool h := cache.Get(1, uint64(key), 0) if v := h.Get(); v == nil { - cache.Set(1, uint64(key), 0, append([]byte(nil), fields[0][0])) + value := cache.AllocManual(1) + value.Buf()[0] = fields[0][0] + cache.Set(1, uint64(key), 0, value).Release() } else { hit = true if !bytes.Equal(v, fields[0][:1]) { @@ -52,10 +54,21 @@ func TestCache(t *testing.T) { } } +func testManualValue(cache *Cache, s string, repeat int) *Value { + b := bytes.Repeat([]byte(s), repeat) + v := cache.AllocManual(len(b)) + copy(v.Buf(), b) + return v +} + +func testAutoValue(cache *Cache, s string, repeat int) *Value { + return cache.AllocAuto(bytes.Repeat([]byte(s), repeat)) +} + func TestWeakHandle(t *testing.T) { cache := newShards(5, 1) - cache.Set(1, 1, 0, bytes.Repeat([]byte("a"), 5)) - h := cache.Set(1, 0, 0, bytes.Repeat([]byte("b"), 5)) + cache.Set(1, 1, 0, testAutoValue(cache, "a", 5)).Release() + h := cache.Set(1, 0, 0, testAutoValue(cache, "b", 5)) if v := h.Get(); string(v) != "bbbbb" { t.Fatalf("expected bbbbb, but found %v", v) } @@ -64,7 +77,7 @@ func TestWeakHandle(t *testing.T) { if v := w.Get(); string(v) != "bbbbb" { t.Fatalf("expected bbbbb, but found %v", v) } - cache.Set(1, 2, 0, bytes.Repeat([]byte("a"), 5)) + cache.Set(1, 2, 0, testManualValue(cache, "a", 5)).Release() if v := w.Get(); v != nil { t.Fatalf("expected nil, but found %s", v) } @@ -72,9 +85,9 @@ func TestWeakHandle(t *testing.T) { func TestCacheDelete(t *testing.T) { cache := newShards(100, 1) - cache.Set(1, 0, 0, bytes.Repeat([]byte("a"), 5)) - cache.Set(1, 1, 0, bytes.Repeat([]byte("a"), 5)) - cache.Set(1, 2, 0, bytes.Repeat([]byte("a"), 5)) + cache.Set(1, 0, 0, testManualValue(cache, "a", 5)).Release() + cache.Set(1, 1, 0, testManualValue(cache, "a", 5)).Release() + cache.Set(1, 2, 0, testManualValue(cache, "a", 5)).Release() if expected, size := int64(15), cache.Size(); expected != size { t.Fatalf("expected cache size %d, but found %d", expected, size) } @@ -84,9 +97,13 @@ func TestCacheDelete(t *testing.T) { } if h := cache.Get(1, 0, 0); h.Get() == nil { t.Fatalf("expected to find block 0/0") + } else { + h.Release() } if h := cache.Get(1, 1, 0); h.Get() != nil { t.Fatalf("expected to not find block 1/0") + } else { + h.Release() } // Deleting a non-existing block does nothing. cache.Delete(1, 1, 0) @@ -97,11 +114,11 @@ func TestCacheDelete(t *testing.T) { func TestEvictFile(t *testing.T) { cache := newShards(100, 1) - cache.Set(1, 0, 0, bytes.Repeat([]byte("a"), 5)) - cache.Set(1, 1, 0, bytes.Repeat([]byte("a"), 5)) - cache.Set(1, 2, 0, bytes.Repeat([]byte("a"), 5)) - cache.Set(1, 2, 1, bytes.Repeat([]byte("a"), 5)) - cache.Set(1, 2, 2, bytes.Repeat([]byte("a"), 5)) + cache.Set(1, 0, 0, testManualValue(cache, "a", 5)).Release() + cache.Set(1, 1, 0, testManualValue(cache, "a", 5)).Release() + cache.Set(1, 2, 0, testManualValue(cache, "a", 5)).Release() + cache.Set(1, 2, 1, testManualValue(cache, "a", 5)).Release() + cache.Set(1, 2, 2, testManualValue(cache, "a", 5)).Release() if expected, size := int64(25), cache.Size(); expected != size { t.Fatalf("expected cache size %d, but found %d", expected, size) } @@ -123,14 +140,14 @@ func TestEvictAll(t *testing.T) { // Verify that it is okay to evict all of the data from a cache. Previously // this would trigger a nil-pointer dereference. cache := newShards(100, 1) - cache.Set(1, 0, 0, bytes.Repeat([]byte("a"), 101)) - cache.Set(1, 1, 0, bytes.Repeat([]byte("a"), 101)) + cache.Set(1, 0, 0, testManualValue(cache, "a", 101)).Release() + cache.Set(1, 1, 0, testManualValue(cache, "a", 101)).Release() } func TestMultipleDBs(t *testing.T) { cache := newShards(100, 1) - cache.Set(1, 0, 0, bytes.Repeat([]byte("a"), 5)) - cache.Set(2, 0, 0, bytes.Repeat([]byte("b"), 5)) + cache.Set(1, 0, 0, testManualValue(cache, "a", 5)).Release() + cache.Set(2, 0, 0, testManualValue(cache, "b", 5)).Release() if expected, size := int64(10), cache.Size(); expected != size { t.Fatalf("expected cache size %d, but found %d", expected, size) } @@ -144,31 +161,31 @@ func TestMultipleDBs(t *testing.T) { } h = cache.Get(2, 0, 0) if v := h.Get(); string(v) != "bbbbb" { - t.Fatalf("expected bbbbb, but found %v", v) + t.Fatalf("expected bbbbb, but found %s", v) } } func TestZeroSize(t *testing.T) { cache := newShards(0, 1) - cache.Set(1, 0, 0, bytes.Repeat([]byte("a"), 5)) + cache.Set(1, 0, 0, testManualValue(cache, "a", 5)).Release() } func TestReserve(t *testing.T) { cache := newShards(4, 2) - cache.Set(1, 0, 0, []byte("a")) - cache.Set(2, 0, 0, []byte("a")) + cache.Set(1, 0, 0, testManualValue(cache, "a", 1)).Release() + cache.Set(2, 0, 0, testManualValue(cache, "a", 1)).Release() require.EqualValues(t, 2, cache.Size()) r := cache.Reserve(1) require.EqualValues(t, 0, cache.Size()) - cache.Set(1, 0, 0, []byte("a")) - cache.Set(2, 0, 0, []byte("a")) - cache.Set(3, 0, 0, []byte("a")) - cache.Set(4, 0, 0, []byte("a")) + cache.Set(1, 0, 0, testManualValue(cache, "a", 1)).Release() + cache.Set(2, 0, 0, testManualValue(cache, "a", 1)).Release() + cache.Set(3, 0, 0, testManualValue(cache, "a", 1)).Release() + cache.Set(4, 0, 0, testManualValue(cache, "a", 1)).Release() require.EqualValues(t, 2, cache.Size()) r() require.EqualValues(t, 2, cache.Size()) - cache.Set(1, 0, 0, []byte("a")) - cache.Set(2, 0, 0, []byte("a")) + cache.Set(1, 0, 0, testManualValue(cache, "a", 1)).Release() + cache.Set(2, 0, 0, testManualValue(cache, "a", 1)).Release() require.EqualValues(t, 4, cache.Size()) } diff --git a/internal/cache/value.go b/internal/cache/value.go new file mode 100644 index 0000000000..d4246bf2e0 --- /dev/null +++ b/internal/cache/value.go @@ -0,0 +1,60 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package cache + +import "sync/atomic" + +func newAutoValue(b []byte) *Value { + if b == nil { + return nil + } + // A value starts with an invalid reference count. When the value is added to + // the cache, the reference count will be set to 2: one reference for the + // cache, and another for the returned Handle. + return &Value{buf: b, refs: -(1 << 30)} +} + +// Buf returns the buffer associated with the value. The contents of the buffer +// should not be changed once the value has been added to the cache. Instead, a +// new Value should be created and added to the cache to replace the existing +// value. +func (v *Value) Buf() []byte { + if v == nil { + return nil + } + return v.buf +} + +// Truncate the buffer to the specified length. The buffer length should not be +// changed once the value has been added to the cache as there may be +// concurrent readers of the Value. Instead, a new Value should be created and +// added to the cache to replace the existing value. +func (v *Value) Truncate(n int) { + v.buf = v.buf[:n] +} + +func (v *Value) auto() bool { + return atomic.LoadInt32(&v.refs) < 0 +} + +func (v *Value) manual() bool { + return !v.auto() +} + +func (v *Value) acquire() { + atomic.AddInt32(&v.refs, 1) + v.trace("acquire") +} + +func (v *Value) release() { + n := atomic.AddInt32(&v.refs, -1) + v.trace("release") + if n == 0 { + allocFree(v.buf) + // Setting Value.buf to nil is needed for correctness of the leak checking + // that is performed when the "invariants" build tag is enabled. + v.buf = nil + } +} diff --git a/internal/cache/value_invariants.go b/internal/cache/value_invariants.go new file mode 100644 index 0000000000..983f8e0b7a --- /dev/null +++ b/internal/cache/value_invariants.go @@ -0,0 +1,40 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build invariants tracing + +package cache + +import ( + "fmt" + "os" + "runtime" + "sync/atomic" +) + +func checkValue(obj interface{}) { + v := obj.(*Value) + if v.buf != nil { + fmt.Fprintf(os.Stderr, "%p: cache value was not freed: refs=%d\n%s", + v.buf, atomic.LoadInt32(&v.refs), v.traces()) + os.Exit(1) + } +} + +// newManualValue creates a Value with a manually managed buffer of size n. +// +// This definition of newManualValue is used when either the "invariants" or +// "tracing" build tags are specified. It hooks up a finalizer to the returned +// Value that checks for memory leaks when the GC determines the Value is no +// longer reachable. +func newManualValue(n int) *Value { + if n == 0 { + return nil + } + b := allocNew(n) + v := &Value{buf: b, refs: 1} + v.trace("alloc") + runtime.SetFinalizer(v, checkValue) + return v +} diff --git a/internal/cache/value_normal.go b/internal/cache/value_normal.go new file mode 100644 index 0000000000..88f84247c3 --- /dev/null +++ b/internal/cache/value_normal.go @@ -0,0 +1,32 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build !tracing,!invariants + +package cache + +// Value holds a reference counted immutable value. +// +// This is the definition of Value that is used in normal builds. +type Value struct { + buf []byte + // The number of references on the value. When refs drops to 0, the buf + // associated with the value may be reused. This is a form of manual memory + // management. See Cache.Free. + // + // Auto values are distinguished by setting their reference count to + // -(1<<30). + refs int32 +} + +func newManualValue(n int) *Value { + if n == 0 { + return nil + } + b := allocNew(n) + return &Value{buf: b, refs: 1} +} + +func (v *Value) trace(msg string) { +} diff --git a/internal/cache/value_notracing.go b/internal/cache/value_notracing.go new file mode 100644 index 0000000000..a3d373dccd --- /dev/null +++ b/internal/cache/value_notracing.go @@ -0,0 +1,30 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build invariants,!tracing + +package cache + +// Value holds a reference counted immutable value. +// +// This is the definition of Value that is used when the "invariants" build tag +// is specified and the "tracing" build tag is not specified. If the "tracing" +// build tag is specified, the Value definition comes from value_tracing.go. +type Value struct { + buf []byte + // The number of references on the value. When refs drops to 0, the buf + // associated with the value may be reused. This is a form of manual memory + // management. See Cache.Free. + // + // Auto values are distinguished by setting their reference count to + // -(1<<30). + refs int32 +} + +func (v *Value) trace(msg string) { +} + +func (v *Value) traces() string { + return "" +} diff --git a/internal/cache/value_tracing.go b/internal/cache/value_tracing.go new file mode 100644 index 0000000000..ac84fffcc8 --- /dev/null +++ b/internal/cache/value_tracing.go @@ -0,0 +1,49 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build tracing + +package cache + +import ( + "fmt" + "runtime/debug" + "strings" + "sync" + "sync/atomic" +) + +// Value holds a reference counted immutable value. +// +// This is the definition of Value that is used when the "tracing" build tag is +// specified. +type Value struct { + buf []byte + // The number of references on the value. When refs drops to 0, the buf + // associated with the value may be reused. This is a form of manual memory + // management. See Cache.Free. + // + // Auto values are distinguished by setting their reference count to + // -(1<<30). + refs int32 + // Traces recorded by Value.trace. Used for debugging. + tr struct { + sync.Mutex + msgs []string + } +} + +func (v *Value) trace(msg string) { + s := fmt.Sprintf("%s: refs=%d\n%s", msg, atomic.LoadInt32(&v.refs), debug.Stack()) + v.tr.Lock() + v.tr.msgs = append(v.tr.msgs, s) + v.tr.Unlock() +} + +func (v *Value) traces() string { + v.tr.Lock() + s := strings.Join(v.tr.msgs, "\n") + v.tr.Unlock() + return s +} diff --git a/internal/manual/manual.go b/internal/manual/manual.go new file mode 100644 index 0000000000..7f12b7b322 --- /dev/null +++ b/internal/manual/manual.go @@ -0,0 +1,48 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package manual + +// #include +import "C" +import "unsafe" + +// TODO(peter): Rather than relying an C malloc/free, we could fork the Go +// runtime page allocator and allocate large chunks of memory using mmap or +// similar. + +// New allocates a slice of size n. The returned slice is from manually managed +// memory and MUST be released by calling Free. Failure to do so will result in +// a memory leak. +func New(n int) []byte { + if n == 0 { + return make([]byte, 0) + } + // We need to be conscious of the Cgo pointer passing rules: + // + // https://golang.org/cmd/cgo/#hdr-Passing_pointers + // + // ... + // Note: the current implementation has a bug. While Go code is permitted + // to write nil or a C pointer (but not a Go pointer) to C memory, the + // current implementation may sometimes cause a runtime error if the + // contents of the C memory appear to be a Go pointer. Therefore, avoid + // passing uninitialized C memory to Go code if the Go code is going to + // store pointer values in it. Zero out the memory in C before passing it + // to Go. + ptr := C.calloc(C.size_t(n), 1) + // Interpret the C pointer as a pointer to a Go array, then slice. + return (*[maxArrayLen]byte)(unsafe.Pointer(ptr))[:n:n] +} + +// Free frees the specified slice. +func Free(b []byte) { + if cap(b) != 0 { + if len(b) == 0 { + b = b[:cap(b)] + } + ptr := unsafe.Pointer(&b[0]) + C.free(ptr) + } +} diff --git a/internal/manual/manual_32bit.go b/internal/manual/manual_32bit.go new file mode 100644 index 0000000000..3c39ea053b --- /dev/null +++ b/internal/manual/manual_32bit.go @@ -0,0 +1,12 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build 386 amd64p32 arm armbe mips mipsle mips64p32 mips64p32le ppc sparc + +package manual + +const ( + // maxArrayLen is a safe maximum length for slices on this architecture. + maxArrayLen = 1<<31 - 1 +) diff --git a/internal/manual/manual_64bit.go b/internal/manual/manual_64bit.go new file mode 100644 index 0000000000..23d1a28d73 --- /dev/null +++ b/internal/manual/manual_64bit.go @@ -0,0 +1,12 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build amd64 arm64 arm64be ppc64 ppc64le mips64 mips64le s390x sparc64 + +package manual + +const ( + // maxArrayLen is a safe maximum length for slices on this architecture. + maxArrayLen = 1<<50 - 1 +) diff --git a/internal/manual/manual_nocgo.go b/internal/manual/manual_nocgo.go new file mode 100644 index 0000000000..466d1ee634 --- /dev/null +++ b/internal/manual/manual_nocgo.go @@ -0,0 +1,19 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build !cgo + +package manual + +// Provides versions of New and Free when cgo is not available (e.g. cross +// compilation). + +// New allocates a slice of size n. +func New(n int) []byte { + return make([]byte, n) +} + +// Free frees the specified slice. +func Free(b []byte) { +} diff --git a/sstable/block.go b/sstable/block.go index fdf034b906..b18123b708 100644 --- a/sstable/block.go +++ b/sstable/block.go @@ -267,6 +267,12 @@ func (i *blockIter) init(cmp Compare, block block, globalSeqNum uint64) error { return nil } +func (i *blockIter) initHandle(cmp Compare, block cache.Handle, globalSeqNum uint64) error { + i.cacheHandle.Release() + i.cacheHandle = block + return i.init(cmp, block.Get(), globalSeqNum) +} + func (i *blockIter) invalidate() { i.clearCache() i.offset = 0 @@ -284,11 +290,6 @@ func (i *blockIter) resetForReuse() blockIter { } } -func (i *blockIter) setCacheHandle(h cache.Handle) { - i.cacheHandle.Release() - i.cacheHandle = h -} - func (i *blockIter) readEntry() { ptr := unsafe.Pointer(uintptr(i.ptr) + uintptr(i.offset)) @@ -832,6 +833,7 @@ func (i *blockIter) Error() error { // package. func (i *blockIter) Close() error { i.cacheHandle.Release() + i.cacheHandle = cache.Handle{} i.val = nil return i.err } diff --git a/sstable/data_test.go b/sstable/data_test.go index 777aaf5116..05b956f9b8 100644 --- a/sstable/data_test.go +++ b/sstable/data_test.go @@ -177,6 +177,7 @@ func runIterCmd(td *datadriven.TestData, r *Reader) string { if err := iter.Error(); err != nil { return err.Error() } + defer iter.Close() var b bytes.Buffer var prefix []byte diff --git a/sstable/reader.go b/sstable/reader.go index 0bf5745a36..7a0baf2ca4 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -10,6 +10,8 @@ import ( "errors" "fmt" "io" + "os" + "runtime" "sort" "sync" "unsafe" @@ -17,6 +19,7 @@ import ( "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" "github.com/cockroachdb/pebble/internal/crc" + "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/private" "github.com/cockroachdb/pebble/internal/rangedel" "github.com/cockroachdb/pebble/vfs" @@ -80,16 +83,48 @@ var _ base.InternalIterator = (*singleLevelIterator)(nil) var singleLevelIterPool = sync.Pool{ New: func() interface{} { - return &singleLevelIterator{} + i := &singleLevelIterator{} + if invariants.Enabled { + runtime.SetFinalizer(i, checkSingleLevelIterator) + } + return i }, } var twoLevelIterPool = sync.Pool{ New: func() interface{} { - return &twoLevelIterator{} + i := &twoLevelIterator{} + if invariants.Enabled { + runtime.SetFinalizer(i, checkTwoLevelIterator) + } + return i }, } +func checkSingleLevelIterator(obj interface{}) { + i := obj.(*singleLevelIterator) + if p := i.data.cacheHandle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "singleLevelIterator.data.cacheHandle is not nil: %p\n", p) + os.Exit(1) + } + if p := i.index.cacheHandle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "singleLevelIterator.index.cacheHandle is not nil: %p\n", p) + os.Exit(1) + } +} + +func checkTwoLevelIterator(obj interface{}) { + i := obj.(*twoLevelIterator) + if p := i.data.cacheHandle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "singleLevelIterator.data.cacheHandle is not nil: %p\n", p) + os.Exit(1) + } + if p := i.index.cacheHandle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "singleLevelIterator.index.cacheHandle is not nil: %p\n", p) + os.Exit(1) + } +} + // Init initializes a singleLevelIterator for reading from the table. It is // synonmous with Reader.NewIter, but allows for reusing of the iterator // between different Readers. @@ -159,13 +194,12 @@ func (i *singleLevelIterator) loadBlock() bool { i.err = errCorruptIndexEntry return false } - block, err := i.reader.readBlock(i.dataBH, nil /* transform */) + block, err := i.reader.readBlock(i.dataBH, nil /* transform */, false /* weak */) if err != nil { i.err = err return false } - i.data.setCacheHandle(block) - i.err = i.data.init(i.cmp, block.Get(), i.reader.Properties.GlobalSeqNum) + i.err = i.data.initHandle(i.cmp, block, i.reader.Properties.GlobalSeqNum) if i.err != nil { return false } @@ -441,18 +475,23 @@ func (i *singleLevelIterator) SetCloseHook(fn func(i Iterator) error) { i.closeHook = fn } +func firstError(err0, err1 error) error { + if err0 != nil { + return err0 + } + return err1 +} + // Close implements internalIterator.Close, as documented in the pebble // package. func (i *singleLevelIterator) Close() error { + var err error if i.closeHook != nil { - if err := i.closeHook(i); err != nil { - return err - } - } - if err := i.data.Close(); err != nil { - return err + err = firstError(err, i.closeHook(i)) } - err := i.err + err = firstError(err, i.data.Close()) + err = firstError(err, i.index.Close()) + err = firstError(err, i.err) *i = i.resetForReuse() singleLevelIterPool.Put(i) return err @@ -559,13 +598,12 @@ func (i *twoLevelIterator) loadIndex() bool { i.err = errors.New("pebble/table: corrupt top level index entry") return false } - indexBlock, err := i.reader.readBlock(h, nil /* transform */) + indexBlock, err := i.reader.readBlock(h, nil /* transform */, false /* weak */) if err != nil { i.err = err return false } - i.index.setCacheHandle(indexBlock) - i.err = i.index.init(i.cmp, indexBlock.Get(), i.reader.Properties.GlobalSeqNum) + i.err = i.index.initHandle(i.cmp, indexBlock, i.reader.Properties.GlobalSeqNum) return i.err == nil } @@ -784,15 +822,13 @@ func (i *twoLevelIterator) skipBackward() (*InternalKey, []byte) { // Close implements internalIterator.Close, as documented in the pebble // package. func (i *twoLevelIterator) Close() error { + var err error if i.closeHook != nil { - if err := i.closeHook(i); err != nil { - return err - } - } - if err := i.data.Close(); err != nil { - return err + err = firstError(err, i.closeHook(i)) } - err := i.err + err = firstError(err, i.data.Close()) + err = firstError(err, i.index.Close()) + err = firstError(err, i.err) *i = twoLevelIterator{ singleLevelIterator: i.singleLevelIterator.resetForReuse(), topLevelIndex: i.topLevelIndex.resetForReuse(), @@ -812,6 +848,10 @@ type twoLevelCompactionIterator struct { // twoLevelCompactionIterator implements the base.InternalIterator interface. var _ base.InternalIterator = (*twoLevelCompactionIterator)(nil) +func (i *twoLevelCompactionIterator) Close() error { + return i.twoLevelIterator.Close() +} + func (i *twoLevelCompactionIterator) SeekGE(key []byte) (*InternalKey, []byte) { panic("pebble: SeekGE unimplemented") } @@ -874,7 +914,7 @@ func (i *twoLevelCompactionIterator) skipForward( type weakCachedBlock struct { bh BlockHandle mu sync.RWMutex - handle cache.WeakHandle + handle *cache.WeakHandle } type blockTransform func([]byte) ([]byte, error) @@ -1040,7 +1080,15 @@ func (r *Reader) get(key []byte) (value []byte, err error) { } return nil, err } - return value, i.Close() + + // The value will be "freed" when the iterator is closed, so make a copy + // which will outlast the lifetime of the iterator. + newValue := make([]byte, len(value)) + copy(newValue, value) + if err := i.Close(); err != nil { + return nil, err + } + return newValue, nil } // NewIter returns an iterator for the contents of the table. @@ -1121,7 +1169,7 @@ func (r *Reader) readWeakCachedBlock(w *weakCachedBlock, transform blockTransfor // Slow-path: read the index block from disk. This checks the cache again, // but that is ok because somebody else might have inserted it for us. - h, err := r.readBlock(w.bh, transform) + h, err := r.readBlock(w.bh, transform, true /* weak */) if err != nil { return nil, err } @@ -1135,24 +1183,35 @@ func (r *Reader) readWeakCachedBlock(w *weakCachedBlock, transform blockTransfor } // readBlock reads and decompresses a block from disk into memory. -func (r *Reader) readBlock(bh BlockHandle, transform blockTransform) (cache.Handle, error) { +func (r *Reader) readBlock( + bh BlockHandle, transform blockTransform, weak bool, +) (cache.Handle, error) { if h := r.opts.Cache.Get(r.cacheID, r.fileNum, bh.Offset); h.Get() != nil { return h, nil } - b := r.opts.Cache.Alloc(int(bh.Length + blockTrailerLen)) + var v *cache.Value + if weak { + v = r.opts.Cache.AllocAuto(make([]byte, int(bh.Length+blockTrailerLen))) + } else { + v = r.opts.Cache.AllocManual(int(bh.Length + blockTrailerLen)) + } + b := v.Buf() if _, err := r.file.ReadAt(b, int64(bh.Offset)); err != nil { + r.opts.Cache.Free(v) return cache.Handle{}, err } checksum0 := binary.LittleEndian.Uint32(b[bh.Length+1:]) checksum1 := crc.New(b[:bh.Length+1]).Value() if checksum0 != checksum1 { + r.opts.Cache.Free(v) return cache.Handle{}, errors.New("pebble/table: invalid table (checksum mismatch)") } typ := b[bh.Length] b = b[:bh.Length] + v.Truncate(len(b)) switch typ { case noCompressionBlockType: @@ -1160,29 +1219,55 @@ func (r *Reader) readBlock(bh BlockHandle, transform blockTransform) (cache.Hand case snappyCompressionBlockType: decodedLen, err := snappy.DecodedLen(b) if err != nil { + r.opts.Cache.Free(v) return cache.Handle{}, err } - decoded := r.opts.Cache.Alloc(decodedLen) - decoded, err = snappy.Decode(decoded, b) + var decoded *cache.Value + if weak { + decoded = r.opts.Cache.AllocAuto(make([]byte, decodedLen)) + } else { + decoded = r.opts.Cache.AllocManual(decodedLen) + } + decodedBuf := decoded.Buf() + result, err := snappy.Decode(decodedBuf, b) + r.opts.Cache.Free(v) if err != nil { + r.opts.Cache.Free(decoded) return cache.Handle{}, err } - r.opts.Cache.Free(b) - b = decoded + if len(result) != 0 && + (len(result) != len(decodedBuf) || &result[0] != &decodedBuf[0]) { + r.opts.Cache.Free(decoded) + return cache.Handle{}, fmt.Errorf("pebble/table: snappy decoded into unexpected buffer: %p != %p", + result, decodedBuf) + } + v, b = decoded, decodedBuf default: + r.opts.Cache.Free(v) return cache.Handle{}, fmt.Errorf("pebble/table: unknown block compression: %d", typ) } if transform != nil { - // Transforming blocks is rare, so we don't bother to use cache.Alloc. + // Transforming blocks is rare, so the extra copy of the transformed data + // is not problematic. var err error b, err = transform(b) if err != nil { + r.opts.Cache.Free(v) return cache.Handle{}, err } + var newV *cache.Value + if weak { + newV = r.opts.Cache.AllocAuto(b) + } else { + newV = r.opts.Cache.AllocManual(len(b)) + copy(newV.Buf(), b) + } + r.opts.Cache.Free(v) + v = newV } - h := r.opts.Cache.Set(r.cacheID, r.fileNum, bh.Offset, b) + h := r.opts.Cache.Set(r.cacheID, r.fileNum, bh.Offset, v) return h, nil } @@ -1229,18 +1314,19 @@ func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) { } func (r *Reader) readMetaindex(metaindexBH BlockHandle) error { - b, err := r.readBlock(metaindexBH, nil /* transform */) + b, err := r.readBlock(metaindexBH, nil /* transform */, false /* weak */) if err != nil { return err } data := b.Get() + defer b.Release() + if uint64(len(data)) != metaindexBH.Length { return fmt.Errorf("pebble/table: unexpected metaindex block size: %d vs %d", len(data), metaindexBH.Length) } i, err := newRawBlockIter(bytes.Compare, data) - b.Release() if err != nil { return err } @@ -1258,13 +1344,12 @@ func (r *Reader) readMetaindex(metaindexBH BlockHandle) error { } if bh, ok := meta[metaPropertiesName]; ok { - b, err = r.readBlock(bh, nil /* transform */) + b, err = r.readBlock(bh, nil /* transform */, false /* weak */) if err != nil { return err } - data := b.Get() r.propertiesBH = bh - err := r.Properties.load(data, bh.Offset) + err := r.Properties.load(b.Get(), bh.Offset) b.Release() if err != nil { return err @@ -1350,7 +1435,7 @@ func (r *Reader) Layout() (*Layout, error) { } l.Index = append(l.Index, indexBH) - subIndex, err := r.readBlock(indexBH, nil /* transform */) + subIndex, err := r.readBlock(indexBH, nil /* transform */, false /* weak */) if err != nil { return nil, err } @@ -1415,7 +1500,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { if n == 0 || n != len(val) { return 0, errCorruptIndexEntry } - startIdxBlock, err := r.readBlock(startIdxBH, nil /* transform */) + startIdxBlock, err := r.readBlock(startIdxBH, nil /* transform */, false /* weak */) if err != nil { return 0, err } @@ -1434,7 +1519,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { if n == 0 || n != len(val) { return 0, errCorruptIndexEntry } - endIdxBlock, err := r.readBlock(endIdxBH, nil /* transform */) + endIdxBlock, err := r.readBlock(endIdxBH, nil /* transform */, false /* weak */) if err != nil { return 0, err } @@ -1616,7 +1701,7 @@ func (l *Layout) Describe( continue } - h, err := r.readBlock(b.BlockHandle, nil /* transform */) + h, err := r.readBlock(b.BlockHandle, nil /* transform */, false /* weak */) if err != nil { fmt.Fprintf(w, " [err: %s]\n", err) continue diff --git a/sstable/reader_test.go b/sstable/reader_test.go index cc6a8de2c0..99cfdee284 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -343,6 +343,7 @@ func TestBytesIteratedCompressed(t *testing.T) { r := buildTestTable(t, numEntries, blockSize, indexBlockSize, SnappyCompression) var bytesIterated, prevIterated uint64 citer := r.NewCompactionIter(&bytesIterated) + for key, _ := citer.First(); key != nil; key, _ = citer.Next() { if bytesIterated < prevIterated { t.Fatalf("bytesIterated moved backward: %d < %d", bytesIterated, prevIterated) @@ -355,6 +356,13 @@ func TestBytesIteratedCompressed(t *testing.T) { if bytesIterated < expected*99/100 || bytesIterated > expected*101/100 { t.Fatalf("bytesIterated: got %d, want %d", bytesIterated, expected) } + + if err := citer.Close(); err != nil { + t.Fatal(err) + } + if err := r.Close(); err != nil { + t.Fatal(err) + } } } } @@ -368,6 +376,7 @@ func TestBytesIteratedUncompressed(t *testing.T) { r := buildTestTable(t, numEntries, blockSize, indexBlockSize, NoCompression) var bytesIterated, prevIterated uint64 citer := r.NewCompactionIter(&bytesIterated) + for key, _ := citer.First(); key != nil; key, _ = citer.Next() { if bytesIterated < prevIterated { t.Fatalf("bytesIterated moved backward: %d < %d", bytesIterated, prevIterated) @@ -380,6 +389,13 @@ func TestBytesIteratedUncompressed(t *testing.T) { t.Fatalf("bytesIterated: got %d, want %d (blockSize=%d indexBlockSize=%d numEntries=%d)", bytesIterated, expected, blockSize, indexBlockSize, numEntries) } + + if err := citer.Close(); err != nil { + t.Fatal(err) + } + if err := r.Close(); err != nil { + t.Fatal(err) + } } } } @@ -483,6 +499,8 @@ func BenchmarkTableIterSeekGE(b *testing.B) { for i := 0; i < b.N; i++ { it.SeekGE(keys[rng.Intn(len(keys))]) } + + it.Close() }) } } @@ -501,6 +519,8 @@ func BenchmarkTableIterSeekLT(b *testing.B) { for i := 0; i < b.N; i++ { it.SeekLT(keys[rng.Intn(len(keys))]) } + + it.Close() }) } } @@ -527,6 +547,8 @@ func BenchmarkTableIterNext(b *testing.B) { if testing.Verbose() { fmt.Fprint(ioutil.Discard, sum) } + + it.Close() }) } } @@ -553,6 +575,8 @@ func BenchmarkTableIterPrev(b *testing.B) { if testing.Verbose() { fmt.Fprint(ioutil.Discard, sum) } + + it.Close() }) } } diff --git a/sstable/table_test.go b/sstable/table_test.go index 4404cf52ed..c0fa57e0c6 100644 --- a/sstable/table_test.go +++ b/sstable/table_test.go @@ -673,6 +673,9 @@ func TestReaderGlobalSeqNum(t *testing.T) { t.Fatalf("expected %d, but found %d", globalSeqNum, i.Key().SeqNum()) } } + if err := i.Close(); err != nil { + t.Fatal(err) + } } func TestMetaIndexEntriesSorted(t *testing.T) { @@ -687,12 +690,12 @@ func TestMetaIndexEntriesSorted(t *testing.T) { t.Fatal(err) } - b, err := r.readBlock(r.metaIndexBH, nil /* transform */) + b, err := r.readBlock(r.metaIndexBH, nil /* transform */, false /* weak */) if err != nil { t.Fatal(err) } i, err := newRawBlockIter(bytes.Compare, b.Get()) - b.Release() + defer b.Release() if err != nil { t.Fatal(err) } diff --git a/sstable/writer_test.go b/sstable/writer_test.go index 6dbd4413da..64d1306c54 100644 --- a/sstable/writer_test.go +++ b/sstable/writer_test.go @@ -99,7 +99,12 @@ func TestWriterClearCache(t *testing.T) { opts := ReaderOptions{Cache: cache.New(64 << 20)} writerOpts := WriterOptions{Cache: opts.Cache} cacheOpts := &cacheOpts{cacheID: 1, fileNum: 1} - invalidData := []byte("invalid data") + invalidData := func() *cache.Value { + invalid := []byte("invalid data") + v := opts.Cache.AllocManual(len(invalid)) + copy(v.Buf(), invalid) + return v + } build := func(name string) { f, err := mem.Create(name) @@ -149,7 +154,7 @@ func TestWriterClearCache(t *testing.T) { // Poison the cache for each of the blocks. poison := func(bh BlockHandle) { - opts.Cache.Set(cacheOpts.cacheID, cacheOpts.fileNum, bh.Offset, invalidData) + opts.Cache.Set(cacheOpts.cacheID, cacheOpts.fileNum, bh.Offset, invalidData()).Release() } foreachBH(layout, poison) @@ -161,7 +166,7 @@ func TestWriterClearCache(t *testing.T) { check := func(bh BlockHandle) { h := opts.Cache.Get(cacheOpts.cacheID, cacheOpts.fileNum, bh.Offset) if h.Get() != nil { - t.Fatalf("%d: expected cache to be cleared, but found %q", bh.Offset, invalidData) + t.Fatalf("%d: expected cache to be cleared, but found %q", bh.Offset, h.Get()) } } foreachBH(layout, check) diff --git a/testdata/event_listener b/testdata/event_listener index fab0e691ee..6ea3024580 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -169,7 +169,7 @@ compact 1 1.6 K (size == estimated-debt) zmemtbl 0 0 B ztbl 0 0 B bcache 8 1.5 K 5.9% (score == hit-rate) - tcache 1 688 B 0.0% (score == hit-rate) + tcache 1 664 B 0.0% (score == hit-rate) titers 0 filter - - 0.0% (score == utility) diff --git a/testdata/metrics b/testdata/metrics index 051624895a..73a5deadd0 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -33,7 +33,7 @@ compact 0 826 B (size == estimated-debt) zmemtbl 1 256 K ztbl 0 0 B bcache 3 732 B 0.0% (score == hit-rate) - tcache 1 688 B 0.0% (score == hit-rate) + tcache 1 664 B 0.0% (score == hit-rate) titers 1 filter - - 0.0% (score == utility) @@ -130,7 +130,7 @@ compact 1 0 B (size == estimated-debt) zmemtbl 1 256 K ztbl 1 826 B bcache 4 753 B 27.3% (score == hit-rate) - tcache 1 688 B 60.0% (score == hit-rate) + tcache 1 664 B 60.0% (score == hit-rate) titers 1 filter - - 0.0% (score == utility) From e8986be9483b49cd9b9cfd081505ead0ae31b198 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Sat, 8 Feb 2020 09:40:21 -0500 Subject: [PATCH 2/6] internal/cache: optimize allocation of manual Values For manual Values, allocate the `Value` at the start of the backing buffer. For large block caches, the number of Values can reach 500k-1m or higher, and consume a significant fraction of the total allocated objects in the Go heap. --- internal/cache/value.go | 5 +--- internal/cache/value_invariants.go | 8 +++++++ internal/cache/value_normal.go | 38 +++++++++++++++++------------- internal/cache/value_notracing.go | 12 ++++++---- internal/manual/manual.go | 2 +- internal/manual/manual_32bit.go | 4 ++-- internal/manual/manual_64bit.go | 4 ++-- 7 files changed, 44 insertions(+), 29 deletions(-) diff --git a/internal/cache/value.go b/internal/cache/value.go index d4246bf2e0..7adcbf2a00 100644 --- a/internal/cache/value.go +++ b/internal/cache/value.go @@ -52,9 +52,6 @@ func (v *Value) release() { n := atomic.AddInt32(&v.refs, -1) v.trace("release") if n == 0 { - allocFree(v.buf) - // Setting Value.buf to nil is needed for correctness of the leak checking - // that is performed when the "invariants" build tag is enabled. - v.buf = nil + v.free() } } diff --git a/internal/cache/value_invariants.go b/internal/cache/value_invariants.go index 983f8e0b7a..eca839f572 100644 --- a/internal/cache/value_invariants.go +++ b/internal/cache/value_invariants.go @@ -38,3 +38,11 @@ func newManualValue(n int) *Value { runtime.SetFinalizer(v, checkValue) return v } + +func (v *Value) free() { + allocFree(v.buf) + // Setting Value.buf to nil is needed for correctness of the leak checking + // that is performed when the "invariants" or "tracing" build tags are + // enabled. + v.buf = nil +} diff --git a/internal/cache/value_normal.go b/internal/cache/value_normal.go index 88f84247c3..0f53e6adf0 100644 --- a/internal/cache/value_normal.go +++ b/internal/cache/value_normal.go @@ -6,27 +6,33 @@ package cache -// Value holds a reference counted immutable value. -// -// This is the definition of Value that is used in normal builds. -type Value struct { - buf []byte - // The number of references on the value. When refs drops to 0, the buf - // associated with the value may be reused. This is a form of manual memory - // management. See Cache.Free. - // - // Auto values are distinguished by setting their reference count to - // -(1<<30). - refs int32 -} +import ( + "unsafe" + + "github.com/cockroachdb/pebble/internal/manual" +) + +const valueSize = int(unsafe.Sizeof(Value{})) func newManualValue(n int) *Value { if n == 0 { return nil } - b := allocNew(n) - return &Value{buf: b, refs: 1} + // When we're not performing leak detection, the lifetime of the returned + // Value is exactly the lifetime of the backing buffer and we can manually + // allocate both. + b := allocNew(valueSize + n) + v := (*Value)(unsafe.Pointer(&b[0])) + v.buf = b[valueSize:] + v.refs = 1 + return v } -func (v *Value) trace(msg string) { +func (v *Value) free() { + // When we're not performing leak detection, the Value and buffer were + // allocated contiguously. + n := valueSize + cap(v.buf) + buf := (*[manual.MaxArrayLen]byte)(unsafe.Pointer(v))[:n:n] + v.buf = nil + allocFree(buf) } diff --git a/internal/cache/value_notracing.go b/internal/cache/value_notracing.go index a3d373dccd..01b01ef9b0 100644 --- a/internal/cache/value_notracing.go +++ b/internal/cache/value_notracing.go @@ -2,15 +2,16 @@ // of this source code is governed by a BSD-style license that can be found in // the LICENSE file. -// +build invariants,!tracing +// +build !tracing package cache // Value holds a reference counted immutable value. // -// This is the definition of Value that is used when the "invariants" build tag -// is specified and the "tracing" build tag is not specified. If the "tracing" -// build tag is specified, the Value definition comes from value_tracing.go. +// This is the definition of Value that is used in normal builds and when the +// "invariants" build tag is specified and the "tracing" build tag is not +// specified. If the "tracing" build tag is specified, the Value definition +// comes from value_tracing.go. type Value struct { buf []byte // The number of references on the value. When refs drops to 0, the buf @@ -28,3 +29,6 @@ func (v *Value) trace(msg string) { func (v *Value) traces() string { return "" } + +// Silence unused warning. +var _ = (*Value).traces diff --git a/internal/manual/manual.go b/internal/manual/manual.go index 7f12b7b322..b6d393869a 100644 --- a/internal/manual/manual.go +++ b/internal/manual/manual.go @@ -33,7 +33,7 @@ func New(n int) []byte { // to Go. ptr := C.calloc(C.size_t(n), 1) // Interpret the C pointer as a pointer to a Go array, then slice. - return (*[maxArrayLen]byte)(unsafe.Pointer(ptr))[:n:n] + return (*[MaxArrayLen]byte)(unsafe.Pointer(ptr))[:n:n] } // Free frees the specified slice. diff --git a/internal/manual/manual_32bit.go b/internal/manual/manual_32bit.go index 3c39ea053b..9d1add4905 100644 --- a/internal/manual/manual_32bit.go +++ b/internal/manual/manual_32bit.go @@ -7,6 +7,6 @@ package manual const ( - // maxArrayLen is a safe maximum length for slices on this architecture. - maxArrayLen = 1<<31 - 1 + // MaxArrayLen is a safe maximum length for slices on this architecture. + MaxArrayLen = 1<<31 - 1 ) diff --git a/internal/manual/manual_64bit.go b/internal/manual/manual_64bit.go index 23d1a28d73..f03846953a 100644 --- a/internal/manual/manual_64bit.go +++ b/internal/manual/manual_64bit.go @@ -7,6 +7,6 @@ package manual const ( - // maxArrayLen is a safe maximum length for slices on this architecture. - maxArrayLen = 1<<50 - 1 + // MaxArrayLen is a safe maximum length for slices on this architecture. + MaxArrayLen = 1<<50 - 1 ) From 1bdb288cbaa5e152111e1e3d91a5cc3ad6132121 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Sat, 8 Feb 2020 11:07:11 -0500 Subject: [PATCH 3/6] internal/cache: optimize allocation of entry objects Each entry in the cache has an associated `entry` struct. For large block caches, the number of entries can reach 500k-1m or higher, and consume a significant fraction of the total allocated objects in the Go heap. For manually managed values, the lifetime of the associated `entry` is known and the `*entry` never escapes outside of the cache package. This allows us to manually manage the memory for these entries, which are the majority of entries in the cache. --- internal/cache/clockpro.go | 192 ++++++++++------------------- internal/cache/entry.go | 157 +++++++++++++++++++++++ internal/cache/entry_invariants.go | 18 +++ internal/cache/entry_normal.go | 80 ++++++++++++ internal/cache/value_normal.go | 2 +- 5 files changed, 322 insertions(+), 127 deletions(-) create mode 100644 internal/cache/entry.go create mode 100644 internal/cache/entry_invariants.go create mode 100644 internal/cache/entry_normal.go diff --git a/internal/cache/clockpro.go b/internal/cache/clockpro.go index 8282a4bed6..58d34e8f2c 100644 --- a/internal/cache/clockpro.go +++ b/internal/cache/clockpro.go @@ -22,29 +22,10 @@ import ( "runtime" "sync" "sync/atomic" - "unsafe" -) - -type entryType int8 -const ( - etTest entryType = iota - etCold - etHot + "github.com/cockroachdb/pebble/internal/invariants" ) -func (p entryType) String() string { - switch p { - case etTest: - return "test" - case etCold: - return "cold" - case etHot: - return "hot" - } - return "unknown" -} - type fileKey struct { // id is the namespace for fileNums. id uint64 @@ -56,90 +37,6 @@ type key struct { offset uint64 } -type entry struct { - key key - val unsafe.Pointer - blockLink struct { - next *entry - prev *entry - } - fileLink struct { - next *entry - prev *entry - } - size int64 - ptype entryType - // referenced is atomically set to indicate that this entry has been accessed - // since the last time one of the clock hands swept it. - referenced int32 - shard *shard -} - -func (e *entry) init() *entry { - e.blockLink.next = e - e.blockLink.prev = e - e.fileLink.next = e - e.fileLink.prev = e - return e -} - -func (e *entry) next() *entry { - if e == nil { - return nil - } - return e.blockLink.next -} - -func (e *entry) prev() *entry { - if e == nil { - return nil - } - return e.blockLink.prev -} - -func (e *entry) link(s *entry) { - s.blockLink.prev = e.blockLink.prev - s.blockLink.prev.blockLink.next = s - s.blockLink.next = e - s.blockLink.next.blockLink.prev = s -} - -func (e *entry) unlink() *entry { - next := e.blockLink.next - e.blockLink.prev.blockLink.next = e.blockLink.next - e.blockLink.next.blockLink.prev = e.blockLink.prev - e.blockLink.prev = e - e.blockLink.next = e - return next -} - -func (e *entry) linkFile(s *entry) { - s.fileLink.prev = e.fileLink.prev - s.fileLink.prev.fileLink.next = s - s.fileLink.next = e - s.fileLink.next.fileLink.prev = s -} - -func (e *entry) unlinkFile() *entry { - next := e.fileLink.next - e.fileLink.prev.fileLink.next = e.fileLink.next - e.fileLink.next.fileLink.prev = e.fileLink.prev - e.fileLink.prev = e - e.fileLink.next = e - return next -} - -func (e *entry) setValue(v *Value) { - if old := e.getValue(); old != nil { - old.release() - } - atomic.StorePointer(&e.val, unsafe.Pointer(v)) -} - -func (e *entry) getValue() *Value { - return (*Value)(atomic.LoadPointer(&e.val)) -} - // Handle provides a strong reference to an entry in the cache. The reference // does not pin the entry in the cache, but it does prevent the underlying byte // slice from being reused. When entry is non-nil, value is initialized to @@ -209,7 +106,7 @@ type shard struct { reservedSize int64 maxSize int64 - coldSize int64 + coldTarget int64 blocks map[key]*entry // fileNum+offset -> block files map[fileKey]*entry // fileNum -> list of blocks @@ -238,10 +135,16 @@ func (c *shard) Get(id, fileNum, offset uint64) Handle { c.mu.RUnlock() if value == nil { atomic.AddInt64(&c.misses, 1) - } else { - atomic.AddInt64(&c.hits, 1) + return Handle{} + } + atomic.AddInt64(&c.hits, 1) + + // Enforce the restriction that manually managed values cannot be converted + // to weak handles. + if e.manual { + e = nil } - return Handle{value: value} + return Handle{entry: e, value: value} } func (c *shard) Set(id, fileNum, offset uint64, value *Value) Handle { @@ -254,12 +157,15 @@ func (c *shard) Set(id, fileNum, offset uint64, value *Value) Handle { k := key{fileKey{id, fileNum}, offset} e := c.blocks[k] + if e != nil && e.manual != value.manual() { + panic(fmt.Sprintf("pebble: inconsistent caching of manual Value: entry=%t vs value=%t", + e.manual, value.manual())) + } switch { case e == nil: // no cache entry? add it - e = &entry{ptype: etCold, key: k, size: int64(len(value.buf)), shard: c} - e.init() + e = newEntry(c, k, int64(len(value.buf)), value.manual()) e.setValue(value) if c.metaAdd(k, e) { value.trace("add-cold") @@ -267,6 +173,7 @@ func (c *shard) Set(id, fileNum, offset uint64, value *Value) Handle { c.sizeCold += e.size } else { value.trace("skip-cold") + e.free() } case e.getValue() != nil: @@ -287,12 +194,12 @@ func (c *shard) Set(id, fileNum, offset uint64, value *Value) Handle { default: // cache entry was a test page - c.metaDel(e) c.sizeTest -= e.size + c.metaDel(e) - c.coldSize += e.size - if c.coldSize > c.targetSize() { - c.coldSize = c.targetSize() + c.coldTarget += e.size + if c.coldTarget > c.targetSize() { + c.coldTarget = c.targetSize() } atomic.StoreInt32(&e.referenced, 0) @@ -304,9 +211,15 @@ func (c *shard) Set(id, fileNum, offset uint64, value *Value) Handle { c.sizeHot += e.size } else { value.trace("skip-hot") + e.free() } } + // Enforce the restriction that manually managed values cannot be converted + // to weak handles. + if e.manual { + e = nil + } // Values are initialized with a reference count of 1. That reference count // is being transferred to the returned Handle. return Handle{entry: e, value: value} @@ -400,6 +313,9 @@ func (c *shard) metaAdd(key key, e *entry) bool { return true } +// Remove the entry from the cache. This removes the entry from the blocks map, +// the files map, and ensures that hand{Hot,Cold,Test} are not pointing at the +// entry. func (c *shard) metaDel(e *entry) { if value := e.getValue(); value != nil { value.trace("metaDel") @@ -430,6 +346,31 @@ func (c *shard) metaDel(e *entry) { } else { c.files[e.key.fileKey] = next } + + c.metaCheck(e) +} + +// Check that the specified entry is not referenced by the cache. +func (c *shard) metaCheck(e *entry) { + if invariants.Enabled { + for _, t := range c.blocks { + if e == t { + panic("not reached") + } + } + for _, t := range c.files { + if e == t { + panic("not reached") + } + } + // NB: c.hand{Hot,Cold,Test} are pointers into a single linked list. We + // only have to traverse one of them to check all of them. + for t := c.handHot.next(); t != c.handHot; t = t.next() { + if e == t { + panic("not reached") + } + } + } } func (c *shard) metaEvict(e *entry) { @@ -442,6 +383,7 @@ func (c *shard) metaEvict(e *entry) { c.sizeTest -= e.size } c.metaDel(e) + e.free() } func (c *shard) evict() { @@ -471,7 +413,7 @@ func (c *shard) runHandCold() { c.handCold = c.handCold.next() - for c.targetSize()-c.coldSize <= c.sizeHot && c.handHot != nil { + for c.targetSize()-c.coldTarget <= c.sizeHot && c.handHot != nil { c.runHandHot() } } @@ -508,15 +450,13 @@ func (c *shard) runHandTest() { e := c.handTest if e.ptype == etTest { - prev := c.handTest.prev() - c.metaDel(c.handTest) - c.handTest = prev - c.sizeTest -= e.size - c.coldSize -= e.size - if c.coldSize < 0 { - c.coldSize = 0 + c.coldTarget -= e.size + if c.coldTarget < 0 { + c.coldTarget = 0 } + c.metaDel(e) + e.free() } c.handTest = c.handTest.next() @@ -624,10 +564,10 @@ func newShards(size int64, shards int) *Cache { } for i := range c.shards { c.shards[i] = shard{ - maxSize: size / int64(len(c.shards)), - coldSize: size / int64(len(c.shards)), - blocks: make(map[key]*entry), - files: make(map[fileKey]*entry), + maxSize: size / int64(len(c.shards)), + coldTarget: size / int64(len(c.shards)), + blocks: make(map[key]*entry), + files: make(map[fileKey]*entry), } } // TODO(peter): This finalizer is used to clear the cache when the Cache diff --git a/internal/cache/entry.go b/internal/cache/entry.go new file mode 100644 index 0000000000..b7a98170b1 --- /dev/null +++ b/internal/cache/entry.go @@ -0,0 +1,157 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package cache + +import ( + "sync/atomic" + "unsafe" +) + +type entryType int8 + +const ( + etTest entryType = iota + etCold + etHot +) + +func (p entryType) String() string { + switch p { + case etTest: + return "test" + case etCold: + return "cold" + case etHot: + return "hot" + } + return "unknown" +} + +// entry holds the metadata for a cache entry. If the value stored in an entry +// is manually managed, the entry will also use manual memory management. +// +// Using manual memory management for entries is technically a volation of the +// Cgo pointer rules: +// +// https://golang.org/cmd/cgo/#hdr-Passing_pointers +// +// Specifically, Go pointers should not be stored in C allocated memory. The +// reason for this rule is that the Go GC will not look at C allocated memory +// to find pointers to Go objects. If the only reference to a Go object is +// stored in C allocated memory, the object will be reclaimed. The blockLink, +// fileLink, and shard fields of the entry struct may all point to Go objects, +// thus the violation. What makes this "safe" is that the Cache guarantees that +// there are other pointers to the entry and shard which will keep them +// alive. In particular, every Go allocated entry in the cache is referenced by +// the shard.entries map. And every shard is referenced by the Cache.shards +// map. +type entry struct { + key key + val unsafe.Pointer + blockLink struct { + next *entry + prev *entry + } + fileLink struct { + next *entry + prev *entry + } + size int64 + ptype entryType + // Is the memory for the entry manually managed? A manually managed entry can + // only store manually managed values (Value.manual() is true). + manual bool + // referenced is atomically set to indicate that this entry has been accessed + // since the last time one of the clock hands swept it. + referenced int32 + shard *shard +} + +const entrySize = int(unsafe.Sizeof(entry{})) + +func newEntry(s *shard, key key, size int64, manual bool) *entry { + var e *entry + if manual { + e = entryAllocNew() + } else { + e = &entry{} + } + *e = entry{ + key: key, + size: size, + ptype: etCold, + manual: manual, + shard: s, + } + e.blockLink.next = e + e.blockLink.prev = e + e.fileLink.next = e + e.fileLink.prev = e + return e +} + +func (e *entry) free() { + if e.manual { + *e = entry{} + entryAllocFree(e) + } +} + +func (e *entry) next() *entry { + if e == nil { + return nil + } + return e.blockLink.next +} + +func (e *entry) prev() *entry { + if e == nil { + return nil + } + return e.blockLink.prev +} + +func (e *entry) link(s *entry) { + s.blockLink.prev = e.blockLink.prev + s.blockLink.prev.blockLink.next = s + s.blockLink.next = e + s.blockLink.next.blockLink.prev = s +} + +func (e *entry) unlink() *entry { + next := e.blockLink.next + e.blockLink.prev.blockLink.next = e.blockLink.next + e.blockLink.next.blockLink.prev = e.blockLink.prev + e.blockLink.prev = e + e.blockLink.next = e + return next +} + +func (e *entry) linkFile(s *entry) { + s.fileLink.prev = e.fileLink.prev + s.fileLink.prev.fileLink.next = s + s.fileLink.next = e + s.fileLink.next.fileLink.prev = s +} + +func (e *entry) unlinkFile() *entry { + next := e.fileLink.next + e.fileLink.prev.fileLink.next = e.fileLink.next + e.fileLink.next.fileLink.prev = e.fileLink.prev + e.fileLink.prev = e + e.fileLink.next = e + return next +} + +func (e *entry) setValue(v *Value) { + if old := e.getValue(); old != nil { + old.release() + } + atomic.StorePointer(&e.val, unsafe.Pointer(v)) +} + +func (e *entry) getValue() *Value { + return (*Value)(atomic.LoadPointer(&e.val)) +} diff --git a/internal/cache/entry_invariants.go b/internal/cache/entry_invariants.go new file mode 100644 index 0000000000..e4850e4fa7 --- /dev/null +++ b/internal/cache/entry_invariants.go @@ -0,0 +1,18 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. +// +// +build invariants tracing + +package cache + +// When the "invariants" or "tracing" build tags are enabled, we need to +// allocate entries using the Go allocator so entry.val properly maintains a +// reference to the Value. + +func entryAllocNew() *entry { + return &entry{} +} + +func entryAllocFree(e *entry) { +} diff --git a/internal/cache/entry_normal.go b/internal/cache/entry_normal.go new file mode 100644 index 0000000000..fcc83b9585 --- /dev/null +++ b/internal/cache/entry_normal.go @@ -0,0 +1,80 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. +// +// +build !invariants,!tracing + +package cache + +import ( + "runtime" + "sync" + "unsafe" + + "github.com/cockroachdb/pebble/internal/manual" +) + +const ( + entryAllocCacheLimit = 128 +) + +var entryAllocPool = sync.Pool{ + New: func() interface{} { + return newEntryAllocCache() + }, +} + +func entryAllocNew() *entry { + a := entryAllocPool.Get().(*entryAllocCache) + e := a.alloc() + entryAllocPool.Put(a) + return e +} + +func entryAllocFree(e *entry) { + a := entryAllocPool.Get().(*entryAllocCache) + a.free(e) + entryAllocPool.Put(a) +} + +type entryAllocCache struct { + entries []*entry +} + +func newEntryAllocCache() *entryAllocCache { + c := &entryAllocCache{} + runtime.SetFinalizer(c, freeEntryAllocCache) + return c +} + +func freeEntryAllocCache(obj interface{}) { + c := obj.(*entryAllocCache) + for i, e := range c.entries { + c.dealloc(e) + c.entries[i] = nil + } +} + +func (c *entryAllocCache) alloc() *entry { + n := len(c.entries) + if n == 0 { + b := manual.New(entrySize) + return (*entry)(unsafe.Pointer(&b[0])) + } + e := c.entries[n-1] + c.entries = c.entries[:n-1] + return e +} + +func (c *entryAllocCache) dealloc(e *entry) { + buf := (*[manual.MaxArrayLen]byte)(unsafe.Pointer(e))[:entrySize:entrySize] + manual.Free(buf) +} + +func (c *entryAllocCache) free(e *entry) { + if len(c.entries) == entryAllocCacheLimit { + c.dealloc(e) + return + } + c.entries = append(c.entries, e) +} diff --git a/internal/cache/value_normal.go b/internal/cache/value_normal.go index 0f53e6adf0..afee2ce7be 100644 --- a/internal/cache/value_normal.go +++ b/internal/cache/value_normal.go @@ -2,7 +2,7 @@ // of this source code is governed by a BSD-style license that can be found in // the LICENSE file. -// +build !tracing,!invariants +// +build !invariants,!tracing package cache From 73078fcde96e8dfdce10b0913d3c7ce254ffc2eb Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Sat, 8 Feb 2020 10:22:25 -0500 Subject: [PATCH 4/6] ci: add no-invariants test configuration The "invariants" build tag causes different code paths to be utilized. Add a no-invariants test configuration for testing the invariant-less code paths. Change the race builds to not specify the "invariants" tags as doing so causes them to frequently flake on the CI machines. This appears to be due to the size of the machines (RAM available) and not something more serious. --- .travis.yml | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/.travis.yml b/.travis.yml index 224ef1e0d7..04d9fb885e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -17,7 +17,11 @@ matrix: - name: "go1.13.x-linux-race" go: 1.13.x os: linux - script: make testrace + script: make testrace TAGS= + - name: "go1.13.x-linux-no-invariants" + go: 1.13.x + os: linux + script: make test TAGS= - name: "go1.13.x-darwin" go: 1.13.x os: osx @@ -26,11 +30,6 @@ matrix: go: 1.13.x os: windows script: go test ./... - - name: "go1.13.x-freebsd" - go: 1.13.x - os: linux - # NB: "env: GOOS=freebsd" does not have the desired effect. - script: GOOS=freebsd go build -v ./... notifications: email: From c1fa89cf69a0c56e9b270b1730299ec87d7952db Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Fri, 7 Feb 2020 13:59:08 -0500 Subject: [PATCH 5/6] docs: add doc describing manual memory management --- docs/memory.md | 86 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 docs/memory.md diff --git a/docs/memory.md b/docs/memory.md new file mode 100644 index 0000000000..b7cc1a2d02 --- /dev/null +++ b/docs/memory.md @@ -0,0 +1,86 @@ +# Memory Management + +## Background + +Pebble has two significant sources of memory usage: MemTables and the +Block Cache. MemTables buffer data that has been written to the WAL +but not yet flushed to an SSTable. The Block Cache provides a cache of +uncompressed SSTable data blocks. + +Originally, Pebble used regular Go memory allocation for the memory +backing both MemTables and the Block Cache. This was problematic as it +put significant pressure on the Go GC. The higher the bandwidth of +memory allocations, the more work GC has to do to reclaim the +memory. In order to lessen the pressure on the Go GC, an "allocation +cache" was introduced to the Block Cache which allowed reusing the +memory backing cached blocks in most circumstances. This produced a +dramatic reduction in GC pressure and a measurable performance +improvement in CockroachDB workloads. + +Unfortunately, the use of Go allocated memory still caused a +problem. CockroachDB running on top of Pebble often resulted in an RSS +(resident set size) 2x what it was when using RocksDB. The cause of +this effect is due to the Go runtime's heuristic for triggering GC: + +> A collection is triggered when the ratio of freshly allocated data +> to live data remaining after the previous collection reaches this +> percentage. + +This percentage can be configured by the +[`GOGC`](https://golang.org/pkg/runtime/) environment variable or by +calling +[`debug.SetGCPercent`](https://golang.org/pkg/runtime/debug/#SetGCPercent). The +default value is `100`, which means that GC is triggered when the +freshly allocated data is equal to the amount of live data at the end +of the last collection period. This generally works well in practice, +but the Pebble Block Cache is often configured to be 10s of gigabytes +in size. Waiting for 10s of gigabytes of data to be allocated before +triggering a GC results in very large Go heap sizes. + +## Manual Memory Management + +Attempting to adjust `GOGC` to account for the significant amount of +memory used by the Block Cache is fraught. What value should be used? +`10%`? `20%`? Should the setting be tuned dynamically? Rather than +introducing a heuristic which may have cascading effects on the +application using Pebble, we decided to move the Block Cache and +MemTable memory out of the Go heap. This is done by using the C memory +allocator, though it could also be done by providing a simple memory +allocator in Go which uses `mmap` to allocate memory. + +In order to support manual memory management for the Block Cache and +MemTables, Pebble needs to precisely track their lifetime. This was +already being done for the MemTable in order to account for its memory +usage in metrics. It was mostly being done for the Block Cache. Values +stores in the Block Cache are reference counted and are returned to +the "alloc cache" when their reference count falls +to 0. Unfortunately, this tracking wasn't precise and there were +numerous cases where the cache values were being leaked. This was +acceptable in a world where the Go GC would clean up after us. It is +unacceptable if the leak becomes permanent. + +## Leak Detection + +In order to find all of the cache value leaks, Pebble has a leak +detection facility built on top of +[`runtime.SetFinalizer`](https://golang.org/pkg/runtime/#SetFinalizer). A +finalizer is a function associated with an object which is run when +the object is no longer reachable. On the surface, this sounds perfect +as a facility for performing all memory reclamation. Unfortunately, +finalizers are generally frowned upon by the Go implementors, and come +with very loose guarantees: + +> The finalizer is scheduled to run at some arbitrary time after the +> program can no longer reach the object to which obj points. There is +> no guarantee that finalizers will run before a program exits, so +> typically they are useful only for releasing non-memory resources +> associated with an object during a long-running program + +This language is somewhat frightening, but in practice finalizers are +run at the end of every GC period. Pebble does not use finalizers for +correctness, but instead uses them for its leak detection facility. In +the block cache, a finalizer is associated with the Go allocated +`cache.Value` object. When the finalizer is run, it checks that the +buffer backing the `cache.Value` has been freed. This leak detection +facility is enabled by the `"invariants"` build tag which is enabled +by the Pebble unit tests. From 0f2704f939f61c83b627ee1cef57c818da74aa9e Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Sat, 8 Feb 2020 14:34:07 -0500 Subject: [PATCH 6/6] internal/cache: add robinHoodMap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `robinHoodMap` which is a hash map implemented using Robin Hood hashing. `robinHoodMap` is specialized to use `key` and `*entry` as the key and value for the map respectively. The memory for the backing array is manually allocated. This lowers GC pressure by moving the allocated memory out of the Go heap, and by moving all of `*entry` pointers out of the Go heap. Use `robinHoodMap` for both the `shard.blocks` and `shard.files` maps. Old is Go-map, new is RobinHood-map. name old time/op new time/op delta MapInsert-16 144ns ± 5% 101ns ± 8% -29.88% (p=0.000 n=10+9) MapLookupHit-16 137ns ± 1% 47ns ± 1% -65.58% (p=0.000 n=8+10) MapLookupMiss-16 89.9ns ± 4% 48.6ns ± 3% -45.95% (p=0.000 n=10+9) --- internal/cache/clockpro.go | 93 ++++++--- internal/cache/entry.go | 19 +- internal/cache/entry_normal.go | 1 + internal/cache/robin_hood.go | 318 ++++++++++++++++++++++++++++++ internal/cache/robin_hood_test.go | 238 ++++++++++++++++++++++ 5 files changed, 638 insertions(+), 31 deletions(-) create mode 100644 internal/cache/robin_hood.go create mode 100644 internal/cache/robin_hood_test.go diff --git a/internal/cache/clockpro.go b/internal/cache/clockpro.go index 58d34e8f2c..1f95c6b667 100644 --- a/internal/cache/clockpro.go +++ b/internal/cache/clockpro.go @@ -19,7 +19,9 @@ package cache // import "github.com/cockroachdb/pebble/internal/cache" import ( "fmt" + "os" "runtime" + "runtime/debug" "sync" "sync/atomic" @@ -37,6 +39,17 @@ type key struct { offset uint64 } +// file returns the "file key" for the receiver. This is the key used for the +// shard.files map. +func (k key) file() key { + k.offset = 0 + return k +} + +func (k key) String() string { + return fmt.Sprintf("%d/%d/%d", k.id, k.fileNum, k.offset) +} + // Handle provides a strong reference to an entry in the cache. The reference // does not pin the entry in the cache, but it does prevent the underlying byte // slice from being reused. When entry is non-nil, value is initialized to @@ -107,8 +120,17 @@ type shard struct { reservedSize int64 maxSize int64 coldTarget int64 - blocks map[key]*entry // fileNum+offset -> block - files map[fileKey]*entry // fileNum -> list of blocks + blocks robinHoodMap // fileNum+offset -> block + files robinHoodMap // fileNum -> list of blocks + + // The blocks and files maps store values in manually managed memory that is + // invisible to the Go GC. This is fine for Value and entry objects that are + // stored in manually managed memory. Auto Values and the associated auto + // entries need to have a reference that the Go GC is aware of to prevent + // them from being reclaimed. The entries map provides this reference. When + // the "invariants" build tag is set, all Value and entry objects are Go + // allocated and the entries map will contain a reference to every entry. + entries map[*entry]struct{} handHot *entry handCold *entry @@ -121,7 +143,7 @@ type shard struct { func (c *shard) Get(id, fileNum, offset uint64) Handle { c.mu.RLock() - e := c.blocks[key{fileKey{id, fileNum}, offset}] + e := c.blocks.Get(key{fileKey{id, fileNum}, offset}) var value *Value if e != nil { value = e.getValue() @@ -156,7 +178,7 @@ func (c *shard) Set(id, fileNum, offset uint64, value *Value) Handle { defer c.mu.Unlock() k := key{fileKey{id, fileNum}, offset} - e := c.blocks[k] + e := c.blocks.Get(k) if e != nil && e.manual != value.manual() { panic(fmt.Sprintf("pebble: inconsistent caching of manual Value: entry=%t vs value=%t", e.manual, value.manual())) @@ -230,7 +252,7 @@ func (c *shard) Delete(id, fileNum, offset uint64) { c.mu.Lock() defer c.mu.Unlock() - e := c.blocks[key{fileKey{id, fileNum}, offset}] + e := c.blocks.Get(key{fileKey{id, fileNum}, offset}) if e == nil { return } @@ -242,7 +264,8 @@ func (c *shard) EvictFile(id, fileNum uint64) { c.mu.Lock() defer c.mu.Unlock() - blocks := c.files[fileKey{id, fileNum}] + fkey := key{fileKey{id, fileNum}, 0} + blocks := c.files.Get(fkey) if blocks == nil { return } @@ -290,7 +313,12 @@ func (c *shard) metaAdd(key key, e *entry) bool { return false } - c.blocks[key] = e + c.blocks.Put(key, e) + if !e.managed { + // Go allocated entries need to be referenced from Go memory. The entries + // map provides that reference. + c.entries[e] = struct{}{} + } if c.handHot == nil { // first element @@ -305,8 +333,9 @@ func (c *shard) metaAdd(key key, e *entry) bool { c.handCold = c.handCold.prev() } - if fileBlocks := c.files[key.fileKey]; fileBlocks == nil { - c.files[key.fileKey] = e + fkey := key.file() + if fileBlocks := c.files.Get(fkey); fileBlocks == nil { + c.files.Put(fkey, e) } else { fileBlocks.linkFile(e) } @@ -322,7 +351,12 @@ func (c *shard) metaDel(e *entry) { } e.setValue(nil) - delete(c.blocks, e.key) + c.blocks.Delete(e.key) + if !e.managed { + // Go allocated entries need to be referenced from Go memory. The entries + // map provides that reference. + delete(c.entries, e) + } if e == c.handHot { c.handHot = c.handHot.prev() @@ -341,10 +375,11 @@ func (c *shard) metaDel(e *entry) { c.handTest = nil } + fkey := e.key.file() if next := e.unlinkFile(); e == next { - delete(c.files, e.key.fileKey) + c.files.Delete(fkey) } else { - c.files[e.key.fileKey] = next + c.files.Put(fkey, next) } c.metaCheck(e) @@ -353,21 +388,28 @@ func (c *shard) metaDel(e *entry) { // Check that the specified entry is not referenced by the cache. func (c *shard) metaCheck(e *entry) { if invariants.Enabled { - for _, t := range c.blocks { - if e == t { - panic("not reached") - } + if _, ok := c.entries[e]; ok { + fmt.Fprintf(os.Stderr, "%p: %s unexpectedly found in entries map\n%s", + e, e.key, debug.Stack()) + os.Exit(1) } - for _, t := range c.files { - if e == t { - panic("not reached") - } + if c.blocks.findByValue(e) != nil { + fmt.Fprintf(os.Stderr, "%p: %s unexpectedly found in blocks map\n%s\n%s", + e, e.key, &c.blocks, debug.Stack()) + os.Exit(1) + } + if c.files.findByValue(e) != nil { + fmt.Fprintf(os.Stderr, "%p: %s unexpectedly found in files map\n%s\n%s", + e, e.key, &c.files, debug.Stack()) + os.Exit(1) } // NB: c.hand{Hot,Cold,Test} are pointers into a single linked list. We // only have to traverse one of them to check all of them. for t := c.handHot.next(); t != c.handHot; t = t.next() { if e == t { - panic("not reached") + fmt.Fprintf(os.Stderr, "%p: %s unexpectedly found in blocks list\n%s", + e, e.key, debug.Stack()) + os.Exit(1) } } } @@ -552,6 +594,8 @@ func clearCache(obj interface{}) { s.mu.Lock() s.maxSize = 0 s.evict() + s.blocks.free() + s.files.free() s.mu.Unlock() } } @@ -566,9 +610,10 @@ func newShards(size int64, shards int) *Cache { c.shards[i] = shard{ maxSize: size / int64(len(c.shards)), coldTarget: size / int64(len(c.shards)), - blocks: make(map[key]*entry), - files: make(map[fileKey]*entry), + entries: make(map[*entry]struct{}), } + c.shards[i].blocks.init(16) + c.shards[i].files.init(16) } // TODO(peter): This finalizer is used to clear the cache when the Cache // itself is GC'd. Investigate making this explicit, and then changing the @@ -702,7 +747,7 @@ func (c *Cache) Metrics() Metrics { for i := range c.shards { s := &c.shards[i] s.mu.RLock() - m.Count += int64(len(s.blocks)) + m.Count += int64(s.blocks.Count()) m.Size += s.sizeHot + s.sizeCold s.mu.RUnlock() m.Hits += atomic.LoadInt64(&s.hits) diff --git a/internal/cache/entry.go b/internal/cache/entry.go index b7a98170b1..444d7386ab 100644 --- a/internal/cache/entry.go +++ b/internal/cache/entry.go @@ -60,9 +60,13 @@ type entry struct { } size int64 ptype entryType - // Is the memory for the entry manually managed? A manually managed entry can - // only store manually managed values (Value.manual() is true). + // Can the entry hold a manual Value? Only a manually managed entry can store + // manually managed values (Value.manual() is true). manual bool + // Was the entry allocated using the Go allocator or the manual + // allocator. This can differ from the setting of the manual field due when + // the "invariants" build tag is set. + managed bool // referenced is atomically set to indicate that this entry has been accessed // since the last time one of the clock hands swept it. referenced int32 @@ -79,11 +83,12 @@ func newEntry(s *shard, key key, size int64, manual bool) *entry { e = &entry{} } *e = entry{ - key: key, - size: size, - ptype: etCold, - manual: manual, - shard: s, + key: key, + size: size, + ptype: etCold, + manual: manual, + managed: e.managed, + shard: s, } e.blockLink.next = e e.blockLink.prev = e diff --git a/internal/cache/entry_normal.go b/internal/cache/entry_normal.go index fcc83b9585..890d0413ad 100644 --- a/internal/cache/entry_normal.go +++ b/internal/cache/entry_normal.go @@ -28,6 +28,7 @@ func entryAllocNew() *entry { a := entryAllocPool.Get().(*entryAllocCache) e := a.alloc() entryAllocPool.Put(a) + e.managed = true return e } diff --git a/internal/cache/robin_hood.go b/internal/cache/robin_hood.go new file mode 100644 index 0000000000..7ad32fc2c3 --- /dev/null +++ b/internal/cache/robin_hood.go @@ -0,0 +1,318 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package cache + +import ( + "fmt" + "math/bits" + "os" + "runtime" + "runtime/debug" + "strings" + "time" + "unsafe" + + "github.com/cockroachdb/pebble/internal/invariants" + "github.com/cockroachdb/pebble/internal/manual" +) + +var hashSeed = uint64(time.Now().UnixNano()) + +// Fibonacci hash: https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/ +func robinHoodHash(k key, shift uint32) uint32 { + const m = 11400714819323198485 + h := hashSeed + h ^= k.id * m + h ^= k.fileNum * m + h ^= k.offset * m + return uint32(h >> shift) +} + +type robinHoodEntry struct { + key key + // Note that value may point to a Go allocated object, even though the memory + // for the entry itself is manually managed. This is technically a volation + // of the Cgo pointer rules: + // + // https://golang.org/cmd/cgo/#hdr-Passing_pointers + // + // Specifically, Go pointers should not be stored in C allocated memory. The + // reason for this rule is that the Go GC will not look at C allocated memory + // to find pointers to Go objects. If the only reference to a Go object is + // stored in C allocated memory, the object will be reclaimed. What makes + // this "safe" is that the Cache guarantees that there are other pointers to + // the entry and shard which will keep them alive. In particular, every Go + // allocated entry in the cache is referenced by the shard.entries map. And + // every shard is referenced by the Cache.shards map. + value *entry + // The distance the entry is from its desired position. + dist uint32 +} + +type robinHoodEntries struct { + ptr unsafe.Pointer + len uint32 +} + +func newRobinHoodEntries(n uint32) robinHoodEntries { + size := uintptr(n) * unsafe.Sizeof(robinHoodEntry{}) + return robinHoodEntries{ + ptr: unsafe.Pointer(&(manual.New(int(size)))[0]), + len: n, + } +} + +func (e robinHoodEntries) at(i uint32) *robinHoodEntry { + return (*robinHoodEntry)(unsafe.Pointer(uintptr(e.ptr) + + uintptr(i)*unsafe.Sizeof(robinHoodEntry{}))) +} + +func (e robinHoodEntries) free() { + size := uintptr(e.len) * unsafe.Sizeof(robinHoodEntry{}) + buf := (*[manual.MaxArrayLen]byte)(e.ptr)[:size:size] + manual.Free(buf) +} + +// robinHoodMap is an implementation of Robin Hood hashing. Robin Hood hashing +// is an open-address hash table using linear probing. The twist is that the +// linear probe distance is reduced by moving existing entries when inserting +// and deleting. This is accomplished by keeping track of how far an entry is +// from its "desired" slot (hash of key modulo number of slots). During +// insertion, if the new entry being inserted is farther from its desired slot +// than the target entry, we swap the target and new entry. This effectively +// steals from the "rich" target entry and gives to the "poor" new entry (thus +// the origin of the name). +// +// An extension over the base Robin Hood hashing idea comes from +// https://probablydance.com/2017/02/26/i-wrote-the-fastest-hashtable/. A cap +// is placed on the max distance an entry can be from its desired slot. When +// this threshold is reached during insertion, the size of the table is doubled +// and insertion is restarted. Additionally, the entries slice is given "max +// dist" extra entries on the end. The very last entry in the entries slice is +// never used and acts as a sentinel which terminates loops. The previous +// maxDist-1 entries act as the extra entries. For example, if the size of the +// table is 2, maxDist is computed as 4 and the actual size of the entry slice +// is 6. +// +// +---+---+---+---+---+---+ +// | 0 | 1 | 2 | 3 | 4 | 5 | +// +---+---+---+---+---+---+ +// ^ +// size +// +// In this scenario, the target entry for a key will always be in the range +// [0,1]. Valid entries may reside in the range [0,4] due to the linear probing +// of up to maxDist entries. The entry at index 5 will never contain a value, +// and instead acts as a sentinel (its distance is always 0). The max distance +// threshold is set to log2(num-entries). This ensures that retrieval is O(log +// N), though note that N is the number of total entries, not the count of +// valid entries. +// +// Deletion is implemented via the backward shift delete mechanism instead of +// tombstones. This preserves the performance of the table in the presence of +// deletions. See +// http://codecapsule.com/2013/11/17/robin-hood-hashing-backward-shift-deletion +// for details. +type robinHoodMap struct { + entries robinHoodEntries + size uint32 + shift uint32 + count uint32 + maxDist uint32 +} + +func maxDistForSize(size uint32) uint32 { + desired := uint32(bits.Len32(size)) + if desired < 4 { + desired = 4 + } + return desired +} + +func newRobinHoodMap(initialCapacity int) *robinHoodMap { + m := &robinHoodMap{} + m.init(initialCapacity) + runtime.SetFinalizer(m, clearRobinHoodMap) + return m +} + +func clearRobinHoodMap(obj interface{}) { + m := obj.(*robinHoodMap) + m.free() +} + +func (m *robinHoodMap) init(initialCapacity int) { + if initialCapacity < 1 { + initialCapacity = 1 + } + targetSize := 1 << (uint(bits.Len(uint(2*initialCapacity-1))) - 1) + m.rehash(uint32(targetSize)) +} + +func (m *robinHoodMap) free() { + if m.entries.ptr != nil { + m.entries.free() + m.entries.ptr = nil + } +} + +func (m *robinHoodMap) rehash(size uint32) { + oldEntries := m.entries + + m.size = size + m.shift = uint32(64 - bits.Len32(m.size-1)) + m.maxDist = maxDistForSize(size) + m.entries = newRobinHoodEntries(size + m.maxDist) + m.count = 0 + + for i := uint32(0); i < oldEntries.len; i++ { + e := oldEntries.at(i) + if e.value != nil { + m.Put(e.key, e.value) + } + } + + if oldEntries.ptr != nil { + oldEntries.free() + } +} + +// Find an entry containing the specified value. This is intended to be used +// from debug and test code. +func (m *robinHoodMap) findByValue(v *entry) *robinHoodEntry { + for i := uint32(0); i < m.entries.len; i++ { + e := m.entries.at(i) + if e.value == v { + return e + } + } + return nil +} + +func (m *robinHoodMap) Count() int { + return int(m.count) +} + +func (m *robinHoodMap) Put(k key, v *entry) { + maybeExists := true + n := robinHoodEntry{key: k, value: v, dist: 0} + for i := robinHoodHash(k, m.shift); ; i++ { + e := m.entries.at(i) + if maybeExists && k == e.key { + // Entry already exists: overwrite. + e.value = n.value + m.checkEntry(i) + return + } + + if e.value == nil { + // Found an empty entry: insert here. + *e = n + m.count++ + m.checkEntry(i) + return + } + + if e.dist < n.dist { + // Swap the new entry with the current entry because the current is + // rich. We then continue to loop, looking for a new location for the + // current entry. Note that this is also the not-found condition for + // retrieval, which means that "k" is not present in the map. See Get(). + n, *e = *e, n + m.checkEntry(i) + maybeExists = false + } + + // The new entry gradually moves away from its ideal position. + n.dist++ + + // If we've reached the max distance threshold, grow the table and restart + // the insertion. + if n.dist == m.maxDist { + m.rehash(2 * m.size) + i = robinHoodHash(n.key, m.shift) - 1 + n.dist = 0 + maybeExists = false + } + } +} + +func (m *robinHoodMap) Get(k key) *entry { + var dist uint32 + for i := robinHoodHash(k, m.shift); ; i++ { + e := m.entries.at(i) + if k == e.key { + // Found. + return e.value + } + if e.dist < dist { + // Not found. + return nil + } + dist++ + } +} + +func (m *robinHoodMap) Delete(k key) { + var dist uint32 + for i := robinHoodHash(k, m.shift); ; i++ { + e := m.entries.at(i) + if k == e.key { + m.checkEntry(i) + // We found the entry to delete. Shift the following entries backwards + // until the next empty value or entry with a zero distance. Note that + // empty values are guaranteed to have "dist == 0". + m.count-- + for j := i + 1; ; j++ { + t := m.entries.at(j) + if t.dist == 0 { + *e = robinHoodEntry{} + return + } + e.key = t.key + e.value = t.value + e.dist = t.dist - 1 + e = t + m.checkEntry(j) + } + } + if dist > e.dist { + // Not found. + return + } + dist++ + } +} + +func (m *robinHoodMap) checkEntry(i uint32) { + if invariants.Enabled { + e := m.entries.at(i) + if e.value != nil { + pos := robinHoodHash(e.key, m.shift) + if (uint32(i) - pos) != e.dist { + fmt.Fprintf(os.Stderr, "%d: invalid dist=%d, expected %d: %s\n%s", + i, e.dist, uint32(i)-pos, e.key, debug.Stack()) + os.Exit(1) + } + if e.dist > m.maxDist { + fmt.Fprintf(os.Stderr, "%d: invalid dist=%d > maxDist=%d: %s\n%s", + i, e.dist, m.maxDist, e.key, debug.Stack()) + os.Exit(1) + } + } + } +} + +func (m *robinHoodMap) String() string { + var buf strings.Builder + fmt.Fprintf(&buf, "count: %d\n", m.count) + for i := uint32(0); i < m.entries.len; i++ { + e := m.entries.at(i) + if e.value != nil { + fmt.Fprintf(&buf, "%d: [%s,%p,%d]\n", i, e.key, e.value, e.dist) + } + } + return buf.String() +} diff --git a/internal/cache/robin_hood_test.go b/internal/cache/robin_hood_test.go new file mode 100644 index 0000000000..af952530ba --- /dev/null +++ b/internal/cache/robin_hood_test.go @@ -0,0 +1,238 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package cache + +import ( + "fmt" + "io/ioutil" + "runtime" + "testing" + "time" + + "golang.org/x/exp/rand" +) + +func TestRobinHoodMap(t *testing.T) { + rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) + rhMap := newRobinHoodMap(0) + goMap := make(map[key]*entry) + + randomKey := func() key { + n := rng.Intn(len(goMap)) + for k := range goMap { + if n == 0 { + return k + } + n-- + } + return key{} + } + + ops := 10000 + rng.Intn(10000) + for i := 0; i < ops; i++ { + var which float64 + if len(goMap) > 0 { + which = rng.Float64() + } + + switch { + case which < 0.4: + // 40% insert. + var k key + k.id = rng.Uint64() + k.fileNum = rng.Uint64() + k.offset = rng.Uint64() + e := &entry{} + goMap[k] = e + rhMap.Put(k, e) + if len(goMap) != int(rhMap.Count()) { + t.Fatalf("map sizes differ: %d != %d", len(goMap), rhMap.Count()) + } + + case which < 0.1: + // 10% overwrite. + k := randomKey() + e := &entry{} + goMap[k] = e + rhMap.Put(k, e) + if len(goMap) != int(rhMap.Count()) { + t.Fatalf("map sizes differ: %d != %d", len(goMap), rhMap.Count()) + } + + case which < 0.75: + // 25% delete. + k := randomKey() + delete(goMap, k) + rhMap.Delete(k) + if len(goMap) != int(rhMap.Count()) { + t.Fatalf("map sizes differ: %d != %d", len(goMap), rhMap.Count()) + } + + default: + // 25% lookup. + k := randomKey() + v := goMap[k] + u := rhMap.Get(k) + if v != u { + t.Fatalf("%s: expected %p, but found %p", k, v, u) + } + } + } + + t.Logf("map size: %d", len(goMap)) +} + +const benchSize = 1 << 20 + +func BenchmarkGoMapInsert(b *testing.B) { + rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) + keys := make([]key, benchSize) + for i := range keys { + keys[i].fileNum = uint64(rng.Intn(1 << 20)) + keys[i].offset = uint64(rng.Intn(1 << 20)) + } + b.ResetTimer() + + var m map[key]*entry + for i, j := 0, 0; i < b.N; i, j = i+1, j+1 { + if m == nil || j == len(keys) { + b.StopTimer() + m = make(map[key]*entry, len(keys)) + j = 0 + b.StartTimer() + } + m[keys[j]] = nil + } +} + +func BenchmarkRobinHoodInsert(b *testing.B) { + rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) + keys := make([]key, benchSize) + for i := range keys { + keys[i].fileNum = uint64(rng.Intn(1 << 20)) + keys[i].offset = uint64(rng.Intn(1 << 20)) + } + e := &entry{} + b.ResetTimer() + + var m *robinHoodMap + for i, j := 0, 0; i < b.N; i, j = i+1, j+1 { + if m == nil || j == len(keys) { + b.StopTimer() + m = newRobinHoodMap(len(keys)) + j = 0 + b.StartTimer() + } + m.Put(keys[j], e) + } + + runtime.KeepAlive(e) +} + +func BenchmarkGoMapLookupHit(b *testing.B) { + rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) + keys := make([]key, benchSize) + m := make(map[key]*entry, len(keys)) + e := &entry{} + for i := range keys { + keys[i].fileNum = uint64(rng.Intn(1 << 20)) + keys[i].offset = uint64(rng.Intn(1 << 20)) + m[keys[i]] = e + } + b.ResetTimer() + + var p *entry + for i, j := 0, 0; i < b.N; i, j = i+1, j+1 { + if j == len(keys) { + j = 0 + } + p = m[keys[j]] + } + + if testing.Verbose() { + fmt.Fprintln(ioutil.Discard, p) + } +} + +func BenchmarkRobinHoodLookupHit(b *testing.B) { + rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) + keys := make([]key, benchSize) + m := newRobinHoodMap(len(keys)) + e := &entry{} + for i := range keys { + keys[i].fileNum = uint64(rng.Intn(1 << 20)) + keys[i].offset = uint64(rng.Intn(1 << 20)) + m.Put(keys[i], e) + } + b.ResetTimer() + + var p *entry + for i, j := 0, 0; i < b.N; i, j = i+1, j+1 { + if j == len(keys) { + j = 0 + } + p = m.Get(keys[j]) + } + + if testing.Verbose() { + fmt.Fprintln(ioutil.Discard, p) + } + runtime.KeepAlive(e) +} + +func BenchmarkGoMapLookupMiss(b *testing.B) { + rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) + keys := make([]key, benchSize) + m := make(map[key]*entry, len(keys)) + e := &entry{} + for i := range keys { + keys[i].id = 1 + keys[i].fileNum = uint64(rng.Intn(1 << 20)) + keys[i].offset = uint64(rng.Intn(1 << 20)) + m[keys[i]] = e + keys[i].id = 2 + } + b.ResetTimer() + + var p *entry + for i, j := 0, 0; i < b.N; i, j = i+1, j+1 { + if j == len(keys) { + j = 0 + } + p = m[keys[j]] + } + + if testing.Verbose() { + fmt.Fprintln(ioutil.Discard, p) + } +} + +func BenchmarkRobinHoodLookupMiss(b *testing.B) { + rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) + keys := make([]key, benchSize) + m := newRobinHoodMap(len(keys)) + e := &entry{} + for i := range keys { + keys[i].id = 1 + keys[i].fileNum = uint64(rng.Intn(1 << 20)) + keys[i].offset = uint64(rng.Intn(1 << 20)) + m.Put(keys[i], e) + keys[i].id = 2 + } + b.ResetTimer() + + var p *entry + for i, j := 0, 0; i < b.N; i, j = i+1, j+1 { + if j == len(keys) { + j = 0 + } + p = m.Get(keys[j]) + } + + if testing.Verbose() { + fmt.Fprintln(ioutil.Discard, p) + } + runtime.KeepAlive(e) +}