diff --git a/.travis.yml b/.travis.yml index 224ef1e0d7..04d9fb885e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -17,7 +17,11 @@ matrix: - name: "go1.13.x-linux-race" go: 1.13.x os: linux - script: make testrace + script: make testrace TAGS= + - name: "go1.13.x-linux-no-invariants" + go: 1.13.x + os: linux + script: make test TAGS= - name: "go1.13.x-darwin" go: 1.13.x os: osx @@ -26,11 +30,6 @@ matrix: go: 1.13.x os: windows script: go test ./... - - name: "go1.13.x-freebsd" - go: 1.13.x - os: linux - # NB: "env: GOOS=freebsd" does not have the desired effect. - script: GOOS=freebsd go build -v ./... notifications: email: diff --git a/db_test.go b/db_test.go index 6932a04464..8ca894edb6 100644 --- a/db_test.go +++ b/db_test.go @@ -689,6 +689,7 @@ func TestIterLeak(t *testing.T) { t.Fatal(err) } } else { + defer iter.Close() if err := d.Close(); err == nil { t.Fatalf("expected failure, but found success") } else if !strings.HasPrefix(err.Error(), "leaked iterators:") { @@ -714,7 +715,10 @@ func TestMemTableReservation(t *testing.T) { // Add a block to the cache. Note that the memtable size is larger than the // cache size, so opening the DB should cause this block to be evicted. tmpID := opts.Cache.NewID() - opts.Cache.Set(tmpID, 0, 0, []byte("hello world")) + helloWorld := []byte("hello world") + value := opts.Cache.AllocManual(len(helloWorld)) + copy(value.Buf(), helloWorld) + opts.Cache.Set(tmpID, 0, 0, value).Release() d, err := Open("", opts) if err != nil { diff --git a/docs/memory.md b/docs/memory.md new file mode 100644 index 0000000000..b7cc1a2d02 --- /dev/null +++ b/docs/memory.md @@ -0,0 +1,86 @@ +# Memory Management + +## Background + +Pebble has two significant sources of memory usage: MemTables and the +Block Cache. MemTables buffer data that has been written to the WAL +but not yet flushed to an SSTable. The Block Cache provides a cache of +uncompressed SSTable data blocks. + +Originally, Pebble used regular Go memory allocation for the memory +backing both MemTables and the Block Cache. This was problematic as it +put significant pressure on the Go GC. The higher the bandwidth of +memory allocations, the more work GC has to do to reclaim the +memory. In order to lessen the pressure on the Go GC, an "allocation +cache" was introduced to the Block Cache which allowed reusing the +memory backing cached blocks in most circumstances. This produced a +dramatic reduction in GC pressure and a measurable performance +improvement in CockroachDB workloads. + +Unfortunately, the use of Go allocated memory still caused a +problem. CockroachDB running on top of Pebble often resulted in an RSS +(resident set size) 2x what it was when using RocksDB. The cause of +this effect is due to the Go runtime's heuristic for triggering GC: + +> A collection is triggered when the ratio of freshly allocated data +> to live data remaining after the previous collection reaches this +> percentage. + +This percentage can be configured by the +[`GOGC`](https://golang.org/pkg/runtime/) environment variable or by +calling +[`debug.SetGCPercent`](https://golang.org/pkg/runtime/debug/#SetGCPercent). The +default value is `100`, which means that GC is triggered when the +freshly allocated data is equal to the amount of live data at the end +of the last collection period. This generally works well in practice, +but the Pebble Block Cache is often configured to be 10s of gigabytes +in size. Waiting for 10s of gigabytes of data to be allocated before +triggering a GC results in very large Go heap sizes. + +## Manual Memory Management + +Attempting to adjust `GOGC` to account for the significant amount of +memory used by the Block Cache is fraught. What value should be used? +`10%`? `20%`? Should the setting be tuned dynamically? Rather than +introducing a heuristic which may have cascading effects on the +application using Pebble, we decided to move the Block Cache and +MemTable memory out of the Go heap. This is done by using the C memory +allocator, though it could also be done by providing a simple memory +allocator in Go which uses `mmap` to allocate memory. + +In order to support manual memory management for the Block Cache and +MemTables, Pebble needs to precisely track their lifetime. This was +already being done for the MemTable in order to account for its memory +usage in metrics. It was mostly being done for the Block Cache. Values +stores in the Block Cache are reference counted and are returned to +the "alloc cache" when their reference count falls +to 0. Unfortunately, this tracking wasn't precise and there were +numerous cases where the cache values were being leaked. This was +acceptable in a world where the Go GC would clean up after us. It is +unacceptable if the leak becomes permanent. + +## Leak Detection + +In order to find all of the cache value leaks, Pebble has a leak +detection facility built on top of +[`runtime.SetFinalizer`](https://golang.org/pkg/runtime/#SetFinalizer). A +finalizer is a function associated with an object which is run when +the object is no longer reachable. On the surface, this sounds perfect +as a facility for performing all memory reclamation. Unfortunately, +finalizers are generally frowned upon by the Go implementors, and come +with very loose guarantees: + +> The finalizer is scheduled to run at some arbitrary time after the +> program can no longer reach the object to which obj points. There is +> no guarantee that finalizers will run before a program exits, so +> typically they are useful only for releasing non-memory resources +> associated with an object during a long-running program + +This language is somewhat frightening, but in practice finalizers are +run at the end of every GC period. Pebble does not use finalizers for +correctness, but instead uses them for its leak detection facility. In +the block cache, a finalizer is associated with the Go allocated +`cache.Value` object. When the finalizer is run, it checks that the +buffer backing the `cache.Value` has been freed. This leak detection +facility is enabled by the `"invariants"` build tag which is enabled +by the Pebble unit tests. diff --git a/internal/cache/alloc.go b/internal/cache/alloc.go index 846a195a3a..58db9d066e 100644 --- a/internal/cache/alloc.go +++ b/internal/cache/alloc.go @@ -5,9 +5,11 @@ package cache import ( + "runtime" "sync" "time" + "github.com/cockroachdb/pebble/internal/manual" "golang.org/x/exp/rand" ) @@ -30,8 +32,17 @@ var allocPool = sync.Pool{ }, } -// allocNew allocates a slice of size n. The use of sync.Pool provides a -// per-cpu cache of allocCache structures to allocate from. +// allocNew allocates a non-garbage collected slice of size n. Every call to +// allocNew() MUST be paired with a call to allocFree(). Failure to do so will +// result in a memory leak. The use of sync.Pool provides a per-cpu cache of +// allocCache structures to allocate from. +// +// TODO(peter): Is the allocCache still necessary for performance? Before the +// introduction of manual memory management, the allocCache dramatically +// reduced GC pressure by reducing allocation bandwidth. It no longer serves +// this purpose because manual.{New,Free} don't produce any GC pressure. Will +// need to run benchmark workloads to see if this can be removed which would +// allow the removal of the one required use of runtime.SetFinalizer. func allocNew(n int) []byte { a := allocPool.Get().(*allocCache) b := a.alloc(n) @@ -73,12 +84,20 @@ func newAllocCache() *allocCache { bufs: make([][]byte, 0, allocCacheCountLimit), } c.rnd.Seed(uint64(time.Now().UnixNano())) + runtime.SetFinalizer(c, freeAllocCache) return c } +func freeAllocCache(obj interface{}) { + c := obj.(*allocCache) + for i := range c.bufs { + manual.Free(c.bufs[i]) + } +} + func (c *allocCache) alloc(n int) []byte { if n < allocCacheMinSize || n >= allocCacheMaxSize { - return make([]byte, n) + return manual.New(n) } class := sizeToClass(n) @@ -92,12 +111,13 @@ func (c *allocCache) alloc(n int) []byte { } } - return make([]byte, n, classToSize(class)) + return manual.New(classToSize(class))[:n] } func (c *allocCache) free(b []byte) { n := cap(b) if n < allocCacheMinSize || n >= allocCacheMaxSize { + manual.Free(b) return } b = b[:n:n] @@ -117,6 +137,7 @@ func (c *allocCache) free(b []byte) { // are biased, but that is fine for the usage here. j := (uint32(len(c.bufs)) * (uint32(c.rnd.Uint64()) & ((1 << 16) - 1))) >> 16 c.size -= cap(c.bufs[j]) + manual.Free(c.bufs[j]) c.bufs[i], c.bufs[j] = nil, c.bufs[i] c.bufs = c.bufs[:i] } diff --git a/internal/cache/alloc_test.go b/internal/cache/alloc_test.go index d5a5405184..a5b83022ed 100644 --- a/internal/cache/alloc_test.go +++ b/internal/cache/alloc_test.go @@ -7,12 +7,14 @@ package cache import ( "testing" "unsafe" + + "github.com/cockroachdb/pebble/internal/manual" ) func TestAllocCache(t *testing.T) { c := newAllocCache() for i := 0; i < 64; i++ { - c.free(make([]byte, 1025)) + c.free(manual.New(1025)) if c.size == 0 { t.Fatalf("expected cache size to be non-zero") } @@ -34,7 +36,7 @@ func TestAllocCache(t *testing.T) { func TestAllocCacheEvict(t *testing.T) { c := newAllocCache() for i := 0; i < allocCacheCountLimit; i++ { - c.free(make([]byte, 1024)) + c.free(manual.New(1024)) } bufs := make([][]byte, allocCacheCountLimit) @@ -61,7 +63,7 @@ func BenchmarkAllocCache(b *testing.B) { // Populate the cache with buffers if one size class. c := newAllocCache() for i := 0; i < allocCacheCountLimit; i++ { - c.free(make([]byte, 1024)) + c.free(manual.New(1024)) } // Benchmark allocating buffers of a different size class. diff --git a/internal/cache/clockpro.go b/internal/cache/clockpro.go index 1e47d22dfb..1f95c6b667 100644 --- a/internal/cache/clockpro.go +++ b/internal/cache/clockpro.go @@ -18,32 +18,16 @@ package cache // import "github.com/cockroachdb/pebble/internal/cache" import ( + "fmt" + "os" "runtime" + "runtime/debug" "sync" "sync/atomic" - "unsafe" -) - -type entryType int8 -const ( - etTest entryType = iota - etCold - etHot + "github.com/cockroachdb/pebble/internal/invariants" ) -func (p entryType) String() string { - switch p { - case etTest: - return "test" - case etCold: - return "cold" - case etHot: - return "hot" - } - return "unknown" -} - type fileKey struct { // id is the namespace for fileNums. id uint64 @@ -55,135 +39,25 @@ type key struct { offset uint64 } -type value struct { - buf []byte - // The number of references on the value. When refs drops to 0, the buf - // associated with the value may be reused. This is a form of manual memory - // management. See Cache.Free. - refs int32 -} - -func newValue(b []byte) *value { - if b == nil { - return nil - } - // A value starts with 2 references. One for the cache, and one for the - // handle that will be returned. - return &value{buf: b, refs: 2} -} - -func (v *value) acquire() { - atomic.AddInt32(&v.refs, 1) -} - -func (v *value) release() bool { - return atomic.AddInt32(&v.refs, -1) == 0 -} - -type entry struct { - key key - val unsafe.Pointer - blockLink struct { - next *entry - prev *entry - } - fileLink struct { - next *entry - prev *entry - } - size int64 - ptype entryType - // referenced is atomically set to indicate that this entry has been accessed - // since the last time one of the clock hands swept it. - referenced int32 - shard *shard -} - -func (e *entry) init() *entry { - e.blockLink.next = e - e.blockLink.prev = e - e.fileLink.next = e - e.fileLink.prev = e - return e -} - -func (e *entry) next() *entry { - if e == nil { - return nil - } - return e.blockLink.next -} - -func (e *entry) prev() *entry { - if e == nil { - return nil - } - return e.blockLink.prev -} - -func (e *entry) link(s *entry) { - s.blockLink.prev = e.blockLink.prev - s.blockLink.prev.blockLink.next = s - s.blockLink.next = e - s.blockLink.next.blockLink.prev = s -} - -func (e *entry) unlink() *entry { - next := e.blockLink.next - e.blockLink.prev.blockLink.next = e.blockLink.next - e.blockLink.next.blockLink.prev = e.blockLink.prev - e.blockLink.prev = e - e.blockLink.next = e - return next -} - -func (e *entry) linkFile(s *entry) { - s.fileLink.prev = e.fileLink.prev - s.fileLink.prev.fileLink.next = s - s.fileLink.next = e - s.fileLink.next.fileLink.prev = s -} - -func (e *entry) unlinkFile() *entry { - next := e.fileLink.next - e.fileLink.prev.fileLink.next = e.fileLink.next - e.fileLink.next.fileLink.prev = e.fileLink.prev - e.fileLink.prev = e - e.fileLink.next = e - return next -} - -func (e *entry) setValue(v *value) { - if old := e.getValue(); old != nil { - if old.release() { - allocFree(old.buf) - } - } - atomic.StorePointer(&e.val, unsafe.Pointer(v)) +// file returns the "file key" for the receiver. This is the key used for the +// shard.files map. +func (k key) file() key { + k.offset = 0 + return k } -func (e *entry) getValue() *value { - return (*value)(atomic.LoadPointer(&e.val)) -} - -func (e *entry) Get() []byte { - v := e.getValue() - if v == nil { - return nil - } - atomic.StoreInt32(&e.referenced, 1) - // Record a cache hit because the entry is being used as a WeakHandle and - // successfully avoided a more expensive shard.Get() operation. - atomic.AddInt64(&e.shard.hits, 1) - return v.buf +func (k key) String() string { + return fmt.Sprintf("%d/%d/%d", k.id, k.fileNum, k.offset) } // Handle provides a strong reference to an entry in the cache. The reference // does not pin the entry in the cache, but it does prevent the underlying byte -// slice from being reused. +// slice from being reused. When entry is non-nil, value is initialized to +// entry.val. Over the lifetime of the handle (until Release is called), +// entry.val may change, but value will remain unchanged. type Handle struct { entry *entry - value *value + value *Value } // Get returns the value stored in handle. @@ -199,9 +73,7 @@ func (h Handle) Get() []byte { // Release releases the reference to the cache entry. func (h Handle) Release() { if h.value != nil { - if h.value.release() { - allocFree(h.value.buf) - } + h.value.release() } } @@ -211,26 +83,32 @@ func (h Handle) Release() { // the reference count on the value is incremented which will prevent the // associated buffer from ever being reused until it is GC'd by the Go // runtime. It is not necessary to call Handle.Release() after calling Weak(). -func (h Handle) Weak() WeakHandle { - if h.entry == nil { - return nil // return a nil interface, not (*entry)(nil) +func (h Handle) Weak() *WeakHandle { + if h.value.manual() { + panic("pebble: cannot make manual Value into a WeakHandle") } - // Add a reference to the value which will never be cleared. This is - // necessary because WeakHandle.Get() performs an atomic load of the value, - // but we need to ensure that nothing can concurrently be freeing the buffer - // for reuse. Rather than add additional locking to this code path, we add a - // reference here so that the underlying buffer can never be reused. And we - // rely on the Go runtime to eventually GC the buffer. - h.value.acquire() - return h.entry + return (*WeakHandle)(h.entry) } // WeakHandle provides a "weak" reference to an entry in the cache. A weak // reference allows the entry to be evicted, but also provides fast access -type WeakHandle interface { - // Get retrieves the value associated with the weak handle, returning nil if - // no value is present. - Get() []byte +type WeakHandle entry + +// Get retrieves the value associated with the weak handle, returning nil if no +// value is present. The calls to Get must be balanced with the calls to +// Release. +func (h *WeakHandle) Get() []byte { + e := (*entry)(h) + v := e.getValue() + if v == nil { + return nil + } + + atomic.StoreInt32(&e.referenced, 1) + // Record a cache hit because the entry is being used as a WeakHandle and + // successfully avoided a more expensive shard.Get() operation. + atomic.AddInt64(&e.shard.hits, 1) + return v.buf } type shard struct { @@ -241,9 +119,18 @@ type shard struct { reservedSize int64 maxSize int64 - coldSize int64 - blocks map[key]*entry // fileNum+offset -> block - files map[fileKey]*entry // fileNum -> list of blocks + coldTarget int64 + blocks robinHoodMap // fileNum+offset -> block + files robinHoodMap // fileNum -> list of blocks + + // The blocks and files maps store values in manually managed memory that is + // invisible to the Go GC. This is fine for Value and entry objects that are + // stored in manually managed memory. Auto Values and the associated auto + // entries need to have a reference that the Go GC is aware of to prevent + // them from being reclaimed. The entries map provides this reference. When + // the "invariants" build tag is set, all Value and entry objects are Go + // allocated and the entries map will contain a reference to every entry. + entries map[*entry]struct{} handHot *entry handCold *entry @@ -256,8 +143,8 @@ type shard struct { func (c *shard) Get(id, fileNum, offset uint64) Handle { c.mu.RLock() - e := c.blocks[key{fileKey{id, fileNum}, offset}] - var value *value + e := c.blocks.Get(key{fileKey{id, fileNum}, offset}) + var value *Value if e != nil { value = e.getValue() if value != nil { @@ -270,60 +157,94 @@ func (c *shard) Get(id, fileNum, offset uint64) Handle { c.mu.RUnlock() if value == nil { atomic.AddInt64(&c.misses, 1) - } else { - atomic.AddInt64(&c.hits, 1) + return Handle{} + } + atomic.AddInt64(&c.hits, 1) + + // Enforce the restriction that manually managed values cannot be converted + // to weak handles. + if e.manual { + e = nil } - return Handle{value: value} + return Handle{entry: e, value: value} } -func (c *shard) Set(id, fileNum, offset uint64, value []byte) Handle { +func (c *shard) Set(id, fileNum, offset uint64, value *Value) Handle { + if n := atomic.LoadInt32(&value.refs); n != 1 && n >= 0 { + panic(fmt.Sprintf("pebble: Value has already been added to the cache: refs=%d", n)) + } + c.mu.Lock() defer c.mu.Unlock() k := key{fileKey{id, fileNum}, offset} - e := c.blocks[k] - v := newValue(value) + e := c.blocks.Get(k) + if e != nil && e.manual != value.manual() { + panic(fmt.Sprintf("pebble: inconsistent caching of manual Value: entry=%t vs value=%t", + e.manual, value.manual())) + } switch { case e == nil: // no cache entry? add it - e = &entry{ptype: etCold, key: k, size: int64(len(value)), shard: c} - e.init() - e.setValue(v) + e = newEntry(c, k, int64(len(value.buf)), value.manual()) + e.setValue(value) if c.metaAdd(k, e) { + value.trace("add-cold") + value.acquire() // add reference for the cache c.sizeCold += e.size + } else { + value.trace("skip-cold") + e.free() } case e.getValue() != nil: // cache entry was a hot or cold page - e.setValue(v) + value.acquire() // add reference for the cache + e.setValue(value) atomic.StoreInt32(&e.referenced, 1) - delta := int64(len(value)) - e.size - e.size = int64(len(value)) + delta := int64(len(value.buf)) - e.size + e.size = int64(len(value.buf)) if e.ptype == etHot { + value.trace("add-hot") c.sizeHot += delta } else { + value.trace("add-cold") c.sizeCold += delta } c.evict() default: // cache entry was a test page - c.coldSize += e.size - if c.coldSize > c.targetSize() { - c.coldSize = c.targetSize() + c.sizeTest -= e.size + c.metaDel(e) + + c.coldTarget += e.size + if c.coldTarget > c.targetSize() { + c.coldTarget = c.targetSize() } + atomic.StoreInt32(&e.referenced, 0) - e.setValue(v) + e.setValue(value) e.ptype = etHot - c.sizeTest -= e.size - c.metaDel(e) if c.metaAdd(k, e) { + value.trace("add-hot") + value.acquire() // add reference for the cache c.sizeHot += e.size + } else { + value.trace("skip-hot") + e.free() } } - return Handle{entry: e, value: v} + // Enforce the restriction that manually managed values cannot be converted + // to weak handles. + if e.manual { + e = nil + } + // Values are initialized with a reference count of 1. That reference count + // is being transferred to the returned Handle. + return Handle{entry: e, value: value} } // Delete deletes the cached value for the specified file and offset. @@ -331,7 +252,7 @@ func (c *shard) Delete(id, fileNum, offset uint64) { c.mu.Lock() defer c.mu.Unlock() - e := c.blocks[key{fileKey{id, fileNum}, offset}] + e := c.blocks.Get(key{fileKey{id, fileNum}, offset}) if e == nil { return } @@ -343,7 +264,8 @@ func (c *shard) EvictFile(id, fileNum uint64) { c.mu.Lock() defer c.mu.Unlock() - blocks := c.files[fileKey{id, fileNum}] + fkey := key{fileKey{id, fileNum}, 0} + blocks := c.files.Get(fkey) if blocks == nil { return } @@ -391,7 +313,12 @@ func (c *shard) metaAdd(key key, e *entry) bool { return false } - c.blocks[key] = e + c.blocks.Put(key, e) + if !e.managed { + // Go allocated entries need to be referenced from Go memory. The entries + // map provides that reference. + c.entries[e] = struct{}{} + } if c.handHot == nil { // first element @@ -406,16 +333,30 @@ func (c *shard) metaAdd(key key, e *entry) bool { c.handCold = c.handCold.prev() } - if fileBlocks := c.files[key.fileKey]; fileBlocks == nil { - c.files[key.fileKey] = e + fkey := key.file() + if fileBlocks := c.files.Get(fkey); fileBlocks == nil { + c.files.Put(fkey, e) } else { fileBlocks.linkFile(e) } return true } +// Remove the entry from the cache. This removes the entry from the blocks map, +// the files map, and ensures that hand{Hot,Cold,Test} are not pointing at the +// entry. func (c *shard) metaDel(e *entry) { - delete(c.blocks, e.key) + if value := e.getValue(); value != nil { + value.trace("metaDel") + } + e.setValue(nil) + + c.blocks.Delete(e.key) + if !e.managed { + // Go allocated entries need to be referenced from Go memory. The entries + // map provides that reference. + delete(c.entries, e) + } if e == c.handHot { c.handHot = c.handHot.prev() @@ -434,10 +375,43 @@ func (c *shard) metaDel(e *entry) { c.handTest = nil } + fkey := e.key.file() if next := e.unlinkFile(); e == next { - delete(c.files, e.key.fileKey) + c.files.Delete(fkey) } else { - c.files[e.key.fileKey] = next + c.files.Put(fkey, next) + } + + c.metaCheck(e) +} + +// Check that the specified entry is not referenced by the cache. +func (c *shard) metaCheck(e *entry) { + if invariants.Enabled { + if _, ok := c.entries[e]; ok { + fmt.Fprintf(os.Stderr, "%p: %s unexpectedly found in entries map\n%s", + e, e.key, debug.Stack()) + os.Exit(1) + } + if c.blocks.findByValue(e) != nil { + fmt.Fprintf(os.Stderr, "%p: %s unexpectedly found in blocks map\n%s\n%s", + e, e.key, &c.blocks, debug.Stack()) + os.Exit(1) + } + if c.files.findByValue(e) != nil { + fmt.Fprintf(os.Stderr, "%p: %s unexpectedly found in files map\n%s\n%s", + e, e.key, &c.files, debug.Stack()) + os.Exit(1) + } + // NB: c.hand{Hot,Cold,Test} are pointers into a single linked list. We + // only have to traverse one of them to check all of them. + for t := c.handHot.next(); t != c.handHot; t = t.next() { + if e == t { + fmt.Fprintf(os.Stderr, "%p: %s unexpectedly found in blocks list\n%s", + e, e.key, debug.Stack()) + os.Exit(1) + } + } } } @@ -451,6 +425,7 @@ func (c *shard) metaEvict(e *entry) { c.sizeTest -= e.size } c.metaDel(e) + e.free() } func (c *shard) evict() { @@ -480,7 +455,7 @@ func (c *shard) runHandCold() { c.handCold = c.handCold.next() - for c.targetSize()-c.coldSize <= c.sizeHot && c.handHot != nil { + for c.targetSize()-c.coldTarget <= c.sizeHot && c.handHot != nil { c.runHandHot() } } @@ -517,15 +492,13 @@ func (c *shard) runHandTest() { e := c.handTest if e.ptype == etTest { - prev := c.handTest.prev() - c.metaDel(c.handTest) - c.handTest = prev - c.sizeTest -= e.size - c.coldSize -= e.size - if c.coldSize < 0 { - c.coldSize = 0 + c.coldTarget -= e.size + if c.coldTarget < 0 { + c.coldTarget = 0 } + c.metaDel(e) + e.free() } c.handTest = c.handTest.next() @@ -543,7 +516,65 @@ type Metrics struct { Misses int64 } -// Cache ... +// Cache implements Pebble's sharded block cache. The Clock-PRO algorithm is +// used for page replacement +// (http://static.usenix.org/event/usenix05/tech/general/full_papers/jiang/jiang_html/html.html). In +// order to provide better concurrency, 2 x NumCPUs shards are created, with +// each shard being given 1/n of the target cache size. The Clock-PRO algorithm +// is run independently on each shard. +// +// Blocks are keyed by an (id, fileNum, offset) triple. The ID is a namespace +// for file numbers and allows a single Cache to be shared between multiple +// Pebble instances. The fileNum and offset refer to an sstable file number and +// the offset of the block within the file. Because sstables are immutable and +// file numbers are never reused, (fileNum,offset) are unique for the lifetime +// of a Pebble instance. +// +// In addition to maintaining a map from (fileNum,offset) to data, each shard +// maintains a map of the cached blocks for a particular fileNum. This allows +// efficient eviction of all of the blocks for a file which is used when an +// sstable is deleted from disk. +// +// Strong vs Weak Handles +// +// When a block is retrieved from the cache, a Handle is returned. The Handle +// maintains a reference to the associated value and will prevent the value +// from being removed. The caller must call Handle.Release() when they are done +// using the value (failure to do so will result in a memory leak). Until +// Handle.Release() is called, the value is pinned in memory. +// +// A Handle can be transformed into a WeakHandle by a call to Handle.Weak(). A +// WeakHandle does not pin the associated value in memory, and instead allows +// the value to be evicted from the cache and reclaimed by the Go GC. The value +// associated with a WeakHandle can be retrieved by WeakHandle.Get() which may +// return nil if the value has been evicted from the cache. WeakHandle's are +// useful for avoiding the overhad of a cache lookup. They are used for sstable +// index, filter, and range-del blocks, which are frequently accessed, almost +// always in the cache, but which we want to allow to be evicted under memory +// pressure. +// +// Memory Management +// +// In order to reduce pressure on the Go GC, manual memory management is +// performed for the majority of blocks in the cache. Manual memory management +// is selected using Cache.AllocManual() to allocate a value, rather than +// Cache.AllocAuto(). Note that manual values cannot be used for +// WeakHandles. The general rule is to use AllocAuto when a WeakHandle will be +// retrieved, and AllocManual in all other cases. +// +// Manual memory management is performed by calling into C.{malloc,free} to +// allocate memory. Cache.Values are reference counted and the memory backing a +// manual value is freed when the reference count drops to 0. +// +// Manual memory management brings the possibility of memory leaks. It is +// imperative that every Handle returned by Cache.{Get,Set} is eventually +// released. The "invariants" build tag enables a leak detection facility that +// places a GC finalizer on cache.Value. When the cache.Value finalizer is run, +// if the underlying buffer is still present a leak has occurred. The "tracing" +// build tag enables tracing of cache.Value reference count manipulation and +// eases finding where a leak has occurred. These two facilities are usually +// used in combination by specifying `-tags invariants,tracing`. Note that +// "tracing" produces a significant slowdown, while "invariants" does not. type Cache struct { maxSize int64 idAlloc uint64 @@ -556,6 +587,19 @@ func New(size int64) *Cache { return newShards(size, 2*runtime.NumCPU()) } +func clearCache(obj interface{}) { + c := obj.(*Cache) + for i := range c.shards { + s := &c.shards[i] + s.mu.Lock() + s.maxSize = 0 + s.evict() + s.blocks.free() + s.files.free() + s.mu.Unlock() + } +} + func newShards(size int64, shards int) *Cache { c := &Cache{ maxSize: size, @@ -564,12 +608,18 @@ func newShards(size int64, shards int) *Cache { } for i := range c.shards { c.shards[i] = shard{ - maxSize: size / int64(len(c.shards)), - coldSize: size / int64(len(c.shards)), - blocks: make(map[key]*entry), - files: make(map[fileKey]*entry), + maxSize: size / int64(len(c.shards)), + coldTarget: size / int64(len(c.shards)), + entries: make(map[*entry]struct{}), } - } + c.shards[i].blocks.init(16) + c.shards[i].files.init(16) + } + // TODO(peter): This finalizer is used to clear the cache when the Cache + // itself is GC'd. Investigate making this explicit, and then changing the + // finalizer to only be installed when invariants.Enabled is true and to only + // check that all of the manual memory has been freed. + runtime.SetFinalizer(c, clearCache) return c } @@ -611,8 +661,8 @@ func (c *Cache) Get(id, fileNum, offset uint64) Handle { // Set sets the cache value for the specified file and offset, overwriting an // existing value if present. A Handle is returned which provides faster // retrieval of the cached value than Get (lock-free and avoidance of the map -// lookup). -func (c *Cache) Set(id, fileNum, offset uint64, value []byte) Handle { +// lookup). The value must have been allocated by Cache.Alloc. +func (c *Cache) Set(id, fileNum, offset uint64, value *Value) Handle { return c.getShard(id, fileNum, offset).Set(id, fileNum, offset, value) } @@ -645,16 +695,29 @@ func (c *Cache) Size() int64 { return size } -// Alloc allocates a byte slice of the specified size, possibly reusing -// previously allocated but unused memory. -func (c *Cache) Alloc(n int) []byte { - return allocNew(n) +// AllocManual allocates a byte slice of the specified size, possibly reusing +// previously allocated but unused memory. The memory backing the value is +// manually managed. The caller MUST either add the value to the cache (via +// Cache.Set), or release the value (via Cache.Free). Failure to do so will +// result in a memory leak. +func (c *Cache) AllocManual(n int) *Value { + return newManualValue(n) } -// Free frees the specified slice of memory. The buffer will possibly be -// reused, making it invalid to use the buffer after calling Free. -func (c *Cache) Free(b []byte) { - allocFree(b) +// AllocAuto allocates an automatically managed value using buf as the internal +// buffer. +func (c *Cache) AllocAuto(buf []byte) *Value { + return newAutoValue(buf) +} + +// Free frees the specified value. The buffer associated with the value will +// possibly be reused, making it invalid to use the buffer after calling +// Free. Do not call Free on a value that has been added to the cache. +func (c *Cache) Free(v *Value) { + if n := atomic.LoadInt32(&v.refs); n > 1 { + panic(fmt.Sprintf("pebble: Value has been added to the cache: refs=%d", n)) + } + v.release() } // Reserve N bytes in the cache. This effectively shrinks the size of the cache @@ -684,7 +747,7 @@ func (c *Cache) Metrics() Metrics { for i := range c.shards { s := &c.shards[i] s.mu.RLock() - m.Count += int64(len(s.blocks)) + m.Count += int64(s.blocks.Count()) m.Size += s.sizeHot + s.sizeCold s.mu.RUnlock() m.Hits += atomic.LoadInt64(&s.hits) diff --git a/internal/cache/clockpro_test.go b/internal/cache/clockpro_test.go index 5c535092dd..bf6bc4301c 100644 --- a/internal/cache/clockpro_test.go +++ b/internal/cache/clockpro_test.go @@ -37,7 +37,9 @@ func TestCache(t *testing.T) { var hit bool h := cache.Get(1, uint64(key), 0) if v := h.Get(); v == nil { - cache.Set(1, uint64(key), 0, append([]byte(nil), fields[0][0])) + value := cache.AllocManual(1) + value.Buf()[0] = fields[0][0] + cache.Set(1, uint64(key), 0, value).Release() } else { hit = true if !bytes.Equal(v, fields[0][:1]) { @@ -52,10 +54,21 @@ func TestCache(t *testing.T) { } } +func testManualValue(cache *Cache, s string, repeat int) *Value { + b := bytes.Repeat([]byte(s), repeat) + v := cache.AllocManual(len(b)) + copy(v.Buf(), b) + return v +} + +func testAutoValue(cache *Cache, s string, repeat int) *Value { + return cache.AllocAuto(bytes.Repeat([]byte(s), repeat)) +} + func TestWeakHandle(t *testing.T) { cache := newShards(5, 1) - cache.Set(1, 1, 0, bytes.Repeat([]byte("a"), 5)) - h := cache.Set(1, 0, 0, bytes.Repeat([]byte("b"), 5)) + cache.Set(1, 1, 0, testAutoValue(cache, "a", 5)).Release() + h := cache.Set(1, 0, 0, testAutoValue(cache, "b", 5)) if v := h.Get(); string(v) != "bbbbb" { t.Fatalf("expected bbbbb, but found %v", v) } @@ -64,7 +77,7 @@ func TestWeakHandle(t *testing.T) { if v := w.Get(); string(v) != "bbbbb" { t.Fatalf("expected bbbbb, but found %v", v) } - cache.Set(1, 2, 0, bytes.Repeat([]byte("a"), 5)) + cache.Set(1, 2, 0, testManualValue(cache, "a", 5)).Release() if v := w.Get(); v != nil { t.Fatalf("expected nil, but found %s", v) } @@ -72,9 +85,9 @@ func TestWeakHandle(t *testing.T) { func TestCacheDelete(t *testing.T) { cache := newShards(100, 1) - cache.Set(1, 0, 0, bytes.Repeat([]byte("a"), 5)) - cache.Set(1, 1, 0, bytes.Repeat([]byte("a"), 5)) - cache.Set(1, 2, 0, bytes.Repeat([]byte("a"), 5)) + cache.Set(1, 0, 0, testManualValue(cache, "a", 5)).Release() + cache.Set(1, 1, 0, testManualValue(cache, "a", 5)).Release() + cache.Set(1, 2, 0, testManualValue(cache, "a", 5)).Release() if expected, size := int64(15), cache.Size(); expected != size { t.Fatalf("expected cache size %d, but found %d", expected, size) } @@ -84,9 +97,13 @@ func TestCacheDelete(t *testing.T) { } if h := cache.Get(1, 0, 0); h.Get() == nil { t.Fatalf("expected to find block 0/0") + } else { + h.Release() } if h := cache.Get(1, 1, 0); h.Get() != nil { t.Fatalf("expected to not find block 1/0") + } else { + h.Release() } // Deleting a non-existing block does nothing. cache.Delete(1, 1, 0) @@ -97,11 +114,11 @@ func TestCacheDelete(t *testing.T) { func TestEvictFile(t *testing.T) { cache := newShards(100, 1) - cache.Set(1, 0, 0, bytes.Repeat([]byte("a"), 5)) - cache.Set(1, 1, 0, bytes.Repeat([]byte("a"), 5)) - cache.Set(1, 2, 0, bytes.Repeat([]byte("a"), 5)) - cache.Set(1, 2, 1, bytes.Repeat([]byte("a"), 5)) - cache.Set(1, 2, 2, bytes.Repeat([]byte("a"), 5)) + cache.Set(1, 0, 0, testManualValue(cache, "a", 5)).Release() + cache.Set(1, 1, 0, testManualValue(cache, "a", 5)).Release() + cache.Set(1, 2, 0, testManualValue(cache, "a", 5)).Release() + cache.Set(1, 2, 1, testManualValue(cache, "a", 5)).Release() + cache.Set(1, 2, 2, testManualValue(cache, "a", 5)).Release() if expected, size := int64(25), cache.Size(); expected != size { t.Fatalf("expected cache size %d, but found %d", expected, size) } @@ -123,14 +140,14 @@ func TestEvictAll(t *testing.T) { // Verify that it is okay to evict all of the data from a cache. Previously // this would trigger a nil-pointer dereference. cache := newShards(100, 1) - cache.Set(1, 0, 0, bytes.Repeat([]byte("a"), 101)) - cache.Set(1, 1, 0, bytes.Repeat([]byte("a"), 101)) + cache.Set(1, 0, 0, testManualValue(cache, "a", 101)).Release() + cache.Set(1, 1, 0, testManualValue(cache, "a", 101)).Release() } func TestMultipleDBs(t *testing.T) { cache := newShards(100, 1) - cache.Set(1, 0, 0, bytes.Repeat([]byte("a"), 5)) - cache.Set(2, 0, 0, bytes.Repeat([]byte("b"), 5)) + cache.Set(1, 0, 0, testManualValue(cache, "a", 5)).Release() + cache.Set(2, 0, 0, testManualValue(cache, "b", 5)).Release() if expected, size := int64(10), cache.Size(); expected != size { t.Fatalf("expected cache size %d, but found %d", expected, size) } @@ -144,31 +161,31 @@ func TestMultipleDBs(t *testing.T) { } h = cache.Get(2, 0, 0) if v := h.Get(); string(v) != "bbbbb" { - t.Fatalf("expected bbbbb, but found %v", v) + t.Fatalf("expected bbbbb, but found %s", v) } } func TestZeroSize(t *testing.T) { cache := newShards(0, 1) - cache.Set(1, 0, 0, bytes.Repeat([]byte("a"), 5)) + cache.Set(1, 0, 0, testManualValue(cache, "a", 5)).Release() } func TestReserve(t *testing.T) { cache := newShards(4, 2) - cache.Set(1, 0, 0, []byte("a")) - cache.Set(2, 0, 0, []byte("a")) + cache.Set(1, 0, 0, testManualValue(cache, "a", 1)).Release() + cache.Set(2, 0, 0, testManualValue(cache, "a", 1)).Release() require.EqualValues(t, 2, cache.Size()) r := cache.Reserve(1) require.EqualValues(t, 0, cache.Size()) - cache.Set(1, 0, 0, []byte("a")) - cache.Set(2, 0, 0, []byte("a")) - cache.Set(3, 0, 0, []byte("a")) - cache.Set(4, 0, 0, []byte("a")) + cache.Set(1, 0, 0, testManualValue(cache, "a", 1)).Release() + cache.Set(2, 0, 0, testManualValue(cache, "a", 1)).Release() + cache.Set(3, 0, 0, testManualValue(cache, "a", 1)).Release() + cache.Set(4, 0, 0, testManualValue(cache, "a", 1)).Release() require.EqualValues(t, 2, cache.Size()) r() require.EqualValues(t, 2, cache.Size()) - cache.Set(1, 0, 0, []byte("a")) - cache.Set(2, 0, 0, []byte("a")) + cache.Set(1, 0, 0, testManualValue(cache, "a", 1)).Release() + cache.Set(2, 0, 0, testManualValue(cache, "a", 1)).Release() require.EqualValues(t, 4, cache.Size()) } diff --git a/internal/cache/entry.go b/internal/cache/entry.go new file mode 100644 index 0000000000..444d7386ab --- /dev/null +++ b/internal/cache/entry.go @@ -0,0 +1,162 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package cache + +import ( + "sync/atomic" + "unsafe" +) + +type entryType int8 + +const ( + etTest entryType = iota + etCold + etHot +) + +func (p entryType) String() string { + switch p { + case etTest: + return "test" + case etCold: + return "cold" + case etHot: + return "hot" + } + return "unknown" +} + +// entry holds the metadata for a cache entry. If the value stored in an entry +// is manually managed, the entry will also use manual memory management. +// +// Using manual memory management for entries is technically a volation of the +// Cgo pointer rules: +// +// https://golang.org/cmd/cgo/#hdr-Passing_pointers +// +// Specifically, Go pointers should not be stored in C allocated memory. The +// reason for this rule is that the Go GC will not look at C allocated memory +// to find pointers to Go objects. If the only reference to a Go object is +// stored in C allocated memory, the object will be reclaimed. The blockLink, +// fileLink, and shard fields of the entry struct may all point to Go objects, +// thus the violation. What makes this "safe" is that the Cache guarantees that +// there are other pointers to the entry and shard which will keep them +// alive. In particular, every Go allocated entry in the cache is referenced by +// the shard.entries map. And every shard is referenced by the Cache.shards +// map. +type entry struct { + key key + val unsafe.Pointer + blockLink struct { + next *entry + prev *entry + } + fileLink struct { + next *entry + prev *entry + } + size int64 + ptype entryType + // Can the entry hold a manual Value? Only a manually managed entry can store + // manually managed values (Value.manual() is true). + manual bool + // Was the entry allocated using the Go allocator or the manual + // allocator. This can differ from the setting of the manual field due when + // the "invariants" build tag is set. + managed bool + // referenced is atomically set to indicate that this entry has been accessed + // since the last time one of the clock hands swept it. + referenced int32 + shard *shard +} + +const entrySize = int(unsafe.Sizeof(entry{})) + +func newEntry(s *shard, key key, size int64, manual bool) *entry { + var e *entry + if manual { + e = entryAllocNew() + } else { + e = &entry{} + } + *e = entry{ + key: key, + size: size, + ptype: etCold, + manual: manual, + managed: e.managed, + shard: s, + } + e.blockLink.next = e + e.blockLink.prev = e + e.fileLink.next = e + e.fileLink.prev = e + return e +} + +func (e *entry) free() { + if e.manual { + *e = entry{} + entryAllocFree(e) + } +} + +func (e *entry) next() *entry { + if e == nil { + return nil + } + return e.blockLink.next +} + +func (e *entry) prev() *entry { + if e == nil { + return nil + } + return e.blockLink.prev +} + +func (e *entry) link(s *entry) { + s.blockLink.prev = e.blockLink.prev + s.blockLink.prev.blockLink.next = s + s.blockLink.next = e + s.blockLink.next.blockLink.prev = s +} + +func (e *entry) unlink() *entry { + next := e.blockLink.next + e.blockLink.prev.blockLink.next = e.blockLink.next + e.blockLink.next.blockLink.prev = e.blockLink.prev + e.blockLink.prev = e + e.blockLink.next = e + return next +} + +func (e *entry) linkFile(s *entry) { + s.fileLink.prev = e.fileLink.prev + s.fileLink.prev.fileLink.next = s + s.fileLink.next = e + s.fileLink.next.fileLink.prev = s +} + +func (e *entry) unlinkFile() *entry { + next := e.fileLink.next + e.fileLink.prev.fileLink.next = e.fileLink.next + e.fileLink.next.fileLink.prev = e.fileLink.prev + e.fileLink.prev = e + e.fileLink.next = e + return next +} + +func (e *entry) setValue(v *Value) { + if old := e.getValue(); old != nil { + old.release() + } + atomic.StorePointer(&e.val, unsafe.Pointer(v)) +} + +func (e *entry) getValue() *Value { + return (*Value)(atomic.LoadPointer(&e.val)) +} diff --git a/internal/cache/entry_invariants.go b/internal/cache/entry_invariants.go new file mode 100644 index 0000000000..e4850e4fa7 --- /dev/null +++ b/internal/cache/entry_invariants.go @@ -0,0 +1,18 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. +// +// +build invariants tracing + +package cache + +// When the "invariants" or "tracing" build tags are enabled, we need to +// allocate entries using the Go allocator so entry.val properly maintains a +// reference to the Value. + +func entryAllocNew() *entry { + return &entry{} +} + +func entryAllocFree(e *entry) { +} diff --git a/internal/cache/entry_normal.go b/internal/cache/entry_normal.go new file mode 100644 index 0000000000..890d0413ad --- /dev/null +++ b/internal/cache/entry_normal.go @@ -0,0 +1,81 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. +// +// +build !invariants,!tracing + +package cache + +import ( + "runtime" + "sync" + "unsafe" + + "github.com/cockroachdb/pebble/internal/manual" +) + +const ( + entryAllocCacheLimit = 128 +) + +var entryAllocPool = sync.Pool{ + New: func() interface{} { + return newEntryAllocCache() + }, +} + +func entryAllocNew() *entry { + a := entryAllocPool.Get().(*entryAllocCache) + e := a.alloc() + entryAllocPool.Put(a) + e.managed = true + return e +} + +func entryAllocFree(e *entry) { + a := entryAllocPool.Get().(*entryAllocCache) + a.free(e) + entryAllocPool.Put(a) +} + +type entryAllocCache struct { + entries []*entry +} + +func newEntryAllocCache() *entryAllocCache { + c := &entryAllocCache{} + runtime.SetFinalizer(c, freeEntryAllocCache) + return c +} + +func freeEntryAllocCache(obj interface{}) { + c := obj.(*entryAllocCache) + for i, e := range c.entries { + c.dealloc(e) + c.entries[i] = nil + } +} + +func (c *entryAllocCache) alloc() *entry { + n := len(c.entries) + if n == 0 { + b := manual.New(entrySize) + return (*entry)(unsafe.Pointer(&b[0])) + } + e := c.entries[n-1] + c.entries = c.entries[:n-1] + return e +} + +func (c *entryAllocCache) dealloc(e *entry) { + buf := (*[manual.MaxArrayLen]byte)(unsafe.Pointer(e))[:entrySize:entrySize] + manual.Free(buf) +} + +func (c *entryAllocCache) free(e *entry) { + if len(c.entries) == entryAllocCacheLimit { + c.dealloc(e) + return + } + c.entries = append(c.entries, e) +} diff --git a/internal/cache/robin_hood.go b/internal/cache/robin_hood.go new file mode 100644 index 0000000000..7ad32fc2c3 --- /dev/null +++ b/internal/cache/robin_hood.go @@ -0,0 +1,318 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package cache + +import ( + "fmt" + "math/bits" + "os" + "runtime" + "runtime/debug" + "strings" + "time" + "unsafe" + + "github.com/cockroachdb/pebble/internal/invariants" + "github.com/cockroachdb/pebble/internal/manual" +) + +var hashSeed = uint64(time.Now().UnixNano()) + +// Fibonacci hash: https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/ +func robinHoodHash(k key, shift uint32) uint32 { + const m = 11400714819323198485 + h := hashSeed + h ^= k.id * m + h ^= k.fileNum * m + h ^= k.offset * m + return uint32(h >> shift) +} + +type robinHoodEntry struct { + key key + // Note that value may point to a Go allocated object, even though the memory + // for the entry itself is manually managed. This is technically a volation + // of the Cgo pointer rules: + // + // https://golang.org/cmd/cgo/#hdr-Passing_pointers + // + // Specifically, Go pointers should not be stored in C allocated memory. The + // reason for this rule is that the Go GC will not look at C allocated memory + // to find pointers to Go objects. If the only reference to a Go object is + // stored in C allocated memory, the object will be reclaimed. What makes + // this "safe" is that the Cache guarantees that there are other pointers to + // the entry and shard which will keep them alive. In particular, every Go + // allocated entry in the cache is referenced by the shard.entries map. And + // every shard is referenced by the Cache.shards map. + value *entry + // The distance the entry is from its desired position. + dist uint32 +} + +type robinHoodEntries struct { + ptr unsafe.Pointer + len uint32 +} + +func newRobinHoodEntries(n uint32) robinHoodEntries { + size := uintptr(n) * unsafe.Sizeof(robinHoodEntry{}) + return robinHoodEntries{ + ptr: unsafe.Pointer(&(manual.New(int(size)))[0]), + len: n, + } +} + +func (e robinHoodEntries) at(i uint32) *robinHoodEntry { + return (*robinHoodEntry)(unsafe.Pointer(uintptr(e.ptr) + + uintptr(i)*unsafe.Sizeof(robinHoodEntry{}))) +} + +func (e robinHoodEntries) free() { + size := uintptr(e.len) * unsafe.Sizeof(robinHoodEntry{}) + buf := (*[manual.MaxArrayLen]byte)(e.ptr)[:size:size] + manual.Free(buf) +} + +// robinHoodMap is an implementation of Robin Hood hashing. Robin Hood hashing +// is an open-address hash table using linear probing. The twist is that the +// linear probe distance is reduced by moving existing entries when inserting +// and deleting. This is accomplished by keeping track of how far an entry is +// from its "desired" slot (hash of key modulo number of slots). During +// insertion, if the new entry being inserted is farther from its desired slot +// than the target entry, we swap the target and new entry. This effectively +// steals from the "rich" target entry and gives to the "poor" new entry (thus +// the origin of the name). +// +// An extension over the base Robin Hood hashing idea comes from +// https://probablydance.com/2017/02/26/i-wrote-the-fastest-hashtable/. A cap +// is placed on the max distance an entry can be from its desired slot. When +// this threshold is reached during insertion, the size of the table is doubled +// and insertion is restarted. Additionally, the entries slice is given "max +// dist" extra entries on the end. The very last entry in the entries slice is +// never used and acts as a sentinel which terminates loops. The previous +// maxDist-1 entries act as the extra entries. For example, if the size of the +// table is 2, maxDist is computed as 4 and the actual size of the entry slice +// is 6. +// +// +---+---+---+---+---+---+ +// | 0 | 1 | 2 | 3 | 4 | 5 | +// +---+---+---+---+---+---+ +// ^ +// size +// +// In this scenario, the target entry for a key will always be in the range +// [0,1]. Valid entries may reside in the range [0,4] due to the linear probing +// of up to maxDist entries. The entry at index 5 will never contain a value, +// and instead acts as a sentinel (its distance is always 0). The max distance +// threshold is set to log2(num-entries). This ensures that retrieval is O(log +// N), though note that N is the number of total entries, not the count of +// valid entries. +// +// Deletion is implemented via the backward shift delete mechanism instead of +// tombstones. This preserves the performance of the table in the presence of +// deletions. See +// http://codecapsule.com/2013/11/17/robin-hood-hashing-backward-shift-deletion +// for details. +type robinHoodMap struct { + entries robinHoodEntries + size uint32 + shift uint32 + count uint32 + maxDist uint32 +} + +func maxDistForSize(size uint32) uint32 { + desired := uint32(bits.Len32(size)) + if desired < 4 { + desired = 4 + } + return desired +} + +func newRobinHoodMap(initialCapacity int) *robinHoodMap { + m := &robinHoodMap{} + m.init(initialCapacity) + runtime.SetFinalizer(m, clearRobinHoodMap) + return m +} + +func clearRobinHoodMap(obj interface{}) { + m := obj.(*robinHoodMap) + m.free() +} + +func (m *robinHoodMap) init(initialCapacity int) { + if initialCapacity < 1 { + initialCapacity = 1 + } + targetSize := 1 << (uint(bits.Len(uint(2*initialCapacity-1))) - 1) + m.rehash(uint32(targetSize)) +} + +func (m *robinHoodMap) free() { + if m.entries.ptr != nil { + m.entries.free() + m.entries.ptr = nil + } +} + +func (m *robinHoodMap) rehash(size uint32) { + oldEntries := m.entries + + m.size = size + m.shift = uint32(64 - bits.Len32(m.size-1)) + m.maxDist = maxDistForSize(size) + m.entries = newRobinHoodEntries(size + m.maxDist) + m.count = 0 + + for i := uint32(0); i < oldEntries.len; i++ { + e := oldEntries.at(i) + if e.value != nil { + m.Put(e.key, e.value) + } + } + + if oldEntries.ptr != nil { + oldEntries.free() + } +} + +// Find an entry containing the specified value. This is intended to be used +// from debug and test code. +func (m *robinHoodMap) findByValue(v *entry) *robinHoodEntry { + for i := uint32(0); i < m.entries.len; i++ { + e := m.entries.at(i) + if e.value == v { + return e + } + } + return nil +} + +func (m *robinHoodMap) Count() int { + return int(m.count) +} + +func (m *robinHoodMap) Put(k key, v *entry) { + maybeExists := true + n := robinHoodEntry{key: k, value: v, dist: 0} + for i := robinHoodHash(k, m.shift); ; i++ { + e := m.entries.at(i) + if maybeExists && k == e.key { + // Entry already exists: overwrite. + e.value = n.value + m.checkEntry(i) + return + } + + if e.value == nil { + // Found an empty entry: insert here. + *e = n + m.count++ + m.checkEntry(i) + return + } + + if e.dist < n.dist { + // Swap the new entry with the current entry because the current is + // rich. We then continue to loop, looking for a new location for the + // current entry. Note that this is also the not-found condition for + // retrieval, which means that "k" is not present in the map. See Get(). + n, *e = *e, n + m.checkEntry(i) + maybeExists = false + } + + // The new entry gradually moves away from its ideal position. + n.dist++ + + // If we've reached the max distance threshold, grow the table and restart + // the insertion. + if n.dist == m.maxDist { + m.rehash(2 * m.size) + i = robinHoodHash(n.key, m.shift) - 1 + n.dist = 0 + maybeExists = false + } + } +} + +func (m *robinHoodMap) Get(k key) *entry { + var dist uint32 + for i := robinHoodHash(k, m.shift); ; i++ { + e := m.entries.at(i) + if k == e.key { + // Found. + return e.value + } + if e.dist < dist { + // Not found. + return nil + } + dist++ + } +} + +func (m *robinHoodMap) Delete(k key) { + var dist uint32 + for i := robinHoodHash(k, m.shift); ; i++ { + e := m.entries.at(i) + if k == e.key { + m.checkEntry(i) + // We found the entry to delete. Shift the following entries backwards + // until the next empty value or entry with a zero distance. Note that + // empty values are guaranteed to have "dist == 0". + m.count-- + for j := i + 1; ; j++ { + t := m.entries.at(j) + if t.dist == 0 { + *e = robinHoodEntry{} + return + } + e.key = t.key + e.value = t.value + e.dist = t.dist - 1 + e = t + m.checkEntry(j) + } + } + if dist > e.dist { + // Not found. + return + } + dist++ + } +} + +func (m *robinHoodMap) checkEntry(i uint32) { + if invariants.Enabled { + e := m.entries.at(i) + if e.value != nil { + pos := robinHoodHash(e.key, m.shift) + if (uint32(i) - pos) != e.dist { + fmt.Fprintf(os.Stderr, "%d: invalid dist=%d, expected %d: %s\n%s", + i, e.dist, uint32(i)-pos, e.key, debug.Stack()) + os.Exit(1) + } + if e.dist > m.maxDist { + fmt.Fprintf(os.Stderr, "%d: invalid dist=%d > maxDist=%d: %s\n%s", + i, e.dist, m.maxDist, e.key, debug.Stack()) + os.Exit(1) + } + } + } +} + +func (m *robinHoodMap) String() string { + var buf strings.Builder + fmt.Fprintf(&buf, "count: %d\n", m.count) + for i := uint32(0); i < m.entries.len; i++ { + e := m.entries.at(i) + if e.value != nil { + fmt.Fprintf(&buf, "%d: [%s,%p,%d]\n", i, e.key, e.value, e.dist) + } + } + return buf.String() +} diff --git a/internal/cache/robin_hood_test.go b/internal/cache/robin_hood_test.go new file mode 100644 index 0000000000..af952530ba --- /dev/null +++ b/internal/cache/robin_hood_test.go @@ -0,0 +1,238 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package cache + +import ( + "fmt" + "io/ioutil" + "runtime" + "testing" + "time" + + "golang.org/x/exp/rand" +) + +func TestRobinHoodMap(t *testing.T) { + rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) + rhMap := newRobinHoodMap(0) + goMap := make(map[key]*entry) + + randomKey := func() key { + n := rng.Intn(len(goMap)) + for k := range goMap { + if n == 0 { + return k + } + n-- + } + return key{} + } + + ops := 10000 + rng.Intn(10000) + for i := 0; i < ops; i++ { + var which float64 + if len(goMap) > 0 { + which = rng.Float64() + } + + switch { + case which < 0.4: + // 40% insert. + var k key + k.id = rng.Uint64() + k.fileNum = rng.Uint64() + k.offset = rng.Uint64() + e := &entry{} + goMap[k] = e + rhMap.Put(k, e) + if len(goMap) != int(rhMap.Count()) { + t.Fatalf("map sizes differ: %d != %d", len(goMap), rhMap.Count()) + } + + case which < 0.1: + // 10% overwrite. + k := randomKey() + e := &entry{} + goMap[k] = e + rhMap.Put(k, e) + if len(goMap) != int(rhMap.Count()) { + t.Fatalf("map sizes differ: %d != %d", len(goMap), rhMap.Count()) + } + + case which < 0.75: + // 25% delete. + k := randomKey() + delete(goMap, k) + rhMap.Delete(k) + if len(goMap) != int(rhMap.Count()) { + t.Fatalf("map sizes differ: %d != %d", len(goMap), rhMap.Count()) + } + + default: + // 25% lookup. + k := randomKey() + v := goMap[k] + u := rhMap.Get(k) + if v != u { + t.Fatalf("%s: expected %p, but found %p", k, v, u) + } + } + } + + t.Logf("map size: %d", len(goMap)) +} + +const benchSize = 1 << 20 + +func BenchmarkGoMapInsert(b *testing.B) { + rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) + keys := make([]key, benchSize) + for i := range keys { + keys[i].fileNum = uint64(rng.Intn(1 << 20)) + keys[i].offset = uint64(rng.Intn(1 << 20)) + } + b.ResetTimer() + + var m map[key]*entry + for i, j := 0, 0; i < b.N; i, j = i+1, j+1 { + if m == nil || j == len(keys) { + b.StopTimer() + m = make(map[key]*entry, len(keys)) + j = 0 + b.StartTimer() + } + m[keys[j]] = nil + } +} + +func BenchmarkRobinHoodInsert(b *testing.B) { + rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) + keys := make([]key, benchSize) + for i := range keys { + keys[i].fileNum = uint64(rng.Intn(1 << 20)) + keys[i].offset = uint64(rng.Intn(1 << 20)) + } + e := &entry{} + b.ResetTimer() + + var m *robinHoodMap + for i, j := 0, 0; i < b.N; i, j = i+1, j+1 { + if m == nil || j == len(keys) { + b.StopTimer() + m = newRobinHoodMap(len(keys)) + j = 0 + b.StartTimer() + } + m.Put(keys[j], e) + } + + runtime.KeepAlive(e) +} + +func BenchmarkGoMapLookupHit(b *testing.B) { + rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) + keys := make([]key, benchSize) + m := make(map[key]*entry, len(keys)) + e := &entry{} + for i := range keys { + keys[i].fileNum = uint64(rng.Intn(1 << 20)) + keys[i].offset = uint64(rng.Intn(1 << 20)) + m[keys[i]] = e + } + b.ResetTimer() + + var p *entry + for i, j := 0, 0; i < b.N; i, j = i+1, j+1 { + if j == len(keys) { + j = 0 + } + p = m[keys[j]] + } + + if testing.Verbose() { + fmt.Fprintln(ioutil.Discard, p) + } +} + +func BenchmarkRobinHoodLookupHit(b *testing.B) { + rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) + keys := make([]key, benchSize) + m := newRobinHoodMap(len(keys)) + e := &entry{} + for i := range keys { + keys[i].fileNum = uint64(rng.Intn(1 << 20)) + keys[i].offset = uint64(rng.Intn(1 << 20)) + m.Put(keys[i], e) + } + b.ResetTimer() + + var p *entry + for i, j := 0, 0; i < b.N; i, j = i+1, j+1 { + if j == len(keys) { + j = 0 + } + p = m.Get(keys[j]) + } + + if testing.Verbose() { + fmt.Fprintln(ioutil.Discard, p) + } + runtime.KeepAlive(e) +} + +func BenchmarkGoMapLookupMiss(b *testing.B) { + rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) + keys := make([]key, benchSize) + m := make(map[key]*entry, len(keys)) + e := &entry{} + for i := range keys { + keys[i].id = 1 + keys[i].fileNum = uint64(rng.Intn(1 << 20)) + keys[i].offset = uint64(rng.Intn(1 << 20)) + m[keys[i]] = e + keys[i].id = 2 + } + b.ResetTimer() + + var p *entry + for i, j := 0, 0; i < b.N; i, j = i+1, j+1 { + if j == len(keys) { + j = 0 + } + p = m[keys[j]] + } + + if testing.Verbose() { + fmt.Fprintln(ioutil.Discard, p) + } +} + +func BenchmarkRobinHoodLookupMiss(b *testing.B) { + rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) + keys := make([]key, benchSize) + m := newRobinHoodMap(len(keys)) + e := &entry{} + for i := range keys { + keys[i].id = 1 + keys[i].fileNum = uint64(rng.Intn(1 << 20)) + keys[i].offset = uint64(rng.Intn(1 << 20)) + m.Put(keys[i], e) + keys[i].id = 2 + } + b.ResetTimer() + + var p *entry + for i, j := 0, 0; i < b.N; i, j = i+1, j+1 { + if j == len(keys) { + j = 0 + } + p = m.Get(keys[j]) + } + + if testing.Verbose() { + fmt.Fprintln(ioutil.Discard, p) + } + runtime.KeepAlive(e) +} diff --git a/internal/cache/value.go b/internal/cache/value.go new file mode 100644 index 0000000000..7adcbf2a00 --- /dev/null +++ b/internal/cache/value.go @@ -0,0 +1,57 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package cache + +import "sync/atomic" + +func newAutoValue(b []byte) *Value { + if b == nil { + return nil + } + // A value starts with an invalid reference count. When the value is added to + // the cache, the reference count will be set to 2: one reference for the + // cache, and another for the returned Handle. + return &Value{buf: b, refs: -(1 << 30)} +} + +// Buf returns the buffer associated with the value. The contents of the buffer +// should not be changed once the value has been added to the cache. Instead, a +// new Value should be created and added to the cache to replace the existing +// value. +func (v *Value) Buf() []byte { + if v == nil { + return nil + } + return v.buf +} + +// Truncate the buffer to the specified length. The buffer length should not be +// changed once the value has been added to the cache as there may be +// concurrent readers of the Value. Instead, a new Value should be created and +// added to the cache to replace the existing value. +func (v *Value) Truncate(n int) { + v.buf = v.buf[:n] +} + +func (v *Value) auto() bool { + return atomic.LoadInt32(&v.refs) < 0 +} + +func (v *Value) manual() bool { + return !v.auto() +} + +func (v *Value) acquire() { + atomic.AddInt32(&v.refs, 1) + v.trace("acquire") +} + +func (v *Value) release() { + n := atomic.AddInt32(&v.refs, -1) + v.trace("release") + if n == 0 { + v.free() + } +} diff --git a/internal/cache/value_invariants.go b/internal/cache/value_invariants.go new file mode 100644 index 0000000000..eca839f572 --- /dev/null +++ b/internal/cache/value_invariants.go @@ -0,0 +1,48 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build invariants tracing + +package cache + +import ( + "fmt" + "os" + "runtime" + "sync/atomic" +) + +func checkValue(obj interface{}) { + v := obj.(*Value) + if v.buf != nil { + fmt.Fprintf(os.Stderr, "%p: cache value was not freed: refs=%d\n%s", + v.buf, atomic.LoadInt32(&v.refs), v.traces()) + os.Exit(1) + } +} + +// newManualValue creates a Value with a manually managed buffer of size n. +// +// This definition of newManualValue is used when either the "invariants" or +// "tracing" build tags are specified. It hooks up a finalizer to the returned +// Value that checks for memory leaks when the GC determines the Value is no +// longer reachable. +func newManualValue(n int) *Value { + if n == 0 { + return nil + } + b := allocNew(n) + v := &Value{buf: b, refs: 1} + v.trace("alloc") + runtime.SetFinalizer(v, checkValue) + return v +} + +func (v *Value) free() { + allocFree(v.buf) + // Setting Value.buf to nil is needed for correctness of the leak checking + // that is performed when the "invariants" or "tracing" build tags are + // enabled. + v.buf = nil +} diff --git a/internal/cache/value_normal.go b/internal/cache/value_normal.go new file mode 100644 index 0000000000..afee2ce7be --- /dev/null +++ b/internal/cache/value_normal.go @@ -0,0 +1,38 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build !invariants,!tracing + +package cache + +import ( + "unsafe" + + "github.com/cockroachdb/pebble/internal/manual" +) + +const valueSize = int(unsafe.Sizeof(Value{})) + +func newManualValue(n int) *Value { + if n == 0 { + return nil + } + // When we're not performing leak detection, the lifetime of the returned + // Value is exactly the lifetime of the backing buffer and we can manually + // allocate both. + b := allocNew(valueSize + n) + v := (*Value)(unsafe.Pointer(&b[0])) + v.buf = b[valueSize:] + v.refs = 1 + return v +} + +func (v *Value) free() { + // When we're not performing leak detection, the Value and buffer were + // allocated contiguously. + n := valueSize + cap(v.buf) + buf := (*[manual.MaxArrayLen]byte)(unsafe.Pointer(v))[:n:n] + v.buf = nil + allocFree(buf) +} diff --git a/internal/cache/value_notracing.go b/internal/cache/value_notracing.go new file mode 100644 index 0000000000..01b01ef9b0 --- /dev/null +++ b/internal/cache/value_notracing.go @@ -0,0 +1,34 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build !tracing + +package cache + +// Value holds a reference counted immutable value. +// +// This is the definition of Value that is used in normal builds and when the +// "invariants" build tag is specified and the "tracing" build tag is not +// specified. If the "tracing" build tag is specified, the Value definition +// comes from value_tracing.go. +type Value struct { + buf []byte + // The number of references on the value. When refs drops to 0, the buf + // associated with the value may be reused. This is a form of manual memory + // management. See Cache.Free. + // + // Auto values are distinguished by setting their reference count to + // -(1<<30). + refs int32 +} + +func (v *Value) trace(msg string) { +} + +func (v *Value) traces() string { + return "" +} + +// Silence unused warning. +var _ = (*Value).traces diff --git a/internal/cache/value_tracing.go b/internal/cache/value_tracing.go new file mode 100644 index 0000000000..ac84fffcc8 --- /dev/null +++ b/internal/cache/value_tracing.go @@ -0,0 +1,49 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build tracing + +package cache + +import ( + "fmt" + "runtime/debug" + "strings" + "sync" + "sync/atomic" +) + +// Value holds a reference counted immutable value. +// +// This is the definition of Value that is used when the "tracing" build tag is +// specified. +type Value struct { + buf []byte + // The number of references on the value. When refs drops to 0, the buf + // associated with the value may be reused. This is a form of manual memory + // management. See Cache.Free. + // + // Auto values are distinguished by setting their reference count to + // -(1<<30). + refs int32 + // Traces recorded by Value.trace. Used for debugging. + tr struct { + sync.Mutex + msgs []string + } +} + +func (v *Value) trace(msg string) { + s := fmt.Sprintf("%s: refs=%d\n%s", msg, atomic.LoadInt32(&v.refs), debug.Stack()) + v.tr.Lock() + v.tr.msgs = append(v.tr.msgs, s) + v.tr.Unlock() +} + +func (v *Value) traces() string { + v.tr.Lock() + s := strings.Join(v.tr.msgs, "\n") + v.tr.Unlock() + return s +} diff --git a/internal/manual/manual.go b/internal/manual/manual.go new file mode 100644 index 0000000000..b6d393869a --- /dev/null +++ b/internal/manual/manual.go @@ -0,0 +1,48 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package manual + +// #include +import "C" +import "unsafe" + +// TODO(peter): Rather than relying an C malloc/free, we could fork the Go +// runtime page allocator and allocate large chunks of memory using mmap or +// similar. + +// New allocates a slice of size n. The returned slice is from manually managed +// memory and MUST be released by calling Free. Failure to do so will result in +// a memory leak. +func New(n int) []byte { + if n == 0 { + return make([]byte, 0) + } + // We need to be conscious of the Cgo pointer passing rules: + // + // https://golang.org/cmd/cgo/#hdr-Passing_pointers + // + // ... + // Note: the current implementation has a bug. While Go code is permitted + // to write nil or a C pointer (but not a Go pointer) to C memory, the + // current implementation may sometimes cause a runtime error if the + // contents of the C memory appear to be a Go pointer. Therefore, avoid + // passing uninitialized C memory to Go code if the Go code is going to + // store pointer values in it. Zero out the memory in C before passing it + // to Go. + ptr := C.calloc(C.size_t(n), 1) + // Interpret the C pointer as a pointer to a Go array, then slice. + return (*[MaxArrayLen]byte)(unsafe.Pointer(ptr))[:n:n] +} + +// Free frees the specified slice. +func Free(b []byte) { + if cap(b) != 0 { + if len(b) == 0 { + b = b[:cap(b)] + } + ptr := unsafe.Pointer(&b[0]) + C.free(ptr) + } +} diff --git a/internal/manual/manual_32bit.go b/internal/manual/manual_32bit.go new file mode 100644 index 0000000000..9d1add4905 --- /dev/null +++ b/internal/manual/manual_32bit.go @@ -0,0 +1,12 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build 386 amd64p32 arm armbe mips mipsle mips64p32 mips64p32le ppc sparc + +package manual + +const ( + // MaxArrayLen is a safe maximum length for slices on this architecture. + MaxArrayLen = 1<<31 - 1 +) diff --git a/internal/manual/manual_64bit.go b/internal/manual/manual_64bit.go new file mode 100644 index 0000000000..f03846953a --- /dev/null +++ b/internal/manual/manual_64bit.go @@ -0,0 +1,12 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build amd64 arm64 arm64be ppc64 ppc64le mips64 mips64le s390x sparc64 + +package manual + +const ( + // MaxArrayLen is a safe maximum length for slices on this architecture. + MaxArrayLen = 1<<50 - 1 +) diff --git a/internal/manual/manual_nocgo.go b/internal/manual/manual_nocgo.go new file mode 100644 index 0000000000..466d1ee634 --- /dev/null +++ b/internal/manual/manual_nocgo.go @@ -0,0 +1,19 @@ +// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +// +build !cgo + +package manual + +// Provides versions of New and Free when cgo is not available (e.g. cross +// compilation). + +// New allocates a slice of size n. +func New(n int) []byte { + return make([]byte, n) +} + +// Free frees the specified slice. +func Free(b []byte) { +} diff --git a/sstable/block.go b/sstable/block.go index fdf034b906..b18123b708 100644 --- a/sstable/block.go +++ b/sstable/block.go @@ -267,6 +267,12 @@ func (i *blockIter) init(cmp Compare, block block, globalSeqNum uint64) error { return nil } +func (i *blockIter) initHandle(cmp Compare, block cache.Handle, globalSeqNum uint64) error { + i.cacheHandle.Release() + i.cacheHandle = block + return i.init(cmp, block.Get(), globalSeqNum) +} + func (i *blockIter) invalidate() { i.clearCache() i.offset = 0 @@ -284,11 +290,6 @@ func (i *blockIter) resetForReuse() blockIter { } } -func (i *blockIter) setCacheHandle(h cache.Handle) { - i.cacheHandle.Release() - i.cacheHandle = h -} - func (i *blockIter) readEntry() { ptr := unsafe.Pointer(uintptr(i.ptr) + uintptr(i.offset)) @@ -832,6 +833,7 @@ func (i *blockIter) Error() error { // package. func (i *blockIter) Close() error { i.cacheHandle.Release() + i.cacheHandle = cache.Handle{} i.val = nil return i.err } diff --git a/sstable/data_test.go b/sstable/data_test.go index 777aaf5116..05b956f9b8 100644 --- a/sstable/data_test.go +++ b/sstable/data_test.go @@ -177,6 +177,7 @@ func runIterCmd(td *datadriven.TestData, r *Reader) string { if err := iter.Error(); err != nil { return err.Error() } + defer iter.Close() var b bytes.Buffer var prefix []byte diff --git a/sstable/reader.go b/sstable/reader.go index 0bf5745a36..7a0baf2ca4 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -10,6 +10,8 @@ import ( "errors" "fmt" "io" + "os" + "runtime" "sort" "sync" "unsafe" @@ -17,6 +19,7 @@ import ( "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" "github.com/cockroachdb/pebble/internal/crc" + "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/private" "github.com/cockroachdb/pebble/internal/rangedel" "github.com/cockroachdb/pebble/vfs" @@ -80,16 +83,48 @@ var _ base.InternalIterator = (*singleLevelIterator)(nil) var singleLevelIterPool = sync.Pool{ New: func() interface{} { - return &singleLevelIterator{} + i := &singleLevelIterator{} + if invariants.Enabled { + runtime.SetFinalizer(i, checkSingleLevelIterator) + } + return i }, } var twoLevelIterPool = sync.Pool{ New: func() interface{} { - return &twoLevelIterator{} + i := &twoLevelIterator{} + if invariants.Enabled { + runtime.SetFinalizer(i, checkTwoLevelIterator) + } + return i }, } +func checkSingleLevelIterator(obj interface{}) { + i := obj.(*singleLevelIterator) + if p := i.data.cacheHandle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "singleLevelIterator.data.cacheHandle is not nil: %p\n", p) + os.Exit(1) + } + if p := i.index.cacheHandle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "singleLevelIterator.index.cacheHandle is not nil: %p\n", p) + os.Exit(1) + } +} + +func checkTwoLevelIterator(obj interface{}) { + i := obj.(*twoLevelIterator) + if p := i.data.cacheHandle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "singleLevelIterator.data.cacheHandle is not nil: %p\n", p) + os.Exit(1) + } + if p := i.index.cacheHandle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "singleLevelIterator.index.cacheHandle is not nil: %p\n", p) + os.Exit(1) + } +} + // Init initializes a singleLevelIterator for reading from the table. It is // synonmous with Reader.NewIter, but allows for reusing of the iterator // between different Readers. @@ -159,13 +194,12 @@ func (i *singleLevelIterator) loadBlock() bool { i.err = errCorruptIndexEntry return false } - block, err := i.reader.readBlock(i.dataBH, nil /* transform */) + block, err := i.reader.readBlock(i.dataBH, nil /* transform */, false /* weak */) if err != nil { i.err = err return false } - i.data.setCacheHandle(block) - i.err = i.data.init(i.cmp, block.Get(), i.reader.Properties.GlobalSeqNum) + i.err = i.data.initHandle(i.cmp, block, i.reader.Properties.GlobalSeqNum) if i.err != nil { return false } @@ -441,18 +475,23 @@ func (i *singleLevelIterator) SetCloseHook(fn func(i Iterator) error) { i.closeHook = fn } +func firstError(err0, err1 error) error { + if err0 != nil { + return err0 + } + return err1 +} + // Close implements internalIterator.Close, as documented in the pebble // package. func (i *singleLevelIterator) Close() error { + var err error if i.closeHook != nil { - if err := i.closeHook(i); err != nil { - return err - } - } - if err := i.data.Close(); err != nil { - return err + err = firstError(err, i.closeHook(i)) } - err := i.err + err = firstError(err, i.data.Close()) + err = firstError(err, i.index.Close()) + err = firstError(err, i.err) *i = i.resetForReuse() singleLevelIterPool.Put(i) return err @@ -559,13 +598,12 @@ func (i *twoLevelIterator) loadIndex() bool { i.err = errors.New("pebble/table: corrupt top level index entry") return false } - indexBlock, err := i.reader.readBlock(h, nil /* transform */) + indexBlock, err := i.reader.readBlock(h, nil /* transform */, false /* weak */) if err != nil { i.err = err return false } - i.index.setCacheHandle(indexBlock) - i.err = i.index.init(i.cmp, indexBlock.Get(), i.reader.Properties.GlobalSeqNum) + i.err = i.index.initHandle(i.cmp, indexBlock, i.reader.Properties.GlobalSeqNum) return i.err == nil } @@ -784,15 +822,13 @@ func (i *twoLevelIterator) skipBackward() (*InternalKey, []byte) { // Close implements internalIterator.Close, as documented in the pebble // package. func (i *twoLevelIterator) Close() error { + var err error if i.closeHook != nil { - if err := i.closeHook(i); err != nil { - return err - } - } - if err := i.data.Close(); err != nil { - return err + err = firstError(err, i.closeHook(i)) } - err := i.err + err = firstError(err, i.data.Close()) + err = firstError(err, i.index.Close()) + err = firstError(err, i.err) *i = twoLevelIterator{ singleLevelIterator: i.singleLevelIterator.resetForReuse(), topLevelIndex: i.topLevelIndex.resetForReuse(), @@ -812,6 +848,10 @@ type twoLevelCompactionIterator struct { // twoLevelCompactionIterator implements the base.InternalIterator interface. var _ base.InternalIterator = (*twoLevelCompactionIterator)(nil) +func (i *twoLevelCompactionIterator) Close() error { + return i.twoLevelIterator.Close() +} + func (i *twoLevelCompactionIterator) SeekGE(key []byte) (*InternalKey, []byte) { panic("pebble: SeekGE unimplemented") } @@ -874,7 +914,7 @@ func (i *twoLevelCompactionIterator) skipForward( type weakCachedBlock struct { bh BlockHandle mu sync.RWMutex - handle cache.WeakHandle + handle *cache.WeakHandle } type blockTransform func([]byte) ([]byte, error) @@ -1040,7 +1080,15 @@ func (r *Reader) get(key []byte) (value []byte, err error) { } return nil, err } - return value, i.Close() + + // The value will be "freed" when the iterator is closed, so make a copy + // which will outlast the lifetime of the iterator. + newValue := make([]byte, len(value)) + copy(newValue, value) + if err := i.Close(); err != nil { + return nil, err + } + return newValue, nil } // NewIter returns an iterator for the contents of the table. @@ -1121,7 +1169,7 @@ func (r *Reader) readWeakCachedBlock(w *weakCachedBlock, transform blockTransfor // Slow-path: read the index block from disk. This checks the cache again, // but that is ok because somebody else might have inserted it for us. - h, err := r.readBlock(w.bh, transform) + h, err := r.readBlock(w.bh, transform, true /* weak */) if err != nil { return nil, err } @@ -1135,24 +1183,35 @@ func (r *Reader) readWeakCachedBlock(w *weakCachedBlock, transform blockTransfor } // readBlock reads and decompresses a block from disk into memory. -func (r *Reader) readBlock(bh BlockHandle, transform blockTransform) (cache.Handle, error) { +func (r *Reader) readBlock( + bh BlockHandle, transform blockTransform, weak bool, +) (cache.Handle, error) { if h := r.opts.Cache.Get(r.cacheID, r.fileNum, bh.Offset); h.Get() != nil { return h, nil } - b := r.opts.Cache.Alloc(int(bh.Length + blockTrailerLen)) + var v *cache.Value + if weak { + v = r.opts.Cache.AllocAuto(make([]byte, int(bh.Length+blockTrailerLen))) + } else { + v = r.opts.Cache.AllocManual(int(bh.Length + blockTrailerLen)) + } + b := v.Buf() if _, err := r.file.ReadAt(b, int64(bh.Offset)); err != nil { + r.opts.Cache.Free(v) return cache.Handle{}, err } checksum0 := binary.LittleEndian.Uint32(b[bh.Length+1:]) checksum1 := crc.New(b[:bh.Length+1]).Value() if checksum0 != checksum1 { + r.opts.Cache.Free(v) return cache.Handle{}, errors.New("pebble/table: invalid table (checksum mismatch)") } typ := b[bh.Length] b = b[:bh.Length] + v.Truncate(len(b)) switch typ { case noCompressionBlockType: @@ -1160,29 +1219,55 @@ func (r *Reader) readBlock(bh BlockHandle, transform blockTransform) (cache.Hand case snappyCompressionBlockType: decodedLen, err := snappy.DecodedLen(b) if err != nil { + r.opts.Cache.Free(v) return cache.Handle{}, err } - decoded := r.opts.Cache.Alloc(decodedLen) - decoded, err = snappy.Decode(decoded, b) + var decoded *cache.Value + if weak { + decoded = r.opts.Cache.AllocAuto(make([]byte, decodedLen)) + } else { + decoded = r.opts.Cache.AllocManual(decodedLen) + } + decodedBuf := decoded.Buf() + result, err := snappy.Decode(decodedBuf, b) + r.opts.Cache.Free(v) if err != nil { + r.opts.Cache.Free(decoded) return cache.Handle{}, err } - r.opts.Cache.Free(b) - b = decoded + if len(result) != 0 && + (len(result) != len(decodedBuf) || &result[0] != &decodedBuf[0]) { + r.opts.Cache.Free(decoded) + return cache.Handle{}, fmt.Errorf("pebble/table: snappy decoded into unexpected buffer: %p != %p", + result, decodedBuf) + } + v, b = decoded, decodedBuf default: + r.opts.Cache.Free(v) return cache.Handle{}, fmt.Errorf("pebble/table: unknown block compression: %d", typ) } if transform != nil { - // Transforming blocks is rare, so we don't bother to use cache.Alloc. + // Transforming blocks is rare, so the extra copy of the transformed data + // is not problematic. var err error b, err = transform(b) if err != nil { + r.opts.Cache.Free(v) return cache.Handle{}, err } + var newV *cache.Value + if weak { + newV = r.opts.Cache.AllocAuto(b) + } else { + newV = r.opts.Cache.AllocManual(len(b)) + copy(newV.Buf(), b) + } + r.opts.Cache.Free(v) + v = newV } - h := r.opts.Cache.Set(r.cacheID, r.fileNum, bh.Offset, b) + h := r.opts.Cache.Set(r.cacheID, r.fileNum, bh.Offset, v) return h, nil } @@ -1229,18 +1314,19 @@ func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) { } func (r *Reader) readMetaindex(metaindexBH BlockHandle) error { - b, err := r.readBlock(metaindexBH, nil /* transform */) + b, err := r.readBlock(metaindexBH, nil /* transform */, false /* weak */) if err != nil { return err } data := b.Get() + defer b.Release() + if uint64(len(data)) != metaindexBH.Length { return fmt.Errorf("pebble/table: unexpected metaindex block size: %d vs %d", len(data), metaindexBH.Length) } i, err := newRawBlockIter(bytes.Compare, data) - b.Release() if err != nil { return err } @@ -1258,13 +1344,12 @@ func (r *Reader) readMetaindex(metaindexBH BlockHandle) error { } if bh, ok := meta[metaPropertiesName]; ok { - b, err = r.readBlock(bh, nil /* transform */) + b, err = r.readBlock(bh, nil /* transform */, false /* weak */) if err != nil { return err } - data := b.Get() r.propertiesBH = bh - err := r.Properties.load(data, bh.Offset) + err := r.Properties.load(b.Get(), bh.Offset) b.Release() if err != nil { return err @@ -1350,7 +1435,7 @@ func (r *Reader) Layout() (*Layout, error) { } l.Index = append(l.Index, indexBH) - subIndex, err := r.readBlock(indexBH, nil /* transform */) + subIndex, err := r.readBlock(indexBH, nil /* transform */, false /* weak */) if err != nil { return nil, err } @@ -1415,7 +1500,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { if n == 0 || n != len(val) { return 0, errCorruptIndexEntry } - startIdxBlock, err := r.readBlock(startIdxBH, nil /* transform */) + startIdxBlock, err := r.readBlock(startIdxBH, nil /* transform */, false /* weak */) if err != nil { return 0, err } @@ -1434,7 +1519,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { if n == 0 || n != len(val) { return 0, errCorruptIndexEntry } - endIdxBlock, err := r.readBlock(endIdxBH, nil /* transform */) + endIdxBlock, err := r.readBlock(endIdxBH, nil /* transform */, false /* weak */) if err != nil { return 0, err } @@ -1616,7 +1701,7 @@ func (l *Layout) Describe( continue } - h, err := r.readBlock(b.BlockHandle, nil /* transform */) + h, err := r.readBlock(b.BlockHandle, nil /* transform */, false /* weak */) if err != nil { fmt.Fprintf(w, " [err: %s]\n", err) continue diff --git a/sstable/reader_test.go b/sstable/reader_test.go index cc6a8de2c0..99cfdee284 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -343,6 +343,7 @@ func TestBytesIteratedCompressed(t *testing.T) { r := buildTestTable(t, numEntries, blockSize, indexBlockSize, SnappyCompression) var bytesIterated, prevIterated uint64 citer := r.NewCompactionIter(&bytesIterated) + for key, _ := citer.First(); key != nil; key, _ = citer.Next() { if bytesIterated < prevIterated { t.Fatalf("bytesIterated moved backward: %d < %d", bytesIterated, prevIterated) @@ -355,6 +356,13 @@ func TestBytesIteratedCompressed(t *testing.T) { if bytesIterated < expected*99/100 || bytesIterated > expected*101/100 { t.Fatalf("bytesIterated: got %d, want %d", bytesIterated, expected) } + + if err := citer.Close(); err != nil { + t.Fatal(err) + } + if err := r.Close(); err != nil { + t.Fatal(err) + } } } } @@ -368,6 +376,7 @@ func TestBytesIteratedUncompressed(t *testing.T) { r := buildTestTable(t, numEntries, blockSize, indexBlockSize, NoCompression) var bytesIterated, prevIterated uint64 citer := r.NewCompactionIter(&bytesIterated) + for key, _ := citer.First(); key != nil; key, _ = citer.Next() { if bytesIterated < prevIterated { t.Fatalf("bytesIterated moved backward: %d < %d", bytesIterated, prevIterated) @@ -380,6 +389,13 @@ func TestBytesIteratedUncompressed(t *testing.T) { t.Fatalf("bytesIterated: got %d, want %d (blockSize=%d indexBlockSize=%d numEntries=%d)", bytesIterated, expected, blockSize, indexBlockSize, numEntries) } + + if err := citer.Close(); err != nil { + t.Fatal(err) + } + if err := r.Close(); err != nil { + t.Fatal(err) + } } } } @@ -483,6 +499,8 @@ func BenchmarkTableIterSeekGE(b *testing.B) { for i := 0; i < b.N; i++ { it.SeekGE(keys[rng.Intn(len(keys))]) } + + it.Close() }) } } @@ -501,6 +519,8 @@ func BenchmarkTableIterSeekLT(b *testing.B) { for i := 0; i < b.N; i++ { it.SeekLT(keys[rng.Intn(len(keys))]) } + + it.Close() }) } } @@ -527,6 +547,8 @@ func BenchmarkTableIterNext(b *testing.B) { if testing.Verbose() { fmt.Fprint(ioutil.Discard, sum) } + + it.Close() }) } } @@ -553,6 +575,8 @@ func BenchmarkTableIterPrev(b *testing.B) { if testing.Verbose() { fmt.Fprint(ioutil.Discard, sum) } + + it.Close() }) } } diff --git a/sstable/table_test.go b/sstable/table_test.go index 4404cf52ed..c0fa57e0c6 100644 --- a/sstable/table_test.go +++ b/sstable/table_test.go @@ -673,6 +673,9 @@ func TestReaderGlobalSeqNum(t *testing.T) { t.Fatalf("expected %d, but found %d", globalSeqNum, i.Key().SeqNum()) } } + if err := i.Close(); err != nil { + t.Fatal(err) + } } func TestMetaIndexEntriesSorted(t *testing.T) { @@ -687,12 +690,12 @@ func TestMetaIndexEntriesSorted(t *testing.T) { t.Fatal(err) } - b, err := r.readBlock(r.metaIndexBH, nil /* transform */) + b, err := r.readBlock(r.metaIndexBH, nil /* transform */, false /* weak */) if err != nil { t.Fatal(err) } i, err := newRawBlockIter(bytes.Compare, b.Get()) - b.Release() + defer b.Release() if err != nil { t.Fatal(err) } diff --git a/sstable/writer_test.go b/sstable/writer_test.go index 6dbd4413da..64d1306c54 100644 --- a/sstable/writer_test.go +++ b/sstable/writer_test.go @@ -99,7 +99,12 @@ func TestWriterClearCache(t *testing.T) { opts := ReaderOptions{Cache: cache.New(64 << 20)} writerOpts := WriterOptions{Cache: opts.Cache} cacheOpts := &cacheOpts{cacheID: 1, fileNum: 1} - invalidData := []byte("invalid data") + invalidData := func() *cache.Value { + invalid := []byte("invalid data") + v := opts.Cache.AllocManual(len(invalid)) + copy(v.Buf(), invalid) + return v + } build := func(name string) { f, err := mem.Create(name) @@ -149,7 +154,7 @@ func TestWriterClearCache(t *testing.T) { // Poison the cache for each of the blocks. poison := func(bh BlockHandle) { - opts.Cache.Set(cacheOpts.cacheID, cacheOpts.fileNum, bh.Offset, invalidData) + opts.Cache.Set(cacheOpts.cacheID, cacheOpts.fileNum, bh.Offset, invalidData()).Release() } foreachBH(layout, poison) @@ -161,7 +166,7 @@ func TestWriterClearCache(t *testing.T) { check := func(bh BlockHandle) { h := opts.Cache.Get(cacheOpts.cacheID, cacheOpts.fileNum, bh.Offset) if h.Get() != nil { - t.Fatalf("%d: expected cache to be cleared, but found %q", bh.Offset, invalidData) + t.Fatalf("%d: expected cache to be cleared, but found %q", bh.Offset, h.Get()) } } foreachBH(layout, check) diff --git a/testdata/event_listener b/testdata/event_listener index fab0e691ee..6ea3024580 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -169,7 +169,7 @@ compact 1 1.6 K (size == estimated-debt) zmemtbl 0 0 B ztbl 0 0 B bcache 8 1.5 K 5.9% (score == hit-rate) - tcache 1 688 B 0.0% (score == hit-rate) + tcache 1 664 B 0.0% (score == hit-rate) titers 0 filter - - 0.0% (score == utility) diff --git a/testdata/metrics b/testdata/metrics index 051624895a..73a5deadd0 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -33,7 +33,7 @@ compact 0 826 B (size == estimated-debt) zmemtbl 1 256 K ztbl 0 0 B bcache 3 732 B 0.0% (score == hit-rate) - tcache 1 688 B 0.0% (score == hit-rate) + tcache 1 664 B 0.0% (score == hit-rate) titers 1 filter - - 0.0% (score == utility) @@ -130,7 +130,7 @@ compact 1 0 B (size == estimated-debt) zmemtbl 1 256 K ztbl 1 826 B bcache 4 753 B 27.3% (score == hit-rate) - tcache 1 688 B 60.0% (score == hit-rate) + tcache 1 664 B 60.0% (score == hit-rate) titers 1 filter - - 0.0% (score == utility)