From 7e416b8c22d4ad51f68dd0a734812f19e21a3858 Mon Sep 17 00:00:00 2001 From: frrist Date: Mon, 3 May 2021 12:59:37 -0700 Subject: [PATCH] fix(arc): Per-CID locking. Sync Map to CID lock - fixes #64 --- arc_cache.go | 97 ++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 90 insertions(+), 7 deletions(-) diff --git a/arc_cache.go b/arc_cache.go index 1e497ab..d7698aa 100644 --- a/arc_cache.go +++ b/arc_cache.go @@ -2,6 +2,7 @@ package blockstore import ( "context" + "sync" lru "github.com/hashicorp/golang-lru" blocks "github.com/ipfs/go-block-format" @@ -17,7 +18,9 @@ type cacheSize int // size. This provides block access-time improvements, allowing // to short-cut many searches without querying the underlying datastore. type arccache struct { - cache *lru.TwoQueueCache + cache *lru.TwoQueueCache + arcLks sync.Map + blockstore Blockstore viewer Viewer @@ -33,7 +36,7 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache, if err != nil { return nil, err } - c := &arccache{cache: cache, blockstore: bs} + c := &arccache{cache: cache, arcLks: sync.Map{}, blockstore: bs} c.hits = metrics.NewCtx(ctx, "arc.hits_total", "Number of ARC cache hits").Counter() c.total = metrics.NewCtx(ctx, "arc_total", "Total number of ARC cache requests").Counter() if v, ok := bs.(Viewer); ok { @@ -43,7 +46,9 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache, } func (b *arccache) DeleteBlock(k cid.Cid) error { - if has, _, ok := b.queryCache(k); ok && !has { + has, _, ok, release := b.queryCacheSync(k) + defer release() + if ok && !has { return nil } @@ -68,7 +73,9 @@ func (b *arccache) Has(k cid.Cid) (bool, error) { } func (b *arccache) GetSize(k cid.Cid) (int, error) { - if has, blockSize, ok := b.queryCache(k); ok { + has, blockSize, ok, release := b.queryCacheSync(k) + defer release() + if ok { if !has { // don't have it, return return -1, ErrNotFound @@ -119,7 +126,9 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) { return nil, ErrNotFound } - if has, _, ok := b.queryCache(k); ok && !has { + has, _, ok, release := b.queryCacheSync(k) + defer release() + if ok && !has { return nil, ErrNotFound } @@ -133,7 +142,9 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) { } func (b *arccache) Put(bl blocks.Block) error { - if has, _, ok := b.queryCache(bl.Cid()); ok && has { + has, _, ok, release := b.queryCacheSync(bl.Cid()) + defer release() + if ok && has { return nil } @@ -146,13 +157,22 @@ func (b *arccache) Put(bl blocks.Block) error { func (b *arccache) PutMany(bs []blocks.Block) error { var good []blocks.Block + var releases []func() for _, block := range bs { // call put on block if result is inconclusive or we are sure that // the block isn't in storage - if has, _, ok := b.queryCache(block.Cid()); !ok || (ok && !has) { + if has, _, ok, release := b.queryCacheSync(block.Cid()); !ok || (ok && !has) { good = append(good, block) + releases = append(releases, release) } } + + defer func() { + for _, release := range releases { + release() + } + }() + err := b.blockstore.PutMany(good) if err != nil { return err @@ -208,6 +228,69 @@ func (b *arccache) queryCache(k cid.Cid) (exists bool, size int, ok bool) { return false, -1, false } +// queryCacheSync checks if the CID is in the cache. If so, it returns: +// +// * exists (bool): whether the CID is known to exist or not. +// * size (int): the size if cached, or -1 if not cached. +// * ok (bool): whether present in the cache. +// * release (func): method to be called by caller that releases lock held on `k` +// +// When ok is false, the answer in inconclusive and the caller must ignore the +// other two return values. Querying the underying store is necessary. +// +// When ok is true, exists carries the correct answer, and size carries the +// size, if known, or -1 if not. +func (b *arccache) queryCacheSync(k cid.Cid) (exists bool, size int, ok bool, release func()) { + exists = false + size = -1 + ok = false + release = func() {} + + b.total.Inc() + if !k.Defined() { + log.Error("undefined cid in arccache") + // Return cache invalid so the call to blockstore happens + // in case of invalid key and correct error is created. + return + } + + h, ok := b.cache.Get(string(k.Hash())) + if ok { + b.hits.Inc() + switch h := h.(type) { + case cacheHave: + exists = bool(h) + size = -1 + ok = true + case cacheSize: + exists = true + size = int(h) + ok = true + } + } + + // check if a lock exists for content `k`. + v, hasLk := b.arcLks.Load(k) + if exists && hasLk { + // cache and lock hit. + lk := v.(*sync.Mutex) + lk.Lock() + release = func() { lk.Unlock() } + + } else if exists && !hasLk { + // cache hit and lock miss, create the lock, lock it, and add it to the lockMap + lk := new(sync.Mutex) + b.arcLks.Store(k, lk) + lk.Lock() + release = func() { lk.Unlock() } + } else if !exists && hasLk { + // cache miss and lock hit, remove lock from map + b.arcLks.Delete(k) + } + // else cache miss and lock miss, noop + return +} + func (b *arccache) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { return b.blockstore.AllKeysChan(ctx) }