Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Commit

Permalink
fix(arc): striped locking on last byte of CID
Browse files Browse the repository at this point in the history
  • Loading branch information
frrist committed Apr 8, 2021
1 parent b75f631 commit d94af12
Showing 1 changed file with 26 additions and 2 deletions.
28 changes: 26 additions & 2 deletions arc_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blockstore

import (
"context"
"sync"

lru "github.com/hashicorp/golang-lru"
blocks "github.com/ipfs/go-block-format"
Expand All @@ -16,7 +17,9 @@ type cacheSize int
// block Cids. This provides block access-time improvements, allowing
// to short-cut many searches without query-ing the underlying datastore.
type arccache struct {
arc *lru.TwoQueueCache
arc *lru.TwoQueueCache
lks [256]sync.Mutex

blockstore Blockstore

hits metrics.Counter
Expand All @@ -28,7 +31,7 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache,
if err != nil {
return nil, err
}
c := &arccache{arc: arc, blockstore: bs}
c := &arccache{arc: arc, lks: [256]sync.Mutex{}, blockstore: bs}
c.hits = metrics.NewCtx(ctx, "arc.hits_total", "Number of ARC cache hits").Counter()
c.total = metrics.NewCtx(ctx, "arc_total", "Total number of ARC cache requests").Counter()

Expand All @@ -40,6 +43,9 @@ func (b *arccache) DeleteBlock(k cid.Cid) error {
return nil
}

b.lks[k.Bytes()[len(k.Bytes())-1]].Lock()
defer b.lks[k.Bytes()[len(k.Bytes())-1]].Unlock()

b.arc.Remove(k) // Invalidate cache before deleting.
err := b.blockstore.DeleteBlock(k)
if err == nil {
Expand Down Expand Up @@ -76,6 +82,10 @@ func (b *arccache) Has(k cid.Cid) (bool, error) {
if has, _, ok := b.hasCached(k); ok {
return has, nil
}

b.lks[k.Bytes()[len(k.Bytes())-1]].Lock()
defer b.lks[k.Bytes()[len(k.Bytes())-1]].Unlock()

has, err := b.blockstore.Has(k)
if err != nil {
return false, err
Expand All @@ -96,6 +106,10 @@ func (b *arccache) GetSize(k cid.Cid) (int, error) {
}
// we have it but don't know the size, ask the datastore.
}

b.lks[k.Bytes()[len(k.Bytes())-1]].Lock()
defer b.lks[k.Bytes()[len(k.Bytes())-1]].Unlock()

blockSize, err := b.blockstore.GetSize(k)
if err == ErrNotFound {
b.cacheHave(k, false)
Expand All @@ -115,6 +129,9 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
return nil, ErrNotFound
}

b.lks[k.Bytes()[len(k.Bytes())-1]].Lock()
defer b.lks[k.Bytes()[len(k.Bytes())-1]].Unlock()

bl, err := b.blockstore.Get(k)
if bl == nil && err == ErrNotFound {
b.cacheHave(k, false)
Expand All @@ -129,6 +146,9 @@ func (b *arccache) Put(bl blocks.Block) error {
return nil
}

b.lks[bl.Cid().Bytes()[len(bl.Cid().Bytes())-1]].Lock()
defer b.lks[bl.Cid().Bytes()[len(bl.Cid().Bytes())-1]].Unlock()

err := b.blockstore.Put(bl)
if err == nil {
b.cacheSize(bl.Cid(), len(bl.RawData()))
Expand All @@ -141,9 +161,13 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
for _, block := range bs {
// call put on block if result is inconclusive or we are sure that
// the block isn't in storage

if has, _, ok := b.hasCached(block.Cid()); !ok || (ok && !has) {
good = append(good, block)
b.lks[block.Cid().Bytes()[len(block.Cid().Bytes())-1]].Lock()
defer b.lks[block.Cid().Bytes()[len(block.Cid().Bytes())-1]].Unlock()
}

}
err := b.blockstore.PutMany(good)
if err != nil {
Expand Down

0 comments on commit d94af12

Please sign in to comment.