diff --git a/pin/set.go b/pin/set.go index ec08971e9a6..9ed1559907b 100644 --- a/pin/set.go +++ b/pin/set.go @@ -2,15 +2,14 @@ package pin import ( "bytes" + "context" "crypto/rand" "encoding/binary" "errors" "fmt" "hash/fnv" "sort" - "unsafe" - "context" "github.com/ipfs/go-ipfs/merkledag" "github.com/ipfs/go-ipfs/pin/internal/pb" "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" @@ -19,8 +18,11 @@ import ( ) const ( + // defaultFanout specifies the default number of fan-out links per layer defaultFanout = 256 - maxItems = 8192 + + // maxItems is the maximum number of items that will fit in a single bucket + maxItems = 8192 ) func randomSeed() (uint32, error) { @@ -40,36 +42,12 @@ func hash(seed uint32, c *cid.Cid) uint32 { return h.Sum32() } -type itemIterator func() (c *cid.Cid, data []byte, ok bool) +type itemIterator func() (c *cid.Cid, ok bool) type keyObserver func(*cid.Cid) -// refcount is the marshaled format of refcounts. It may change -// between versions; this is valid for version 1. Changing it may -// become desirable if there are many links with refcount > 255. -// -// There are two guarantees that need to be preserved, if this is -// changed: -// -// - the marshaled format is of fixed size, matching -// unsafe.Sizeof(refcount(0)) -// - methods of refcount handle endianness, and may -// in later versions need encoding/binary. -type refcount uint8 - -func (r refcount) Bytes() []byte { - return []byte{byte(r)} -} - -// readRefcount returns the idx'th refcount in []byte, which is -// assumed to be a sequence of refcount.Bytes results. -func (r *refcount) ReadFromIdx(buf []byte, idx int) { - *r = refcount(buf[idx]) -} - type sortByHash struct { links []*merkledag.Link - data []byte } func (s sortByHash) Len() int { @@ -82,13 +60,6 @@ func (s sortByHash) Less(a, b int) bool { func (s sortByHash) Swap(a, b int) { s.links[a], s.links[b] = s.links[b], s.links[a] - if len(s.data) != 0 { - const n = int(unsafe.Sizeof(refcount(0))) - tmp := make([]byte, n) - copy(tmp, s.data[a*n:a*n+n]) - copy(s.data[a*n:a*n+n], s.data[b*n:b*n+n]) - copy(s.data[b*n:b*n+n], tmp) - } } func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint64, iter itemIterator, internalKeys keyObserver) (*merkledag.Node, error) { @@ -96,13 +67,15 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint if err != nil { return nil, err } - n := &merkledag.Node{ - Links: make([]*merkledag.Link, 0, defaultFanout+maxItems), - } + + n := &merkledag.Node{Links: make([]*merkledag.Link, 0, defaultFanout+maxItems)} for i := 0; i < defaultFanout; i++ { n.Links = append(n.Links, &merkledag.Link{Hash: emptyKey.Hash()}) } + + // add emptyKey to our set of internal pinset objects internalKeys(emptyKey) + hdr := &pb.Set{ Version: proto.Uint32(1), Fanout: proto.Uint32(defaultFanout), @@ -111,97 +84,106 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint if err := writeHdr(n, hdr); err != nil { return nil, err } - hdrLen := len(n.Data()) if estimatedLen < maxItems { // it'll probably fit for i := 0; i < maxItems; i++ { - k, data, ok := iter() + k, ok := iter() if !ok { // all done break } n.Links = append(n.Links, &merkledag.Link{Hash: k.Hash()}) - n.SetData(append(n.Data(), data...)) } // sort by hash, also swap item Data s := sortByHash{ links: n.Links[defaultFanout:], - data: n.Data()[hdrLen:], } sort.Stable(s) } - // wasteful but simple - type item struct { - c *cid.Cid - data []byte - } - hashed := make(map[uint32][]item) + hashed := make([][]*cid.Cid, defaultFanout) for { - k, data, ok := iter() + // This loop essentially enumerates every single item in the set + // and maps them all into a set of buckets. Each bucket will be recursively + // turned into its own sub-set, and so on down the chain. Each sub-set + // gets added to the dagservice, and put into its place in a set nodes + // links array. + // + // Previously, the bucket was selected by taking an int32 from the hash of + // the input key + seed. This was erroneous as we would later be assigning + // the created sub-sets into an array of length 256 by the modulus of the + // int32 hash value with 256. This resulted in overwriting existing sub-sets + // and losing pins. The fix (a few lines down from this comment), is to + // map the hash value down to the 8 bit keyspace here while creating the + // buckets. This way, we avoid any overlapping later on. + k, ok := iter() if !ok { break } - h := hash(seed, k) - hashed[h] = append(hashed[h], item{k, data}) + h := hash(seed, k) % defaultFanout + hashed[h] = append(hashed[h], k) } + for h, items := range hashed { - childIter := func() (c *cid.Cid, data []byte, ok bool) { - if len(items) == 0 { - return nil, nil, false - } - first := items[0] - items = items[1:] - return first.c, first.data, true + if len(items) == 0 { + // recursion base case + continue } + + childIter := getCidListIterator(items) + + // recursively create a pinset from the items for this bucket index child, err := storeItems(ctx, dag, uint64(len(items)), childIter, internalKeys) if err != nil { return nil, err } + size, err := child.Size() if err != nil { return nil, err } + childKey, err := dag.Add(child) if err != nil { return nil, err } + internalKeys(childKey) - l := &merkledag.Link{ - Name: "", + + // overwrite the 'empty key' in the existing links array + n.Links[h] = &merkledag.Link{ Hash: childKey.Hash(), Size: size, } - n.Links[int(h%defaultFanout)] = l } return n, nil } -func readHdr(n *merkledag.Node) (*pb.Set, []byte, error) { +func readHdr(n *merkledag.Node) (*pb.Set, error) { hdrLenRaw, consumed := binary.Uvarint(n.Data()) if consumed <= 0 { - return nil, nil, errors.New("invalid Set header length") + return nil, errors.New("invalid Set header length") } - buf := n.Data()[consumed:] - if hdrLenRaw > uint64(len(buf)) { - return nil, nil, errors.New("impossibly large Set header length") + + pbdata := n.Data()[consumed:] + if hdrLenRaw > uint64(len(pbdata)) { + return nil, errors.New("impossibly large Set header length") } // as hdrLenRaw was <= an int, we now know it fits in an int hdrLen := int(hdrLenRaw) var hdr pb.Set - if err := proto.Unmarshal(buf[:hdrLen], &hdr); err != nil { - return nil, nil, err + if err := proto.Unmarshal(pbdata[:hdrLen], &hdr); err != nil { + return nil, err } - buf = buf[hdrLen:] if v := hdr.GetVersion(); v != 1 { - return nil, nil, fmt.Errorf("unsupported Set version: %d", v) + return nil, fmt.Errorf("unsupported Set version: %d", v) } if uint64(hdr.GetFanout()) > uint64(len(n.Links)) { - return nil, nil, errors.New("impossibly large Fanout") + return nil, errors.New("impossibly large Fanout") } - return &hdr, buf, nil + return &hdr, nil } func writeHdr(n *merkledag.Node, hdr *pb.Set) error { @@ -209,24 +191,31 @@ func writeHdr(n *merkledag.Node, hdr *pb.Set) error { if err != nil { return err } - n.SetData(make([]byte, binary.MaxVarintLen64, binary.MaxVarintLen64+len(hdrData))) - written := binary.PutUvarint(n.Data(), uint64(len(hdrData))) - n.SetData(n.Data()[:written]) - n.SetData(append(n.Data(), hdrData...)) + + // make enough space for the length prefix and the marshalled header data + data := make([]byte, binary.MaxVarintLen64, binary.MaxVarintLen64+len(hdrData)) + + // write the uvarint length of the header data + uvarlen := binary.PutUvarint(data, uint64(len(hdrData))) + + // append the actual protobuf data *after* the length value we wrote + data = append(data[:uvarlen], hdrData...) + + n.SetData(data) return nil } -type walkerFunc func(buf []byte, idx int, link *merkledag.Link) error +type walkerFunc func(idx int, link *merkledag.Link) error func walkItems(ctx context.Context, dag merkledag.DAGService, n *merkledag.Node, fn walkerFunc, children keyObserver) error { - hdr, buf, err := readHdr(n) + hdr, err := readHdr(n) if err != nil { return err } // readHdr guarantees fanout is a safe value fanout := hdr.GetFanout() for i, l := range n.Links[fanout:] { - if err := fn(buf, i, l); err != nil { + if err := fn(i, l); err != nil { return err } } @@ -262,7 +251,7 @@ func loadSet(ctx context.Context, dag merkledag.DAGService, root *merkledag.Node } var res []*cid.Cid - walk := func(buf []byte, idx int, link *merkledag.Link) error { + walk := func(idx int, link *merkledag.Link) error { res = append(res, cid.NewCidV0(link.Hash)) return nil } @@ -272,40 +261,21 @@ func loadSet(ctx context.Context, dag merkledag.DAGService, root *merkledag.Node return res, nil } -func loadMultiset(ctx context.Context, dag merkledag.DAGService, root *merkledag.Node, name string, internalKeys keyObserver) (map[key.Key]uint64, error) { - l, err := root.GetNodeLink(name) - if err != nil { - return nil, fmt.Errorf("Failed to get link %s: %v", name, err) - } - c := cid.NewCidV0(l.Hash) - internalKeys(c) - n, err := l.GetNode(ctx, dag) - if err != nil { - return nil, fmt.Errorf("Failed to get node from link %s: %v", name, err) - } - - refcounts := make(map[key.Key]uint64) - walk := func(buf []byte, idx int, link *merkledag.Link) error { - var r refcount - r.ReadFromIdx(buf, idx) - refcounts[key.Key(link.Hash)] += uint64(r) - return nil - } - if err := walkItems(ctx, dag, n, walk, internalKeys); err != nil { - return nil, err - } - return refcounts, nil -} - -func storeSet(ctx context.Context, dag merkledag.DAGService, cids []*cid.Cid, internalKeys keyObserver) (*merkledag.Node, error) { - iter := func() (c *cid.Cid, data []byte, ok bool) { +func getCidListIterator(cids []*cid.Cid) itemIterator { + return func() (c *cid.Cid, ok bool) { if len(cids) == 0 { - return nil, nil, false + return nil, false } + first := cids[0] cids = cids[1:] - return first, nil, true + return first, true } +} + +func storeSet(ctx context.Context, dag merkledag.DAGService, cids []*cid.Cid, internalKeys keyObserver) (*merkledag.Node, error) { + iter := getCidListIterator(cids) + n, err := storeItems(ctx, dag, uint64(len(cids)), iter, internalKeys) if err != nil { return nil, err diff --git a/pin/set_test.go b/pin/set_test.go index f48fc9d1753..8c60633b48d 100644 --- a/pin/set_test.go +++ b/pin/set_test.go @@ -1,13 +1,66 @@ package pin -import "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" +import ( + "context" + "fmt" + "os" + "testing" -func ignoreKeys(key.Key) {} + dag "github.com/ipfs/go-ipfs/merkledag" + mdtest "github.com/ipfs/go-ipfs/merkledag/test" -func copyMap(m map[key.Key]uint16) map[key.Key]uint64 { - c := make(map[key.Key]uint64, len(m)) - for k, v := range m { - c[k] = uint64(v) + cid "gx/ipfs/QmakyCk6Vnn16WEKjbkxieZmM2YLTzkFWizbmGowoYPjro/go-cid" +) + +func ignoreCids(_ *cid.Cid) {} + +func TestSet(t *testing.T) { + ds := mdtest.Mock() + limit := 10000 // 10000 reproduces the pinloss issue fairly reliably + + if os.Getenv("STRESS_IT_OUT_YO") != "" { + limit = 10000000 + } + var inputs []*cid.Cid + for i := 0; i < limit; i++ { + c, err := ds.Add(dag.NodeWithData([]byte(fmt.Sprint(i)))) + if err != nil { + t.Fatal(err) + } + + inputs = append(inputs, c) + } + + out, err := storeSet(context.Background(), ds, inputs, ignoreCids) + if err != nil { + t.Fatal(err) + } + + // weird wrapper node because loadSet expects us to pass an + // object pointing to multiple named sets + setroot := &dag.Node{} + err = setroot.AddNodeLinkClean("foo", out) + if err != nil { + t.Fatal(err) + } + + outset, err := loadSet(context.Background(), ds, setroot, "foo", ignoreCids) + if err != nil { + t.Fatal(err) + } + + if len(outset) != limit { + t.Fatal("got wrong number", len(outset), limit) + } + + seen := cid.NewSet() + for _, c := range outset { + seen.Add(c) + } + + for _, c := range inputs { + if !seen.Has(c) { + t.Fatalf("expected to have %s, didnt find it") + } } - return c }