Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Commit

Permalink
fix(arc): Per-CID locking. Sync Map to CID lock
Browse files Browse the repository at this point in the history
  • Loading branch information
frrist committed Apr 6, 2021
1 parent 75d8d1a commit 5229cae
Showing 1 changed file with 86 additions and 15 deletions.
101 changes: 86 additions & 15 deletions arc_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blockstore

import (
"context"
"sync"

lru "github.com/hashicorp/golang-lru"
blocks "github.com/ipfs/go-block-format"
Expand All @@ -16,7 +17,10 @@ type cacheSize int
// block Cids. This provides block access-time improvements, allowing
// to short-cut many searches without query-ing the underlying datastore.
type arccache struct {
arc *lru.TwoQueueCache
arc *lru.TwoQueueCache

arcLks sync.Map

blockstore Blockstore

hits metrics.Counter
Expand All @@ -28,28 +32,72 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache,
if err != nil {
return nil, err
}
c := &arccache{arc: arc, blockstore: bs}
c := &arccache{arc: arc, arcLks: sync.Map{}, blockstore: bs}
c.hits = metrics.NewCtx(ctx, "arc.hits_total", "Number of ARC cache hits").Counter()
c.total = metrics.NewCtx(ctx, "arc_total", "Total number of ARC cache requests").Counter()

return c, nil
}

func (b *arccache) DeleteBlock(k cid.Cid) error {
if has, _, ok := b.hasCached(k); ok && !has {
return nil
// if ok == false has is inconclusive, calling release is a noop
// if ok == true then has response to question: is it contained and a lock will be held on content `k` until `release` is called.
// it is the callers responsibility to call `release` when they are done writing/reading data.
func (b *arccache) hasCachedSync(k cid.Cid) (has bool, size int, ok bool, release func()) {
// default return ops
has = false
size = -1
ok = false
release = func() {}

b.total.Inc()
if !k.Defined() {
log.Error("undefined cid in arccache")
// Return cache invalid so the call to blockstore happens
// in case of invalid key and correct error is created.
return
}

b.arc.Remove(k) // Invalidate cache before deleting.
err := b.blockstore.DeleteBlock(k)
if err == nil {
b.cacheHave(k, false)
h, ok := b.arc.Get(string(k.Hash()))
if ok {
b.hits.Inc()
switch h := h.(type) {
case cacheHave:
has = bool(h)
size = -1
ok = true
case cacheSize:
has = true
size = int(h)
ok = true
}
}
return err

// check if we have a lock for this content.
v, hasLk := b.arcLks.Load(k)
if has && hasLk {
// cache and lock hit.
lk := v.(*sync.Mutex)
lk.Lock()
release = func() { lk.Unlock() }

return
} else if has && !hasLk {
// cache hit and lock miss, create the lock, lock it, and add it to the lockMap
lk := new(sync.Mutex)
b.arcLks.Store(k, lk)

lk.Lock()
release = func() { lk.Unlock() }
} else if !has && hasLk {
// cache miss and lock hit, remove lock from map
b.arcLks.Delete(k)
}
// else cache miss and lock miss, noop
return
}

// if ok == false has is inconclusive
// if ok == true then has respons to question: is it contained
// if ok == true then has response to question: is it contained
func (b *arccache) hasCached(k cid.Cid) (has bool, size int, ok bool) {
b.total.Inc()
if !k.Defined() {
Expand All @@ -72,6 +120,21 @@ func (b *arccache) hasCached(k cid.Cid) (has bool, size int, ok bool) {
return false, -1, false
}

func (b *arccache) DeleteBlock(k cid.Cid) error {
has, _, ok, release := b.hasCachedSync(k)
defer release()
if ok && !has {
return nil
}

b.arc.Remove(k) // Invalidate cache before deleting.
err := b.blockstore.DeleteBlock(k)
if err == nil {
b.cacheHave(k, false)
}
return err
}

func (b *arccache) Has(k cid.Cid) (bool, error) {
if has, _, ok := b.hasCached(k); ok {
return has, nil
Expand All @@ -85,7 +148,9 @@ func (b *arccache) Has(k cid.Cid) (bool, error) {
}

func (b *arccache) GetSize(k cid.Cid) (int, error) {
if has, blockSize, ok := b.hasCached(k); ok {
has, blockSize, ok, release := b.hasCachedSync(k)
defer release()
if ok {
if !has {
// don't have it, return
return -1, ErrNotFound
Expand All @@ -111,7 +176,9 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
return nil, ErrNotFound
}

if has, _, ok := b.hasCached(k); ok && !has {
has, _, ok, release := b.hasCachedSync(k)
defer release()
if ok && !has {
return nil, ErrNotFound
}

Expand All @@ -125,7 +192,9 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
}

func (b *arccache) Put(bl blocks.Block) error {
if has, _, ok := b.hasCached(bl.Cid()); ok && has {
has, _, ok, release := b.hasCachedSync(bl.Cid())
defer release()
if ok && has {
return nil
}

Expand All @@ -141,9 +210,11 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
for _, block := range bs {
// call put on block if result is inconclusive or we are sure that
// the block isn't in storage
if has, _, ok := b.hasCached(block.Cid()); !ok || (ok && !has) {
has, _, ok, release := b.hasCachedSync(block.Cid())
if !ok || (ok && !has) {
good = append(good, block)
}
defer release()
}
err := b.blockstore.PutMany(good)
if err != nil {
Expand Down

0 comments on commit 5229cae

Please sign in to comment.