diff --git a/arc_cache.go b/arc_cache.go index e2b930d..24e0bf2 100644 --- a/arc_cache.go +++ b/arc_cache.go @@ -2,6 +2,7 @@ package blockstore import ( "context" + "sync" lru "github.com/hashicorp/golang-lru" blocks "github.com/ipfs/go-block-format" @@ -16,7 +17,10 @@ type cacheSize int // block Cids. This provides block access-time improvements, allowing // to short-cut many searches without query-ing the underlying datastore. type arccache struct { - arc *lru.TwoQueueCache + arc *lru.TwoQueueCache + + arcLks sync.Map + blockstore Blockstore hits metrics.Counter @@ -28,28 +32,72 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache, if err != nil { return nil, err } - c := &arccache{arc: arc, blockstore: bs} + c := &arccache{arc: arc, arcLks: sync.Map{}, blockstore: bs} c.hits = metrics.NewCtx(ctx, "arc.hits_total", "Number of ARC cache hits").Counter() c.total = metrics.NewCtx(ctx, "arc_total", "Total number of ARC cache requests").Counter() return c, nil } -func (b *arccache) DeleteBlock(k cid.Cid) error { - if has, _, ok := b.hasCached(k); ok && !has { - return nil +// if ok == false has is inconclusive, calling release is a noop +// if ok == true then has response to question: is it contained and a lock will be held on content `k` until `release` is called. +// it is the callers responsibility to call `release` when they are done writing/reading data. +func (b *arccache) hasCachedSync(k cid.Cid) (has bool, size int, ok bool, release func()) { + // default return ops + has = false + size = -1 + ok = false + release = func() {} + + b.total.Inc() + if !k.Defined() { + log.Error("undefined cid in arccache") + // Return cache invalid so the call to blockstore happens + // in case of invalid key and correct error is created. + return } - b.arc.Remove(k) // Invalidate cache before deleting. - err := b.blockstore.DeleteBlock(k) - if err == nil { - b.cacheHave(k, false) + h, ok := b.arc.Get(string(k.Hash())) + if ok { + b.hits.Inc() + switch h := h.(type) { + case cacheHave: + has = bool(h) + size = -1 + ok = true + case cacheSize: + has = true + size = int(h) + ok = true + } } - return err + + // check if we have a lock for this content. + v, hasLk := b.arcLks.Load(k) + if has && hasLk { + // cache and lock hit. + lk := v.(*sync.Mutex) + lk.Lock() + release = func() { lk.Unlock() } + + return + } else if has && !hasLk { + // cache hit and lock miss, create the lock, lock it, and add it to the lockMap + lk := new(sync.Mutex) + b.arcLks.Store(k, lk) + + lk.Lock() + release = func() { lk.Unlock() } + } else if !has && hasLk { + // cache miss and lock hit, remove lock from map + b.arcLks.Delete(k) + } + // else cache miss and lock miss, noop + return } // if ok == false has is inconclusive -// if ok == true then has respons to question: is it contained +// if ok == true then has response to question: is it contained func (b *arccache) hasCached(k cid.Cid) (has bool, size int, ok bool) { b.total.Inc() if !k.Defined() { @@ -72,6 +120,21 @@ func (b *arccache) hasCached(k cid.Cid) (has bool, size int, ok bool) { return false, -1, false } +func (b *arccache) DeleteBlock(k cid.Cid) error { + has, _, ok, release := b.hasCachedSync(k) + defer release() + if ok && !has { + return nil + } + + b.arc.Remove(k) // Invalidate cache before deleting. + err := b.blockstore.DeleteBlock(k) + if err == nil { + b.cacheHave(k, false) + } + return err +} + func (b *arccache) Has(k cid.Cid) (bool, error) { if has, _, ok := b.hasCached(k); ok { return has, nil @@ -85,7 +148,9 @@ func (b *arccache) Has(k cid.Cid) (bool, error) { } func (b *arccache) GetSize(k cid.Cid) (int, error) { - if has, blockSize, ok := b.hasCached(k); ok { + has, blockSize, ok, release := b.hasCachedSync(k) + defer release() + if ok { if !has { // don't have it, return return -1, ErrNotFound @@ -111,7 +176,9 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) { return nil, ErrNotFound } - if has, _, ok := b.hasCached(k); ok && !has { + has, _, ok, release := b.hasCachedSync(k) + defer release() + if ok && !has { return nil, ErrNotFound } @@ -125,7 +192,9 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) { } func (b *arccache) Put(bl blocks.Block) error { - if has, _, ok := b.hasCached(bl.Cid()); ok && has { + has, _, ok, release := b.hasCachedSync(bl.Cid()) + defer release() + if ok && has { return nil } @@ -141,9 +210,11 @@ func (b *arccache) PutMany(bs []blocks.Block) error { for _, block := range bs { // call put on block if result is inconclusive or we are sure that // the block isn't in storage - if has, _, ok := b.hasCached(block.Cid()); !ok || (ok && !has) { + has, _, ok, release := b.hasCachedSync(block.Cid()) + if !ok || (ok && !has) { good = append(good, block) } + defer release() } err := b.blockstore.PutMany(good) if err != nil {