From 08f751c83f924b3b52ef5dac1b32edf06553772c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Mart=C3=AD?= Date: Thu, 1 Jul 2021 13:11:49 +0100 Subject: [PATCH] blockstore: add option to deduplicate by CID And a test that uses duplicate hashes as well as duplicate CIDs. We reuse the same insertion index, since it's enough for this purpose. There's no need to keep a separate map or set of CIDs. While at it, make the index package not silently swallow errors, and improve the tests to handle errors more consistently. Fixes #123. Fixes #125. --- v2/blockstore/readwrite.go | 26 +++++++- v2/blockstore/readwrite_test.go | 103 +++++++++++++++++++------------- v2/index/insertionindex.go | 30 +++++++++- 3 files changed, 113 insertions(+), 46 deletions(-) diff --git a/v2/blockstore/readwrite.go b/v2/blockstore/readwrite.go index fef65a6c..99fd11d2 100644 --- a/v2/blockstore/readwrite.go +++ b/v2/blockstore/readwrite.go @@ -33,6 +33,8 @@ type ReadWrite struct { ReadOnly idx *index.InsertionIndex header carv2.Header + + dedupCids bool } // TODO consider exposing interfaces @@ -52,6 +54,15 @@ func WithIndexPadding(p uint64) Option { } } +// WithCidDeduplication makes Put calls ignore blocks if the blockstore already +// has the exact same CID. +// This can help avoid redundancy in a CARv1's list of CID-Block pairs. +// +// Note that this compares whole CIDs, not just multihashes. +func WithCidDeduplication(b *ReadWrite) { + b.dedupCids = true +} + // NewReadWrite creates a new ReadWrite at the given path with a provided set of root CIDs as the car roots. func NewReadWrite(path string, roots []cid.Cid, opts ...Option) (*ReadWrite, error) { // TODO support resumption if the path provided contains partially written blocks in v2 format. @@ -113,11 +124,16 @@ func (b *ReadWrite) PutMany(blks []blocks.Block) error { defer b.mu.Unlock() for _, bl := range blks { + c := bl.Cid() + if b.dedupCids && b.idx.HasExactCID(c) { + continue + } + n := uint64(b.carV1Writer.Position()) - if err := util.LdWrite(b.carV1Writer, bl.Cid().Bytes(), bl.RawData()); err != nil { + if err := util.LdWrite(b.carV1Writer, c.Bytes(), bl.RawData()); err != nil { return err } - b.idx.InsertNoReplace(bl.Cid(), n) + b.idx.InsertNoReplace(c, n) } return nil } @@ -126,7 +142,11 @@ func (b *ReadWrite) PutMany(blks []blocks.Block) error { // for more efficient subsequent read. // After this call, this blockstore can no longer be used for read or write. func (b *ReadWrite) Finalize() error { - b.panicIfFinalized() + if b.header.CarV1Size != 0 { + // Allow duplicate Finalize calls, just like Close. + // Still error, just like ReadOnly.Close; it should be discarded. + return fmt.Errorf("called Finalize twice") + } b.mu.Lock() defer b.mu.Unlock() diff --git a/v2/blockstore/readwrite_test.go b/v2/blockstore/readwrite_test.go index b4a138c9..3b66b3c4 100644 --- a/v2/blockstore/readwrite_test.go +++ b/v2/blockstore/readwrite_test.go @@ -6,6 +6,7 @@ import ( "io" "math/rand" "os" + "path/filepath" "sync" "testing" "time" @@ -26,17 +27,14 @@ func TestBlockstore(t *testing.T) { f, err := os.Open("testdata/test.car") require.NoError(t, err) - defer f.Close() + t.Cleanup(func() { f.Close() }) r, err := carv1.NewCarReader(f) require.NoError(t, err) - path := "testv2blockstore.car" + + path := filepath.Join(t.TempDir(), "readwrite.car") ingester, err := blockstore.NewReadWrite(path, r.Header.Roots) - if err != nil { - t.Fatal(err) - } - defer func() { - os.Remove(path) - }() + require.NoError(t, err) + t.Cleanup(func() { ingester.Finalize() }) cids := make([]cid.Cid, 0) for { @@ -46,9 +44,8 @@ func TestBlockstore(t *testing.T) { } require.NoError(t, err) - if err := ingester.Put(b); err != nil { - t.Fatal(err) - } + err = ingester.Put(b) + require.NoError(t, err) cids = append(cids, b.Cid()) // try reading a random one: @@ -60,32 +57,24 @@ func TestBlockstore(t *testing.T) { for _, c := range cids { b, err := ingester.Get(c) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) if !b.Cid().Equals(c) { t.Fatal("wrong item returned") } } - if err := ingester.Finalize(); err != nil { - t.Fatal(err) - } + err = ingester.Finalize() + require.NoError(t, err) carb, err := blockstore.OpenReadOnly(path, false) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + t.Cleanup(func() { carb.Close() }) allKeysCh, err := carb.AllKeysChan(ctx) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) numKeysCh := 0 for c := range allKeysCh { b, err := carb.Get(c) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) if !b.Cid().Equals(c) { t.Fatal("wrong item returned") } @@ -97,9 +86,7 @@ func TestBlockstore(t *testing.T) { for _, c := range cids { b, err := carb.Get(c) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) if !b.Cid().Equals(c) { t.Fatal("wrong item returned") } @@ -107,12 +94,19 @@ func TestBlockstore(t *testing.T) { } func TestBlockstorePutSameHashes(t *testing.T) { - path := "testv2blockstore.car" - wbs, err := blockstore.NewReadWrite(path, nil) - if err != nil { - t.Fatal(err) - } - defer func() { os.Remove(path) }() + tdir := t.TempDir() + wbs, err := blockstore.NewReadWrite( + filepath.Join(tdir, "readwrite.car"), nil, + ) + require.NoError(t, err) + t.Cleanup(func() { wbs.Finalize() }) + + wbsd, err := blockstore.NewReadWrite( + filepath.Join(tdir, "readwrite-dedup.car"), nil, + blockstore.WithCidDeduplication, + ) + require.NoError(t, err) + t.Cleanup(func() { wbsd.Finalize() }) var blockList []blocks.Block @@ -131,16 +125,32 @@ func TestBlockstorePutSameHashes(t *testing.T) { blockList = append(blockList, block) } + // Two raw blocks, meaning we have two unique multihashes. + // However, we have multiple CIDs for each multihash. + // We also have two duplicate CIDs. data1 := []byte("foo bar") appendBlock(data1, 0, cid.Raw) appendBlock(data1, 1, cid.Raw) appendBlock(data1, 1, cid.DagCBOR) + appendBlock(data1, 1, cid.DagCBOR) // duplicate CID data2 := []byte("foo bar baz") appendBlock(data2, 0, cid.Raw) appendBlock(data2, 1, cid.Raw) + appendBlock(data2, 1, cid.Raw) // duplicate CID appendBlock(data2, 1, cid.DagCBOR) + countBlocks := func(bs *blockstore.ReadWrite) int { + ch, err := bs.AllKeysChan(context.Background()) + require.NoError(t, err) + + n := 0 + for range ch { + n++ + } + return n + } + for i, block := range blockList { // Has should never error here. // The first block should be missing. @@ -166,17 +176,28 @@ func TestBlockstorePutSameHashes(t *testing.T) { require.Equal(t, block.RawData(), got.RawData()) } + require.Equal(t, len(blockList), countBlocks(wbs)) + err = wbs.Finalize() require.NoError(t, err) + + // Put the same list of blocks to the blockstore that + // deduplicates by CID. + // We should end up with two fewer blocks. + for _, block := range blockList { + err = wbsd.Put(block) + require.NoError(t, err) + } + require.Equal(t, len(blockList)-2, countBlocks(wbsd)) + + err = wbsd.Finalize() + require.NoError(t, err) } func TestBlockstoreConcurrentUse(t *testing.T) { - path := "testv2blockstore.car" - wbs, err := blockstore.NewReadWrite(path, nil) - if err != nil { - t.Fatal(err) - } - defer func() { os.Remove(path) }() + wbs, err := blockstore.NewReadWrite(filepath.Join(t.TempDir(), "readwrite.car"), nil) + require.NoError(t, err) + t.Cleanup(func() { wbs.Finalize() }) var wg sync.WaitGroup for i := 0; i < 100; i++ { diff --git a/v2/index/insertionindex.go b/v2/index/insertionindex.go index 10b83eba..78dace1d 100644 --- a/v2/index/insertionindex.go +++ b/v2/index/insertionindex.go @@ -36,7 +36,7 @@ func (r recordDigest) Less(than llrb.Item) bool { func mkRecord(r Record) recordDigest { d, err := multihash.Decode(r.Hash()) if err != nil { - return recordDigest{} + panic(err) } return recordDigest{d.Digest, r} @@ -45,7 +45,7 @@ func mkRecord(r Record) recordDigest { func mkRecordFromCid(c cid.Cid, at uint64) recordDigest { d, err := multihash.Decode(c.Hash()) if err != nil { - return recordDigest{} + panic(err) } return recordDigest{d.Digest, Record{Cid: c, Idx: at}} @@ -141,3 +141,29 @@ func (ii *InsertionIndex) Flatten() (Index, error) { } return si, nil } + +func (ii *InsertionIndex) HasExactCID(c cid.Cid) bool { + d, err := multihash.Decode(c.Hash()) + if err != nil { + panic(err) + } + entry := recordDigest{digest: d.Digest} + + found := false + iter := func(i llrb.Item) bool { + existing := i.(recordDigest) + if !bytes.Equal(existing.digest, entry.digest) { + // We've already looked at all entries with matching digests. + return false + } + if existing.Record.Cid == c { + // We found an exact match. + found = true + return false + } + // Continue looking in ascending order. + return true + } + ii.items.AscendGreaterOrEqual(entry, iter) + return found +}