Skip to content

Commit

Permalink
blockstore: add option to deduplicate by CID
Browse files Browse the repository at this point in the history
And a test that uses duplicate hashes as well as duplicate CIDs.

We reuse the same insertion index, since it's enough for this purpose.
There's no need to keep a separate map or set of CIDs.

While at it, make the index package not silently swallow errors,
and improve the tests to handle errors more consistently.

Fixes #123.
Fixes #125.
  • Loading branch information
mvdan committed Jul 16, 2021
1 parent 2611339 commit 08f751c
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 46 deletions.
26 changes: 23 additions & 3 deletions v2/blockstore/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type ReadWrite struct {
ReadOnly
idx *index.InsertionIndex
header carv2.Header

dedupCids bool
}

// TODO consider exposing interfaces
Expand All @@ -52,6 +54,15 @@ func WithIndexPadding(p uint64) Option {
}
}

// WithCidDeduplication makes Put calls ignore blocks if the blockstore already
// has the exact same CID.
// This can help avoid redundancy in a CARv1's list of CID-Block pairs.
//
// Note that this compares whole CIDs, not just multihashes.
func WithCidDeduplication(b *ReadWrite) {
b.dedupCids = true
}

// NewReadWrite creates a new ReadWrite at the given path with a provided set of root CIDs as the car roots.
func NewReadWrite(path string, roots []cid.Cid, opts ...Option) (*ReadWrite, error) {
// TODO support resumption if the path provided contains partially written blocks in v2 format.
Expand Down Expand Up @@ -113,11 +124,16 @@ func (b *ReadWrite) PutMany(blks []blocks.Block) error {
defer b.mu.Unlock()

for _, bl := range blks {
c := bl.Cid()
if b.dedupCids && b.idx.HasExactCID(c) {
continue
}

n := uint64(b.carV1Writer.Position())
if err := util.LdWrite(b.carV1Writer, bl.Cid().Bytes(), bl.RawData()); err != nil {
if err := util.LdWrite(b.carV1Writer, c.Bytes(), bl.RawData()); err != nil {
return err
}
b.idx.InsertNoReplace(bl.Cid(), n)
b.idx.InsertNoReplace(c, n)
}
return nil
}
Expand All @@ -126,7 +142,11 @@ func (b *ReadWrite) PutMany(blks []blocks.Block) error {
// for more efficient subsequent read.
// After this call, this blockstore can no longer be used for read or write.
func (b *ReadWrite) Finalize() error {
b.panicIfFinalized()
if b.header.CarV1Size != 0 {
// Allow duplicate Finalize calls, just like Close.
// Still error, just like ReadOnly.Close; it should be discarded.
return fmt.Errorf("called Finalize twice")
}

b.mu.Lock()
defer b.mu.Unlock()
Expand Down
103 changes: 62 additions & 41 deletions v2/blockstore/readwrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"math/rand"
"os"
"path/filepath"
"sync"
"testing"
"time"
Expand All @@ -26,17 +27,14 @@ func TestBlockstore(t *testing.T) {

f, err := os.Open("testdata/test.car")
require.NoError(t, err)
defer f.Close()
t.Cleanup(func() { f.Close() })
r, err := carv1.NewCarReader(f)
require.NoError(t, err)
path := "testv2blockstore.car"

path := filepath.Join(t.TempDir(), "readwrite.car")
ingester, err := blockstore.NewReadWrite(path, r.Header.Roots)
if err != nil {
t.Fatal(err)
}
defer func() {
os.Remove(path)
}()
require.NoError(t, err)
t.Cleanup(func() { ingester.Finalize() })

cids := make([]cid.Cid, 0)
for {
Expand All @@ -46,9 +44,8 @@ func TestBlockstore(t *testing.T) {
}
require.NoError(t, err)

if err := ingester.Put(b); err != nil {
t.Fatal(err)
}
err = ingester.Put(b)
require.NoError(t, err)
cids = append(cids, b.Cid())

// try reading a random one:
Expand All @@ -60,32 +57,24 @@ func TestBlockstore(t *testing.T) {

for _, c := range cids {
b, err := ingester.Get(c)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
if !b.Cid().Equals(c) {
t.Fatal("wrong item returned")
}
}

if err := ingester.Finalize(); err != nil {
t.Fatal(err)
}
err = ingester.Finalize()
require.NoError(t, err)
carb, err := blockstore.OpenReadOnly(path, false)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
t.Cleanup(func() { carb.Close() })

allKeysCh, err := carb.AllKeysChan(ctx)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
numKeysCh := 0
for c := range allKeysCh {
b, err := carb.Get(c)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
if !b.Cid().Equals(c) {
t.Fatal("wrong item returned")
}
Expand All @@ -97,22 +86,27 @@ func TestBlockstore(t *testing.T) {

for _, c := range cids {
b, err := carb.Get(c)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
if !b.Cid().Equals(c) {
t.Fatal("wrong item returned")
}
}
}

func TestBlockstorePutSameHashes(t *testing.T) {
path := "testv2blockstore.car"
wbs, err := blockstore.NewReadWrite(path, nil)
if err != nil {
t.Fatal(err)
}
defer func() { os.Remove(path) }()
tdir := t.TempDir()
wbs, err := blockstore.NewReadWrite(
filepath.Join(tdir, "readwrite.car"), nil,
)
require.NoError(t, err)
t.Cleanup(func() { wbs.Finalize() })

wbsd, err := blockstore.NewReadWrite(
filepath.Join(tdir, "readwrite-dedup.car"), nil,
blockstore.WithCidDeduplication,
)
require.NoError(t, err)
t.Cleanup(func() { wbsd.Finalize() })

var blockList []blocks.Block

Expand All @@ -131,16 +125,32 @@ func TestBlockstorePutSameHashes(t *testing.T) {
blockList = append(blockList, block)
}

// Two raw blocks, meaning we have two unique multihashes.
// However, we have multiple CIDs for each multihash.
// We also have two duplicate CIDs.
data1 := []byte("foo bar")
appendBlock(data1, 0, cid.Raw)
appendBlock(data1, 1, cid.Raw)
appendBlock(data1, 1, cid.DagCBOR)
appendBlock(data1, 1, cid.DagCBOR) // duplicate CID

data2 := []byte("foo bar baz")
appendBlock(data2, 0, cid.Raw)
appendBlock(data2, 1, cid.Raw)
appendBlock(data2, 1, cid.Raw) // duplicate CID
appendBlock(data2, 1, cid.DagCBOR)

countBlocks := func(bs *blockstore.ReadWrite) int {
ch, err := bs.AllKeysChan(context.Background())
require.NoError(t, err)

n := 0
for range ch {
n++
}
return n
}

for i, block := range blockList {
// Has should never error here.
// The first block should be missing.
Expand All @@ -166,17 +176,28 @@ func TestBlockstorePutSameHashes(t *testing.T) {
require.Equal(t, block.RawData(), got.RawData())
}

require.Equal(t, len(blockList), countBlocks(wbs))

err = wbs.Finalize()
require.NoError(t, err)

// Put the same list of blocks to the blockstore that
// deduplicates by CID.
// We should end up with two fewer blocks.
for _, block := range blockList {
err = wbsd.Put(block)
require.NoError(t, err)
}
require.Equal(t, len(blockList)-2, countBlocks(wbsd))

err = wbsd.Finalize()
require.NoError(t, err)
}

func TestBlockstoreConcurrentUse(t *testing.T) {
path := "testv2blockstore.car"
wbs, err := blockstore.NewReadWrite(path, nil)
if err != nil {
t.Fatal(err)
}
defer func() { os.Remove(path) }()
wbs, err := blockstore.NewReadWrite(filepath.Join(t.TempDir(), "readwrite.car"), nil)
require.NoError(t, err)
t.Cleanup(func() { wbs.Finalize() })

var wg sync.WaitGroup
for i := 0; i < 100; i++ {
Expand Down
30 changes: 28 additions & 2 deletions v2/index/insertionindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (r recordDigest) Less(than llrb.Item) bool {
func mkRecord(r Record) recordDigest {
d, err := multihash.Decode(r.Hash())
if err != nil {
return recordDigest{}
panic(err)
}

return recordDigest{d.Digest, r}
Expand All @@ -45,7 +45,7 @@ func mkRecord(r Record) recordDigest {
func mkRecordFromCid(c cid.Cid, at uint64) recordDigest {
d, err := multihash.Decode(c.Hash())
if err != nil {
return recordDigest{}
panic(err)
}

return recordDigest{d.Digest, Record{Cid: c, Idx: at}}
Expand Down Expand Up @@ -141,3 +141,29 @@ func (ii *InsertionIndex) Flatten() (Index, error) {
}
return si, nil
}

func (ii *InsertionIndex) HasExactCID(c cid.Cid) bool {
d, err := multihash.Decode(c.Hash())
if err != nil {
panic(err)
}
entry := recordDigest{digest: d.Digest}

found := false
iter := func(i llrb.Item) bool {
existing := i.(recordDigest)
if !bytes.Equal(existing.digest, entry.digest) {
// We've already looked at all entries with matching digests.
return false
}
if existing.Record.Cid == c {
// We found an exact match.
found = true
return false
}
// Continue looking in ascending order.
return true
}
ii.items.AscendGreaterOrEqual(entry, iter)
return found
}

0 comments on commit 08f751c

Please sign in to comment.