Skip to content

Commit

Permalink
blockstore: make it safe for concurrent use
Browse files Browse the repository at this point in the history
IPLD traversals would sporadically fail due to data races, since they do
concurrent blockstore Puts.

The added test reproduced that kind of error very reliably.

Fixes #121.


This commit was moved from ipld/go-car@307cc4c
  • Loading branch information
mvdan committed Jul 16, 2021
1 parent ff6bfc1 commit 81641ca
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 2 deletions.
23 changes: 23 additions & 0 deletions ipld/car/v2/blockstore/readonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"io"
"sync"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
Expand All @@ -23,6 +24,14 @@ var _ blockstore.Blockstore = (*ReadOnly)(nil)

// ReadOnly provides a read-only Car Block Store.
type ReadOnly struct {
// mu allows ReadWrite to be safe for concurrent use.
// It's in ReadOnly so that read operations also grab read locks,
// given that ReadWrite embeds ReadOnly for methods like Get and Has.
//
// The main fields guarded by the mutex are the index and the underlying writers.
// For simplicity, the entirety of the blockstore methods grab the mutex.
mu sync.RWMutex

// The backing containing the CAR in v1 format.
backing io.ReaderAt
// The CAR v1 content index.
Expand Down Expand Up @@ -84,6 +93,9 @@ func (b *ReadOnly) DeleteBlock(_ cid.Cid) error {

// Has indicates if the store contains a block that corresponds to the given key.
func (b *ReadOnly) Has(key cid.Cid) (bool, error) {
b.mu.RLock()
defer b.mu.RUnlock()

offset, err := b.idx.Get(key)
if errors.Is(err, index.ErrNotFound) {
return false, nil
Expand All @@ -104,6 +116,9 @@ func (b *ReadOnly) Has(key cid.Cid) (bool, error) {

// Get gets a block corresponding to the given key.
func (b *ReadOnly) Get(key cid.Cid) (blocks.Block, error) {
b.mu.RLock()
defer b.mu.RUnlock()

offset, err := b.idx.Get(key)
if err != nil {
return nil, err
Expand All @@ -121,6 +136,9 @@ func (b *ReadOnly) Get(key cid.Cid) (blocks.Block, error) {

// GetSize gets the size of an item corresponding to the given key.
func (b *ReadOnly) GetSize(key cid.Cid) (int, error) {
b.mu.RLock()
defer b.mu.RUnlock()

idx, err := b.idx.Get(key)
if err != nil {
return -1, err
Expand Down Expand Up @@ -152,6 +170,9 @@ func (b *ReadOnly) PutMany([]blocks.Block) error {

// AllKeysChan returns the list of keys in the CAR.
func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
// We release the lock when the channel-sending goroutine stops.
b.mu.RLock()

// TODO we may use this walk for populating the index, and we need to be able to iterate keys in this way somewhere for index generation. In general though, when it's asked for all keys from a blockstore with an index, we should iterate through the index when possible rather than linear reads through the full car.
header, err := carv1.ReadHeader(bufio.NewReader(internalio.NewOffsetReader(b.backing, 0)))
if err != nil {
Expand All @@ -166,6 +187,8 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
ch := make(chan cid.Cid, 5)

go func() {
defer b.mu.RUnlock()

defer close(ch)

rdr := internalio.NewOffsetReader(b.backing, int64(offset))
Expand Down
9 changes: 7 additions & 2 deletions ipld/car/v2/blockstore/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ func (b *ReadWrite) panicIfFinalized() {

// Put puts a given block to the underlying datastore
func (b *ReadWrite) Put(blk blocks.Block) error {
b.panicIfFinalized()

// PutMany already calls panicIfFinalized.
return b.PutMany([]blocks.Block{blk})
}

Expand All @@ -110,6 +109,9 @@ func (b *ReadWrite) Put(blk blocks.Block) error {
func (b *ReadWrite) PutMany(blks []blocks.Block) error {
b.panicIfFinalized()

b.mu.Lock()
defer b.mu.Unlock()

for _, bl := range blks {
n := uint64(b.carV1Writer.Position())
if err := util.LdWrite(b.carV1Writer, bl.Cid().Bytes(), bl.RawData()); err != nil {
Expand All @@ -126,6 +128,9 @@ func (b *ReadWrite) PutMany(blks []blocks.Block) error {
func (b *ReadWrite) Finalize() error {
b.panicIfFinalized()

b.mu.Lock()
defer b.mu.Unlock()

// TODO check if add index option is set and don't write the index then set index offset to zero.
// TODO see if folks need to continue reading from a finalized blockstore, if so return ReadOnly blockstore here.
b.header = b.header.WithCarV1Size(uint64(b.carV1Writer.Position()))
Expand Down
44 changes: 44 additions & 0 deletions ipld/car/v2/blockstore/readwrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package blockstore_test

import (
"context"
"fmt"
"io"
"math/rand"
"os"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -167,3 +169,45 @@ func TestBlockstorePutSameHashes(t *testing.T) {
err = wbs.Finalize()
require.NoError(t, err)
}

func TestBlockstoreConcurrentUse(t *testing.T) {
path := "testv2blockstore.car"
wbs, err := blockstore.NewReadWrite(path, nil)
if err != nil {
t.Fatal(err)
}
defer func() { os.Remove(path) }()

var wg sync.WaitGroup
for i := 0; i < 100; i++ {
data := []byte(fmt.Sprintf("data-%d", i))

wg.Add(1)
go func() {
defer wg.Done()

c, err := cid.Prefix{
Version: 1,
Codec: cid.Raw,
MhType: multihash.SHA2_256,
MhLength: -1,
}.Sum(data)
require.NoError(t, err)

block, err := blocks.NewBlockWithCid(data, c)
require.NoError(t, err)

has, err := wbs.Has(block.Cid())
require.NoError(t, err)
require.False(t, has)

err = wbs.Put(block)
require.NoError(t, err)

got, err := wbs.Get(block.Cid())
require.NoError(t, err)
require.Equal(t, data, got.RawData())
}()
}
wg.Wait()
}

0 comments on commit 81641ca

Please sign in to comment.