From 02d233a2ff1162c05bce6263ec6a859eb43e442f Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 7 Oct 2016 11:14:45 -0700 Subject: [PATCH] cid: integrate cid into bitswap and blockstores License: MIT Signed-off-by: Jeromy --- blocks/blocks.go | 33 ++++---- blocks/blocks_test.go | 15 ++-- blocks/blockstore/arc_cache.go | 44 +++++++---- blocks/blockstore/arc_cache_test.go | 52 +++++++++---- blocks/blockstore/blockstore.go | 67 ++++++++-------- blocks/blockstore/blockstore_test.go | 42 +++++----- blocks/blockstore/bloom_cache.go | 27 +++---- blocks/blockstore/bloom_cache_test.go | 10 +-- blocks/blockstore/util/remove.go | 3 +- blocks/set/set.go | 45 +++++------ blocks/set/set_test.go | 13 ++-- blockservice/blockservice.go | 37 +++------ blockservice/test/blocks_test.go | 17 ++-- core/commands/bitswap.go | 7 +- core/commands/block.go | 8 +- core/commands/dht.go | 3 +- core/commands/files/files.go | 7 +- core/commands/ls.go | 5 +- core/commands/pubsub.go | 2 +- core/commands/refs.go | 9 +-- core/commands/repo.go | 2 +- core/corerepo/gc.go | 5 +- core/coreunix/add_test.go | 5 +- exchange/bitswap/bitswap.go | 78 +++++++++---------- exchange/bitswap/bitswap_test.go | 42 +++++----- exchange/bitswap/decision/bench_test.go | 8 +- exchange/bitswap/decision/engine.go | 21 ++--- exchange/bitswap/decision/engine_test.go | 6 +- exchange/bitswap/decision/ledger.go | 17 ++-- .../bitswap/decision/peer_request_queue.go | 34 ++++---- .../decision/peer_request_queue_test.go | 25 +++--- exchange/bitswap/message/message.go | 47 ++++++----- exchange/bitswap/message/message_test.go | 63 ++++++++------- exchange/bitswap/network/interface.go | 9 ++- exchange/bitswap/network/ipfs_impl.go | 13 +--- .../bitswap/notifications/notifications.go | 19 ++--- .../notifications/notifications_test.go | 24 +++--- exchange/bitswap/stat.go | 5 +- exchange/bitswap/testnet/virtual.go | 11 +-- exchange/bitswap/wantlist/wantlist.go | 31 ++++---- exchange/bitswap/wantmanager.go | 22 +++--- exchange/bitswap/workers.go | 26 +++---- exchange/interface.go | 8 +- exchange/offline/offline.go | 11 +-- exchange/offline/offline_test.go | 17 ++-- exchange/reprovide/reprovide.go | 4 +- importer/chunk/rabin_test.go | 7 +- merkledag/merkledag.go | 34 +++----- mfs/file.go | 3 +- pin/gc/gc.go | 28 +++---- test/integration/bitswap_wo_routing_test.go | 4 +- thirdparty/ds-help/key.go | 4 + 52 files changed, 544 insertions(+), 535 deletions(-) diff --git a/blocks/blocks.go b/blocks/blocks.go index 4b26a22dd038..e7b2b1042791 100644 --- a/blocks/blocks.go +++ b/blocks/blocks.go @@ -6,8 +6,6 @@ import ( "errors" "fmt" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" - mh "gx/ipfs/QmYDds3421prZgqKbLpEK7T9Aa2eVdQ7o3YarX1LVLdP2J/go-multihash" cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" @@ -18,37 +16,39 @@ var ErrWrongHash = errors.New("data did not match given hash!") type Block interface { Multihash() mh.Multihash RawData() []byte - Key() key.Key + Cid() *cid.Cid String() string Loggable() map[string]interface{} } // Block is a singular block of data in ipfs type BasicBlock struct { - multihash mh.Multihash - data []byte + cid *cid.Cid + data []byte } // NewBlock creates a Block object from opaque data. It will hash the data. func NewBlock(data []byte) *BasicBlock { - return &BasicBlock{data: data, multihash: u.Hash(data)} + // TODO: fix assumptions + return &BasicBlock{data: data, cid: cid.NewCidV0(u.Hash(data))} } // NewBlockWithHash creates a new block when the hash of the data // is already known, this is used to save time in situations where // we are able to be confident that the data is correct -func NewBlockWithHash(data []byte, h mh.Multihash) (*BasicBlock, error) { +func NewBlockWithCid(data []byte, c *cid.Cid) (*BasicBlock, error) { if u.Debug { - chk := u.Hash(data) - if string(chk) != string(h) { + // TODO: fix assumptions + chkc := cid.NewCidV0(u.Hash(data)) + if !chkc.Equals(c) { return nil, ErrWrongHash } } - return &BasicBlock{data: data, multihash: h}, nil + return &BasicBlock{data: data, cid: c}, nil } func (b *BasicBlock) Multihash() mh.Multihash { - return b.multihash + return b.cid.Hash() } func (b *BasicBlock) RawData() []byte { @@ -56,20 +56,15 @@ func (b *BasicBlock) RawData() []byte { } func (b *BasicBlock) Cid() *cid.Cid { - return cid.NewCidV0(b.multihash) -} - -// Key returns the block's Multihash as a Key value. -func (b *BasicBlock) Key() key.Key { - return key.Key(b.multihash) + return b.cid } func (b *BasicBlock) String() string { - return fmt.Sprintf("[Block %s]", b.Key()) + return fmt.Sprintf("[Block %s]", b.Cid()) } func (b *BasicBlock) Loggable() map[string]interface{} { return map[string]interface{}{ - "block": b.Key().String(), + "block": b.Cid().String(), } } diff --git a/blocks/blocks_test.go b/blocks/blocks_test.go index 4d5d5908f51b..4ce12866ec85 100644 --- a/blocks/blocks_test.go +++ b/blocks/blocks_test.go @@ -5,6 +5,7 @@ import ( "testing" mh "gx/ipfs/QmYDds3421prZgqKbLpEK7T9Aa2eVdQ7o3YarX1LVLdP2J/go-multihash" + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" ) @@ -44,12 +45,12 @@ func TestHash(t *testing.T) { } } -func TestKey(t *testing.T) { +func TestCid(t *testing.T) { data := []byte("yet another data") block := NewBlock(data) - key := block.Key() + c := block.Cid() - if !bytes.Equal(block.Multihash(), key.ToMultihash()) { + if !bytes.Equal(block.Multihash(), c.Hash()) { t.Error("key contains wrong data") } } @@ -66,8 +67,10 @@ func TestManualHash(t *testing.T) { t.Fatal(err) } + c := cid.NewCidV0(hash) + u.Debug = false - block, err := NewBlockWithHash(data, hash) + block, err := NewBlockWithCid(data, c) if err != nil { t.Fatal(err) } @@ -77,7 +80,7 @@ func TestManualHash(t *testing.T) { } data[5] = byte((uint32(data[5]) + 5) % 256) // Transfrom hash to be different - block, err = NewBlockWithHash(data, hash) + block, err = NewBlockWithCid(data, c) if err != nil { t.Fatal(err) } @@ -88,7 +91,7 @@ func TestManualHash(t *testing.T) { u.Debug = true - block, err = NewBlockWithHash(data, hash) + block, err = NewBlockWithCid(data, c) if err != ErrWrongHash { t.Fatal(err) } diff --git a/blocks/blockstore/arc_cache.go b/blocks/blockstore/arc_cache.go index fd6ff7eb9714..42d798a4eb50 100644 --- a/blocks/blockstore/arc_cache.go +++ b/blocks/blockstore/arc_cache.go @@ -1,13 +1,13 @@ package blockstore import ( - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" + "context" "github.com/ipfs/go-ipfs/blocks" - context "context" "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface" lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru" + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" ) @@ -31,7 +31,7 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache, return c, nil } -func (b *arccache) DeleteBlock(k key.Key) error { +func (b *arccache) DeleteBlock(k *cid.Cid) error { if has, ok := b.hasCached(k); ok && !has { return ErrNotFound } @@ -40,7 +40,7 @@ func (b *arccache) DeleteBlock(k key.Key) error { err := b.blockstore.DeleteBlock(k) switch err { case nil, ds.ErrNotFound, ErrNotFound: - b.arc.Add(k, false) + b.addCache(k, false) return err default: return err @@ -49,15 +49,16 @@ func (b *arccache) DeleteBlock(k key.Key) error { // if ok == false has is inconclusive // if ok == true then has respons to question: is it contained -func (b *arccache) hasCached(k key.Key) (has bool, ok bool) { +func (b *arccache) hasCached(k *cid.Cid) (has bool, ok bool) { b.total.Inc() - if k == "" { + if k == nil { + log.Error("nil cid in arccache") // Return cache invalid so the call to blockstore happens // in case of invalid key and correct error is created. return false, false } - h, ok := b.arc.Get(k) + h, ok := b.arc.Get(k.KeyString()) if ok { b.hits.Inc() return h.(bool), true @@ -65,40 +66,45 @@ func (b *arccache) hasCached(k key.Key) (has bool, ok bool) { return false, false } -func (b *arccache) Has(k key.Key) (bool, error) { +func (b *arccache) Has(k *cid.Cid) (bool, error) { if has, ok := b.hasCached(k); ok { return has, nil } res, err := b.blockstore.Has(k) if err == nil { - b.arc.Add(k, res) + b.addCache(k, res) } return res, err } -func (b *arccache) Get(k key.Key) (blocks.Block, error) { +func (b *arccache) Get(k *cid.Cid) (blocks.Block, error) { + if k == nil { + log.Error("nil cid in arc cache") + return nil, ErrNotFound + } + if has, ok := b.hasCached(k); ok && !has { return nil, ErrNotFound } bl, err := b.blockstore.Get(k) if bl == nil && err == ErrNotFound { - b.arc.Add(k, false) + b.addCache(k, false) } else if bl != nil { - b.arc.Add(k, true) + b.addCache(k, true) } return bl, err } func (b *arccache) Put(bl blocks.Block) error { - if has, ok := b.hasCached(bl.Key()); ok && has { + if has, ok := b.hasCached(bl.Cid()); ok && has { return nil } err := b.blockstore.Put(bl) if err == nil { - b.arc.Add(bl.Key(), true) + b.addCache(bl.Cid(), true) } return err } @@ -108,7 +114,7 @@ func (b *arccache) PutMany(bs []blocks.Block) error { for _, block := range bs { // call put on block if result is inconclusive or we are sure that // the block isn't in storage - if has, ok := b.hasCached(block.Key()); !ok || (ok && !has) { + if has, ok := b.hasCached(block.Cid()); !ok || (ok && !has) { good = append(good, block) } } @@ -117,12 +123,16 @@ func (b *arccache) PutMany(bs []blocks.Block) error { return err } for _, block := range good { - b.arc.Add(block.Key(), true) + b.addCache(block.Cid(), true) } return nil } -func (b *arccache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) { +func (b *arccache) addCache(c *cid.Cid, has bool) { + b.arc.Add(c.KeyString(), has) +} + +func (b *arccache) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) { return b.blockstore.AllKeysChan(ctx) } diff --git a/blocks/blockstore/arc_cache_test.go b/blocks/blockstore/arc_cache_test.go index 4bf9307a8a3e..384cca270f7e 100644 --- a/blocks/blockstore/arc_cache_test.go +++ b/blocks/blockstore/arc_cache_test.go @@ -1,12 +1,12 @@ package blockstore import ( + "context" "testing" "github.com/ipfs/go-ipfs/blocks" - "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" - context "context" + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" syncds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync" ) @@ -60,7 +60,7 @@ func TestRemoveCacheEntryOnDelete(t *testing.T) { writeHitTheDatastore = true }) - arc.DeleteBlock(exampleBlock.Key()) + arc.DeleteBlock(exampleBlock.Cid()) arc.Put(exampleBlock) if !writeHitTheDatastore { t.Fail() @@ -78,9 +78,9 @@ func TestElideDuplicateWrite(t *testing.T) { func TestHasRequestTriggersCache(t *testing.T) { arc, _, cd := createStores(t) - arc.Has(exampleBlock.Key()) + arc.Has(exampleBlock.Cid()) trap("has hit datastore", cd, t) - if has, err := arc.Has(exampleBlock.Key()); has || err != nil { + if has, err := arc.Has(exampleBlock.Cid()); has || err != nil { t.Fatal("has was true but there is no such block") } @@ -92,7 +92,7 @@ func TestHasRequestTriggersCache(t *testing.T) { trap("has hit datastore", cd, t) - if has, err := arc.Has(exampleBlock.Key()); !has || err != nil { + if has, err := arc.Has(exampleBlock.Cid()); !has || err != nil { t.Fatal("has returned invalid result") } } @@ -100,13 +100,13 @@ func TestHasRequestTriggersCache(t *testing.T) { func TestGetFillsCache(t *testing.T) { arc, _, cd := createStores(t) - if bl, err := arc.Get(exampleBlock.Key()); bl != nil || err == nil { + if bl, err := arc.Get(exampleBlock.Cid()); bl != nil || err == nil { t.Fatal("block was found or there was no error") } trap("has hit datastore", cd, t) - if has, err := arc.Has(exampleBlock.Key()); has || err != nil { + if has, err := arc.Has(exampleBlock.Cid()); has || err != nil { t.Fatal("has was true but there is no such block") } @@ -118,7 +118,7 @@ func TestGetFillsCache(t *testing.T) { trap("has hit datastore", cd, t) - if has, err := arc.Has(exampleBlock.Key()); !has || err != nil { + if has, err := arc.Has(exampleBlock.Cid()); !has || err != nil { t.Fatal("has returned invalid result") } } @@ -126,15 +126,15 @@ func TestGetFillsCache(t *testing.T) { func TestGetAndDeleteFalseShortCircuit(t *testing.T) { arc, _, cd := createStores(t) - arc.Has(exampleBlock.Key()) + arc.Has(exampleBlock.Cid()) trap("get hit datastore", cd, t) - if bl, err := arc.Get(exampleBlock.Key()); bl != nil || err != ErrNotFound { + if bl, err := arc.Get(exampleBlock.Cid()); bl != nil || err != ErrNotFound { t.Fatal("get returned invalid result") } - if arc.DeleteBlock(exampleBlock.Key()) != ErrNotFound { + if arc.DeleteBlock(exampleBlock.Cid()) != ErrNotFound { t.Fatal("expected ErrNotFound error") } } @@ -148,7 +148,7 @@ func TestArcCreationFailure(t *testing.T) { func TestInvalidKey(t *testing.T) { arc, _, _ := createStores(t) - bl, err := arc.Get(key.Key("")) + bl, err := arc.Get(nil) if bl != nil { t.Fatal("blocks should be nil") @@ -163,10 +163,28 @@ func TestHasAfterSucessfulGetIsCached(t *testing.T) { bs.Put(exampleBlock) - arc.Get(exampleBlock.Key()) + arc.Get(exampleBlock.Cid()) trap("has hit datastore", cd, t) - arc.Has(exampleBlock.Key()) + arc.Has(exampleBlock.Cid()) +} + +func TestDifferentKeyObjectsWork(t *testing.T) { + arc, bs, cd := createStores(t) + + bs.Put(exampleBlock) + + arc.Get(exampleBlock.Cid()) + + trap("has hit datastore", cd, t) + cidstr := exampleBlock.Cid().String() + + ncid, err := cid.Decode(cidstr) + if err != nil { + t.Fatal(err) + } + + arc.Has(ncid) } func TestPutManyCaches(t *testing.T) { @@ -174,9 +192,9 @@ func TestPutManyCaches(t *testing.T) { arc.PutMany([]blocks.Block{exampleBlock}) trap("has hit datastore", cd, t) - arc.Has(exampleBlock.Key()) + arc.Has(exampleBlock.Cid()) untrap(cd) - arc.DeleteBlock(exampleBlock.Key()) + arc.DeleteBlock(exampleBlock.Cid()) arc.Put(exampleBlock) trap("PunMany has hit datastore", cd, t) diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index 162a21da035c..7d4d10de17b1 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -3,15 +3,16 @@ package blockstore import ( + "context" "errors" "sync" "sync/atomic" - context "context" blocks "github.com/ipfs/go-ipfs/blocks" + dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help" + logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" - mh "gx/ipfs/QmYDds3421prZgqKbLpEK7T9Aa2eVdQ7o3YarX1LVLdP2J/go-multihash" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" dsns "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/namespace" dsq "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/query" @@ -29,13 +30,13 @@ var ErrNotFound = errors.New("blockstore: block not found") // Blockstore wraps a Datastore type Blockstore interface { - DeleteBlock(key.Key) error - Has(key.Key) (bool, error) - Get(key.Key) (blocks.Block, error) + DeleteBlock(*cid.Cid) error + Has(*cid.Cid) (bool, error) + Get(*cid.Cid) (blocks.Block, error) Put(blocks.Block) error PutMany([]blocks.Block) error - AllKeysChan(ctx context.Context) (<-chan key.Key, error) + AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) } type GCBlockstore interface { @@ -80,12 +81,13 @@ func (bs *blockstore) HashOnRead(enabled bool) { bs.rehash = enabled } -func (bs *blockstore) Get(k key.Key) (blocks.Block, error) { - if k == "" { +func (bs *blockstore) Get(k *cid.Cid) (blocks.Block, error) { + if k == nil { + log.Error("nil cid in blockstore") return nil, ErrNotFound } - maybeData, err := bs.datastore.Get(k.DsKey()) + maybeData, err := bs.datastore.Get(dshelp.NewKeyFromBinary(k.KeyString())) if err == ds.ErrNotFound { return nil, ErrNotFound } @@ -99,18 +101,18 @@ func (bs *blockstore) Get(k key.Key) (blocks.Block, error) { if bs.rehash { rb := blocks.NewBlock(bdata) - if rb.Key() != k { + if !rb.Cid().Equals(k) { return nil, ErrHashMismatch } else { return rb, nil } } else { - return blocks.NewBlockWithHash(bdata, mh.Multihash(k)) + return blocks.NewBlockWithCid(bdata, k) } } func (bs *blockstore) Put(block blocks.Block) error { - k := block.Key().DsKey() + k := dshelp.NewKeyFromBinary(block.Cid().KeyString()) // Has is cheaper than Put, so see if we already have it exists, err := bs.datastore.Has(k) @@ -126,7 +128,7 @@ func (bs *blockstore) PutMany(blocks []blocks.Block) error { return err } for _, b := range blocks { - k := b.Key().DsKey() + k := dshelp.NewKeyFromBinary(b.Cid().KeyString()) exists, err := bs.datastore.Has(k) if err == nil && exists { continue @@ -140,19 +142,19 @@ func (bs *blockstore) PutMany(blocks []blocks.Block) error { return t.Commit() } -func (bs *blockstore) Has(k key.Key) (bool, error) { - return bs.datastore.Has(k.DsKey()) +func (bs *blockstore) Has(k *cid.Cid) (bool, error) { + return bs.datastore.Has(dshelp.NewKeyFromBinary(k.KeyString())) } -func (s *blockstore) DeleteBlock(k key.Key) error { - return s.datastore.Delete(k.DsKey()) +func (s *blockstore) DeleteBlock(k *cid.Cid) error { + return s.datastore.Delete(dshelp.NewKeyFromBinary(k.KeyString())) } // AllKeysChan runs a query for keys from the blockstore. // this is very simplistic, in the future, take dsq.Query as a param? // // AllKeysChan respects context -func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) { +func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) { // KeysOnly, because that would be _a lot_ of data. q := dsq.Query{KeysOnly: true} @@ -164,39 +166,38 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) { } // this function is here to compartmentalize - get := func() (key.Key, bool) { + get := func() (*cid.Cid, bool) { select { case <-ctx.Done(): - return "", false + return nil, false case e, more := <-res.Next(): if !more { - return "", false + return nil, false } if e.Error != nil { log.Debug("blockstore.AllKeysChan got err:", e.Error) - return "", false + return nil, false } // need to convert to key.Key using key.KeyFromDsKey. - k, err := key.KeyFromDsKey(ds.NewKey(e.Key)) + kb, err := dshelp.BinaryFromDsKey(ds.NewKey(e.Key)) // TODO: calling NewKey isnt free if err != nil { log.Warningf("error parsing key from DsKey: ", err) - return "", true + return nil, true } - log.Debug("blockstore: query got key", k) - // key must be a multihash. else ignore it. - _, err = mh.Cast([]byte(k)) + c, err := cid.Cast(kb) if err != nil { - log.Warningf("key from datastore was not a multihash: ", err) - return "", true + log.Warning("error parsing cid from decoded DsKey: ", err) + return nil, true } + log.Debug("blockstore: query got key", c) - return k, true + return c, true } } - output := make(chan key.Key, dsq.KeysOnlyBufSize) + output := make(chan *cid.Cid, dsq.KeysOnlyBufSize) go func() { defer func() { res.Process().Close() // ensure exit (signals early exit, too) @@ -208,7 +209,7 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) { if !ok { return } - if k == "" { + if k == nil { continue } diff --git a/blocks/blockstore/blockstore_test.go b/blocks/blockstore/blockstore_test.go index 75385dd2a802..98882d479fd3 100644 --- a/blocks/blockstore/blockstore_test.go +++ b/blocks/blockstore/blockstore_test.go @@ -2,22 +2,24 @@ package blockstore import ( "bytes" + "context" "fmt" "testing" - context "context" + blocks "github.com/ipfs/go-ipfs/blocks" + dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help" + + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" dsq "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/query" ds_sync "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync" - - blocks "github.com/ipfs/go-ipfs/blocks" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" ) func TestGetWhenKeyNotPresent(t *testing.T) { bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) - bl, err := bs.Get(key.Key("not present")) + c := cid.NewCidV0(u.Hash([]byte("stuff"))) + bl, err := bs.Get(c) if bl != nil { t.Error("nil block expected") @@ -27,9 +29,9 @@ func TestGetWhenKeyNotPresent(t *testing.T) { } } -func TestGetWhenKeyIsEmptyString(t *testing.T) { +func TestGetWhenKeyIsNil(t *testing.T) { bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) - _, err := bs.Get(key.Key("")) + _, err := bs.Get(nil) if err != ErrNotFound { t.Fail() } @@ -44,7 +46,7 @@ func TestPutThenGetBlock(t *testing.T) { t.Fatal(err) } - blockFromBlockstore, err := bs.Get(block.Key()) + blockFromBlockstore, err := bs.Get(block.Cid()) if err != nil { t.Fatal(err) } @@ -62,7 +64,7 @@ func TestHashOnRead(t *testing.T) { bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) bl := blocks.NewBlock([]byte("some data")) - blBad, err := blocks.NewBlockWithHash([]byte("some other data"), bl.Key().ToMultihash()) + blBad, err := blocks.NewBlockWithCid([]byte("some other data"), bl.Cid()) if err != nil { t.Fatal("debug is off, still got an error") } @@ -71,35 +73,35 @@ func TestHashOnRead(t *testing.T) { bs.Put(bl2) bs.HashOnRead(true) - if _, err := bs.Get(bl.Key()); err != ErrHashMismatch { + if _, err := bs.Get(bl.Cid()); err != ErrHashMismatch { t.Fatalf("expected '%v' got '%v'\n", ErrHashMismatch, err) } - if b, err := bs.Get(bl2.Key()); err != nil || b.String() != bl2.String() { + if b, err := bs.Get(bl2.Cid()); err != nil || b.String() != bl2.String() { t.Fatal("got wrong blocks") } } -func newBlockStoreWithKeys(t *testing.T, d ds.Datastore, N int) (Blockstore, []key.Key) { +func newBlockStoreWithKeys(t *testing.T, d ds.Datastore, N int) (Blockstore, []*cid.Cid) { if d == nil { d = ds.NewMapDatastore() } bs := NewBlockstore(ds_sync.MutexWrap(d)) - keys := make([]key.Key, N) + keys := make([]*cid.Cid, N) for i := 0; i < N; i++ { block := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i))) err := bs.Put(block) if err != nil { t.Fatal(err) } - keys[i] = block.Key() + keys[i] = block.Cid() } return bs, keys } -func collect(ch <-chan key.Key) []key.Key { - var keys []key.Key +func collect(ch <-chan *cid.Cid) []*cid.Cid { + var keys []*cid.Cid for k := range ch { keys = append(keys, k) } @@ -188,18 +190,18 @@ func TestValueTypeMismatch(t *testing.T) { block := blocks.NewBlock([]byte("some data")) datastore := ds.NewMapDatastore() - k := BlockPrefix.Child(block.Key().DsKey()) + k := BlockPrefix.Child(dshelp.NewKeyFromBinary(block.Cid().KeyString())) datastore.Put(k, "data that isn't a block!") blockstore := NewBlockstore(ds_sync.MutexWrap(datastore)) - _, err := blockstore.Get(block.Key()) + _, err := blockstore.Get(block.Cid()) if err != ValueTypeMismatch { t.Fatal(err) } } -func expectMatches(t *testing.T, expect, actual []key.Key) { +func expectMatches(t *testing.T, expect, actual []*cid.Cid) { if len(expect) != len(actual) { t.Errorf("expect and actual differ: %d != %d", len(expect), len(actual)) @@ -207,7 +209,7 @@ func expectMatches(t *testing.T, expect, actual []key.Key) { for _, ek := range expect { found := false for _, ak := range actual { - if ek == ak { + if ek.Equals(ak) { found = true } } diff --git a/blocks/blockstore/bloom_cache.go b/blocks/blockstore/bloom_cache.go index 6abd4886c24e..e78a4211c95d 100644 --- a/blocks/blockstore/bloom_cache.go +++ b/blocks/blockstore/bloom_cache.go @@ -1,14 +1,14 @@ package blockstore import ( + "context" "sync/atomic" "time" "github.com/ipfs/go-ipfs/blocks" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" - context "context" "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface" + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" bloom "gx/ipfs/QmeiMCBkYHxkDkDfnDadzz4YxY5ruL5Pj499essE4vRsGM/bbloom" ) @@ -84,7 +84,7 @@ func (b *bloomcache) Rebuild(ctx context.Context) { select { case key, ok := <-ch: if ok { - b.bloom.AddTS([]byte(key)) // Use binary key, the more compact the better + b.bloom.AddTS(key.Bytes()) // Use binary key, the more compact the better } else { finish = true } @@ -97,7 +97,7 @@ func (b *bloomcache) Rebuild(ctx context.Context) { atomic.StoreInt32(&b.active, 1) } -func (b *bloomcache) DeleteBlock(k key.Key) error { +func (b *bloomcache) DeleteBlock(k *cid.Cid) error { if has, ok := b.hasCached(k); ok && !has { return ErrNotFound } @@ -107,15 +107,16 @@ func (b *bloomcache) DeleteBlock(k key.Key) error { // if ok == false has is inconclusive // if ok == true then has respons to question: is it contained -func (b *bloomcache) hasCached(k key.Key) (has bool, ok bool) { +func (b *bloomcache) hasCached(k *cid.Cid) (has bool, ok bool) { b.total.Inc() - if k == "" { + if k == nil { + log.Error("nil cid in bloom cache") // Return cache invalid so call to blockstore // in case of invalid key is forwarded deeper return false, false } if b.BloomActive() { - blr := b.bloom.HasTS([]byte(k)) + blr := b.bloom.HasTS(k.Bytes()) if blr == false { // not contained in bloom is only conclusive answer bloom gives b.hits.Inc() return false, true @@ -124,7 +125,7 @@ func (b *bloomcache) hasCached(k key.Key) (has bool, ok bool) { return false, false } -func (b *bloomcache) Has(k key.Key) (bool, error) { +func (b *bloomcache) Has(k *cid.Cid) (bool, error) { if has, ok := b.hasCached(k); ok { return has, nil } @@ -132,7 +133,7 @@ func (b *bloomcache) Has(k key.Key) (bool, error) { return b.blockstore.Has(k) } -func (b *bloomcache) Get(k key.Key) (blocks.Block, error) { +func (b *bloomcache) Get(k *cid.Cid) (blocks.Block, error) { if has, ok := b.hasCached(k); ok && !has { return nil, ErrNotFound } @@ -141,13 +142,13 @@ func (b *bloomcache) Get(k key.Key) (blocks.Block, error) { } func (b *bloomcache) Put(bl blocks.Block) error { - if has, ok := b.hasCached(bl.Key()); ok && has { + if has, ok := b.hasCached(bl.Cid()); ok && has { return nil } err := b.blockstore.Put(bl) if err == nil { - b.bloom.AddTS([]byte(bl.Key())) + b.bloom.AddTS(bl.Cid().Bytes()) } return err } @@ -162,12 +163,12 @@ func (b *bloomcache) PutMany(bs []blocks.Block) error { return err } for _, bl := range bs { - b.bloom.AddTS([]byte(bl.Key())) + b.bloom.AddTS(bl.Cid().Bytes()) } return nil } -func (b *bloomcache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) { +func (b *bloomcache) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) { return b.blockstore.AllKeysChan(ctx) } diff --git a/blocks/blockstore/bloom_cache_test.go b/blocks/blockstore/bloom_cache_test.go index 248308874d64..8bdf567f07a8 100644 --- a/blocks/blockstore/bloom_cache_test.go +++ b/blocks/blockstore/bloom_cache_test.go @@ -44,7 +44,7 @@ func TestPutManyAddsToBloom(t *testing.T) { block2 := blocks.NewBlock([]byte("bar")) cachedbs.PutMany([]blocks.Block{block1}) - has, err := cachedbs.Has(block1.Key()) + has, err := cachedbs.Has(block1.Cid()) if err != nil { t.Fatal(err) } @@ -52,7 +52,7 @@ func TestPutManyAddsToBloom(t *testing.T) { t.Fatal("added block is reported missing") } - has, err = cachedbs.Has(block2.Key()) + has, err = cachedbs.Has(block2.Cid()) if err != nil { t.Fatal(err) } @@ -93,7 +93,7 @@ func TestHasIsBloomCached(t *testing.T) { }) for i := 0; i < 1000; i++ { - cachedbs.Has(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i+2000))).Key()) + cachedbs.Has(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i+2000))).Cid()) } if float64(cacheFails)/float64(1000) > float64(0.05) { @@ -112,11 +112,11 @@ func TestHasIsBloomCached(t *testing.T) { t.Fatalf("expected datastore hit: %d", cacheFails) } - if has, err := cachedbs.Has(block.Key()); !has || err != nil { + if has, err := cachedbs.Has(block.Cid()); !has || err != nil { t.Fatal("has gave wrong response") } - bl, err := cachedbs.Get(block.Key()) + bl, err := cachedbs.Get(block.Cid()) if bl.String() != block.String() { t.Fatal("block data doesn't match") } diff --git a/blocks/blockstore/util/remove.go b/blocks/blockstore/util/remove.go index 4b5f86d14ca2..3afc92d456d1 100644 --- a/blocks/blockstore/util/remove.go +++ b/blocks/blockstore/util/remove.go @@ -6,7 +6,6 @@ import ( bs "github.com/ipfs/go-ipfs/blocks/blockstore" "github.com/ipfs/go-ipfs/pin" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" ) @@ -38,7 +37,7 @@ func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, out chan<- interface{}, c stillOkay := FilterPinned(pins, out, cids) for _, c := range stillOkay { - err := blocks.DeleteBlock(key.Key(c.Hash())) + err := blocks.DeleteBlock(c) if err != nil && opts.Force && (err == bs.ErrNotFound || err == ds.ErrNotFound) { // ignore non-existent blocks } else if err != nil { diff --git a/blocks/set/set.go b/blocks/set/set.go index 5a949513673c..f15df587bf85 100644 --- a/blocks/set/set.go +++ b/blocks/set/set.go @@ -4,62 +4,57 @@ package set import ( "github.com/ipfs/go-ipfs/blocks/bloom" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" ) var log = logging.Logger("blockset") // BlockSet represents a mutable set of keyed blocks type BlockSet interface { - AddBlock(key.Key) - RemoveBlock(key.Key) - HasKey(key.Key) bool + AddBlock(*cid.Cid) + RemoveBlock(*cid.Cid) + HasKey(*cid.Cid) bool GetBloomFilter() bloom.Filter - GetKeys() []key.Key + GetKeys() []*cid.Cid } -func SimpleSetFromKeys(keys []key.Key) BlockSet { - sbs := &simpleBlockSet{blocks: make(map[key.Key]struct{})} +func SimpleSetFromKeys(keys []*cid.Cid) BlockSet { + sbs := &simpleBlockSet{blocks: cid.NewSet()} for _, k := range keys { - sbs.blocks[k] = struct{}{} + sbs.AddBlock(k) } return sbs } func NewSimpleBlockSet() BlockSet { - return &simpleBlockSet{blocks: make(map[key.Key]struct{})} + return &simpleBlockSet{blocks: cid.NewSet()} } type simpleBlockSet struct { - blocks map[key.Key]struct{} + blocks *cid.Set } -func (b *simpleBlockSet) AddBlock(k key.Key) { - b.blocks[k] = struct{}{} +func (b *simpleBlockSet) AddBlock(k *cid.Cid) { + b.blocks.Add(k) } -func (b *simpleBlockSet) RemoveBlock(k key.Key) { - delete(b.blocks, k) +func (b *simpleBlockSet) RemoveBlock(k *cid.Cid) { + b.blocks.Remove(k) } -func (b *simpleBlockSet) HasKey(k key.Key) bool { - _, has := b.blocks[k] - return has +func (b *simpleBlockSet) HasKey(k *cid.Cid) bool { + return b.blocks.Has(k) } func (b *simpleBlockSet) GetBloomFilter() bloom.Filter { f := bloom.BasicFilter() - for k := range b.blocks { - f.Add([]byte(k)) + for _, k := range b.blocks.Keys() { + f.Add(k.Bytes()) } return f } -func (b *simpleBlockSet) GetKeys() []key.Key { - var out []key.Key - for k := range b.blocks { - out = append(out, k) - } - return out +func (b *simpleBlockSet) GetKeys() []*cid.Cid { + return b.blocks.Keys() } diff --git a/blocks/set/set_test.go b/blocks/set/set_test.go index cd81d2a34427..f28f91744a96 100644 --- a/blocks/set/set_test.go +++ b/blocks/set/set_test.go @@ -4,7 +4,8 @@ import ( "testing" bu "github.com/ipfs/go-ipfs/blocks/blocksutil" - k "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" + + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" ) const ( @@ -13,15 +14,15 @@ const ( tReAdd ) -func exampleKeys() []k.Key { - res := make([]k.Key, 1<<8) +func exampleKeys() []*cid.Cid { + res := make([]*cid.Cid, 1<<8) gen := bu.NewBlockGenerator() for i := uint64(0); i < 1<<8; i++ { - res[i] = gen.Next().Key() + res[i] = gen.Next().Cid() } return res } -func checkSet(set BlockSet, keySlice []k.Key, t *testing.T) { +func checkSet(set BlockSet, keySlice []*cid.Cid, t *testing.T) { for i, key := range keySlice { if i&tReAdd == 0 { if set.HasKey(key) == false { @@ -69,7 +70,7 @@ func TestSetWorks(t *testing.T) { bloom := set.GetBloomFilter() for _, key := range addedKeys { - if bloom.Find([]byte(key)) == false { + if bloom.Find(key.Bytes()) == false { t.Error("bloom doesn't contain expected key") } } diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index dd274d27e348..3fb47aa0ba90 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -10,7 +10,6 @@ import ( blocks "github.com/ipfs/go-ipfs/blocks" "github.com/ipfs/go-ipfs/blocks/blockstore" exchange "github.com/ipfs/go-ipfs/exchange" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" context "context" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" @@ -30,12 +29,6 @@ type BlockService struct { Exchange exchange.Interface } -// an Object is simply a typed block -type Object interface { - Cid() *cid.Cid - blocks.Block -} - // NewBlockService creates a BlockService with given datastore instance. func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService { if rem == nil { @@ -50,14 +43,14 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService { // AddBlock adds a particular block to the service, Putting it into the datastore. // TODO pass a context into this if the remote.HasBlock is going to remain here. -func (s *BlockService) AddObject(o Object) (*cid.Cid, error) { +func (s *BlockService) AddBlock(o blocks.Block) (*cid.Cid, error) { // TODO: while this is a great optimization, we should think about the // possibility of streaming writes directly to disk. If we can pass this object // all the way down to the datastore without having to 'buffer' its data, // we could implement a `WriteTo` method on it that could do a streaming write // of the content, saving us (probably) considerable memory. c := o.Cid() - has, err := s.Blockstore.Has(key.Key(c.Hash())) + has, err := s.Blockstore.Has(c) if err != nil { return nil, err } @@ -78,13 +71,10 @@ func (s *BlockService) AddObject(o Object) (*cid.Cid, error) { return c, nil } -func (s *BlockService) AddObjects(bs []Object) ([]*cid.Cid, error) { +func (s *BlockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) { var toput []blocks.Block - var toputcids []*cid.Cid for _, b := range bs { - c := b.Cid() - - has, err := s.Blockstore.Has(key.Key(c.Hash())) + has, err := s.Blockstore.Has(b.Cid()) if err != nil { return nil, err } @@ -94,7 +84,6 @@ func (s *BlockService) AddObjects(bs []Object) ([]*cid.Cid, error) { } toput = append(toput, b) - toputcids = append(toputcids, c) } err := s.Blockstore.PutMany(toput) @@ -108,8 +97,7 @@ func (s *BlockService) AddObjects(bs []Object) ([]*cid.Cid, error) { return nil, fmt.Errorf("blockservice is closed (%s)", err) } - c := o.(Object).Cid() // cast is safe, we created these - ks = append(ks, c) + ks = append(ks, o.Cid()) } return ks, nil } @@ -119,7 +107,7 @@ func (s *BlockService) AddObjects(bs []Object) ([]*cid.Cid, error) { func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) { log.Debugf("BlockService GetBlock: '%s'", c) - block, err := s.Blockstore.Get(key.Key(c.Hash())) + block, err := s.Blockstore.Get(c) if err == nil { return block, nil } @@ -128,7 +116,7 @@ func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, // TODO be careful checking ErrNotFound. If the underlying // implementation changes, this will break. log.Debug("Blockservice: Searching bitswap") - blk, err := s.Exchange.GetBlock(ctx, key.Key(c.Hash())) + blk, err := s.Exchange.GetBlock(ctx, c) if err != nil { if err == blockstore.ErrNotFound { return nil, ErrNotFound @@ -153,12 +141,11 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc out := make(chan blocks.Block, 0) go func() { defer close(out) - var misses []key.Key + var misses []*cid.Cid for _, c := range ks { - k := key.Key(c.Hash()) - hit, err := s.Blockstore.Get(k) + hit, err := s.Blockstore.Get(c) if err != nil { - misses = append(misses, k) + misses = append(misses, c) continue } log.Debug("Blockservice: Got data in datastore") @@ -191,8 +178,8 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc } // DeleteBlock deletes a block in the blockservice from the datastore -func (s *BlockService) DeleteObject(o Object) error { - return s.Blockstore.DeleteBlock(o.Key()) +func (s *BlockService) DeleteBlock(o blocks.Block) error { + return s.Blockstore.DeleteBlock(o.Cid()) } func (s *BlockService) Close() error { diff --git a/blockservice/test/blocks_test.go b/blockservice/test/blocks_test.go index 9b86d5a61bd4..00520776341e 100644 --- a/blockservice/test/blocks_test.go +++ b/blockservice/test/blocks_test.go @@ -2,6 +2,7 @@ package bstest import ( "bytes" + "context" "fmt" "testing" "time" @@ -10,9 +11,7 @@ import ( blockstore "github.com/ipfs/go-ipfs/blocks/blockstore" . "github.com/ipfs/go-ipfs/blockservice" offline "github.com/ipfs/go-ipfs/exchange/offline" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" - "context" cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" @@ -44,11 +43,11 @@ func TestBlocks(t *testing.T) { t.Error("Block Multihash and data multihash not equal") } - if o.Key() != key.Key(h) { + if !o.Cid().Equals(cid.NewCidV0(h)) { t.Error("Block key and data multihash key not equal") } - k, err := bs.AddObject(o) + k, err := bs.AddBlock(o) if err != nil { t.Error("failed to add block to BlockService", err) return @@ -66,7 +65,7 @@ func TestBlocks(t *testing.T) { return } - if o.Key() != b2.Key() { + if !o.Cid().Equals(b2.Cid()) { t.Error("Block keys not equal.") } @@ -93,7 +92,7 @@ func TestGetBlocksSequential(t *testing.T) { var cids []*cid.Cid for _, o := range objs { cids = append(cids, o.Cid()) - servs[0].AddObject(o) + servs[0].AddBlock(o) } t.Log("one instance at a time, get blocks concurrently") @@ -102,12 +101,12 @@ func TestGetBlocksSequential(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*50) defer cancel() out := servs[i].GetBlocks(ctx, cids) - gotten := make(map[key.Key]blocks.Block) + gotten := make(map[string]blocks.Block) for blk := range out { - if _, ok := gotten[blk.Key()]; ok { + if _, ok := gotten[blk.Cid().KeyString()]; ok { t.Fatal("Got duplicate block!") } - gotten[blk.Key()] = blk + gotten[blk.Cid().KeyString()] = blk } if len(gotten) != len(objs) { t.Fatalf("Didnt get enough blocks back: %d/%d", len(gotten), len(objs)) diff --git a/core/commands/bitswap.go b/core/commands/bitswap.go index d13a7bab89b4..18b4e12eb4e5 100644 --- a/core/commands/bitswap.go +++ b/core/commands/bitswap.go @@ -8,7 +8,6 @@ import ( cmds "github.com/ipfs/go-ipfs/commands" bitswap "github.com/ipfs/go-ipfs/exchange/bitswap" decision "github.com/ipfs/go-ipfs/exchange/bitswap/decision" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" "gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize" cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" @@ -54,7 +53,7 @@ var unwantCmd = &cmds.Command{ return } - var ks []key.Key + var ks []*cid.Cid for _, arg := range req.Arguments() { c, err := cid.Decode(arg) if err != nil { @@ -62,7 +61,7 @@ var unwantCmd = &cmds.Command{ return } - ks = append(ks, key.Key(c.Hash())) + ks = append(ks, c) } bs.CancelWants(ks) @@ -164,7 +163,7 @@ var bitswapStatCmd = &cmds.Command{ fmt.Fprintf(buf, "\tdup data received: %s\n", humanize.Bytes(out.DupDataReceived)) fmt.Fprintf(buf, "\twantlist [%d keys]\n", len(out.Wantlist)) for _, k := range out.Wantlist { - fmt.Fprintf(buf, "\t\t%s\n", k.B58String()) + fmt.Fprintf(buf, "\t\t%s\n", k.String()) } fmt.Fprintf(buf, "\tpartners [%d]\n", len(out.Peers)) for _, p := range out.Peers { diff --git a/core/commands/block.go b/core/commands/block.go index 895bab71aa8e..eb63d63646e0 100644 --- a/core/commands/block.go +++ b/core/commands/block.go @@ -66,7 +66,7 @@ on raw ipfs blocks. It outputs the following to stdout: } res.SetOutput(&BlockStat{ - Key: b.Key().B58String(), + Key: b.Cid().String(), Size: len(b.RawData()), }) }, @@ -140,9 +140,9 @@ It reads from stdin, and is a base58 encoded multihash. } b := blocks.NewBlock(data) - log.Debugf("BlockPut key: '%q'", b.Key()) + log.Debugf("BlockPut key: '%q'", b.Cid()) - k, err := n.Blocks.AddObject(b) + k, err := n.Blocks.AddBlock(b) if err != nil { res.SetError(err, cmds.ErrNormal) return @@ -182,7 +182,7 @@ func getBlockForKey(req cmds.Request, skey string) (blocks.Block, error) { return nil, err } - log.Debugf("ipfs block: got block with key: %q", b.Key()) + log.Debugf("ipfs block: got block with key: %s", b.Cid()) return b, nil } diff --git a/core/commands/dht.go b/core/commands/dht.go index d9311faa559b..8bbdfadb64ee 100644 --- a/core/commands/dht.go +++ b/core/commands/dht.go @@ -17,7 +17,6 @@ import ( routing "gx/ipfs/QmXKuGUzLcgoQvp8M6ZEJzupWUNmx8NoqXEbYLMDjL4rjj/go-libp2p-routing" notif "gx/ipfs/QmXKuGUzLcgoQvp8M6ZEJzupWUNmx8NoqXEbYLMDjL4rjj/go-libp2p-routing/notifications" pstore "gx/ipfs/QmXXCcQ7CLg5a81Ui9TTR35QcR4y7ZyihxwfjqaHfUVcVo/go-libp2p-peerstore" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" @@ -276,7 +275,7 @@ var provideRefDhtCmd = &cmds.Command{ return } - has, err := n.Blockstore.Has(key.Key(c.Hash())) + has, err := n.Blockstore.Has(c) if err != nil { res.SetError(err, cmds.ErrNormal) return diff --git a/core/commands/files/files.go b/core/commands/files/files.go index 72a3dc8a8304..4809f3d7d460 100644 --- a/core/commands/files/files.go +++ b/core/commands/files/files.go @@ -609,7 +609,12 @@ stat' on the file or any of its ancestors. return } - defer wfd.Close() + defer func() { + err := wfd.Close() + if err != nil { + res.SetError(err, cmds.ErrNormal) + } + }() if trunc { if err := wfd.Truncate(0); err != nil { diff --git a/core/commands/ls.go b/core/commands/ls.go index bf95c8cdc1c5..de765d1b5e4d 100644 --- a/core/commands/ls.go +++ b/core/commands/ls.go @@ -12,7 +12,8 @@ import ( path "github.com/ipfs/go-ipfs/path" unixfs "github.com/ipfs/go-ipfs/unixfs" unixfspb "github.com/ipfs/go-ipfs/unixfs/pb" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" + + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" ) type LsLink struct { @@ -90,7 +91,7 @@ The JSON output contains type information. for j, link := range dagnode.Links { var linkNode *merkledag.Node t := unixfspb.Data_DataType(-1) - linkKey := key.Key(link.Hash) + linkKey := cid.NewCidV0(link.Hash) if ok, err := node.Blockstore.Has(linkKey); ok && err == nil { b, err := node.Blockstore.Get(linkKey) if err != nil { diff --git a/core/commands/pubsub.go b/core/commands/pubsub.go index 4f7c715f4129..5eb7794aff88 100644 --- a/core/commands/pubsub.go +++ b/core/commands/pubsub.go @@ -106,7 +106,7 @@ To use, the daemon must be run with '--enable-pubsub-experiment'. if discover { go func() { blk := blocks.NewBlock([]byte("floodsub:" + topic)) - cid, err := n.Blocks.AddObject(blk) + cid, err := n.Blocks.AddBlock(blk) if err != nil { log.Error("pubsub discovery: ", err) return diff --git a/core/commands/refs.go b/core/commands/refs.go index 467bf9fad391..38c1e37348d9 100644 --- a/core/commands/refs.go +++ b/core/commands/refs.go @@ -2,6 +2,7 @@ package commands import ( "bytes" + "context" "errors" "io" "strings" @@ -10,16 +11,14 @@ import ( "github.com/ipfs/go-ipfs/core" dag "github.com/ipfs/go-ipfs/merkledag" path "github.com/ipfs/go-ipfs/path" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" - context "context" cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" ) // KeyList is a general type for outputting lists of keys type KeyList struct { - Keys []key.Key + Keys []*cid.Cid } // KeyListTextMarshaler outputs a KeyList as plaintext, one key per line @@ -27,7 +26,7 @@ func KeyListTextMarshaler(res cmds.Response) (io.Reader, error) { output := res.Output().(*KeyList) buf := new(bytes.Buffer) for _, key := range output.Keys { - buf.WriteString(key.B58String() + "\n") + buf.WriteString(key.String() + "\n") } return buf, nil } @@ -160,7 +159,7 @@ Displays the hashes of all local objects. defer close(out) for k := range allKeys { - out <- &RefWrapper{Ref: k.B58String()} + out <- &RefWrapper{Ref: k.String()} } }() }, diff --git a/core/commands/repo.go b/core/commands/repo.go index cb81482c7bc0..8767f924661c 100644 --- a/core/commands/repo.go +++ b/core/commands/repo.go @@ -95,7 +95,7 @@ order to reclaim hard disk space. buf := new(bytes.Buffer) if quiet { - buf = bytes.NewBufferString(string(obj.Key) + "\n") + buf = bytes.NewBufferString(obj.Key.String() + "\n") } else { buf = bytes.NewBufferString(fmt.Sprintf("removed %s\n", obj.Key)) } diff --git a/core/corerepo/gc.go b/core/corerepo/gc.go index 148515b754de..7c5f0e87e780 100644 --- a/core/corerepo/gc.go +++ b/core/corerepo/gc.go @@ -1,6 +1,7 @@ package corerepo import ( + "context" "errors" "time" @@ -8,9 +9,7 @@ import ( mfs "github.com/ipfs/go-ipfs/mfs" gc "github.com/ipfs/go-ipfs/pin/gc" repo "github.com/ipfs/go-ipfs/repo" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" - context "context" humanize "gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" @@ -21,7 +20,7 @@ var log = logging.Logger("corerepo") var ErrMaxStorageExceeded = errors.New("Maximum storage limit exceeded. Maybe unpin some files?") type KeyRemoved struct { - Key key.Key + Key *cid.Cid } type GC struct { diff --git a/core/coreunix/add_test.go b/core/coreunix/add_test.go index f2319bc2db2e..c1c0ec2e1692 100644 --- a/core/coreunix/add_test.go +++ b/core/coreunix/add_test.go @@ -14,7 +14,6 @@ import ( "github.com/ipfs/go-ipfs/repo" "github.com/ipfs/go-ipfs/repo/config" "github.com/ipfs/go-ipfs/thirdparty/testutil" - "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" "context" cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" @@ -94,7 +93,7 @@ func TestAddGCLive(t *testing.T) { t.Fatal("add shouldnt complete yet") } - var gcout <-chan key.Key + var gcout <-chan *cid.Cid gcstarted := make(chan struct{}) go func() { defer close(gcstarted) @@ -139,7 +138,7 @@ func TestAddGCLive(t *testing.T) { } for k := range gcout { - if _, ok := addedHashes[k.B58String()]; ok { + if _, ok := addedHashes[k.String()]; ok { t.Fatal("gc'ed a hash we just added") } } diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index f832e0787338..600ba076dc90 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -3,13 +3,12 @@ package bitswap import ( + "context" "errors" "math" "sync" "time" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" - blocks "github.com/ipfs/go-ipfs/blocks" blockstore "github.com/ipfs/go-ipfs/blocks/blockstore" exchange "github.com/ipfs/go-ipfs/exchange" @@ -19,12 +18,12 @@ import ( notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications" flags "github.com/ipfs/go-ipfs/flags" "github.com/ipfs/go-ipfs/thirdparty/delay" - loggables "gx/ipfs/QmTMy4hVSY28DdwJ9kBz6y7q6MuioFzPcpM3Ma3aPjo1i3/go-libp2p-loggables" - context "context" process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" + loggables "gx/ipfs/QmTMy4hVSY28DdwJ9kBz6y7q6MuioFzPcpM3Ma3aPjo1i3/go-libp2p-loggables" + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" ) @@ -90,8 +89,8 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, network: network, findKeys: make(chan *blockRequest, sizeBatchRequestChan), process: px, - newBlocks: make(chan key.Key, HasBlockBufferSize), - provideKeys: make(chan key.Key, provideKeysBufferSize), + newBlocks: make(chan *cid.Cid, HasBlockBufferSize), + provideKeys: make(chan *cid.Cid, provideKeysBufferSize), wm: NewWantManager(ctx, network), } go bs.wm.Run() @@ -137,9 +136,9 @@ type Bitswap struct { process process.Process - newBlocks chan key.Key + newBlocks chan *cid.Cid - provideKeys chan key.Key + provideKeys chan *cid.Cid counterLk sync.Mutex blocksRecvd int @@ -148,14 +147,15 @@ type Bitswap struct { } type blockRequest struct { - Key key.Key + Cid *cid.Cid Ctx context.Context } // GetBlock attempts to retrieve a particular block from peers within the // deadline enforced by the context. -func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (blocks.Block, error) { - if k == "" { +func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) { + if k == nil { + log.Error("nil cid in GetBlock") return nil, blockstore.ErrNotFound } @@ -165,18 +165,17 @@ func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (blocks.Block, er // functions called by this one. Otherwise those functions won't return // when this context's cancel func is executed. This is difficult to // enforce. May this comment keep you safe. - ctx, cancelFunc := context.WithCancel(parent) ctx = logging.ContextWithLoggable(ctx, loggables.Uuid("GetBlockRequest")) - log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k) - defer log.Event(ctx, "Bitswap.GetBlockRequest.End", &k) + log.Event(ctx, "Bitswap.GetBlockRequest.Start", k) + defer log.Event(ctx, "Bitswap.GetBlockRequest.End", k) defer func() { cancelFunc() }() - promise, err := bs.GetBlocks(ctx, []key.Key{k}) + promise, err := bs.GetBlocks(ctx, []*cid.Cid{k}) if err != nil { return nil, err } @@ -197,10 +196,10 @@ func (bs *Bitswap) GetBlock(parent context.Context, k key.Key) (blocks.Block, er } } -func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key { - var out []key.Key +func (bs *Bitswap) WantlistForPeer(p peer.ID) []*cid.Cid { + var out []*cid.Cid for _, e := range bs.engine.WantlistForPeer(p) { - out = append(out, e.Key) + out = append(out, e.Cid) } return out } @@ -216,7 +215,7 @@ func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt { // NB: Your request remains open until the context expires. To conserve // resources, provide a context with a reasonably short deadline (ie. not one // that lasts throughout the lifetime of the server) -func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks.Block, error) { +func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks.Block, error) { if len(keys) == 0 { out := make(chan blocks.Block) close(out) @@ -231,7 +230,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks promise := bs.notifications.Subscribe(ctx, keys...) for _, k := range keys { - log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k) + log.Event(ctx, "Bitswap.GetBlockRequest.Start", k) } bs.wm.WantBlocks(ctx, keys) @@ -240,13 +239,13 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks // be able to provide for all keys. This currently holds true in most // every situation. Later, this assumption may not hold as true. req := &blockRequest{ - Key: keys[0], + Cid: keys[0], Ctx: ctx, } - remaining := make(map[key.Key]struct{}) + remaining := cid.NewSet() for _, k := range keys { - remaining[k] = struct{}{} + remaining.Add(k) } out := make(chan blocks.Block) @@ -255,11 +254,8 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks defer cancel() defer close(out) defer func() { - var toCancel []key.Key - for k, _ := range remaining { - toCancel = append(toCancel, k) - } - bs.CancelWants(toCancel) + // can't just defer this call on its own, arguments are resolved *when* the defer is created + bs.CancelWants(remaining.Keys()) }() for { select { @@ -268,7 +264,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks return } - delete(remaining, blk.Key()) + remaining.Remove(blk.Cid()) select { case out <- blk: case <-ctx.Done(): @@ -289,8 +285,8 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks } // CancelWant removes a given key from the wantlist -func (bs *Bitswap) CancelWants(keys []key.Key) { - bs.wm.CancelWants(keys) +func (bs *Bitswap) CancelWants(cids []*cid.Cid) { + bs.wm.CancelWants(cids) } // HasBlock announces the existance of a block to this bitswap service. The @@ -318,7 +314,7 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error { bs.engine.AddBlock(blk) select { - case bs.newBlocks <- blk.Key(): + case bs.newBlocks <- blk.Cid(): // send block off to be reprovided case <-bs.process.Closing(): return bs.process.Close() @@ -340,13 +336,13 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg } // quickly send out cancels, reduces chances of duplicate block receives - var keys []key.Key + var keys []*cid.Cid for _, block := range iblocks { - if _, found := bs.wm.wl.Contains(block.Key()); !found { + if _, found := bs.wm.wl.Contains(block.Cid()); !found { log.Infof("received un-asked-for %s from %s", block, p) continue } - keys = append(keys, block.Key()) + keys = append(keys, block.Cid()) } bs.wm.CancelWants(keys) @@ -360,8 +356,8 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg return // ignore error, is either logged previously, or ErrAlreadyHaveBlock } - k := b.Key() - log.Event(ctx, "Bitswap.GetBlockRequest.End", &k) + k := b.Cid() + log.Event(ctx, "Bitswap.GetBlockRequest.End", k) log.Debugf("got block %s from %s", b, p) if err := bs.HasBlock(b); err != nil { @@ -378,7 +374,7 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) error { bs.counterLk.Lock() defer bs.counterLk.Unlock() bs.blocksRecvd++ - has, err := bs.blockstore.Has(b.Key()) + has, err := bs.blockstore.Has(b.Cid()) if err != nil { log.Infof("blockstore.Has error: %s", err) return err @@ -415,10 +411,10 @@ func (bs *Bitswap) Close() error { return bs.process.Close() } -func (bs *Bitswap) GetWantlist() []key.Key { - var out []key.Key +func (bs *Bitswap) GetWantlist() []*cid.Cid { + var out []*cid.Cid for _, e := range bs.wm.wl.Entries() { - out = append(out, e.Key) + out = append(out, e.Cid) } return out } diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index e15e92df0489..2ec9ef5a1da9 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -2,21 +2,21 @@ package bitswap import ( "bytes" + "context" "sync" "testing" "time" - context "context" - detectrace "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race" - travis "github.com/ipfs/go-ipfs/thirdparty/testutil/ci/travis" - blocks "github.com/ipfs/go-ipfs/blocks" blockstore "github.com/ipfs/go-ipfs/blocks/blockstore" blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil" tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet" mockrouting "github.com/ipfs/go-ipfs/routing/mock" delay "github.com/ipfs/go-ipfs/thirdparty/delay" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" + travis "github.com/ipfs/go-ipfs/thirdparty/testutil/ci/travis" + + detectrace "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race" + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" p2ptestutil "gx/ipfs/QmcRa2qn6iCmap9bjp8jAwkvYAq13AUfxdY3rrYiaJbLum/go-libp2p/p2p/test/util" ) @@ -38,7 +38,7 @@ func TestClose(t *testing.T) { bitswap := sesgen.Next() bitswap.Exchange.Close() - bitswap.Exchange.GetBlock(context.Background(), block.Key()) + bitswap.Exchange.GetBlock(context.Background(), block.Cid()) } func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this @@ -57,7 +57,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) defer cancel() - _, err := solo.Exchange.GetBlock(ctx, block.Key()) + _, err := solo.Exchange.GetBlock(ctx, block.Cid()) if err != context.DeadlineExceeded { t.Fatal("Expected DeadlineExceeded error") @@ -84,7 +84,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key()) + received, err := wantsBlock.Exchange.GetBlock(ctx, block.Cid()) if err != nil { t.Log(err) t.Fatal("Expected to succeed") @@ -176,10 +176,10 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { } } - var blkeys []key.Key + var blkeys []*cid.Cid first := instances[0] for _, b := range blocks { - blkeys = append(blkeys, b.Key()) + blkeys = append(blkeys, b.Cid()) first.Exchange.HasBlock(b) } @@ -216,7 +216,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { for _, inst := range instances { for _, b := range blocks { - if _, err := inst.Blockstore().Get(b.Key()); err != nil { + if _, err := inst.Blockstore().Get(b.Cid()); err != nil { t.Fatal(err) } } @@ -224,8 +224,8 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { } func getOrFail(bitswap Instance, b blocks.Block, t *testing.T, wg *sync.WaitGroup) { - if _, err := bitswap.Blockstore().Get(b.Key()); err != nil { - _, err := bitswap.Exchange.GetBlock(context.Background(), b.Key()) + if _, err := bitswap.Blockstore().Get(b.Cid()); err != nil { + _, err := bitswap.Exchange.GetBlock(context.Background(), b.Cid()) if err != nil { t.Fatal(err) } @@ -260,7 +260,7 @@ func TestSendToWantingPeer(t *testing.T) { // peerA requests and waits for block alpha ctx, cancel := context.WithTimeout(context.Background(), waitTime) defer cancel() - alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []key.Key{alpha.Key()}) + alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []*cid.Cid{alpha.Cid()}) if err != nil { t.Fatal(err) } @@ -277,7 +277,7 @@ func TestSendToWantingPeer(t *testing.T) { t.Fatal("context timed out and broke promise channel!") } - if blkrecvd.Key() != alpha.Key() { + if !blkrecvd.Cid().Equals(alpha.Cid()) { t.Fatal("Wrong block!") } @@ -292,7 +292,7 @@ func TestEmptyKey(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - _, err := bs.GetBlock(ctx, key.Key("")) + _, err := bs.GetBlock(ctx, nil) if err != blockstore.ErrNotFound { t.Error("empty str key should return ErrNotFound") } @@ -315,7 +315,7 @@ func TestBasicBitswap(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key()) + blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid()) if err != nil { t.Fatal(err) } @@ -341,7 +341,7 @@ func TestDoubleGet(t *testing.T) { blocks := bg.Blocks(1) ctx1, cancel1 := context.WithCancel(context.Background()) - blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []key.Key{blocks[0].Key()}) + blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []*cid.Cid{blocks[0].Cid()}) if err != nil { t.Fatal(err) } @@ -349,7 +349,7 @@ func TestDoubleGet(t *testing.T) { ctx2, cancel2 := context.WithCancel(context.Background()) defer cancel2() - blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []key.Key{blocks[0].Key()}) + blkch2, err := instances[1].Exchange.GetBlocks(ctx2, []*cid.Cid{blocks[0].Cid()}) if err != nil { t.Fatal(err) } @@ -396,9 +396,9 @@ func TestWantlistCleanup(t *testing.T) { bswap := instances.Exchange blocks := bg.Blocks(20) - var keys []key.Key + var keys []*cid.Cid for _, b := range blocks { - keys = append(keys, b.Key()) + keys = append(keys, b.Cid()) } ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50) diff --git a/exchange/bitswap/decision/bench_test.go b/exchange/bitswap/decision/bench_test.go index 8a8fd3db13e6..cc429278c3a0 100644 --- a/exchange/bitswap/decision/bench_test.go +++ b/exchange/bitswap/decision/bench_test.go @@ -1,12 +1,14 @@ package decision import ( + "fmt" "math" "testing" "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" "github.com/ipfs/go-ipfs/thirdparty/testutil" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" + u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" ) @@ -21,6 +23,8 @@ func BenchmarkTaskQueuePush(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - q.Push(&wantlist.Entry{Key: key.Key(i), Priority: math.MaxInt32}, peers[i%len(peers)]) + c := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i)))) + + q.Push(&wantlist.Entry{Cid: c, Priority: math.MaxInt32}, peers[i%len(peers)]) } } diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 3eddeff86071..d494554d0538 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -169,8 +169,9 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { // with a task in hand, we're ready to prepare the envelope... - block, err := e.bs.Get(nextTask.Entry.Key) + block, err := e.bs.Get(nextTask.Entry.Cid) if err != nil { + log.Errorf("tried to execute a task and errored fetching block: %s", err) // If we don't have the block, don't hold that against the peer // make sure to update that the task has been 'completed' nextTask.Done() @@ -233,13 +234,13 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { for _, entry := range m.Wantlist() { if entry.Cancel { - log.Debugf("%s cancel %s", p, entry.Key) - l.CancelWant(entry.Key) - e.peerRequestQueue.Remove(entry.Key, p) + log.Debugf("%s cancel %s", p, entry.Cid) + l.CancelWant(entry.Cid) + e.peerRequestQueue.Remove(entry.Cid, p) } else { - log.Debugf("wants %s - %d", entry.Key, entry.Priority) - l.Wants(entry.Key, entry.Priority) - if exists, err := e.bs.Has(entry.Key); err == nil && exists { + log.Debugf("wants %s - %d", entry.Cid, entry.Priority) + l.Wants(entry.Cid, entry.Priority) + if exists, err := e.bs.Has(entry.Cid); err == nil && exists { e.peerRequestQueue.Push(entry.Entry, p) newWorkExists = true } @@ -258,7 +259,7 @@ func (e *Engine) addBlock(block blocks.Block) { for _, l := range e.ledgerMap { l.lk.Lock() - if entry, ok := l.WantListContains(block.Key()); ok { + if entry, ok := l.WantListContains(block.Cid()); ok { e.peerRequestQueue.Push(entry, l.Partner) work = true } @@ -287,8 +288,8 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error { l := e.findOrCreate(p) for _, block := range m.Blocks() { l.SentBytes(len(block.RawData())) - l.wantList.Remove(block.Key()) - e.peerRequestQueue.Remove(block.Key(), p) + l.wantList.Remove(block.Cid()) + e.peerRequestQueue.Remove(block.Cid(), p) } return nil diff --git a/exchange/bitswap/decision/engine_test.go b/exchange/bitswap/decision/engine_test.go index 91dbc8fcd42d..d2d4fa0caf6e 100644 --- a/exchange/bitswap/decision/engine_test.go +++ b/exchange/bitswap/decision/engine_test.go @@ -167,7 +167,7 @@ func partnerWants(e *Engine, keys []string, partner peer.ID) { add := message.New(false) for i, letter := range keys { block := blocks.NewBlock([]byte(letter)) - add.AddEntry(block.Key(), math.MaxInt32-i) + add.AddEntry(block.Cid(), math.MaxInt32-i) } e.MessageReceived(partner, add) } @@ -176,7 +176,7 @@ func partnerCancels(e *Engine, keys []string, partner peer.ID) { cancels := message.New(false) for _, k := range keys { block := blocks.NewBlock([]byte(k)) - cancels.Cancel(block.Key()) + cancels.Cancel(block.Cid()) } e.MessageReceived(partner, cancels) } @@ -187,7 +187,7 @@ func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error { envelope := <-next received := envelope.Block expected := blocks.NewBlock([]byte(k)) - if received.Key() != expected.Key() { + if !received.Cid().Equals(expected.Cid()) { return errors.New(fmt.Sprintln("received", string(received.RawData()), "expected", string(expected.RawData()))) } } diff --git a/exchange/bitswap/decision/ledger.go b/exchange/bitswap/decision/ledger.go index b5217cf2b50b..b4b46ef113b1 100644 --- a/exchange/bitswap/decision/ledger.go +++ b/exchange/bitswap/decision/ledger.go @@ -5,19 +5,16 @@ import ( "time" wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" + + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" ) -// keySet is just a convenient alias for maps of keys, where we only care -// access/lookups. -type keySet map[key.Key]struct{} - func newLedger(p peer.ID) *ledger { return &ledger{ wantList: wl.New(), Partner: p, - sentToPeer: make(map[key.Key]time.Time), + sentToPeer: make(map[string]time.Time), } } @@ -44,7 +41,7 @@ type ledger struct { // sentToPeer is a set of keys to ensure we dont send duplicate blocks // to a given peer - sentToPeer map[key.Key]time.Time + sentToPeer map[string]time.Time lk sync.Mutex } @@ -78,16 +75,16 @@ func (l *ledger) ReceivedBytes(n int) { l.Accounting.BytesRecv += uint64(n) } -func (l *ledger) Wants(k key.Key, priority int) { +func (l *ledger) Wants(k *cid.Cid, priority int) { log.Debugf("peer %s wants %s", l.Partner, k) l.wantList.Add(k, priority) } -func (l *ledger) CancelWant(k key.Key) { +func (l *ledger) CancelWant(k *cid.Cid) { l.wantList.Remove(k) } -func (l *ledger) WantListContains(k key.Key) (*wl.Entry, bool) { +func (l *ledger) WantListContains(k *cid.Cid) (*wl.Entry, bool) { return l.wantList.Contains(k) } diff --git a/exchange/bitswap/decision/peer_request_queue.go b/exchange/bitswap/decision/peer_request_queue.go index 732f0d4d466b..742bcd6ffeed 100644 --- a/exchange/bitswap/decision/peer_request_queue.go +++ b/exchange/bitswap/decision/peer_request_queue.go @@ -6,7 +6,8 @@ import ( wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" pq "github.com/ipfs/go-ipfs/thirdparty/pq" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" + + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" ) @@ -14,7 +15,7 @@ type peerRequestQueue interface { // Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty. Pop() *peerRequestTask Push(entry *wantlist.Entry, to peer.ID) - Remove(k key.Key, p peer.ID) + Remove(k *cid.Cid, p peer.ID) // NB: cannot expose simply expose taskQueue.Len because trashed elements // may exist. These trashed elements should not contribute to the count. @@ -57,12 +58,11 @@ func (tl *prq) Push(entry *wantlist.Entry, to peer.ID) { partner.activelk.Lock() defer partner.activelk.Unlock() - _, ok = partner.activeBlocks[entry.Key] - if ok { + if partner.activeBlocks.Has(entry.Cid) { return } - if task, ok := tl.taskMap[taskKey(to, entry.Key)]; ok { + if task, ok := tl.taskMap[taskKey(to, entry.Cid)]; ok { task.Entry.Priority = entry.Priority partner.taskQueue.Update(task.index) return @@ -74,7 +74,7 @@ func (tl *prq) Push(entry *wantlist.Entry, to peer.ID) { created: time.Now(), Done: func() { tl.lock.Lock() - partner.TaskDone(entry.Key) + partner.TaskDone(entry.Cid) tl.pQueue.Update(partner.Index()) tl.lock.Unlock() }, @@ -104,7 +104,7 @@ func (tl *prq) Pop() *peerRequestTask { continue // discarding tasks that have been removed } - partner.StartTask(out.Entry.Key) + partner.StartTask(out.Entry.Cid) partner.requests-- break // and return |out| } @@ -114,7 +114,7 @@ func (tl *prq) Pop() *peerRequestTask { } // Remove removes a task from the queue -func (tl *prq) Remove(k key.Key, p peer.ID) { +func (tl *prq) Remove(k *cid.Cid, p peer.ID) { tl.lock.Lock() t, ok := tl.taskMap[taskKey(p, k)] if ok { @@ -181,7 +181,7 @@ type peerRequestTask struct { // Key uniquely identifies a task. func (t *peerRequestTask) Key() string { - return taskKey(t.Target, t.Entry.Key) + return taskKey(t.Target, t.Entry.Cid) } // Index implements pq.Elem @@ -195,8 +195,8 @@ func (t *peerRequestTask) SetIndex(i int) { } // taskKey returns a key that uniquely identifies a task. -func taskKey(p peer.ID, k key.Key) string { - return string(p) + string(k) +func taskKey(p peer.ID, k *cid.Cid) string { + return string(p) + k.KeyString() } // FIFO is a basic task comparator that returns tasks in the order created. @@ -226,7 +226,7 @@ type activePartner struct { activelk sync.Mutex active int - activeBlocks map[key.Key]struct{} + activeBlocks *cid.Set // requests is the number of blocks this peer is currently requesting // request need not be locked around as it will only be modified under @@ -245,7 +245,7 @@ type activePartner struct { func newActivePartner() *activePartner { return &activePartner{ taskQueue: pq.New(wrapCmp(V1)), - activeBlocks: make(map[key.Key]struct{}), + activeBlocks: cid.NewSet(), } } @@ -281,17 +281,17 @@ func partnerCompare(a, b pq.Elem) bool { } // StartTask signals that a task was started for this partner -func (p *activePartner) StartTask(k key.Key) { +func (p *activePartner) StartTask(k *cid.Cid) { p.activelk.Lock() - p.activeBlocks[k] = struct{}{} + p.activeBlocks.Add(k) p.active++ p.activelk.Unlock() } // TaskDone signals that a task was completed for this partner -func (p *activePartner) TaskDone(k key.Key) { +func (p *activePartner) TaskDone(k *cid.Cid) { p.activelk.Lock() - delete(p.activeBlocks, k) + p.activeBlocks.Remove(k) p.active-- if p.active < 0 { panic("more tasks finished than started!") diff --git a/exchange/bitswap/decision/peer_request_queue_test.go b/exchange/bitswap/decision/peer_request_queue_test.go index 22a5f164d889..6a82d3f20eb3 100644 --- a/exchange/bitswap/decision/peer_request_queue_test.go +++ b/exchange/bitswap/decision/peer_request_queue_test.go @@ -1,6 +1,7 @@ package decision import ( + "fmt" "math" "math/rand" "sort" @@ -9,7 +10,8 @@ import ( "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" "github.com/ipfs/go-ipfs/thirdparty/testutil" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" + u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" ) func TestPushPop(t *testing.T) { @@ -41,10 +43,13 @@ func TestPushPop(t *testing.T) { for _, index := range rand.Perm(len(alphabet)) { // add blocks for all letters letter := alphabet[index] t.Log(partner.String()) - prq.Push(&wantlist.Entry{Key: key.Key(letter), Priority: math.MaxInt32 - index}, partner) + + c := cid.NewCidV0(u.Hash([]byte(letter))) + prq.Push(&wantlist.Entry{Cid: c, Priority: math.MaxInt32 - index}, partner) } for _, consonant := range consonants { - prq.Remove(key.Key(consonant), partner) + c := cid.NewCidV0(u.Hash([]byte(consonant))) + prq.Remove(c, partner) } prq.fullThaw() @@ -56,12 +61,13 @@ func TestPushPop(t *testing.T) { break } - out = append(out, string(received.Entry.Key)) + out = append(out, received.Entry.Cid.String()) } // Entries popped should already be in correct order for i, expected := range vowels { - if out[i] != expected { + exp := cid.NewCidV0(u.Hash([]byte(expected))).String() + if out[i] != exp { t.Fatal("received", out[i], "expected", expected) } } @@ -78,10 +84,11 @@ func TestPeerRepeats(t *testing.T) { // Have each push some blocks for i := 0; i < 5; i++ { - prq.Push(&wantlist.Entry{Key: key.Key(i)}, a) - prq.Push(&wantlist.Entry{Key: key.Key(i)}, b) - prq.Push(&wantlist.Entry{Key: key.Key(i)}, c) - prq.Push(&wantlist.Entry{Key: key.Key(i)}, d) + elcid := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i)))) + prq.Push(&wantlist.Entry{Cid: elcid}, a) + prq.Push(&wantlist.Entry{Cid: elcid}, b) + prq.Push(&wantlist.Entry{Cid: elcid}, c) + prq.Push(&wantlist.Entry{Cid: elcid}, d) } // now, pop off four entries, there should be one from each diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go index 2c1947cfee99..5dc7be1bd063 100644 --- a/exchange/bitswap/message/message.go +++ b/exchange/bitswap/message/message.go @@ -1,16 +1,17 @@ package message import ( + "fmt" "io" blocks "github.com/ipfs/go-ipfs/blocks" pb "github.com/ipfs/go-ipfs/exchange/bitswap/message/pb" wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" - inet "gx/ipfs/QmdXimY9QHaasZmw6hWojWnCJvfgxETjZQfg9g6ZrA9wMX/go-libp2p-net" ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io" proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto" + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" + inet "gx/ipfs/QmdXimY9QHaasZmw6hWojWnCJvfgxETjZQfg9g6ZrA9wMX/go-libp2p-net" ) // TODO move message.go into the bitswap package @@ -25,9 +26,9 @@ type BitSwapMessage interface { Blocks() []blocks.Block // AddEntry adds an entry to the Wantlist. - AddEntry(key key.Key, priority int) + AddEntry(key *cid.Cid, priority int) - Cancel(key key.Key) + Cancel(key *cid.Cid) Empty() bool @@ -47,8 +48,8 @@ type Exportable interface { type impl struct { full bool - wantlist map[key.Key]Entry - blocks map[key.Key]blocks.Block + wantlist map[string]Entry + blocks map[string]blocks.Block } func New(full bool) BitSwapMessage { @@ -57,8 +58,8 @@ func New(full bool) BitSwapMessage { func newMsg(full bool) *impl { return &impl{ - blocks: make(map[key.Key]blocks.Block), - wantlist: make(map[key.Key]Entry), + blocks: make(map[string]blocks.Block), + wantlist: make(map[string]Entry), full: full, } } @@ -68,16 +69,20 @@ type Entry struct { Cancel bool } -func newMessageFromProto(pbm pb.Message) BitSwapMessage { +func newMessageFromProto(pbm pb.Message) (BitSwapMessage, error) { m := newMsg(pbm.GetWantlist().GetFull()) for _, e := range pbm.GetWantlist().GetEntries() { - m.addEntry(key.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel()) + c, err := cid.Cast([]byte(e.GetBlock())) + if err != nil { + return nil, fmt.Errorf("incorrectly formatted cid in wantlist: %s", err) + } + m.addEntry(c, int(e.GetPriority()), e.GetCancel()) } for _, d := range pbm.GetBlocks() { b := blocks.NewBlock(d) m.AddBlock(b) } - return m + return m, nil } func (m *impl) Full() bool { @@ -104,16 +109,17 @@ func (m *impl) Blocks() []blocks.Block { return bs } -func (m *impl) Cancel(k key.Key) { - delete(m.wantlist, k) +func (m *impl) Cancel(k *cid.Cid) { + delete(m.wantlist, k.KeyString()) m.addEntry(k, 0, true) } -func (m *impl) AddEntry(k key.Key, priority int) { +func (m *impl) AddEntry(k *cid.Cid, priority int) { m.addEntry(k, priority, false) } -func (m *impl) addEntry(k key.Key, priority int, cancel bool) { +func (m *impl) addEntry(c *cid.Cid, priority int, cancel bool) { + k := c.KeyString() e, exists := m.wantlist[k] if exists { e.Priority = priority @@ -121,7 +127,7 @@ func (m *impl) addEntry(k key.Key, priority int, cancel bool) { } else { m.wantlist[k] = Entry{ Entry: &wantlist.Entry{ - Key: k, + Cid: c, Priority: priority, }, Cancel: cancel, @@ -130,7 +136,7 @@ func (m *impl) addEntry(k key.Key, priority int, cancel bool) { } func (m *impl) AddBlock(b blocks.Block) { - m.blocks[b.Key()] = b + m.blocks[b.Cid().KeyString()] = b } func FromNet(r io.Reader) (BitSwapMessage, error) { @@ -144,8 +150,7 @@ func FromPBReader(pbr ggio.Reader) (BitSwapMessage, error) { return nil, err } - m := newMessageFromProto(*pb) - return m, nil + return newMessageFromProto(*pb) } func (m *impl) ToProto() *pb.Message { @@ -153,7 +158,7 @@ func (m *impl) ToProto() *pb.Message { pbm.Wantlist = new(pb.Message_Wantlist) for _, e := range m.wantlist { pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{ - Block: proto.String(string(e.Key)), + Block: proto.String(e.Cid.KeyString()), Priority: proto.Int32(int32(e.Priority)), Cancel: proto.Bool(e.Cancel), }) @@ -176,7 +181,7 @@ func (m *impl) ToNet(w io.Writer) error { func (m *impl) Loggable() map[string]interface{} { var blocks []string for _, v := range m.blocks { - blocks = append(blocks, v.Key().B58String()) + blocks = append(blocks, v.Cid().String()) } return map[string]interface{}{ "blocks": blocks, diff --git a/exchange/bitswap/message/message_test.go b/exchange/bitswap/message/message_test.go index 56609c4348b7..d516093b5fdb 100644 --- a/exchange/bitswap/message/message_test.go +++ b/exchange/bitswap/message/message_test.go @@ -8,13 +8,18 @@ import ( blocks "github.com/ipfs/go-ipfs/blocks" pb "github.com/ipfs/go-ipfs/exchange/bitswap/message/pb" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" + u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" ) +func mkFakeCid(s string) *cid.Cid { + return cid.NewCidV0(u.Hash([]byte(s))) +} + func TestAppendWanted(t *testing.T) { - const str = "foo" + str := mkFakeCid("foo") m := New(true) - m.AddEntry(key.Key(str), 1) + m.AddEntry(str, 1) if !wantlistContains(m.ToProto().GetWantlist(), str) { t.Fail() @@ -23,16 +28,20 @@ func TestAppendWanted(t *testing.T) { } func TestNewMessageFromProto(t *testing.T) { - const str = "a_key" + str := mkFakeCid("a_key") protoMessage := new(pb.Message) protoMessage.Wantlist = new(pb.Message_Wantlist) protoMessage.Wantlist.Entries = []*pb.Message_Wantlist_Entry{ - {Block: proto.String(str)}, + {Block: proto.String(str.KeyString())}, } if !wantlistContains(protoMessage.Wantlist, str) { t.Fail() } - m := newMessageFromProto(*protoMessage) + m, err := newMessageFromProto(*protoMessage) + if err != nil { + t.Fatal(err) + } + if !wantlistContains(m.ToProto().GetWantlist(), str) { t.Fail() } @@ -60,10 +69,10 @@ func TestAppendBlock(t *testing.T) { } func TestWantlist(t *testing.T) { - keystrs := []string{"foo", "bar", "baz", "bat"} + keystrs := []*cid.Cid{mkFakeCid("foo"), mkFakeCid("bar"), mkFakeCid("baz"), mkFakeCid("bat")} m := New(true) for _, s := range keystrs { - m.AddEntry(key.Key(s), 1) + m.AddEntry(s, 1) } exported := m.Wantlist() @@ -71,22 +80,22 @@ func TestWantlist(t *testing.T) { present := false for _, s := range keystrs { - if s == string(k.Key) { + if s.Equals(k.Cid) { present = true } } if !present { - t.Logf("%v isn't in original list", k.Key) + t.Logf("%v isn't in original list", k.Cid) t.Fail() } } } func TestCopyProtoByValue(t *testing.T) { - const str = "foo" + str := mkFakeCid("foo") m := New(true) protoBeforeAppend := m.ToProto() - m.AddEntry(key.Key(str), 1) + m.AddEntry(str, 1) if wantlistContains(protoBeforeAppend.GetWantlist(), str) { t.Fail() } @@ -94,11 +103,11 @@ func TestCopyProtoByValue(t *testing.T) { func TestToNetFromNetPreservesWantList(t *testing.T) { original := New(true) - original.AddEntry(key.Key("M"), 1) - original.AddEntry(key.Key("B"), 1) - original.AddEntry(key.Key("D"), 1) - original.AddEntry(key.Key("T"), 1) - original.AddEntry(key.Key("F"), 1) + original.AddEntry(mkFakeCid("M"), 1) + original.AddEntry(mkFakeCid("B"), 1) + original.AddEntry(mkFakeCid("D"), 1) + original.AddEntry(mkFakeCid("T"), 1) + original.AddEntry(mkFakeCid("F"), 1) buf := new(bytes.Buffer) if err := original.ToNet(buf); err != nil { @@ -110,13 +119,13 @@ func TestToNetFromNetPreservesWantList(t *testing.T) { t.Fatal(err) } - keys := make(map[key.Key]bool) + keys := make(map[string]bool) for _, k := range copied.Wantlist() { - keys[k.Key] = true + keys[k.Cid.KeyString()] = true } for _, k := range original.Wantlist() { - if _, ok := keys[k.Key]; !ok { + if _, ok := keys[k.Cid.KeyString()]; !ok { t.Fatalf("Key Missing: \"%v\"", k) } } @@ -140,21 +149,21 @@ func TestToAndFromNetMessage(t *testing.T) { t.Fatal(err) } - keys := make(map[key.Key]bool) + keys := make(map[string]bool) for _, b := range m2.Blocks() { - keys[b.Key()] = true + keys[b.Cid().KeyString()] = true } for _, b := range original.Blocks() { - if _, ok := keys[b.Key()]; !ok { + if _, ok := keys[b.Cid().KeyString()]; !ok { t.Fail() } } } -func wantlistContains(wantlist *pb.Message_Wantlist, x string) bool { +func wantlistContains(wantlist *pb.Message_Wantlist, c *cid.Cid) bool { for _, e := range wantlist.GetEntries() { - if e.GetBlock() == x { + if e.GetBlock() == c.KeyString() { return true } } @@ -174,8 +183,8 @@ func TestDuplicates(t *testing.T) { b := blocks.NewBlock([]byte("foo")) msg := New(true) - msg.AddEntry(b.Key(), 1) - msg.AddEntry(b.Key(), 1) + msg.AddEntry(b.Cid(), 1) + msg.AddEntry(b.Cid(), 1) if len(msg.Wantlist()) != 1 { t.Fatal("Duplicate in BitSwapMessage") } diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index 72cd80a67369..e7aa86cb6fc5 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -1,10 +1,11 @@ package network import ( - context "context" + "context" + bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" ) @@ -52,8 +53,8 @@ type Receiver interface { type Routing interface { // FindProvidersAsync returns a channel of providers for the given key - FindProvidersAsync(context.Context, key.Key, int) <-chan peer.ID + FindProvidersAsync(context.Context, *cid.Cid, int) <-chan peer.ID // Provide provides the key to the network - Provide(context.Context, key.Key) error + Provide(context.Context, *cid.Cid) error } diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index af18965cc8b0..45312130f888 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -10,7 +10,6 @@ import ( ma "gx/ipfs/QmUAQaWbKxGCUTuoQVvvicbQNZ9APF5pDGWyAZSe93AtKH/go-multiaddr" routing "gx/ipfs/QmXKuGUzLcgoQvp8M6ZEJzupWUNmx8NoqXEbYLMDjL4rjj/go-libp2p-routing" pstore "gx/ipfs/QmXXCcQ7CLg5a81Ui9TTR35QcR4y7ZyihxwfjqaHfUVcVo/go-libp2p-peerstore" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io" cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" host "gx/ipfs/QmdML3R42PRSwnt46jSuEts9bHSqLctVYEjJqMR3UYV8ki/go-libp2p-host" @@ -130,7 +129,7 @@ func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error { } // FindProvidersAsync returns a channel of providers for the given key -func (bsnet *impl) FindProvidersAsync(ctx context.Context, k key.Key, max int) <-chan peer.ID { +func (bsnet *impl) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID { // Since routing queries are expensive, give bitswap the peers to which we // have open connections. Note that this may cause issues if bitswap starts @@ -147,12 +146,9 @@ func (bsnet *impl) FindProvidersAsync(ctx context.Context, k key.Key, max int) < out <- id } - // TEMPORARY SHIM UNTIL CID GETS PROPAGATED - c := cid.NewCidV0(k.ToMultihash()) - go func() { defer close(out) - providers := bsnet.routing.FindProvidersAsync(ctx, c, max) + providers := bsnet.routing.FindProvidersAsync(ctx, k, max) for info := range providers { if info.ID == bsnet.host.ID() { continue // ignore self as provider @@ -169,9 +165,8 @@ func (bsnet *impl) FindProvidersAsync(ctx context.Context, k key.Key, max int) < } // Provide provides the key to the network -func (bsnet *impl) Provide(ctx context.Context, k key.Key) error { - c := cid.NewCidV0(k.ToMultihash()) - return bsnet.routing.Provide(ctx, c) +func (bsnet *impl) Provide(ctx context.Context, k *cid.Cid) error { + return bsnet.routing.Provide(ctx, k) } // handleNewStream receives a new stream from the network. diff --git a/exchange/bitswap/notifications/notifications.go b/exchange/bitswap/notifications/notifications.go index bb0fb59d1d7a..41c38ad48dae 100644 --- a/exchange/bitswap/notifications/notifications.go +++ b/exchange/bitswap/notifications/notifications.go @@ -1,17 +1,19 @@ package notifications import ( - context "context" - pubsub "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/pubsub" + "context" + blocks "github.com/ipfs/go-ipfs/blocks" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" + + pubsub "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/pubsub" + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" ) const bufferSize = 16 type PubSub interface { Publish(block blocks.Block) - Subscribe(ctx context.Context, keys ...key.Key) <-chan blocks.Block + Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.Block Shutdown() } @@ -24,8 +26,7 @@ type impl struct { } func (ps *impl) Publish(block blocks.Block) { - topic := string(block.Key()) - ps.wrapped.Pub(block, topic) + ps.wrapped.Pub(block, block.Cid().KeyString()) } func (ps *impl) Shutdown() { @@ -35,7 +36,7 @@ func (ps *impl) Shutdown() { // Subscribe returns a channel of blocks for the given |keys|. |blockChannel| // is closed if the |ctx| times out or is cancelled, or after sending len(keys) // blocks. -func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan blocks.Block { +func (ps *impl) Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.Block { blocksCh := make(chan blocks.Block, len(keys)) valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking @@ -71,10 +72,10 @@ func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan blocks.Bl return blocksCh } -func toStrings(keys []key.Key) []string { +func toStrings(keys []*cid.Cid) []string { strs := make([]string, 0) for _, key := range keys { - strs = append(strs, string(key)) + strs = append(strs, key.KeyString()) } return strs } diff --git a/exchange/bitswap/notifications/notifications_test.go b/exchange/bitswap/notifications/notifications_test.go index e5881564981b..343ddb34c266 100644 --- a/exchange/bitswap/notifications/notifications_test.go +++ b/exchange/bitswap/notifications/notifications_test.go @@ -2,13 +2,13 @@ package notifications import ( "bytes" + "context" "testing" "time" - context "context" blocks "github.com/ipfs/go-ipfs/blocks" blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" ) func TestDuplicates(t *testing.T) { @@ -17,7 +17,7 @@ func TestDuplicates(t *testing.T) { n := New() defer n.Shutdown() - ch := n.Subscribe(context.Background(), b1.Key(), b2.Key()) + ch := n.Subscribe(context.Background(), b1.Cid(), b2.Cid()) n.Publish(b1) blockRecvd, ok := <-ch @@ -41,7 +41,7 @@ func TestPublishSubscribe(t *testing.T) { n := New() defer n.Shutdown() - ch := n.Subscribe(context.Background(), blockSent.Key()) + ch := n.Subscribe(context.Background(), blockSent.Cid()) n.Publish(blockSent) blockRecvd, ok := <-ch @@ -59,7 +59,7 @@ func TestSubscribeMany(t *testing.T) { n := New() defer n.Shutdown() - ch := n.Subscribe(context.Background(), e1.Key(), e2.Key()) + ch := n.Subscribe(context.Background(), e1.Cid(), e2.Cid()) n.Publish(e1) r1, ok := <-ch @@ -83,8 +83,8 @@ func TestDuplicateSubscribe(t *testing.T) { n := New() defer n.Shutdown() - ch1 := n.Subscribe(context.Background(), e1.Key()) - ch2 := n.Subscribe(context.Background(), e1.Key()) + ch1 := n.Subscribe(context.Background(), e1.Cid()) + ch2 := n.Subscribe(context.Background(), e1.Cid()) n.Publish(e1) r1, ok := <-ch1 @@ -118,7 +118,7 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) { n := New() defer n.Shutdown() block := blocks.NewBlock([]byte("A Missed Connection")) - blockChannel := n.Subscribe(fastExpiringCtx, block.Key()) + blockChannel := n.Subscribe(fastExpiringCtx, block.Cid()) assertBlockChannelNil(t, blockChannel) } @@ -132,10 +132,10 @@ func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) { t.Log("generate a large number of blocks. exceed default buffer") bs := g.Blocks(1000) - ks := func() []key.Key { - var keys []key.Key + ks := func() []*cid.Cid { + var keys []*cid.Cid for _, b := range bs { - keys = append(keys, b.Key()) + keys = append(keys, b.Cid()) } return keys }() @@ -162,7 +162,7 @@ func assertBlocksEqual(t *testing.T, a, b blocks.Block) { if !bytes.Equal(a.RawData(), b.RawData()) { t.Fatal("blocks aren't equal") } - if a.Key() != b.Key() { + if a.Cid() != b.Cid() { t.Fatal("block keys aren't equal") } } diff --git a/exchange/bitswap/stat.go b/exchange/bitswap/stat.go index e3518a0d7889..3f8ddc28ebd7 100644 --- a/exchange/bitswap/stat.go +++ b/exchange/bitswap/stat.go @@ -1,13 +1,14 @@ package bitswap import ( - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" "sort" + + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" ) type Stat struct { ProvideBufLen int - Wantlist []key.Key + Wantlist []*cid.Cid Peers []string BlocksReceived int DupBlksReceived int diff --git a/exchange/bitswap/testnet/virtual.go b/exchange/bitswap/testnet/virtual.go index b9b02917800a..b9d7c5a50779 100644 --- a/exchange/bitswap/testnet/virtual.go +++ b/exchange/bitswap/testnet/virtual.go @@ -10,7 +10,6 @@ import ( delay "github.com/ipfs/go-ipfs/thirdparty/delay" testutil "github.com/ipfs/go-ipfs/thirdparty/testutil" routing "gx/ipfs/QmXKuGUzLcgoQvp8M6ZEJzupWUNmx8NoqXEbYLMDjL4rjj/go-libp2p-routing" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" ) @@ -92,18 +91,17 @@ func (nc *networkClient) SendMessage( } // FindProvidersAsync returns a channel of providers for the given key -func (nc *networkClient) FindProvidersAsync(ctx context.Context, k key.Key, max int) <-chan peer.ID { +func (nc *networkClient) FindProvidersAsync(ctx context.Context, k *cid.Cid, max int) <-chan peer.ID { // NB: this function duplicates the PeerInfo -> ID transformation in the // bitswap network adapter. Not to worry. This network client will be // deprecated once the ipfsnet.Mock is added. The code below is only // temporary. - c := cid.NewCidV0(k.ToMultihash()) out := make(chan peer.ID) go func() { defer close(out) - providers := nc.routing.FindProvidersAsync(ctx, c, max) + providers := nc.routing.FindProvidersAsync(ctx, k, max) for info := range providers { select { case <-ctx.Done(): @@ -139,9 +137,8 @@ func (n *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet. } // Provide provides the key to the network -func (nc *networkClient) Provide(ctx context.Context, k key.Key) error { - c := cid.NewCidV0(k.ToMultihash()) - return nc.routing.Provide(ctx, c) +func (nc *networkClient) Provide(ctx context.Context, k *cid.Cid) error { + return nc.routing.Provide(ctx, k) } func (nc *networkClient) SetDelegate(r bsnet.Receiver) { diff --git a/exchange/bitswap/wantlist/wantlist.go b/exchange/bitswap/wantlist/wantlist.go index 1f514e9db546..bf89c4db997f 100644 --- a/exchange/bitswap/wantlist/wantlist.go +++ b/exchange/bitswap/wantlist/wantlist.go @@ -6,7 +6,7 @@ import ( "sort" "sync" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" ) type ThreadSafe struct { @@ -16,11 +16,11 @@ type ThreadSafe struct { // not threadsafe type Wantlist struct { - set map[key.Key]*Entry + set map[string]*Entry } type Entry struct { - Key key.Key + Cid *cid.Cid Priority int RefCnt int @@ -40,11 +40,11 @@ func NewThreadSafe() *ThreadSafe { func New() *Wantlist { return &Wantlist{ - set: make(map[key.Key]*Entry), + set: make(map[string]*Entry), } } -func (w *ThreadSafe) Add(k key.Key, priority int) bool { +func (w *ThreadSafe) Add(k *cid.Cid, priority int) bool { w.lk.Lock() defer w.lk.Unlock() return w.Wantlist.Add(k, priority) @@ -56,13 +56,13 @@ func (w *ThreadSafe) AddEntry(e *Entry) bool { return w.Wantlist.AddEntry(e) } -func (w *ThreadSafe) Remove(k key.Key) bool { +func (w *ThreadSafe) Remove(k *cid.Cid) bool { w.lk.Lock() defer w.lk.Unlock() return w.Wantlist.Remove(k) } -func (w *ThreadSafe) Contains(k key.Key) (*Entry, bool) { +func (w *ThreadSafe) Contains(k *cid.Cid) (*Entry, bool) { w.lk.RLock() defer w.lk.RUnlock() return w.Wantlist.Contains(k) @@ -90,14 +90,15 @@ func (w *Wantlist) Len() int { return len(w.set) } -func (w *Wantlist) Add(k key.Key, priority int) bool { +func (w *Wantlist) Add(c *cid.Cid, priority int) bool { + k := c.KeyString() if e, ok := w.set[k]; ok { e.RefCnt++ return false } w.set[k] = &Entry{ - Key: k, + Cid: c, Priority: priority, RefCnt: 1, } @@ -106,15 +107,17 @@ func (w *Wantlist) Add(k key.Key, priority int) bool { } func (w *Wantlist) AddEntry(e *Entry) bool { - if ex, ok := w.set[e.Key]; ok { + k := e.Cid.KeyString() + if ex, ok := w.set[k]; ok { ex.RefCnt++ return false } - w.set[e.Key] = e + w.set[k] = e return true } -func (w *Wantlist) Remove(k key.Key) bool { +func (w *Wantlist) Remove(c *cid.Cid) bool { + k := c.KeyString() e, ok := w.set[k] if !ok { return false @@ -128,8 +131,8 @@ func (w *Wantlist) Remove(k key.Key) bool { return false } -func (w *Wantlist) Contains(k key.Key) (*Entry, bool) { - e, ok := w.set[k] +func (w *Wantlist) Contains(k *cid.Cid) (*Entry, bool) { + e, ok := w.set[k.KeyString()] return e, ok } diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index 79f8df790292..eca8739d856f 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -1,15 +1,15 @@ package bitswap import ( + "context" "sync" "time" - context "context" engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network" wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" ) @@ -51,7 +51,7 @@ type msgPair struct { type cancellation struct { who peer.ID - blk key.Key + blk *cid.Cid } type msgQueue struct { @@ -69,23 +69,23 @@ type msgQueue struct { done chan struct{} } -func (pm *WantManager) WantBlocks(ctx context.Context, ks []key.Key) { +func (pm *WantManager) WantBlocks(ctx context.Context, ks []*cid.Cid) { log.Infof("want blocks: %s", ks) pm.addEntries(ctx, ks, false) } -func (pm *WantManager) CancelWants(ks []key.Key) { +func (pm *WantManager) CancelWants(ks []*cid.Cid) { log.Infof("cancel wants: %s", ks) pm.addEntries(context.TODO(), ks, true) } -func (pm *WantManager) addEntries(ctx context.Context, ks []key.Key, cancel bool) { +func (pm *WantManager) addEntries(ctx context.Context, ks []*cid.Cid, cancel bool) { var entries []*bsmsg.Entry for i, k := range ks { entries = append(entries, &bsmsg.Entry{ Cancel: cancel, Entry: &wantlist.Entry{ - Key: k, + Cid: k, Priority: kMaxPriority - i, RefCnt: 1, }, @@ -130,7 +130,7 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue { // new peer, we will want to give them our full wantlist fullwantlist := bsmsg.New(true) for _, e := range pm.wl.Entries() { - fullwantlist.AddEntry(e.Key, e.Priority) + fullwantlist.AddEntry(e.Cid, e.Priority) } mq.out = fullwantlist mq.work <- struct{}{} @@ -246,7 +246,7 @@ func (pm *WantManager) Run() { var filtered []*bsmsg.Entry for _, e := range entries { if e.Cancel { - if pm.wl.Remove(e.Key) { + if pm.wl.Remove(e.Cid) { filtered = append(filtered, e) } } else { @@ -323,9 +323,9 @@ func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) { // one passed in for _, e := range entries { if e.Cancel { - mq.out.Cancel(e.Key) + mq.out.Cancel(e.Cid) } else { - mq.out.AddEntry(e.Key, e.Priority) + mq.out.AddEntry(e.Cid, e.Priority) } } } diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index 6254500b8841..d7216ae66855 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -1,15 +1,15 @@ package bitswap import ( + "context" "math/rand" "sync" "time" - context "context" process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer" ) @@ -77,7 +77,7 @@ func (bs *Bitswap) provideWorker(px process.Process) { limit := make(chan struct{}, provideWorkerMax) - limitedGoProvide := func(k key.Key, wid int) { + limitedGoProvide := func(k *cid.Cid, wid int) { defer func() { // replace token when done <-limit @@ -85,7 +85,7 @@ func (bs *Bitswap) provideWorker(px process.Process) { ev := logging.LoggableMap{"ID": wid} ctx := procctx.OnClosingContext(px) // derive ctx from px - defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done() + defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, k).Done() ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx defer cancel() @@ -121,9 +121,9 @@ func (bs *Bitswap) provideWorker(px process.Process) { func (bs *Bitswap) provideCollector(ctx context.Context) { defer close(bs.provideKeys) - var toProvide []key.Key - var nextKey key.Key - var keysOut chan key.Key + var toProvide []*cid.Cid + var nextKey *cid.Cid + var keysOut chan *cid.Cid for { select { @@ -181,7 +181,7 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) { // for new providers for blocks. i := rand.Intn(len(entries)) bs.findKeys <- &blockRequest{ - Key: entries[i].Key, + Cid: entries[i].Cid, Ctx: ctx, } case <-parent.Done(): @@ -192,23 +192,23 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) { func (bs *Bitswap) providerQueryManager(ctx context.Context) { var activeLk sync.Mutex - kset := key.NewKeySet() + kset := cid.NewSet() for { select { case e := <-bs.findKeys: activeLk.Lock() - if kset.Has(e.Key) { + if kset.Has(e.Cid) { activeLk.Unlock() continue } - kset.Add(e.Key) + kset.Add(e.Cid) activeLk.Unlock() go func(e *blockRequest) { child, cancel := context.WithTimeout(e.Ctx, providerRequestTimeout) defer cancel() - providers := bs.network.FindProvidersAsync(child, e.Key, maxProvidersPerRequest) + providers := bs.network.FindProvidersAsync(child, e.Cid, maxProvidersPerRequest) wg := &sync.WaitGroup{} for p := range providers { wg.Add(1) @@ -222,7 +222,7 @@ func (bs *Bitswap) providerQueryManager(ctx context.Context) { } wg.Wait() activeLk.Lock() - kset.Remove(e.Key) + kset.Remove(e.Cid) activeLk.Unlock() }(e) diff --git a/exchange/interface.go b/exchange/interface.go index f2edc569b0a5..cc2ec7a43f5c 100644 --- a/exchange/interface.go +++ b/exchange/interface.go @@ -2,21 +2,21 @@ package exchange import ( + "context" "io" blocks "github.com/ipfs/go-ipfs/blocks" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" - context "context" + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" ) // Any type that implements exchange.Interface may be used as an IPFS block // exchange protocol. type Interface interface { // type Exchanger interface // GetBlock returns the block associated with a given key. - GetBlock(context.Context, key.Key) (blocks.Block, error) + GetBlock(context.Context, *cid.Cid) (blocks.Block, error) - GetBlocks(context.Context, []key.Key) (<-chan blocks.Block, error) + GetBlocks(context.Context, []*cid.Cid) (<-chan blocks.Block, error) // TODO Should callers be concerned with whether the block was made // available on the network? diff --git a/exchange/offline/offline.go b/exchange/offline/offline.go index b483e1825acb..d54979628e5d 100644 --- a/exchange/offline/offline.go +++ b/exchange/offline/offline.go @@ -3,12 +3,13 @@ package offline import ( + "context" + blocks "github.com/ipfs/go-ipfs/blocks" "github.com/ipfs/go-ipfs/blocks/blockstore" exchange "github.com/ipfs/go-ipfs/exchange" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" - context "context" + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" ) func Exchange(bs blockstore.Blockstore) exchange.Interface { @@ -24,7 +25,7 @@ type offlineExchange struct { // GetBlock returns nil to signal that a block could not be retrieved for the // given key. // NB: This function may return before the timeout expires. -func (e *offlineExchange) GetBlock(_ context.Context, k key.Key) (blocks.Block, error) { +func (e *offlineExchange) GetBlock(_ context.Context, k *cid.Cid) (blocks.Block, error) { return e.bs.Get(k) } @@ -40,11 +41,11 @@ func (_ *offlineExchange) Close() error { return nil } -func (e *offlineExchange) GetBlocks(ctx context.Context, ks []key.Key) (<-chan blocks.Block, error) { +func (e *offlineExchange) GetBlocks(ctx context.Context, ks []*cid.Cid) (<-chan blocks.Block, error) { out := make(chan blocks.Block, 0) go func() { defer close(out) - var misses []key.Key + var misses []*cid.Cid for _, k := range ks { hit, err := e.bs.Get(k) if err != nil { diff --git a/exchange/offline/offline_test.go b/exchange/offline/offline_test.go index 9cbd713336b5..80fb6bbed645 100644 --- a/exchange/offline/offline_test.go +++ b/exchange/offline/offline_test.go @@ -1,20 +1,23 @@ package offline import ( + "context" "testing" - context "context" blocks "github.com/ipfs/go-ipfs/blocks" "github.com/ipfs/go-ipfs/blocks/blockstore" "github.com/ipfs/go-ipfs/blocks/blocksutil" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" + + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" + u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" ds_sync "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync" ) func TestBlockReturnsErr(t *testing.T) { off := Exchange(bstore()) - _, err := off.GetBlock(context.Background(), key.Key("foo")) + c := cid.NewCidV0(u.Hash([]byte("foo"))) + _, err := off.GetBlock(context.Background(), c) if err != nil { return // as desired } @@ -31,7 +34,7 @@ func TestHasBlockReturnsNil(t *testing.T) { t.Fail() } - if _, err := store.Get(block.Key()); err != nil { + if _, err := store.Get(block.Cid()); err != nil { t.Fatal(err) } } @@ -49,11 +52,11 @@ func TestGetBlocks(t *testing.T) { } } - request := func() []key.Key { - var ks []key.Key + request := func() []*cid.Cid { + var ks []*cid.Cid for _, b := range expected { - ks = append(ks, b.Key()) + ks = append(ks, b.Cid()) } return ks }() diff --git a/exchange/reprovide/reprovide.go b/exchange/reprovide/reprovide.go index f372b40ca344..66c1ecf6c579 100644 --- a/exchange/reprovide/reprovide.go +++ b/exchange/reprovide/reprovide.go @@ -9,7 +9,6 @@ import ( backoff "gx/ipfs/QmPJUtEJsm5YLUWhF6imvyCH8KZXRJa9Wup7FDMwTy5Ufz/backoff" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" routing "gx/ipfs/QmXKuGUzLcgoQvp8M6ZEJzupWUNmx8NoqXEbYLMDjL4rjj/go-libp2p-routing" - cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" ) var log = logging.Logger("reprovider") @@ -53,8 +52,7 @@ func (rp *Reprovider) Reprovide(ctx context.Context) error { if err != nil { return fmt.Errorf("Failed to get key chan from blockstore: %s", err) } - for k := range keychan { - c := cid.NewCidV0(k.ToMultihash()) + for c := range keychan { op := func() error { err := rp.rsys.Provide(ctx, c) if err != nil { diff --git a/importer/chunk/rabin_test.go b/importer/chunk/rabin_test.go index a6e08f2680fd..366d44fb82a1 100644 --- a/importer/chunk/rabin_test.go +++ b/importer/chunk/rabin_test.go @@ -4,7 +4,6 @@ import ( "bytes" "fmt" "github.com/ipfs/go-ipfs/blocks" - "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" "io" "testing" @@ -39,10 +38,10 @@ func TestRabinChunking(t *testing.T) { } } -func chunkData(t *testing.T, data []byte) map[key.Key]blocks.Block { +func chunkData(t *testing.T, data []byte) map[string]blocks.Block { r := NewRabin(bytes.NewReader(data), 1024*256) - blkmap := make(map[key.Key]blocks.Block) + blkmap := make(map[string]blocks.Block) for { blk, err := r.NextBytes() @@ -54,7 +53,7 @@ func chunkData(t *testing.T, data []byte) map[key.Key]blocks.Block { } b := blocks.NewBlock(blk) - blkmap[b.Key()] = b + blkmap[b.Cid().KeyString()] = b } return blkmap diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index bd415f49f58d..df155dd26385 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -2,14 +2,14 @@ package merkledag import ( + "context" "fmt" "strings" "sync" + blocks "github.com/ipfs/go-ipfs/blocks" bserv "github.com/ipfs/go-ipfs/blockservice" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" - "context" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" ) @@ -49,7 +49,7 @@ func (n *dagService) Add(nd *Node) (*cid.Cid, error) { return nil, fmt.Errorf("dagService is nil") } - return n.Blocks.AddObject(nd) + return n.Blocks.AddBlock(nd) } func (n *dagService) Batch() *Batch { @@ -94,7 +94,7 @@ func (n *dagService) Get(ctx context.Context, c *cid.Cid) (*Node, error) { } func (n *dagService) Remove(nd *Node) error { - return n.Blocks.DeleteObject(nd) + return n.Blocks.DeleteBlock(nd) } // FetchGraph fetches all nodes that are children of the given node @@ -119,27 +119,11 @@ type NodeOption struct { Err error } -// TODO: this is a mid-term hack to get around the fact that blocks don't -// have full CIDs and potentially (though we don't know of any such scenario) -// may have the same block with multiple different encodings. -// We have discussed the possiblity of using CIDs as datastore keys -// in the future. This would be a much larger changeset than i want to make -// right now. -func cidsToKeyMapping(cids []*cid.Cid) map[key.Key]*cid.Cid { - mapping := make(map[key.Key]*cid.Cid) - for _, c := range cids { - mapping[key.Key(c.Hash())] = c - } - return mapping -} - func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption { out := make(chan *NodeOption, len(keys)) blocks := ds.Blocks.GetBlocks(ctx, keys) var count int - mapping := cidsToKeyMapping(keys) - go func() { defer close(out) for { @@ -152,7 +136,7 @@ func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *Node return } - c := mapping[b.Key()] + c := b.Cid() var nd *Node switch c.Type() { @@ -333,7 +317,7 @@ func (np *nodePromise) Get(ctx context.Context) (*Node, error) { type Batch struct { ds *dagService - objects []bserv.Object + blocks []blocks.Block size int MaxSize int } @@ -344,7 +328,7 @@ func (t *Batch) Add(nd *Node) (*cid.Cid, error) { return nil, err } - t.objects = append(t.objects, nd) + t.blocks = append(t.blocks, nd) t.size += len(d) if t.size > t.MaxSize { return nd.Cid(), t.Commit() @@ -353,8 +337,8 @@ func (t *Batch) Add(nd *Node) (*cid.Cid, error) { } func (t *Batch) Commit() error { - _, err := t.ds.Blocks.AddObjects(t.objects) - t.objects = nil + _, err := t.ds.Blocks.AddBlocks(t.blocks) + t.blocks = nil t.size = 0 return err } diff --git a/mfs/file.go b/mfs/file.go index e532fb088a67..bbd7b48c212d 100644 --- a/mfs/file.go +++ b/mfs/file.go @@ -1,6 +1,7 @@ package mfs import ( + "context" "fmt" "sync" @@ -8,8 +9,6 @@ import ( dag "github.com/ipfs/go-ipfs/merkledag" ft "github.com/ipfs/go-ipfs/unixfs" mod "github.com/ipfs/go-ipfs/unixfs/mod" - - context "context" ) type File struct { diff --git a/pin/gc/gc.go b/pin/gc/gc.go index 7bfde538c104..b8cca45e7f56 100644 --- a/pin/gc/gc.go +++ b/pin/gc/gc.go @@ -1,14 +1,14 @@ package gc import ( + "context" + bstore "github.com/ipfs/go-ipfs/blocks/blockstore" bserv "github.com/ipfs/go-ipfs/blockservice" offline "github.com/ipfs/go-ipfs/exchange/offline" dag "github.com/ipfs/go-ipfs/merkledag" pin "github.com/ipfs/go-ipfs/pin" - key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" - context "context" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" ) @@ -24,7 +24,7 @@ var log = logging.Logger("gc") // // The routine then iterates over every block in the blockstore and // deletes any block that is not found in the marked set. -func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan key.Key, error) { +func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan *cid.Cid, error) { unlocker := bs.GCLock() bsrv := bserv.New(bs, offline.Exchange(bs)) @@ -40,7 +40,7 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner, bestEffortRo return nil, err } - output := make(chan key.Key) + output := make(chan *cid.Cid) go func() { defer close(output) defer unlocker.Unlock() @@ -71,24 +71,16 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner, bestEffortRo return output, nil } -func Descendants(ctx context.Context, ds dag.DAGService, set key.KeySet, roots []*cid.Cid, bestEffort bool) error { +func Descendants(ctx context.Context, ds dag.DAGService, set *cid.Set, roots []*cid.Cid, bestEffort bool) error { for _, c := range roots { - set.Add(key.Key(c.Hash())) + set.Add(c) nd, err := ds.Get(ctx, c) if err != nil { return err } // EnumerateChildren recursively walks the dag and adds the keys to the given set - err = dag.EnumerateChildren(ctx, ds, nd, func(c *cid.Cid) bool { - k := key.Key(c.Hash()) - seen := set.Has(k) - if seen { - return false - } - set.Add(k) - return true - }, bestEffort) + err = dag.EnumerateChildren(ctx, ds, nd, set.Visit, bestEffort) if err != nil { return err } @@ -97,10 +89,10 @@ func Descendants(ctx context.Context, ds dag.DAGService, set key.KeySet, roots [ return nil } -func ColoredSet(ctx context.Context, pn pin.Pinner, ds dag.DAGService, bestEffortRoots []*cid.Cid) (key.KeySet, error) { +func ColoredSet(ctx context.Context, pn pin.Pinner, ds dag.DAGService, bestEffortRoots []*cid.Cid) (*cid.Set, error) { // KeySet currently implemented in memory, in the future, may be bloom filter or // disk backed to conserve memory. - gcs := key.NewKeySet() + gcs := cid.NewSet() err := Descendants(ctx, ds, gcs, pn.RecursiveKeys(), false) if err != nil { return nil, err @@ -112,7 +104,7 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ds dag.DAGService, bestEffor } for _, k := range pn.DirectKeys() { - gcs.Add(key.Key(k.Hash())) + gcs.Add(k) } err = Descendants(ctx, ds, gcs, pn.InternalPins(), false) diff --git a/test/integration/bitswap_wo_routing_test.go b/test/integration/bitswap_wo_routing_test.go index b47d38f0642a..d6a6ffdfd538 100644 --- a/test/integration/bitswap_wo_routing_test.go +++ b/test/integration/bitswap_wo_routing_test.go @@ -76,7 +76,7 @@ func TestBitswapWithoutRouting(t *testing.T) { } else if !bytes.Equal(b.RawData(), block0.RawData()) { t.Error("byte comparison fail") } else { - log.Debug("got block: %s", b.Key()) + log.Debug("got block: %s", b.Cid()) } } @@ -93,7 +93,7 @@ func TestBitswapWithoutRouting(t *testing.T) { } else if !bytes.Equal(b.RawData(), block1.RawData()) { t.Error("byte comparison fail") } else { - log.Debug("got block: %s", b.Key()) + log.Debug("got block: %s", b.Cid()) } } } diff --git a/thirdparty/ds-help/key.go b/thirdparty/ds-help/key.go index db680add20fb..417f49605714 100644 --- a/thirdparty/ds-help/key.go +++ b/thirdparty/ds-help/key.go @@ -9,3 +9,7 @@ import ( func NewKeyFromBinary(s string) ds.Key { return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s))) } + +func BinaryFromDsKey(k ds.Key) ([]byte, error) { + return base32.RawStdEncoding.DecodeString(k.String()[1:]) +}