Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DAGService.GetLinks() method and use it in the GC and elsewhere. #3255

Merged
merged 3 commits into from
Oct 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions core/commands/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,12 +370,8 @@ func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv dag.DAGSer
provided := cid.NewSet()
for _, c := range cids {
kset := cid.NewSet()
node, err := dserv.Get(ctx, c)
if err != nil {
return err
}

err = dag.EnumerateChildrenAsync(ctx, dserv, node, kset.Visit)
err := dag.EnumerateChildrenAsync(ctx, dserv, c, kset.Visit)
if err != nil {
return err
}
Expand Down
7 changes: 1 addition & 6 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,7 @@ func pinLsAll(typeStr string, ctx context.Context, n *core.IpfsNode) (map[string
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range n.Pinning.RecursiveKeys() {
nd, err := n.DAG.Get(ctx, k)
if err != nil {
return nil, err
}

err = dag.EnumerateChildren(n.Context(), n.DAG, nd, set.Visit, false)
err := dag.EnumerateChildren(n.Context(), n.DAG, k, set.Visit, false)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions core/corerepo/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func GarbageCollect(n *core.IpfsNode, ctx context.Context) error {
if err != nil {
return err
}
rmed, err := gc.GC(ctx, n.Blockstore, n.Pinning, roots)
rmed, err := gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)
if err != nil {
return err
}
Expand All @@ -113,7 +113,7 @@ func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) (<-chan *KeyRemo
if err != nil {
return nil, err
}
rmed, err := gc.GC(ctx, n.Blockstore, n.Pinning, roots)
rmed, err := gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots)
if err != nil {
return nil, err
}
Expand Down
8 changes: 2 additions & 6 deletions core/coreunix/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestAddGCLive(t *testing.T) {
gcstarted := make(chan struct{})
go func() {
defer close(gcstarted)
gcchan, err := gc.GC(context.Background(), node.Blockstore, node.Pinning, nil)
gcchan, err := gc.GC(context.Background(), node.Blockstore, node.DAG, node.Pinning, nil)
if err != nil {
log.Error("GC ERROR:", err)
errs <- err
Expand Down Expand Up @@ -156,13 +156,9 @@ func TestAddGCLive(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
root, err := node.DAG.Get(ctx, last)
if err != nil {
t.Fatal(err)
}

set := cid.NewSet()
err = dag.EnumerateChildren(ctx, node.DAG, root, set.Visit, false)
err = dag.EnumerateChildren(ctx, node.DAG, last, set.Visit, false)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 4 additions & 0 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,3 +422,7 @@ func (bs *Bitswap) GetWantlist() []key.Key {
}
return out
}

func (bs *Bitswap) IsOnline() bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesnt look like we use this anywhere. Am i missing something or is this just dead code from a different approach?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used. I added a IsOnline() method to exchange to be able to tell if the exchange is online or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah. I missed it on my previous pass through

return true
}
2 changes: 2 additions & 0 deletions exchange/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ type Interface interface { // type Exchanger interface
// available on the network?
HasBlock(blocks.Block) error

IsOnline() bool

io.Closer
}
4 changes: 4 additions & 0 deletions exchange/offline/offline.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,7 @@ func (e *offlineExchange) GetBlocks(ctx context.Context, ks []key.Key) (<-chan b
}()
return out, nil
}

func (e *offlineExchange) IsOnline() bool {
return false
}
63 changes: 47 additions & 16 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

bserv "github.com/ipfs/go-ipfs/blockservice"
offline "github.com/ipfs/go-ipfs/exchange/offline"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"

"context"
Expand All @@ -28,10 +29,20 @@ type DAGService interface {
GetMany(context.Context, []*cid.Cid) <-chan *NodeOption

Batch() *Batch

LinkService
}

type LinkService interface {
// Return all links for a node, may be more effect than
// calling Get in DAGService
GetLinks(context.Context, *cid.Cid) ([]*Link, error)

GetOfflineLinkService() LinkService
}

func NewDAGService(bs *bserv.BlockService) DAGService {
return &dagService{bs}
func NewDAGService(bs *bserv.BlockService) *dagService {
return &dagService{Blocks: bs}
}

// dagService is an IPFS Merkle DAG service.
Expand Down Expand Up @@ -93,13 +104,30 @@ func (n *dagService) Get(ctx context.Context, c *cid.Cid) (*Node, error) {
return res, nil
}

func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*Link, error) {
node, err := n.Get(ctx, c)
if err != nil {
return nil, err
}
return node.Links, nil
}

func (n *dagService) GetOfflineLinkService() LinkService {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be moved to the main DAGService interface. There are many situations where we would want to use this to get an offline dagservice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at the return type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but the actual implementation is returning an offline DAGService, which is something that we would want to have available.

I'm thinking that when we need an offline linkservice, we could call dag.GetOffline() (which would return a DAGService) and then simply pass that into the function requiring the offline linkservice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it this way to enforce type safety. Otherwise we could be passed a LinkService and get a DAGService. If you really don't like that I can change it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type safety is good, lets move forward with this

if n.Blocks.Exchange.IsOnline() {
bsrv := bserv.New(n.Blocks.Blockstore, offline.Exchange(n.Blocks.Blockstore))
return NewDAGService(bsrv)
} else {
return n
}
}

func (n *dagService) Remove(nd *Node) error {
return n.Blocks.DeleteObject(nd)
}

// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *Node, serv DAGService) error {
return EnumerateChildrenAsync(ctx, serv, root, cid.NewSet().Visit)
func FetchGraph(ctx context.Context, c *cid.Cid, serv DAGService) error {
return EnumerateChildrenAsync(ctx, serv, c, cid.NewSet().Visit)
}

// FindLinks searches this nodes links for the given key,
Expand Down Expand Up @@ -366,19 +394,17 @@ func legacyCidFromLink(lnk *Link) *cid.Cid {
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, visit func(*cid.Cid) bool, bestEffort bool) error {
for _, lnk := range root.Links {
func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit func(*cid.Cid) bool, bestEffort bool) error {
links, err := ds.GetLinks(ctx, root)
if bestEffort && err == ErrNotFound {
return nil
} else if err != nil {
return err
}
for _, lnk := range links {
c := legacyCidFromLink(lnk)
if visit(c) {
child, err := ds.Get(ctx, c)
if err != nil {
if bestEffort && err == ErrNotFound {
continue
} else {
return err
}
}
err = EnumerateChildren(ctx, ds, child, visit, bestEffort)
err = EnumerateChildren(ctx, ds, c, visit, bestEffort)
if err != nil {
return err
}
Expand All @@ -387,7 +413,7 @@ func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, visit fun
return nil
}

func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, visit func(*cid.Cid) bool) error {
func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error {
toprocess := make(chan []*cid.Cid, 8)
nodes := make(chan *NodeOption, 8)

Expand All @@ -397,6 +423,11 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, visi

go fetchNodes(ctx, ds, toprocess, nodes)

root, err := ds.Get(ctx, c)
if err != nil {
return err
}

nodes <- &NodeOption{Node: root}
live := 1

Expand Down
8 changes: 4 additions & 4 deletions merkledag/merkledag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func TestFetchGraph(t *testing.T) {
t.Fatal(err)
}

err = FetchGraph(context.TODO(), root, dservs[1])
err = FetchGraph(context.TODO(), root.Cid(), dservs[1])
if err != nil {
t.Fatal(err)
}
Expand All @@ -241,7 +241,7 @@ func TestFetchGraph(t *testing.T) {

offline_ds := NewDAGService(bs)

err = EnumerateChildren(context.Background(), offline_ds, root, func(_ *cid.Cid) bool { return true }, false)
err = EnumerateChildren(context.Background(), offline_ds, root.Cid(), func(_ *cid.Cid) bool { return true }, false)
if err != nil {
t.Fatal(err)
}
Expand All @@ -258,7 +258,7 @@ func TestEnumerateChildren(t *testing.T) {
}

set := cid.NewSet()
err = EnumerateChildren(context.Background(), ds, root, set.Visit, false)
err = EnumerateChildren(context.Background(), ds, root.Cid(), set.Visit, false)
if err != nil {
t.Fatal(err)
}
Expand All @@ -269,7 +269,7 @@ func TestEnumerateChildren(t *testing.T) {
for _, lnk := range n.Links {
c := cid.NewCidV0(lnk.Hash)
if !set.Has(c) {
t.Fatal("missing key in set!")
t.Fatal("missing key in set! ", lnk.Hash.B58String())
}
child, err := ds.Get(context.Background(), c)
if err != nil {
Expand Down
25 changes: 9 additions & 16 deletions pin/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package gc

import (
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
bserv "github.com/ipfs/go-ipfs/blockservice"
offline "github.com/ipfs/go-ipfs/exchange/offline"
dag "github.com/ipfs/go-ipfs/merkledag"
pin "github.com/ipfs/go-ipfs/pin"
key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
Expand All @@ -24,13 +22,12 @@ var log = logging.Logger("gc")
//
// The routine then iterates over every block in the blockstore and
// deletes any block that is not found in the marked set.
func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan key.Key, error) {
func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan key.Key, error) {
unlocker := bs.GCLock()

bsrv := bserv.New(bs, offline.Exchange(bs))
ds := dag.NewDAGService(bsrv)
ls = ls.GetOfflineLinkService()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, i see why this was on the LinkService and not the DAGService. Hrm... thats tricky.


gcs, err := ColoredSet(ctx, pn, ds, bestEffortRoots)
gcs, err := ColoredSet(ctx, pn, ls, bestEffortRoots)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -71,16 +68,12 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner, bestEffortRo
return output, nil
}

func Descendants(ctx context.Context, ds dag.DAGService, set key.KeySet, roots []*cid.Cid, bestEffort bool) error {
func Descendants(ctx context.Context, ls dag.LinkService, set key.KeySet, roots []*cid.Cid, bestEffort bool) error {
for _, c := range roots {
set.Add(key.Key(c.Hash()))
nd, err := ds.Get(ctx, c)
if err != nil {
return err
}

// EnumerateChildren recursively walks the dag and adds the keys to the given set
err = dag.EnumerateChildren(ctx, ds, nd, func(c *cid.Cid) bool {
err := dag.EnumerateChildren(ctx, ls, c, func(c *cid.Cid) bool {
k := key.Key(c.Hash())
seen := set.Has(k)
if seen {
Expand All @@ -97,16 +90,16 @@ func Descendants(ctx context.Context, ds dag.DAGService, set key.KeySet, roots [
return nil
}

func ColoredSet(ctx context.Context, pn pin.Pinner, ds dag.DAGService, bestEffortRoots []*cid.Cid) (key.KeySet, error) {
func ColoredSet(ctx context.Context, pn pin.Pinner, ls dag.LinkService, bestEffortRoots []*cid.Cid) (key.KeySet, error) {
// KeySet currently implemented in memory, in the future, may be bloom filter or
// disk backed to conserve memory.
gcs := key.NewKeySet()
err := Descendants(ctx, ds, gcs, pn.RecursiveKeys(), false)
err := Descendants(ctx, ls, gcs, pn.RecursiveKeys(), false)
if err != nil {
return nil, err
}

err = Descendants(ctx, ds, gcs, bestEffortRoots, true)
err = Descendants(ctx, ls, gcs, bestEffortRoots, true)
if err != nil {
return nil, err
}
Expand All @@ -115,7 +108,7 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ds dag.DAGService, bestEffor
gcs.Add(key.Key(k.Hash()))
}

err = Descendants(ctx, ds, gcs, pn.InternalPins(), false)
err = Descendants(ctx, ls, gcs, pn.InternalPins(), false)
if err != nil {
return nil, err
}
Expand Down
28 changes: 11 additions & 17 deletions pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (p *pinner) Pin(ctx context.Context, node *mdag.Node, recurse bool) error {
}

// fetch entire graph
err := mdag.FetchGraph(ctx, node, p.dserv)
err := mdag.FetchGraph(ctx, c, p.dserv)
if err != nil {
return err
}
Expand Down Expand Up @@ -279,12 +279,7 @@ func (p *pinner) isPinnedWithType(c *cid.Cid, mode PinMode) (string, bool, error

// Default is Indirect
for _, rc := range p.recursePin.Keys() {
rnd, err := p.dserv.Get(context.Background(), rc)
if err != nil {
return "", false, err
}

has, err := hasChild(p.dserv, rnd, k)
has, err := hasChild(p.dserv, rc, k)
if err != nil {
return "", false, err
}
Expand Down Expand Up @@ -317,11 +312,11 @@ func (p *pinner) CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error) {
// Now walk all recursive pins to check for indirect pins
var checkChildren func(*cid.Cid, *cid.Cid) error
checkChildren = func(rk, parentKey *cid.Cid) error {
parent, err := p.dserv.Get(context.Background(), parentKey)
links, err := p.dserv.GetLinks(context.Background(), parentKey)
if err != nil {
return err
}
for _, lnk := range parent.Links {
for _, lnk := range links {
c := cid.NewCidV0(lnk.Hash)

if toCheck.Has(c) {
Expand Down Expand Up @@ -521,19 +516,18 @@ func (p *pinner) PinWithMode(c *cid.Cid, mode PinMode) {
}
}

func hasChild(ds mdag.DAGService, root *mdag.Node, child key.Key) (bool, error) {
for _, lnk := range root.Links {
func hasChild(ds mdag.LinkService, root *cid.Cid, child key.Key) (bool, error) {
links, err := ds.GetLinks(context.Background(), root)
if err != nil {
return false, err
}
for _, lnk := range links {
c := cid.NewCidV0(lnk.Hash)
if key.Key(c.Hash()) == child {
return true, nil
}

nd, err := ds.Get(context.Background(), c)
if err != nil {
return false, err
}

has, err := hasChild(ds, nd, child)
has, err := hasChild(ds, c, child)
if err != nil {
return false, err
}
Expand Down
5 changes: 5 additions & 0 deletions pin/pin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ func TestPinRecursiveFail(t *testing.T) {
t.Fatal(err)
}

_, err = dserv.Add(a)
if err != nil {
t.Fatal(err)
}

// this one is time based... but shouldnt cause any issues
mctx, _ = context.WithTimeout(ctx, time.Second)
err = p.Pin(mctx, a, true)
Expand Down