Skip to content

Commit

Permalink
Add DAGService.GetLinks() method and use it in the GC and elsewhere.
Browse files Browse the repository at this point in the history
This method will use the (also new) LinkService if it is available to
retrieving just the links for a MerkleDAG without necessary having to
retrieve the underlying block.

For now the main benefit is that the pinner will not break when a block
becomes invalid due to a change in the backing file.  This is possible
because the metadata for a block (that includes the Links) is stored
separately and thus always available even if the backing file changes.

License: MIT
Signed-off-by: Kevin Atkinson <k@kevina.org>
  • Loading branch information
kevina committed Oct 6, 2016
1 parent 67a1b3e commit 3899194
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 35 deletions.
5 changes: 2 additions & 3 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,11 @@ func pinLsAll(typeStr string, ctx context.Context, n *core.IpfsNode) (map[string
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range n.Pinning.RecursiveKeys() {
nd, err := n.DAG.Get(ctx, k)
links, err := n.DAG.GetLinks(ctx, k)
if err != nil {
return nil, err
}

err = dag.EnumerateChildren(n.Context(), n.DAG, nd, set.Visit, false)
err = dag.EnumerateChildren(n.Context(), n.DAG, links, set.Visit, false)
if err != nil {
return nil, err
}
Expand Down
17 changes: 9 additions & 8 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,15 @@ type IpfsNode struct {
PrivateKey ic.PrivKey // the local node's private Key

// Services
Peerstore pstore.Peerstore // storage for other Peer instances
Blockstore bstore.GCBlockstore // the block store (lower level)
Blocks *bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects.
Resolver *path.Resolver // the path resolution system
Reporter metrics.Reporter
Discovery discovery.Service
FilesRoot *mfs.Root
Peerstore pstore.Peerstore // storage for other Peer instances
Blockstore bstore.GCBlockstore // the block store (lower level)
Blocks *bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects.
LinkService merkledag.LinkService
Resolver *path.Resolver // the path resolution system
Reporter metrics.Reporter
Discovery discovery.Service
FilesRoot *mfs.Root

// Online
PeerHost p2phost.Host // the network host (server+client)
Expand Down
4 changes: 2 additions & 2 deletions core/corerepo/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func GarbageCollect(n *core.IpfsNode, ctx context.Context) error {
if err != nil {
return err
}
rmed, err := gc.GC(ctx, n.Blockstore, n.Pinning, roots)
rmed, err := gc.GC(ctx, n.Blockstore, n.LinkService, n.Pinning, roots)
if err != nil {
return err
}
Expand All @@ -113,7 +113,7 @@ func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) (<-chan *KeyRemo
if err != nil {
return nil, err
}
rmed, err := gc.GC(ctx, n.Blockstore, n.Pinning, roots)
rmed, err := gc.GC(ctx, n.Blockstore, n.LinkService, n.Pinning, roots)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions core/coreunix/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestAddGCLive(t *testing.T) {
gcstarted := make(chan struct{})
go func() {
defer close(gcstarted)
gcchan, err := gc.GC(context.Background(), node.Blockstore, node.Pinning, nil)
gcchan, err := gc.GC(context.Background(), node.Blockstore, node.LinkService, node.Pinning, nil)
if err != nil {
log.Error("GC ERROR:", err)
errs <- err
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestAddGCLive(t *testing.T) {
}

set := cid.NewSet()
err = dag.EnumerateChildren(ctx, node.DAG, root, set.Visit, false)
err = dag.EnumerateChildren(ctx, node.DAG, root.Links, set.Visit, false)
if err != nil {
t.Fatal(err)
}
Expand Down
39 changes: 32 additions & 7 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,25 @@ type DAGService interface {
Get(context.Context, *cid.Cid) (*Node, error)
Remove(*Node) error

// Return all links for a node, may be more effect than
// calling Get
GetLinks(context.Context, *cid.Cid) ([]*Link, error)

// GetDAG returns, in order, all the single leve child
// nodes of the passed in node.
GetMany(context.Context, []*cid.Cid) <-chan *NodeOption

Batch() *Batch
}

func NewDAGService(bs *bserv.BlockService) DAGService {
return &dagService{bs}
// A LinkService returns the links for a node if they are available
// locally without having to retrieve the block from the datastore.
type LinkService interface {
Get(*cid.Cid) ([]*Link, error)
}

func NewDAGService(bs *bserv.BlockService) *dagService {
return &dagService{Blocks: bs}
}

// dagService is an IPFS Merkle DAG service.
Expand All @@ -40,7 +50,8 @@ func NewDAGService(bs *bserv.BlockService) DAGService {
// TODO: should cache Nodes that are in memory, and be
// able to free some of them when vm pressure is high
type dagService struct {
Blocks *bserv.BlockService
Blocks *bserv.BlockService
LinkService LinkService
}

// Add adds a node to the dagService, storing the block in the BlockService
Expand Down Expand Up @@ -93,6 +104,20 @@ func (n *dagService) Get(ctx context.Context, c *cid.Cid) (*Node, error) {
return res, nil
}

func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*Link, error) {
if n.LinkService != nil {
links, err := n.LinkService.Get(c)
if err == nil {
return links, nil
}
}
node, err := n.Get(ctx, c)
if err != nil {
return nil, err
}
return node.Links, nil
}

func (n *dagService) Remove(nd *Node) error {
return n.Blocks.DeleteObject(nd)
}
Expand Down Expand Up @@ -366,19 +391,19 @@ func legacyCidFromLink(lnk *Link) *cid.Cid {
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, visit func(*cid.Cid) bool, bestEffort bool) error {
for _, lnk := range root.Links {
func EnumerateChildren(ctx context.Context, ds DAGService, links []*Link, visit func(*cid.Cid) bool, bestEffort bool) error {
for _, lnk := range links {
c := legacyCidFromLink(lnk)
if visit(c) {
child, err := ds.Get(ctx, c)
children, err := ds.GetLinks(ctx, c)
if err != nil {
if bestEffort && err == ErrNotFound {
continue
} else {
return err
}
}
err = EnumerateChildren(ctx, ds, child, visit, bestEffort)
err = EnumerateChildren(ctx, ds, children, visit, bestEffort)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions merkledag/merkledag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func TestFetchGraph(t *testing.T) {

offline_ds := NewDAGService(bs)

err = EnumerateChildren(context.Background(), offline_ds, root, func(_ *cid.Cid) bool { return true }, false)
err = EnumerateChildren(context.Background(), offline_ds, root.Links, func(_ *cid.Cid) bool { return true }, false)
if err != nil {
t.Fatal(err)
}
Expand All @@ -258,7 +258,7 @@ func TestEnumerateChildren(t *testing.T) {
}

set := cid.NewSet()
err = EnumerateChildren(context.Background(), ds, root, set.Visit, false)
err = EnumerateChildren(context.Background(), ds, root.Links, set.Visit, false)
if err != nil {
t.Fatal(err)
}
Expand Down
7 changes: 4 additions & 3 deletions pin/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ var log = logging.Logger("gc")
//
// The routine then iterates over every block in the blockstore and
// deletes any block that is not found in the marked set.
func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan key.Key, error) {
func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan key.Key, error) {
unlocker := bs.GCLock()

bsrv := bserv.New(bs, offline.Exchange(bs))
ds := dag.NewDAGService(bsrv)
ds.LinkService = ls

gcs, err := ColoredSet(ctx, pn, ds, bestEffortRoots)
if err != nil {
Expand Down Expand Up @@ -74,13 +75,13 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner, bestEffortRo
func Descendants(ctx context.Context, ds dag.DAGService, set key.KeySet, roots []*cid.Cid, bestEffort bool) error {
for _, c := range roots {
set.Add(key.Key(c.Hash()))
nd, err := ds.Get(ctx, c)
links, err := ds.GetLinks(ctx, c)
if err != nil {
return err
}

// EnumerateChildren recursively walks the dag and adds the keys to the given set
err = dag.EnumerateChildren(ctx, ds, nd, func(c *cid.Cid) bool {
err = dag.EnumerateChildren(ctx, ds, links, func(c *cid.Cid) bool {
k := key.Key(c.Hash())
seen := set.Has(k)
if seen {
Expand Down
16 changes: 8 additions & 8 deletions pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,12 @@ func (p *pinner) isPinnedWithType(c *cid.Cid, mode PinMode) (string, bool, error

// Default is Indirect
for _, rc := range p.recursePin.Keys() {
rnd, err := p.dserv.Get(context.Background(), rc)
links, err := p.dserv.GetLinks(context.Background(), rc)
if err != nil {
return "", false, err
}

has, err := hasChild(p.dserv, rnd, k)
has, err := hasChild(p.dserv, links, k)
if err != nil {
return "", false, err
}
Expand Down Expand Up @@ -317,11 +317,11 @@ func (p *pinner) CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error) {
// Now walk all recursive pins to check for indirect pins
var checkChildren func(*cid.Cid, *cid.Cid) error
checkChildren = func(rk, parentKey *cid.Cid) error {
parent, err := p.dserv.Get(context.Background(), parentKey)
links, err := p.dserv.GetLinks(context.Background(), parentKey)
if err != nil {
return err
}
for _, lnk := range parent.Links {
for _, lnk := range links {
c := cid.NewCidV0(lnk.Hash)

if toCheck.Has(c) {
Expand Down Expand Up @@ -521,19 +521,19 @@ func (p *pinner) PinWithMode(c *cid.Cid, mode PinMode) {
}
}

func hasChild(ds mdag.DAGService, root *mdag.Node, child key.Key) (bool, error) {
for _, lnk := range root.Links {
func hasChild(ds mdag.DAGService, links []*mdag.Link, child key.Key) (bool, error) {
for _, lnk := range links {
c := cid.NewCidV0(lnk.Hash)
if key.Key(c.Hash()) == child {
return true, nil
}

nd, err := ds.Get(context.Background(), c)
children, err := ds.GetLinks(context.Background(), c)
if err != nil {
return false, err
}

has, err := hasChild(ds, nd, child)
has, err := hasChild(ds, children, child)
if err != nil {
return false, err
}
Expand Down

0 comments on commit 3899194

Please sign in to comment.