Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Commit

Permalink
fix(arc): Per-CID locking. Map CID to lock
Browse files Browse the repository at this point in the history
- fixes #64
  • Loading branch information
frrist committed May 3, 2021
1 parent fb07d7b commit ae3d97c
Showing 1 changed file with 101 additions and 7 deletions.
108 changes: 101 additions & 7 deletions arc_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blockstore

import (
"context"
"sync"

lru "github.com/hashicorp/golang-lru"
blocks "github.com/ipfs/go-block-format"
Expand All @@ -17,7 +18,11 @@ type cacheSize int
// size. This provides block access-time improvements, allowing
// to short-cut many searches without querying the underlying datastore.
type arccache struct {
cache *lru.TwoQueueCache
cache *lru.TwoQueueCache

arcLks map[cid.Cid]*sync.Mutex
arcLksMu sync.RWMutex

blockstore Blockstore
viewer Viewer

Expand All @@ -33,7 +38,8 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache,
if err != nil {
return nil, err
}
c := &arccache{cache: cache, blockstore: bs}
arcLks := make(map[cid.Cid]*sync.Mutex)
c := &arccache{cache: cache, arcLks: arcLks, blockstore: bs}
c.hits = metrics.NewCtx(ctx, "arc.hits_total", "Number of ARC cache hits").Counter()
c.total = metrics.NewCtx(ctx, "arc_total", "Total number of ARC cache requests").Counter()
if v, ok := bs.(Viewer); ok {
Expand All @@ -43,7 +49,9 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache,
}

func (b *arccache) DeleteBlock(k cid.Cid) error {
if has, _, ok := b.queryCache(k); ok && !has {
has, _, ok, release := b.queryCacheSync(k)
defer release()
if ok && !has {
return nil
}

Expand All @@ -68,7 +76,9 @@ func (b *arccache) Has(k cid.Cid) (bool, error) {
}

func (b *arccache) GetSize(k cid.Cid) (int, error) {
if has, blockSize, ok := b.queryCache(k); ok {
has, blockSize, ok, release := b.queryCacheSync(k)
defer release()
if ok {
if !has {
// don't have it, return
return -1, ErrNotFound
Expand Down Expand Up @@ -119,7 +129,9 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
return nil, ErrNotFound
}

if has, _, ok := b.queryCache(k); ok && !has {
has, _, ok, release := b.queryCacheSync(k)
defer release()
if ok && !has {
return nil, ErrNotFound
}

Expand All @@ -133,7 +145,9 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
}

func (b *arccache) Put(bl blocks.Block) error {
if has, _, ok := b.queryCache(bl.Cid()); ok && has {
has, _, ok, release := b.queryCacheSync(bl.Cid())
defer release()
if ok && has {
return nil
}

Expand All @@ -146,13 +160,22 @@ func (b *arccache) Put(bl blocks.Block) error {

func (b *arccache) PutMany(bs []blocks.Block) error {
var good []blocks.Block
var releases []func()
for _, block := range bs {
// call put on block if result is inconclusive or we are sure that
// the block isn't in storage
if has, _, ok := b.queryCache(block.Cid()); !ok || (ok && !has) {
if has, _, ok, release := b.queryCacheSync(block.Cid()); !ok || (ok && !has) {
good = append(good, block)
releases = append(releases, release)
}
}

defer func() {
for _, release := range releases {
release()
}
}()

err := b.blockstore.PutMany(good)
if err != nil {
return err
Expand Down Expand Up @@ -208,6 +231,77 @@ func (b *arccache) queryCache(k cid.Cid) (exists bool, size int, ok bool) {
return false, -1, false
}

// queryCacheSync checks if the CID is in the cache. If so, it returns:
//
// * exists (bool): whether the CID is known to exist or not.
// * size (int): the size if cached, or -1 if not cached.
// * ok (bool): whether present in the cache.
// * release (func): method to be called by caller that releases lock held on `k`
//
// When ok is false, the answer in inconclusive and the caller must ignore the
// other two return values. Querying the underying store is necessary.
//
// When ok is true, exists carries the correct answer, and size carries the
// size, if known, or -1 if not.
func (b *arccache) queryCacheSync(k cid.Cid) (exists bool, size int, ok bool, release func()) {
exists = false
size = -1
ok = false
release = func() {}

b.total.Inc()
if !k.Defined() {
log.Error("undefined cid in arccache")
// Return cache invalid so the call to blockstore happens
// in case of invalid key and correct error is created.
return
}

h, ok := b.cache.Get(string(k.Hash()))
if ok {
b.hits.Inc()
switch h := h.(type) {
case cacheHave:
exists = bool(h)
size = -1
ok = true
case cacheSize:
exists = true
size = int(h)
ok = true
}
}
// read lock the map of cid->locks.
// This ensures other CID's can be locked when more than one lock/waiting is held on the same CID.
b.arcLksMu.RLock()
lk, hasLk := b.arcLks[k]
b.arcLksMu.RUnlock()
// check if a lock exists for content `k`.
if exists && hasLk {
// cache and lock hit.
lk.Lock()
release = func() { lk.Unlock() }
return
} else if exists && !hasLk {
// cache hit and lock miss, create the lock, lock it, and add it to the lockMap
lk = new(sync.Mutex)

b.arcLksMu.Lock()
b.arcLks[k] = lk
b.arcLksMu.Unlock()

lk.Lock()
release = func() { lk.Unlock() }
} else if !exists && hasLk {
// cache miss and lock hit, remove lock from map
b.arcLksMu.Lock()
delete(b.arcLks, k)
b.arcLksMu.Unlock()
}
// else cache miss and lock miss, noop
return
}

func (b *arccache) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return b.blockstore.AllKeysChan(ctx)
}
Expand Down

0 comments on commit ae3d97c

Please sign in to comment.