diff --git a/unixfs/archive/tar/writer.go b/unixfs/archive/tar/writer.go index f710d4063c4..17c43e7170a 100644 --- a/unixfs/archive/tar/writer.go +++ b/unixfs/archive/tar/writer.go @@ -64,7 +64,7 @@ func (w *Writer) writeFile(nd *mdag.ProtoNode, pb *upb.Data, fpath string) error return err } - dagr := uio.NewDataFileReader(w.ctx, nd, pb, w.Dag) + dagr := uio.NewPBFileReader(w.ctx, nd, pb, w.Dag) if _, err := dagr.WriteTo(w.TarW); err != nil { return err } diff --git a/unixfs/io/dagreader.go b/unixfs/io/dagreader.go index da1fd65c82f..f893de80246 100644 --- a/unixfs/io/dagreader.go +++ b/unixfs/io/dagreader.go @@ -1,12 +1,10 @@ package io import ( - "bytes" "context" "errors" "fmt" "io" - "os" mdag "github.com/ipfs/go-ipfs/merkledag" ft "github.com/ipfs/go-ipfs/unixfs" @@ -27,36 +25,6 @@ type DagReader interface { Offset() int64 } -// DagReader provides a way to easily read the data contained in a dag. -type pbDagReader struct { - serv mdag.DAGService - - // the node being read - node *mdag.ProtoNode - - // cached protobuf structure from node.Data - pbdata *ftpb.Data - - // the current data buffer to be read from - // will either be a bytes.Reader or a child DagReader - buf ReadSeekCloser - - // NodeGetters for each of 'nodes' child links - promises []mdag.NodeGetter - - // the index of the child link currently being read from - linkPosition int - - // current offset for the read head within the 'file' - offset int64 - - // Our context - ctx context.Context - - // context cancel for children - cancel func() -} - type ReadSeekCloser interface { io.Reader io.Seeker @@ -83,7 +51,7 @@ func NewDagReader(ctx context.Context, n node.Node, serv mdag.DAGService) (DagRe // Dont allow reading directories return nil, ErrIsDir case ftpb.Data_File, ftpb.Data_Raw: - return NewDataFileReader(ctx, n, pb, serv), nil + return NewPBFileReader(ctx, n, pb, serv), nil case ftpb.Data_Metadata: if len(n.Links()) == 0 { return nil, errors.New("incorrectly formatted metadata object") @@ -107,219 +75,3 @@ func NewDagReader(ctx context.Context, n node.Node, serv mdag.DAGService) (DagRe return nil, fmt.Errorf("unrecognized node type") } } - -func NewDataFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv mdag.DAGService) *pbDagReader { - fctx, cancel := context.WithCancel(ctx) - promises := mdag.GetDAG(fctx, serv, n) - return &pbDagReader{ - node: n, - serv: serv, - buf: NewRSNCFromBytes(pb.GetData()), - promises: promises, - ctx: fctx, - cancel: cancel, - pbdata: pb, - } -} - -// precalcNextBuf follows the next link in line and loads it from the -// DAGService, setting the next buffer to read from -func (dr *pbDagReader) precalcNextBuf(ctx context.Context) error { - dr.buf.Close() // Just to make sure - if dr.linkPosition >= len(dr.promises) { - return io.EOF - } - - nxt, err := dr.promises[dr.linkPosition].Get(ctx) - if err != nil { - return err - } - dr.linkPosition++ - - switch nxt := nxt.(type) { - case *mdag.ProtoNode: - pb := new(ftpb.Data) - err = proto.Unmarshal(nxt.Data(), pb) - if err != nil { - return fmt.Errorf("incorrectly formatted protobuf: %s", err) - } - - switch pb.GetType() { - case ftpb.Data_Directory: - // A directory should not exist within a file - return ft.ErrInvalidDirLocation - case ftpb.Data_File: - dr.buf = NewDataFileReader(dr.ctx, nxt, pb, dr.serv) - return nil - case ftpb.Data_Raw: - dr.buf = NewRSNCFromBytes(pb.GetData()) - return nil - case ftpb.Data_Metadata: - return errors.New("shouldnt have had metadata object inside file") - case ftpb.Data_Symlink: - return errors.New("shouldnt have had symlink inside file") - default: - return ft.ErrUnrecognizedType - } - case *mdag.RawNode: - dr.buf = NewRSNCFromBytes(nxt.RawData()) - return nil - default: - return errors.New("unrecognized node type in pbDagReader") - } -} - -// Size return the total length of the data from the DAG structured file. -func (dr *pbDagReader) Size() uint64 { - return dr.pbdata.GetFilesize() -} - -// Read reads data from the DAG structured file -func (dr *pbDagReader) Read(b []byte) (int, error) { - return dr.CtxReadFull(dr.ctx, b) -} - -// CtxReadFull reads data from the DAG structured file -func (dr *pbDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) { - // If no cached buffer, load one - total := 0 - for { - // Attempt to fill bytes from cached buffer - n, err := dr.buf.Read(b[total:]) - total += n - dr.offset += int64(n) - if err != nil { - // EOF is expected - if err != io.EOF { - return total, err - } - } - - // If weve read enough bytes, return - if total == len(b) { - return total, nil - } - - // Otherwise, load up the next block - err = dr.precalcNextBuf(ctx) - if err != nil { - return total, err - } - } -} - -func (dr *pbDagReader) WriteTo(w io.Writer) (int64, error) { - // If no cached buffer, load one - total := int64(0) - for { - // Attempt to write bytes from cached buffer - n, err := dr.buf.WriteTo(w) - total += n - dr.offset += n - if err != nil { - if err != io.EOF { - return total, err - } - } - - // Otherwise, load up the next block - err = dr.precalcNextBuf(dr.ctx) - if err != nil { - if err == io.EOF { - return total, nil - } - return total, err - } - } -} - -func (dr *pbDagReader) Close() error { - dr.cancel() - return nil -} - -func (dr *pbDagReader) Offset() int64 { - return dr.offset -} - -// Seek implements io.Seeker, and will seek to a given offset in the file -// interface matches standard unix seek -// TODO: check if we can do relative seeks, to reduce the amount of dagreader -// recreations that need to happen. -func (dr *pbDagReader) Seek(offset int64, whence int) (int64, error) { - switch whence { - case os.SEEK_SET: - if offset < 0 { - return -1, errors.New("Invalid offset") - } - - // Grab cached protobuf object (solely to make code look cleaner) - pb := dr.pbdata - - // left represents the number of bytes remaining to seek to (from beginning) - left := offset - if int64(len(pb.Data)) >= offset { - // Close current buf to close potential child dagreader - dr.buf.Close() - dr.buf = NewRSNCFromBytes(pb.GetData()[offset:]) - - // start reading links from the beginning - dr.linkPosition = 0 - dr.offset = offset - return offset, nil - } else { - // skip past root block data - left -= int64(len(pb.Data)) - } - - // iterate through links and find where we need to be - for i := 0; i < len(pb.Blocksizes); i++ { - if pb.Blocksizes[i] > uint64(left) { - dr.linkPosition = i - break - } else { - left -= int64(pb.Blocksizes[i]) - } - } - - // start sub-block request - err := dr.precalcNextBuf(dr.ctx) - if err != nil { - return 0, err - } - - // set proper offset within child readseeker - n, err := dr.buf.Seek(left, os.SEEK_SET) - if err != nil { - return -1, err - } - - // sanity - left -= n - if left != 0 { - return -1, errors.New("failed to seek properly") - } - dr.offset = offset - return offset, nil - case os.SEEK_CUR: - // TODO: be smarter here - noffset := dr.offset + offset - return dr.Seek(noffset, os.SEEK_SET) - case os.SEEK_END: - noffset := int64(dr.pbdata.GetFilesize()) - offset - return dr.Seek(noffset, os.SEEK_SET) - default: - return 0, errors.New("invalid whence") - } -} - -// readSeekNopCloser wraps a bytes.Reader to implement ReadSeekCloser -type readSeekNopCloser struct { - *bytes.Reader -} - -func NewRSNCFromBytes(b []byte) ReadSeekCloser { - return &readSeekNopCloser{bytes.NewReader(b)} -} - -func (r *readSeekNopCloser) Close() error { return nil } diff --git a/unixfs/io/pbdagreader.go b/unixfs/io/pbdagreader.go new file mode 100644 index 00000000000..34f622bd628 --- /dev/null +++ b/unixfs/io/pbdagreader.go @@ -0,0 +1,262 @@ +package io + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "os" + + mdag "github.com/ipfs/go-ipfs/merkledag" + ft "github.com/ipfs/go-ipfs/unixfs" + ftpb "github.com/ipfs/go-ipfs/unixfs/pb" + + proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto" +) + +// DagReader provides a way to easily read the data contained in a dag. +type pbDagReader struct { + serv mdag.DAGService + + // the node being read + node *mdag.ProtoNode + + // cached protobuf structure from node.Data + pbdata *ftpb.Data + + // the current data buffer to be read from + // will either be a bytes.Reader or a child DagReader + buf ReadSeekCloser + + // NodeGetters for each of 'nodes' child links + promises []mdag.NodeGetter + + // the index of the child link currently being read from + linkPosition int + + // current offset for the read head within the 'file' + offset int64 + + // Our context + ctx context.Context + + // context cancel for children + cancel func() +} + +func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv mdag.DAGService) *pbDagReader { + fctx, cancel := context.WithCancel(ctx) + promises := mdag.GetDAG(fctx, serv, n) + return &pbDagReader{ + node: n, + serv: serv, + buf: NewRSNCFromBytes(pb.GetData()), + promises: promises, + ctx: fctx, + cancel: cancel, + pbdata: pb, + } +} + +// precalcNextBuf follows the next link in line and loads it from the +// DAGService, setting the next buffer to read from +func (dr *pbDagReader) precalcNextBuf(ctx context.Context) error { + dr.buf.Close() // Just to make sure + if dr.linkPosition >= len(dr.promises) { + return io.EOF + } + + nxt, err := dr.promises[dr.linkPosition].Get(ctx) + if err != nil { + return err + } + dr.linkPosition++ + + switch nxt := nxt.(type) { + case *mdag.ProtoNode: + pb := new(ftpb.Data) + err = proto.Unmarshal(nxt.Data(), pb) + if err != nil { + return fmt.Errorf("incorrectly formatted protobuf: %s", err) + } + + switch pb.GetType() { + case ftpb.Data_Directory: + // A directory should not exist within a file + return ft.ErrInvalidDirLocation + case ftpb.Data_File: + dr.buf = NewPBFileReader(dr.ctx, nxt, pb, dr.serv) + return nil + case ftpb.Data_Raw: + dr.buf = NewRSNCFromBytes(pb.GetData()) + return nil + case ftpb.Data_Metadata: + return errors.New("shouldnt have had metadata object inside file") + case ftpb.Data_Symlink: + return errors.New("shouldnt have had symlink inside file") + default: + return ft.ErrUnrecognizedType + } + case *mdag.RawNode: + dr.buf = NewRSNCFromBytes(nxt.RawData()) + return nil + default: + return errors.New("unrecognized node type in pbDagReader") + } +} + +// Size return the total length of the data from the DAG structured file. +func (dr *pbDagReader) Size() uint64 { + return dr.pbdata.GetFilesize() +} + +// Read reads data from the DAG structured file +func (dr *pbDagReader) Read(b []byte) (int, error) { + return dr.CtxReadFull(dr.ctx, b) +} + +// CtxReadFull reads data from the DAG structured file +func (dr *pbDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) { + // If no cached buffer, load one + total := 0 + for { + // Attempt to fill bytes from cached buffer + n, err := dr.buf.Read(b[total:]) + total += n + dr.offset += int64(n) + if err != nil { + // EOF is expected + if err != io.EOF { + return total, err + } + } + + // If weve read enough bytes, return + if total == len(b) { + return total, nil + } + + // Otherwise, load up the next block + err = dr.precalcNextBuf(ctx) + if err != nil { + return total, err + } + } +} + +func (dr *pbDagReader) WriteTo(w io.Writer) (int64, error) { + // If no cached buffer, load one + total := int64(0) + for { + // Attempt to write bytes from cached buffer + n, err := dr.buf.WriteTo(w) + total += n + dr.offset += n + if err != nil { + if err != io.EOF { + return total, err + } + } + + // Otherwise, load up the next block + err = dr.precalcNextBuf(dr.ctx) + if err != nil { + if err == io.EOF { + return total, nil + } + return total, err + } + } +} + +func (dr *pbDagReader) Close() error { + dr.cancel() + return nil +} + +func (dr *pbDagReader) Offset() int64 { + return dr.offset +} + +// Seek implements io.Seeker, and will seek to a given offset in the file +// interface matches standard unix seek +// TODO: check if we can do relative seeks, to reduce the amount of dagreader +// recreations that need to happen. +func (dr *pbDagReader) Seek(offset int64, whence int) (int64, error) { + switch whence { + case os.SEEK_SET: + if offset < 0 { + return -1, errors.New("Invalid offset") + } + + // Grab cached protobuf object (solely to make code look cleaner) + pb := dr.pbdata + + // left represents the number of bytes remaining to seek to (from beginning) + left := offset + if int64(len(pb.Data)) >= offset { + // Close current buf to close potential child dagreader + dr.buf.Close() + dr.buf = NewRSNCFromBytes(pb.GetData()[offset:]) + + // start reading links from the beginning + dr.linkPosition = 0 + dr.offset = offset + return offset, nil + } else { + // skip past root block data + left -= int64(len(pb.Data)) + } + + // iterate through links and find where we need to be + for i := 0; i < len(pb.Blocksizes); i++ { + if pb.Blocksizes[i] > uint64(left) { + dr.linkPosition = i + break + } else { + left -= int64(pb.Blocksizes[i]) + } + } + + // start sub-block request + err := dr.precalcNextBuf(dr.ctx) + if err != nil { + return 0, err + } + + // set proper offset within child readseeker + n, err := dr.buf.Seek(left, os.SEEK_SET) + if err != nil { + return -1, err + } + + // sanity + left -= n + if left != 0 { + return -1, errors.New("failed to seek properly") + } + dr.offset = offset + return offset, nil + case os.SEEK_CUR: + // TODO: be smarter here + noffset := dr.offset + offset + return dr.Seek(noffset, os.SEEK_SET) + case os.SEEK_END: + noffset := int64(dr.pbdata.GetFilesize()) - offset + return dr.Seek(noffset, os.SEEK_SET) + default: + return 0, errors.New("invalid whence") + } +} + +// readSeekNopCloser wraps a bytes.Reader to implement ReadSeekCloser +type readSeekNopCloser struct { + *bytes.Reader +} + +func NewRSNCFromBytes(b []byte) ReadSeekCloser { + return &readSeekNopCloser{bytes.NewReader(b)} +} + +func (r *readSeekNopCloser) Close() error { return nil }