From 81641ca92cb9672f0d7778854819e833ff162542 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Mart=C3=AD?= Date: Tue, 29 Jun 2021 12:47:54 +0100 Subject: [PATCH] blockstore: make it safe for concurrent use IPLD traversals would sporadically fail due to data races, since they do concurrent blockstore Puts. The added test reproduced that kind of error very reliably. Fixes #121. This commit was moved from ipld/go-car@307cc4cc87c5672eff4058259d12a3de33b36822 --- ipld/car/v2/blockstore/readonly.go | 23 +++++++++++++ ipld/car/v2/blockstore/readwrite.go | 9 +++-- ipld/car/v2/blockstore/readwrite_test.go | 44 ++++++++++++++++++++++++ 3 files changed, 74 insertions(+), 2 deletions(-) diff --git a/ipld/car/v2/blockstore/readonly.go b/ipld/car/v2/blockstore/readonly.go index 3bdc57a76..2db1c9e08 100644 --- a/ipld/car/v2/blockstore/readonly.go +++ b/ipld/car/v2/blockstore/readonly.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "io" + "sync" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" @@ -23,6 +24,14 @@ var _ blockstore.Blockstore = (*ReadOnly)(nil) // ReadOnly provides a read-only Car Block Store. type ReadOnly struct { + // mu allows ReadWrite to be safe for concurrent use. + // It's in ReadOnly so that read operations also grab read locks, + // given that ReadWrite embeds ReadOnly for methods like Get and Has. + // + // The main fields guarded by the mutex are the index and the underlying writers. + // For simplicity, the entirety of the blockstore methods grab the mutex. + mu sync.RWMutex + // The backing containing the CAR in v1 format. backing io.ReaderAt // The CAR v1 content index. @@ -84,6 +93,9 @@ func (b *ReadOnly) DeleteBlock(_ cid.Cid) error { // Has indicates if the store contains a block that corresponds to the given key. func (b *ReadOnly) Has(key cid.Cid) (bool, error) { + b.mu.RLock() + defer b.mu.RUnlock() + offset, err := b.idx.Get(key) if errors.Is(err, index.ErrNotFound) { return false, nil @@ -104,6 +116,9 @@ func (b *ReadOnly) Has(key cid.Cid) (bool, error) { // Get gets a block corresponding to the given key. func (b *ReadOnly) Get(key cid.Cid) (blocks.Block, error) { + b.mu.RLock() + defer b.mu.RUnlock() + offset, err := b.idx.Get(key) if err != nil { return nil, err @@ -121,6 +136,9 @@ func (b *ReadOnly) Get(key cid.Cid) (blocks.Block, error) { // GetSize gets the size of an item corresponding to the given key. func (b *ReadOnly) GetSize(key cid.Cid) (int, error) { + b.mu.RLock() + defer b.mu.RUnlock() + idx, err := b.idx.Get(key) if err != nil { return -1, err @@ -152,6 +170,9 @@ func (b *ReadOnly) PutMany([]blocks.Block) error { // AllKeysChan returns the list of keys in the CAR. func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + // We release the lock when the channel-sending goroutine stops. + b.mu.RLock() + // TODO we may use this walk for populating the index, and we need to be able to iterate keys in this way somewhere for index generation. In general though, when it's asked for all keys from a blockstore with an index, we should iterate through the index when possible rather than linear reads through the full car. header, err := carv1.ReadHeader(bufio.NewReader(internalio.NewOffsetReader(b.backing, 0))) if err != nil { @@ -166,6 +187,8 @@ func (b *ReadOnly) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { ch := make(chan cid.Cid, 5) go func() { + defer b.mu.RUnlock() + defer close(ch) rdr := internalio.NewOffsetReader(b.backing, int64(offset)) diff --git a/ipld/car/v2/blockstore/readwrite.go b/ipld/car/v2/blockstore/readwrite.go index f72d01e6c..fef65a6c8 100644 --- a/ipld/car/v2/blockstore/readwrite.go +++ b/ipld/car/v2/blockstore/readwrite.go @@ -100,8 +100,7 @@ func (b *ReadWrite) panicIfFinalized() { // Put puts a given block to the underlying datastore func (b *ReadWrite) Put(blk blocks.Block) error { - b.panicIfFinalized() - + // PutMany already calls panicIfFinalized. return b.PutMany([]blocks.Block{blk}) } @@ -110,6 +109,9 @@ func (b *ReadWrite) Put(blk blocks.Block) error { func (b *ReadWrite) PutMany(blks []blocks.Block) error { b.panicIfFinalized() + b.mu.Lock() + defer b.mu.Unlock() + for _, bl := range blks { n := uint64(b.carV1Writer.Position()) if err := util.LdWrite(b.carV1Writer, bl.Cid().Bytes(), bl.RawData()); err != nil { @@ -126,6 +128,9 @@ func (b *ReadWrite) PutMany(blks []blocks.Block) error { func (b *ReadWrite) Finalize() error { b.panicIfFinalized() + b.mu.Lock() + defer b.mu.Unlock() + // TODO check if add index option is set and don't write the index then set index offset to zero. // TODO see if folks need to continue reading from a finalized blockstore, if so return ReadOnly blockstore here. b.header = b.header.WithCarV1Size(uint64(b.carV1Writer.Position())) diff --git a/ipld/car/v2/blockstore/readwrite_test.go b/ipld/car/v2/blockstore/readwrite_test.go index ba08c9c5e..b4a138c93 100644 --- a/ipld/car/v2/blockstore/readwrite_test.go +++ b/ipld/car/v2/blockstore/readwrite_test.go @@ -2,9 +2,11 @@ package blockstore_test import ( "context" + "fmt" "io" "math/rand" "os" + "sync" "testing" "time" @@ -167,3 +169,45 @@ func TestBlockstorePutSameHashes(t *testing.T) { err = wbs.Finalize() require.NoError(t, err) } + +func TestBlockstoreConcurrentUse(t *testing.T) { + path := "testv2blockstore.car" + wbs, err := blockstore.NewReadWrite(path, nil) + if err != nil { + t.Fatal(err) + } + defer func() { os.Remove(path) }() + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + data := []byte(fmt.Sprintf("data-%d", i)) + + wg.Add(1) + go func() { + defer wg.Done() + + c, err := cid.Prefix{ + Version: 1, + Codec: cid.Raw, + MhType: multihash.SHA2_256, + MhLength: -1, + }.Sum(data) + require.NoError(t, err) + + block, err := blocks.NewBlockWithCid(data, c) + require.NoError(t, err) + + has, err := wbs.Has(block.Cid()) + require.NoError(t, err) + require.False(t, has) + + err = wbs.Put(block) + require.NoError(t, err) + + got, err := wbs.Get(block.Cid()) + require.NoError(t, err) + require.Equal(t, data, got.RawData()) + }() + } + wg.Wait() +}