From 207f4818cb3399f85d3a32d0147c25a7972925aa Mon Sep 17 00:00:00 2001 From: b5 Date: Fri, 30 Oct 2020 14:47:10 -0400 Subject: [PATCH] perf(CARStream): push blocks in one large CAR archive stream --- dsync/dsync.go | 56 +++++++++++-- dsync/http.go | 85 ++++++++++++++++++-- dsync/http_test.go | 9 ++- dsync/p2p.go | 23 ++++-- dsync/push.go | 27 +++++++ dsync/session.go | 21 +++++ dsync/stream.go | 182 +++++++++++++++++++++++++++++++++++++++++++ dsync/stream_test.go | 99 +++++++++++++++++++++++ go.mod | 1 + 9 files changed, 479 insertions(+), 24 deletions(-) create mode 100644 dsync/stream.go create mode 100644 dsync/stream_test.go diff --git a/dsync/dsync.go b/dsync/dsync.go index eae5ffc..c50c9d7 100644 --- a/dsync/dsync.go +++ b/dsync/dsync.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "fmt" + "io" "io/ioutil" "net/http" "strings" @@ -30,11 +31,16 @@ import ( path "github.com/ipfs/interface-go-ipfs-core/path" host "github.com/libp2p/go-libp2p-core/host" peer "github.com/libp2p/go-libp2p-core/peer" + protocol "github.com/libp2p/go-libp2p-core/protocol" "github.com/qri-io/dag" ) var log = golog.Logger("dsync") +func init() { + golog.SetLogLevel("dsync", "debug") +} + const ( // default to parallelism of 3. So far 4 was enough to blow up a std k8s pod running IPFS :( defaultPushParallelism = 1 @@ -47,6 +53,16 @@ const ( maxRetries = 80 ) +var ( + // ErrRemoveNotSupported is the error value returned by remotes that don't + // support delete operations + ErrRemoveNotSupported = fmt.Errorf("remove is not supported") + // ErrUnknownProtocolVersion is the error for when the version of the remote + // protocol is unknown, usually because the handshake with the the remote + // hasn't happened yet + ErrUnknownProtocolVersion = fmt.Errorf("unknown protocol version") +) + // DagSyncable is a source that can be synced to & from. dsync requests automate // calls to this interface with higher-order functions like Push and Pull // @@ -58,25 +74,24 @@ type DagSyncable interface { // The remote will return a delta manifest of blocks the remote needs // and a session id that must be sent with each block NewReceiveSession(info *dag.Info, pinOnComplete bool, meta map[string]string) (sid string, diff *dag.Manifest, err error) + // ProtocolVersion indicates the version of dsync the remote speaks, only + // available after a handshake is established. Calling this method before a + // handshake must return ErrUnknownProtocolVersion + ProtocolVersion() (protocol.ID, error) + // ReceiveBlock places a block on the remote ReceiveBlock(sid, hash string, data []byte) ReceiveResponse - // GetDagInfo asks the remote for info specified by a the root identifier // string of a DAG GetDagInfo(ctx context.Context, cidStr string, meta map[string]string) (info *dag.Info, err error) // GetBlock gets a block of data from the remote GetBlock(ctx context.Context, hash string) (rawdata []byte, err error) - // RemoveCID asks the remote to remove a cid. Supporting deletes are optional. // DagSyncables that don't implement DeleteCID must return // ErrDeleteNotSupported RemoveCID(ctx context.Context, cidStr string, meta map[string]string) (err error) } -// ErrRemoveNotSupported is the error value returned by remotes that don't -// support delete operations -var ErrRemoveNotSupported = fmt.Errorf("remove is not supported") - // Hook is a function that a dsync instance will call at specified points in the // sync lifecycle type Hook func(ctx context.Context, info dag.Info, meta map[string]string) error @@ -263,6 +278,11 @@ func New(localNodes ipld.NodeGetter, blockStore coreiface.BlockAPI, opts ...func return ds, nil } +// ProtocolVersion reports the current procotol version for dsync +func (ds *Dsync) ProtocolVersion() (protocol.ID, error) { + return DsyncProtocolID, nil +} + // StartRemote makes dsync available for remote requests, starting an HTTP // server if a listening address is specified. // StartRemote returns immediately. Stop remote service by cancelling @@ -387,13 +407,12 @@ func (ds *Dsync) ReceiveBlock(sid, hash string, data []byte) ReceiveResponse { return ReceiveResponse{ Hash: hash, Status: StatusErrored, - Err: fmt.Errorf("sid '%s' not found", sid), + Err: fmt.Errorf("sid %q not found", sid), } } // ReceiveBlock accepts a block from the sender, placing it in the local blockstore res := sess.ReceiveBlock(hash, bytes.NewReader(data)) - log.Debugf("received block: %s", res.Hash) // check if transfer has completed, if so finalize it, but only once if res.Status == StatusOk && sess.IsFinalizedOnce() { @@ -409,6 +428,27 @@ func (ds *Dsync) ReceiveBlock(sid, hash string, data []byte) ReceiveResponse { return res } +// ReceiveBlocks ingests blocks being pushed into the local store +func (ds *Dsync) ReceiveBlocks(ctx context.Context, sid string, r io.Reader) error { + sess, ok := ds.sessionPool[sid] + if !ok { + log.Debugf("couldn't find session. sid=%q", sid) + return fmt.Errorf("sid %q not found", sid) + } + + if err := sess.ReceiveBlocks(ctx, r); err != nil { + log.Debugf("error receiving blocks. err=%q", err) + return err + } + + if err := ds.finalizeReceive(sess); err != nil { + log.Debugf("error finalizing receive. err=%q", err) + return err + } + + return nil +} + // TODO (b5): needs to be called if someone tries to sync a DAG that requires // no blocks for an early termination, ensuring that we cache a dag.Info in // that case as well diff --git a/dsync/http.go b/dsync/http.go index 91abba1..0f74410 100644 --- a/dsync/http.go +++ b/dsync/http.go @@ -9,17 +9,28 @@ import ( "net/http" "net/url" + "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" + protocol "github.com/libp2p/go-libp2p-core/protocol" "github.com/qri-io/dag" ) +const ( + httpProtcolIDHeader = "dsync-version" +) + // HTTPClient is the request side of doing dsync over HTTP type HTTPClient struct { - URL string + URL string + remProtocolID protocol.ID } -// HTTPClient exists to satisfy the DaySyncable interface on the client side -// of a transfer -var _ DagSyncable = (*HTTPClient)(nil) +var ( + // HTTPClient exists to satisfy the DaySyncable interface on the client side + // of a transfer + _ DagSyncable = (*HTTPClient)(nil) + _ DagStreamable = (*HTTPClient)(nil) +) // NewReceiveSession initiates a session for pushing blocks to a remote. // It sends a Manifest to a remote source over HTTP @@ -61,12 +72,69 @@ func (rem *HTTPClient) NewReceiveSession(info *dag.Info, pinOnComplete bool, met } sid = res.Header.Get("sid") + + protocolIDHeaderStr := res.Header.Get(httpProtcolIDHeader) + if protocolIDHeaderStr == "" { + // protocol ID header only exists in version 0.2.0 and up, when header isn't + // present assume version 0.1.1, the latest version before header was set + // 0.1.1 is wire-compatible with all lower versions of dsync + rem.remProtocolID = protocol.ID("/dsync/0.1.1") + } else { + rem.remProtocolID = protocol.ID(protocolIDHeaderStr) + } + diff = &dag.Manifest{} err = json.NewDecoder(res.Body).Decode(diff) return } +// ProtocolVersion indicates the version of dsync the remote speaks, only +// available after a handshake is established +func (rem *HTTPClient) ProtocolVersion() (protocol.ID, error) { + if string(rem.remProtocolID) == "" { + return "", ErrUnknownProtocolVersion + } + return rem.remProtocolID, nil +} + +// PutBlocks streams a manifest of blocks to the remote in one HTTP call +func (rem *HTTPClient) PutBlocks(ctx context.Context, sid string, ng ipld.NodeGetter, mfst *dag.Manifest, progCh chan cid.Cid) error { + r, err := NewManifestCARReader(ctx, ng, mfst, progCh) + if err != nil { + log.Debugf("err creating CARReader err=%q ", err) + return err + } + + req, err := http.NewRequest("PATCH", fmt.Sprintf("%s?sid=%s", rem.URL, sid), r) + if err != nil { + log.Debugf("err creating PATCH HTTP request err=%q ", err) + return err + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + log.Debugf("err doing HTTP request. err=%q", err) + return err + } + + if res.StatusCode != http.StatusOK { + var msg string + if data, err := ioutil.ReadAll(res.Body); err == nil { + msg = string(data) + } + log.Debugf("error response from remote. err=%q", msg) + return fmt.Errorf("remote response: %d %s", res.StatusCode, msg) + } + + return nil +} + +// FetchBlocks streams a manifest of requested blocks +func (rem *HTTPClient) FetchBlocks(ctx context.Context, sid string, mfst *dag.Manifest, progCh chan cid.Cid) error { + return fmt.Errorf("not implemented") +} + // ReceiveBlock asks a remote to receive a block over HTTP func (rem *HTTPClient) ReceiveBlock(sid, hash string, data []byte) ReceiveResponse { url := fmt.Sprintf("%s?sid=%s&hash=%s", rem.URL, sid, hash) @@ -243,10 +311,10 @@ func HTTPRemoteHandler(ds *Dsync) http.HandlerFunc { return } + w.Header().Set(httpProtcolIDHeader, string(DsyncProtocolID)) w.Header().Set("sid", sid) w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(diff) - case "PUT": data, err := ioutil.ReadAll(r.Body) if err != nil { @@ -266,6 +334,13 @@ func HTTPRemoteHandler(ds *Dsync) http.HandlerFunc { } else { w.WriteHeader(http.StatusOK) } + case "PATCH": + if err := ds.ReceiveBlocks(r.Context(), r.FormValue("sid"), r.Body); err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } + w.WriteHeader(http.StatusOK) case "GET": mfstID := r.FormValue("manifest") blockID := r.FormValue("block") diff --git a/dsync/http_test.go b/dsync/http_test.go index 187989e..e03edfb 100644 --- a/dsync/http_test.go +++ b/dsync/http_test.go @@ -26,8 +26,8 @@ func TestSyncHTTP(t *testing.T) { t.Fatal(err) } - // yooooooooooooooooooooo - f := files.NewReaderFile(ioutil.NopCloser(strings.NewReader("y" + strings.Repeat("o", 350)))) + // yooooooooooooooooooooo... + f := files.NewReaderFile(ioutil.NopCloser(strings.NewReader("y" + strings.Repeat("o", 3500000)))) path, err := a.Unixfs().Add(ctx, f) if err != nil { t.Fatal(err) @@ -67,13 +67,14 @@ func TestSyncHTTP(t *testing.T) { cli := &HTTPClient{URL: s.URL + "/dsync"} + fmt.Printf("pushing %#v\n", info.Manifest.Nodes) push, err := NewPush(aGetter, info, cli, false) if err != nil { t.Fatal(err) } if err := push.Do(ctx); err != nil { - t.Error(err) + t.Fatal(err) } // b should now be able to generate a manifest @@ -85,7 +86,7 @@ func TestSyncHTTP(t *testing.T) { <-onCompleteCalled if err := cli.RemoveCID(ctx, info.RootCID().String(), nil); err != nil { - t.Error(err) + t.Fatal(err) } <-removeCheckCalled diff --git a/dsync/p2p.go b/dsync/p2p.go index f8797f4..ae88b7a 100644 --- a/dsync/p2p.go +++ b/dsync/p2p.go @@ -15,10 +15,8 @@ import ( ) const ( - // DsyncProtocolID is the dsyc p2p Protocol Identifier - DsyncProtocolID = protocol.ID("/dsync") - // DsyncServiceTag tags the type & version of the dsync service - DsyncServiceTag = "dsync/0.1.1-dev" + // DsyncProtocolID is the dsyc p2p Protocol Identifier & version tag + DsyncProtocolID = protocol.ID("/dsync/0.2.0") // default value to give qri peer connections in connmanager, one hunnit dsyncSupportValue = 100 ) @@ -69,6 +67,15 @@ func (c *p2pClient) NewReceiveSession(info *dag.Info, pinOnComplete bool, meta m return sid, diff, err } +// ProtocolVersion indicates the version of dsync the remote speaks, only +// available after a handshake is established +func (c *p2pClient) ProtocolVersion() (protocol.ID, error) { + if string(c.remoteProtocolID) == "" { + return "", ErrUnknownProtocolVersion + } + return c.remoteProtocolID, nil +} + // ReceiveBlock places a block on the remote func (c *p2pClient) ReceiveBlock(sid, cidStr string, data []byte) ReceiveResponse { msg := p2putil.NewMessage(c.host.ID(), mtReceiveBlock, data).WithHeaders( @@ -162,9 +169,10 @@ func (c *p2pClient) RemoveCID(ctx context.Context, cidStr string, meta map[strin // p2pHandler implements dsync as a libp2p protocol handler type p2pHandler struct { - dsync *Dsync - host host.Host - handlers map[p2putil.MsgType]p2putil.HandlerFunc + dsync *Dsync + host host.Host + handlers map[p2putil.MsgType]p2putil.HandlerFunc + remoteProtocolID protocol.ID } // newp2pHandler creates a p2p remote stream handler from a dsync.Remote @@ -191,6 +199,7 @@ func (c *p2pHandler) sendMessage(ctx context.Context, msg p2putil.Message, pid p if err != nil { return p2putil.Message{}, fmt.Errorf("error opening stream: %s", err.Error()) } + c.remoteProtocolID = s.Protocol() defer s.Close() // now that we have a confirmed working connection diff --git a/dsync/push.go b/dsync/push.go index 5accea3..976146c 100644 --- a/dsync/push.go +++ b/dsync/push.go @@ -124,6 +124,33 @@ func (snd *Push) do(ctx context.Context) (err error) { return nil } + protoID, err := snd.remote.ProtocolVersion() + if err != nil { + return err + } + + if protocolSupportsDagStreaming(protoID) { + progCh := make(chan cid.Cid) + + go func() { + for id := range progCh { + // this is the only place we should modify progress after creation + idStr := id.String() + log.Debugf("sent block %s", idStr) + for i, hash := range snd.info.Manifest.Nodes { + if idStr == hash { + snd.prog[i] = 100 + } + } + go snd.completionChanged() + } + }() + + if str, ok := snd.remote.(DagStreamable); ok { + return str.PutBlocks(ctx, snd.sid, snd.lng, snd.diff, progCh) + } + } + // create senders sends := make([]sender, snd.parallelism) for i := 0; i < snd.parallelism; i++ { diff --git a/dsync/session.go b/dsync/session.go index 65559fc..ac7d421 100644 --- a/dsync/session.go +++ b/dsync/session.go @@ -7,6 +7,7 @@ import ( "math/rand" "sync" + "github.com/ipfs/go-cid" ipld "github.com/ipfs/go-ipld-format" coreiface "github.com/ipfs/interface-go-ipfs-core" "github.com/qri-io/dag" @@ -93,6 +94,26 @@ func (s *session) ReceiveBlock(hash string, data io.Reader) ReceiveResponse { } } +func (s *session) ReceiveBlocks(ctx context.Context, r io.Reader) error { + progCh := make(chan cid.Cid) + + go func() { + for id := range progCh { + idStr := id.String() + // this should be the only place that modifies progress + for i, h := range s.info.Manifest.Nodes { + if idStr == h { + s.prog[i] = 100 + } + } + go s.completionChanged() + } + }() + + _, err := AddAllFromCARReader(ctx, s.bapi, r, progCh) + return err +} + // Complete returns if this receive session is finished or not func (s *session) Complete() bool { return s.prog.Complete() diff --git a/dsync/stream.go b/dsync/stream.go new file mode 100644 index 0000000..b37f57c --- /dev/null +++ b/dsync/stream.go @@ -0,0 +1,182 @@ +package dsync + +import ( + "bytes" + "context" + "fmt" + "io" + "strconv" + "strings" + + cid "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" + coreiface "github.com/ipfs/interface-go-ipfs-core" + "github.com/ipld/go-car" + carutil "github.com/ipld/go-car/util" + protocol "github.com/libp2p/go-libp2p-core/protocol" + "github.com/qri-io/dag" +) + +// DagStreamable is an interface for sending and fetching all blocks in a given +// manifest in one trip +type DagStreamable interface { + PutBlocks(ctx context.Context, sid string, ng ipld.NodeGetter, mfst *dag.Manifest, progCh chan cid.Cid) error + FetchBlocks(ctx context.Context, sid string, mfst *dag.Manifest, progCh chan cid.Cid) error +} + +func protocolSupportsDagStreaming(pid protocol.ID) bool { + versions := strings.Split(strings.TrimPrefix(string(pid), "/dsync/"), ".") + if len(versions) != 3 { + log.Debugf("unexpected version string in protocol.ID pid=%q versions=%v", pid, versions) + return false + } + + major, err := strconv.Atoi(versions[0]) + if err != nil { + log.Debugf("error parsing major version number in protocol.ID pid=%q versions=%v", pid, versions) + return false + } + + minor, err := strconv.Atoi(versions[1]) + if err != nil { + log.Debugf("error parsing minor version number in protocol.ID pid=%q versions=%v", pid, versions) + return false + } + + // anything above 0.2 is considered to support Dag Streaming + return major >= 0 && minor >= 2 +} + +// NewManifestCARReader creates a Content-addressed ARchive on the fly from a manifest +// and a node getter. It fetches blocks in order from the list of cids in the +// manifest and writes them to a buffer as the reader is consumed +// The roots specified in the archive header match the manifest RootCID method +// If an incomplete manifest graph is passed to NewManifestCARReader, the resulting +// archive will not be a complete graph. This is permitted by the spec, and +// used by dsync to create an archive of only-missing-blocks +// for more on CAR files, see: https://github.com/ipld/specs/blob/master/block-layer/content-addressable-archives.md +// If supplied a non-nil channel progress channel, the stream will send as +// each CID is buffered to the read stream +func NewManifestCARReader(ctx context.Context, ng ipld.NodeGetter, mfst *dag.Manifest, progCh chan cid.Cid) (io.Reader, error) { + + cids := make([]cid.Cid, 0, len(mfst.Nodes)) + for _, cidStr := range mfst.Nodes { + id, err := cid.Decode(cidStr) + if err != nil { + return nil, err + } + id, err = cid.Cast(id.Bytes()) + if err != nil { + return nil, err + } + cids = append(cids, id) + } + + buf := &bytes.Buffer{} + header := &car.CarHeader{ + Roots: []cid.Cid{mfst.RootCID()}, + Version: 1, + } + err := car.WriteHeader(header, buf) + if err != nil { + return nil, err + } + + str := &mfstCarReader{ + ctx: ctx, + cids: cids, + ng: ng, + buf: buf, + progCh: progCh, + } + return str, nil +} + +type mfstCarReader struct { + i int + ctx context.Context + cids []cid.Cid + ng ipld.NodeGetter + buf *bytes.Buffer + progCh chan cid.Cid +} + +func (str *mfstCarReader) Read(p []byte) (int, error) { + for { + // check for remaining bytes after last block is read + if str.i == len(str.cids) && str.buf.Len() > 0 { + return str.buf.Read(p) + } + + // break loop on sufficent buffer length + if str.buf.Len() > len(p) { + break + } + + if err := str.readBlock(); err != nil { + return 0, err + } + } + + return io.ReadFull(str.buf, p) +} + +// readBlock extends the buffer by one block +func (str *mfstCarReader) readBlock() error { + if str.i == len(str.cids) { + return io.EOF + } + nd, err := str.ng.Get(str.ctx, str.cids[str.i]) + if err != nil { + fmt.Printf("error getting block: %s\n", err) + return err + } + + str.i++ + if err = carutil.LdWrite(str.buf, nd.Cid().Bytes(), nd.RawData()); err != nil { + return err + } + + if str.progCh != nil { + go func() { str.progCh <- nd.Cid() }() + } + + return nil +} + +// AddAllFromCARReader consumers a CAR reader stream, placing all blocks in the +// given blockstore +func AddAllFromCARReader(ctx context.Context, bapi coreiface.BlockAPI, r io.Reader, progCh chan cid.Cid) (int, error) { + rdr, err := car.NewCarReader(r) + if err != nil { + return 0, err + } + + added := 0 + buf := &bytes.Buffer{} + for { + blk, err := rdr.Next() + if err == io.EOF { + break + } else if err != nil { + return added, err + } + + if _, err := buf.Write(blk.RawData()); err != nil { + return added, err + } + if _, err = bapi.Put(ctx, buf); err != nil { + return added, err + } + + buf.Reset() + added++ + + log.Debugf("wrote block %s", blk.Cid()) + if progCh != nil { + go func() { progCh <- blk.Cid() }() + } + } + + return added, nil +} diff --git a/dsync/stream_test.go b/dsync/stream_test.go new file mode 100644 index 0000000..21f46df --- /dev/null +++ b/dsync/stream_test.go @@ -0,0 +1,99 @@ +package dsync + +import ( + "context" + "io/ioutil" + "strings" + "testing" + + "github.com/ipfs/go-cid" + files "github.com/ipfs/go-ipfs-files" + protocol "github.com/libp2p/go-libp2p-core/protocol" + "github.com/qri-io/dag" +) + +func TestCarStream(t *testing.T) { + ctx := context.Background() + _, a, err := makeAPI(ctx) + if err != nil { + t.Fatal(err) + } + + _, b, err := makeAPI(ctx) + if err != nil { + t.Fatal(err) + } + + // yooooooooooooooooooooo... + f := files.NewReaderFile(ioutil.NopCloser(strings.NewReader("y" + strings.Repeat("o", 350000)))) + path, err := a.Unixfs().Add(ctx, f) + if err != nil { + t.Fatal(err) + } + + aGetter := &dag.NodeGetter{Dag: a.Dag()} + + mfst, err := dag.NewManifest(ctx, aGetter, path.Cid()) + if err != nil { + t.Fatal(err) + } + + var sentOnChan []cid.Cid + progCh := make(chan cid.Cid) + done := make(chan struct{}) + go func() { + for { + cid := <-progCh + sentOnChan = append(sentOnChan, cid) + if len(sentOnChan) == len(mfst.Nodes) { + done <- struct{}{} + } + } + }() + + r, err := NewManifestCARReader(ctx, aGetter, mfst, progCh) + if err != nil { + t.Fatal(err) + } + + added, err := AddAllFromCARReader(ctx, b.Block(), r, nil) + if err != nil { + t.Fatal(err) + } + + if len(mfst.Nodes) != added { + t.Errorf("scanned blocks mismatch. wanted: %d got: %d", len(mfst.Nodes), added) + } + + <-done + // TODO (b5) - refactore manifest.Nodes to be of type []cid.Cid & compare + // slices to be exact here + if len(mfst.Nodes) != len(sentOnChan) { + t.Errorf("progress cids channel mismatch. wanted: %d got: %d", len(mfst.Nodes), len(sentOnChan)) + } +} + +func TestProtocolSupportsDagStreaming(t *testing.T) { + cases := []struct { + pid protocol.ID + expect bool + }{ + {"/dsync/0.2.0", true}, + {"/dsync/0.2.1", true}, + {"/dsync/1.28.10", true}, + + {"/dsync/0.1.0", false}, + {"/dsync/", false}, + {"/dsync/bad.number.10", false}, + {"/dsync/1.huh?.10", false}, + } + + for _, c := range cases { + t.Run(string(c.pid), func(t *testing.T) { + got := protocolSupportsDagStreaming(c.pid) + if c.expect != got { + t.Errorf("support for %q mismatch. want: %t got: %t", c.pid, c.expect, got) + } + }) + } +} diff --git a/go.mod b/go.mod index 5eb7a9a..b933ad9 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/ipfs/go-ipld-format v0.2.0 github.com/ipfs/go-log v1.0.4 github.com/ipfs/interface-go-ipfs-core v0.3.0 + github.com/ipld/go-car v0.1.0 github.com/libp2p/go-libp2p v0.11.0 github.com/libp2p/go-libp2p-core v0.6.1 github.com/libp2p/go-libp2p-peerstore v0.2.6