From 3899194cb0aec37617576c4c612f2301479aeb83 Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Fri, 19 Aug 2016 15:52:27 -0400 Subject: [PATCH] Add DAGService.GetLinks() method and use it in the GC and elsewhere. This method will use the (also new) LinkService if it is available to retrieving just the links for a MerkleDAG without necessary having to retrieve the underlying block. For now the main benefit is that the pinner will not break when a block becomes invalid due to a change in the backing file. This is possible because the metadata for a block (that includes the Links) is stored separately and thus always available even if the backing file changes. License: MIT Signed-off-by: Kevin Atkinson --- core/commands/pin.go | 5 ++--- core/core.go | 17 ++++++++-------- core/corerepo/gc.go | 4 ++-- core/coreunix/add_test.go | 4 ++-- merkledag/merkledag.go | 39 ++++++++++++++++++++++++++++++------- merkledag/merkledag_test.go | 4 ++-- pin/gc/gc.go | 7 ++++--- pin/pin.go | 16 +++++++-------- 8 files changed, 61 insertions(+), 35 deletions(-) diff --git a/core/commands/pin.go b/core/commands/pin.go index dba542abf1b..71f4a4706e0 100644 --- a/core/commands/pin.go +++ b/core/commands/pin.go @@ -328,12 +328,11 @@ func pinLsAll(typeStr string, ctx context.Context, n *core.IpfsNode) (map[string if typeStr == "indirect" || typeStr == "all" { set := cid.NewSet() for _, k := range n.Pinning.RecursiveKeys() { - nd, err := n.DAG.Get(ctx, k) + links, err := n.DAG.GetLinks(ctx, k) if err != nil { return nil, err } - - err = dag.EnumerateChildren(n.Context(), n.DAG, nd, set.Visit, false) + err = dag.EnumerateChildren(n.Context(), n.DAG, links, set.Visit, false) if err != nil { return nil, err } diff --git a/core/core.go b/core/core.go index b4ee113c93e..e99f582e577 100644 --- a/core/core.go +++ b/core/core.go @@ -94,14 +94,15 @@ type IpfsNode struct { PrivateKey ic.PrivKey // the local node's private Key // Services - Peerstore pstore.Peerstore // storage for other Peer instances - Blockstore bstore.GCBlockstore // the block store (lower level) - Blocks *bserv.BlockService // the block service, get/add blocks. - DAG merkledag.DAGService // the merkle dag service, get/add objects. - Resolver *path.Resolver // the path resolution system - Reporter metrics.Reporter - Discovery discovery.Service - FilesRoot *mfs.Root + Peerstore pstore.Peerstore // storage for other Peer instances + Blockstore bstore.GCBlockstore // the block store (lower level) + Blocks *bserv.BlockService // the block service, get/add blocks. + DAG merkledag.DAGService // the merkle dag service, get/add objects. + LinkService merkledag.LinkService + Resolver *path.Resolver // the path resolution system + Reporter metrics.Reporter + Discovery discovery.Service + FilesRoot *mfs.Root // Online PeerHost p2phost.Host // the network host (server+client) diff --git a/core/corerepo/gc.go b/core/corerepo/gc.go index 148515b754d..dd4927d58db 100644 --- a/core/corerepo/gc.go +++ b/core/corerepo/gc.go @@ -90,7 +90,7 @@ func GarbageCollect(n *core.IpfsNode, ctx context.Context) error { if err != nil { return err } - rmed, err := gc.GC(ctx, n.Blockstore, n.Pinning, roots) + rmed, err := gc.GC(ctx, n.Blockstore, n.LinkService, n.Pinning, roots) if err != nil { return err } @@ -113,7 +113,7 @@ func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) (<-chan *KeyRemo if err != nil { return nil, err } - rmed, err := gc.GC(ctx, n.Blockstore, n.Pinning, roots) + rmed, err := gc.GC(ctx, n.Blockstore, n.LinkService, n.Pinning, roots) if err != nil { return nil, err } diff --git a/core/coreunix/add_test.go b/core/coreunix/add_test.go index f2319bc2db2..135ee8dbf8e 100644 --- a/core/coreunix/add_test.go +++ b/core/coreunix/add_test.go @@ -98,7 +98,7 @@ func TestAddGCLive(t *testing.T) { gcstarted := make(chan struct{}) go func() { defer close(gcstarted) - gcchan, err := gc.GC(context.Background(), node.Blockstore, node.Pinning, nil) + gcchan, err := gc.GC(context.Background(), node.Blockstore, node.LinkService, node.Pinning, nil) if err != nil { log.Error("GC ERROR:", err) errs <- err @@ -162,7 +162,7 @@ func TestAddGCLive(t *testing.T) { } set := cid.NewSet() - err = dag.EnumerateChildren(ctx, node.DAG, root, set.Visit, false) + err = dag.EnumerateChildren(ctx, node.DAG, root.Links, set.Visit, false) if err != nil { t.Fatal(err) } diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index bd415f49f58..f32104d8684 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -23,6 +23,10 @@ type DAGService interface { Get(context.Context, *cid.Cid) (*Node, error) Remove(*Node) error + // Return all links for a node, may be more effect than + // calling Get + GetLinks(context.Context, *cid.Cid) ([]*Link, error) + // GetDAG returns, in order, all the single leve child // nodes of the passed in node. GetMany(context.Context, []*cid.Cid) <-chan *NodeOption @@ -30,8 +34,14 @@ type DAGService interface { Batch() *Batch } -func NewDAGService(bs *bserv.BlockService) DAGService { - return &dagService{bs} +// A LinkService returns the links for a node if they are available +// locally without having to retrieve the block from the datastore. +type LinkService interface { + Get(*cid.Cid) ([]*Link, error) +} + +func NewDAGService(bs *bserv.BlockService) *dagService { + return &dagService{Blocks: bs} } // dagService is an IPFS Merkle DAG service. @@ -40,7 +50,8 @@ func NewDAGService(bs *bserv.BlockService) DAGService { // TODO: should cache Nodes that are in memory, and be // able to free some of them when vm pressure is high type dagService struct { - Blocks *bserv.BlockService + Blocks *bserv.BlockService + LinkService LinkService } // Add adds a node to the dagService, storing the block in the BlockService @@ -93,6 +104,20 @@ func (n *dagService) Get(ctx context.Context, c *cid.Cid) (*Node, error) { return res, nil } +func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*Link, error) { + if n.LinkService != nil { + links, err := n.LinkService.Get(c) + if err == nil { + return links, nil + } + } + node, err := n.Get(ctx, c) + if err != nil { + return nil, err + } + return node.Links, nil +} + func (n *dagService) Remove(nd *Node) error { return n.Blocks.DeleteObject(nd) } @@ -366,11 +391,11 @@ func legacyCidFromLink(lnk *Link) *cid.Cid { // EnumerateChildren will walk the dag below the given root node and add all // unseen children to the passed in set. // TODO: parallelize to avoid disk latency perf hits? -func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, visit func(*cid.Cid) bool, bestEffort bool) error { - for _, lnk := range root.Links { +func EnumerateChildren(ctx context.Context, ds DAGService, links []*Link, visit func(*cid.Cid) bool, bestEffort bool) error { + for _, lnk := range links { c := legacyCidFromLink(lnk) if visit(c) { - child, err := ds.Get(ctx, c) + children, err := ds.GetLinks(ctx, c) if err != nil { if bestEffort && err == ErrNotFound { continue @@ -378,7 +403,7 @@ func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, visit fun return err } } - err = EnumerateChildren(ctx, ds, child, visit, bestEffort) + err = EnumerateChildren(ctx, ds, children, visit, bestEffort) if err != nil { return err } diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index 339c056090a..f58bc56bd56 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -241,7 +241,7 @@ func TestFetchGraph(t *testing.T) { offline_ds := NewDAGService(bs) - err = EnumerateChildren(context.Background(), offline_ds, root, func(_ *cid.Cid) bool { return true }, false) + err = EnumerateChildren(context.Background(), offline_ds, root.Links, func(_ *cid.Cid) bool { return true }, false) if err != nil { t.Fatal(err) } @@ -258,7 +258,7 @@ func TestEnumerateChildren(t *testing.T) { } set := cid.NewSet() - err = EnumerateChildren(context.Background(), ds, root, set.Visit, false) + err = EnumerateChildren(context.Background(), ds, root.Links, set.Visit, false) if err != nil { t.Fatal(err) } diff --git a/pin/gc/gc.go b/pin/gc/gc.go index 7bfde538c10..ef57cf6ad53 100644 --- a/pin/gc/gc.go +++ b/pin/gc/gc.go @@ -24,11 +24,12 @@ var log = logging.Logger("gc") // // The routine then iterates over every block in the blockstore and // deletes any block that is not found in the marked set. -func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan key.Key, error) { +func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan key.Key, error) { unlocker := bs.GCLock() bsrv := bserv.New(bs, offline.Exchange(bs)) ds := dag.NewDAGService(bsrv) + ds.LinkService = ls gcs, err := ColoredSet(ctx, pn, ds, bestEffortRoots) if err != nil { @@ -74,13 +75,13 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner, bestEffortRo func Descendants(ctx context.Context, ds dag.DAGService, set key.KeySet, roots []*cid.Cid, bestEffort bool) error { for _, c := range roots { set.Add(key.Key(c.Hash())) - nd, err := ds.Get(ctx, c) + links, err := ds.GetLinks(ctx, c) if err != nil { return err } // EnumerateChildren recursively walks the dag and adds the keys to the given set - err = dag.EnumerateChildren(ctx, ds, nd, func(c *cid.Cid) bool { + err = dag.EnumerateChildren(ctx, ds, links, func(c *cid.Cid) bool { k := key.Key(c.Hash()) seen := set.Has(k) if seen { diff --git a/pin/pin.go b/pin/pin.go index 6edd66abc29..ab949ec40ed 100644 --- a/pin/pin.go +++ b/pin/pin.go @@ -279,12 +279,12 @@ func (p *pinner) isPinnedWithType(c *cid.Cid, mode PinMode) (string, bool, error // Default is Indirect for _, rc := range p.recursePin.Keys() { - rnd, err := p.dserv.Get(context.Background(), rc) + links, err := p.dserv.GetLinks(context.Background(), rc) if err != nil { return "", false, err } - has, err := hasChild(p.dserv, rnd, k) + has, err := hasChild(p.dserv, links, k) if err != nil { return "", false, err } @@ -317,11 +317,11 @@ func (p *pinner) CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error) { // Now walk all recursive pins to check for indirect pins var checkChildren func(*cid.Cid, *cid.Cid) error checkChildren = func(rk, parentKey *cid.Cid) error { - parent, err := p.dserv.Get(context.Background(), parentKey) + links, err := p.dserv.GetLinks(context.Background(), parentKey) if err != nil { return err } - for _, lnk := range parent.Links { + for _, lnk := range links { c := cid.NewCidV0(lnk.Hash) if toCheck.Has(c) { @@ -521,19 +521,19 @@ func (p *pinner) PinWithMode(c *cid.Cid, mode PinMode) { } } -func hasChild(ds mdag.DAGService, root *mdag.Node, child key.Key) (bool, error) { - for _, lnk := range root.Links { +func hasChild(ds mdag.DAGService, links []*mdag.Link, child key.Key) (bool, error) { + for _, lnk := range links { c := cid.NewCidV0(lnk.Hash) if key.Key(c.Hash()) == child { return true, nil } - nd, err := ds.Get(context.Background(), c) + children, err := ds.GetLinks(context.Background(), c) if err != nil { return false, err } - has, err := hasChild(ds, nd, child) + has, err := hasChild(ds, children, child) if err != nil { return false, err }