Skip to content

Commit

Permalink
Implement index generation using only io.Reader
Browse files Browse the repository at this point in the history
Implement the ability to generate index from a CARv1 payload given only
an `io.Reader`, where the previous implementation required
`io.ReadSeeker`. The rationale is to be minimal in what we expect in the
API, since index generation from a CARv1 payload never need to rewind
the reader and only moves forward in the stream.

Refactor utility IO functions that convert between types in one place.
Implement constructor functions that only instantiate wrappers when the
passed argument does not satisfy a required interface.

Fixes:
- #146

Relates to:
- #145
  • Loading branch information
masih committed Jul 15, 2021
1 parent a7291a8 commit a315bb0
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 34 deletions.
50 changes: 21 additions & 29 deletions v2/index_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"os"
"sync"

internalio "github.com/ipld/go-car/v2/internal/io"

"github.com/ipld/go-car/v2/index"
"github.com/multiformats/go-multicodec"

Expand All @@ -15,20 +17,11 @@ import (
"github.com/ipld/go-car/v2/internal/carv1"
)

type readSeekerPlusByte struct {
io.ReadSeeker
}

func (r readSeekerPlusByte) ReadByte() (byte, error) {
var p [1]byte
_, err := io.ReadFull(r, p[:])
return p[0], err
}

// GenerateIndex generates index for a given car in v1 format.
// The index can be stored using index.Save into a file or serialized using index.WriteTo.
func GenerateIndex(v1 io.ReadSeeker) (index.Index, error) {
header, err := carv1.ReadHeader(v1)
func GenerateIndex(v1r io.Reader) (index.Index, error) {
reader := internalio.ToByteReadSeeker(v1r)
header, err := carv1.ReadHeader(reader)
if err != nil {
return nil, fmt.Errorf("error reading car header: %w", err)
}
Expand All @@ -37,28 +30,26 @@ func GenerateIndex(v1 io.ReadSeeker) (index.Index, error) {
return nil, fmt.Errorf("expected version to be 1, got %v", header.Version)
}

offset, err := carv1.HeaderSize(header)
if err != nil {
return nil, err
}

idx, err := index.New(multicodec.CarIndexSorted)
if err != nil {
return nil, err
}
records := make([]index.Record, 0)

// Seek to the first frame.
// Record the start of each frame, which we need for the index records.
frameOffset := int64(0)
if frameOffset, err = v1.Seek(int64(offset), io.SeekStart); err != nil {
// Record the start of each frame, with first frame starring from current position in the
// reader, i.e. right after the header, since we have only read the header so far.
var frameOffset int64

// The Seek call below is equivalent to getting the reader.offset directly.
// We get it through Seek to only depend on APIs of a typical io.Seeker.
// This would also reduce refactoring in case the utility reader is moved.
if frameOffset, err = reader.Seek(0, io.SeekCurrent); err != nil {
return nil, err
}

for {
// Grab the length of the frame.
// Note that ReadUvarint wants a ByteReader.
length, err := varint.ReadUvarint(readSeekerPlusByte{v1})
// Read the frame's length.
frameLen, err := varint.ReadUvarint(reader)
if err != nil {
if err == io.EOF {
break
Expand All @@ -69,20 +60,21 @@ func GenerateIndex(v1 io.ReadSeeker) (index.Index, error) {
// Null padding; treat zero-length frames as an EOF.
// They don't contain a CID nor block, so they're not useful.
// TODO: Amend the CARv1 spec to explicitly allow this.
if length == 0 {
if frameLen == 0 {
break
}

// Grab the CID.
n, c, err := cid.CidFromReader(v1)
// Read the CID.
cidLen, c, err := cid.CidFromReader(reader)
if err != nil {
return nil, err
}
records = append(records, index.Record{Cid: c, Idx: uint64(frameOffset)})

// Seek to the next frame by skipping the block.
// The frame length includes the CID, so subtract it.
if frameOffset, err = v1.Seek(int64(length)-int64(n), io.SeekCurrent); err != nil {
remainingFrameLen := int64(frameLen) - int64(cidLen)
if frameOffset, err = reader.Seek(remainingFrameLen, io.SeekCurrent); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -134,7 +126,7 @@ func ReadOrGenerateIndex(rs io.ReadSeeker) (index.Index, error) {
if err != nil {
return nil, err
}
// Seek to the begining, since reading the version changes the reader's offset.
// Seek to the beginning, since reading the version changes the reader's offset.
if _, err := rs.Seek(0, io.SeekStart); err != nil {
return nil, err
}
Expand Down
81 changes: 76 additions & 5 deletions v2/internal/io/converter.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,90 @@
package io

import "io"
import (
"io"
"io/ioutil"
)

var (
_ io.ByteReader = (*readerPlusByte)(nil)
_ io.ByteReader = (*readSeekerPlusByte)(nil)
_ io.ByteReader = (*discardingReadSeekerPlusByte)(nil)
_ io.ReadSeeker = (*discardingReadSeekerPlusByte)(nil)
)

type (
readerPlusByte struct {
io.Reader
}

readSeekerPlusByte struct {
io.ReadSeeker
}

discardingReadSeekerPlusByte struct {
io.Reader
offset int64
}

ByteReadSeeker interface {
io.ReadSeeker
io.ByteReader
}
)

func ToByteReader(r io.Reader) io.ByteReader {
if br, ok := r.(io.ByteReader); ok {
return br
}
return readerPlusByte{r}
return &readerPlusByte{r}
}

func ToByteReadSeeker(r io.Reader) ByteReadSeeker {
if brs, ok := r.(ByteReadSeeker); ok {
return brs
}
if rs, ok := r.(io.ReadSeeker); ok {
return &readSeekerPlusByte{rs}
}
return &discardingReadSeekerPlusByte{Reader: r}
}

func (rb readerPlusByte) ReadByte() (byte, error) {
return readByte(rb)
}

func (rsb readSeekerPlusByte) ReadByte() (byte, error) {
return readByte(rsb)
}

type readerPlusByte struct {
io.Reader
func (drsb *discardingReadSeekerPlusByte) ReadByte() (byte, error) {
return readByte(drsb)
}

func (drsb *discardingReadSeekerPlusByte) Read(p []byte) (read int, err error) {
read, err = drsb.Reader.Read(p)
drsb.offset += int64(read)
return
}

func (drsb *discardingReadSeekerPlusByte) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
n := offset - drsb.offset
if n < 0 {
panic("unsupported rewind via whence: io.SeekStart")
}
_, err := io.CopyN(ioutil.Discard, drsb, n)
return drsb.offset, err
case io.SeekCurrent:
_, err := io.CopyN(ioutil.Discard, drsb, offset)
return drsb.offset, err
default:
panic("unsupported whence: io.SeekEnd")
}
}

func (r readerPlusByte) ReadByte() (byte, error) {
func readByte(r io.Reader) (byte, error) {
var p [1]byte
_, err := io.ReadFull(r, p[:])
return p[0], err
Expand Down

0 comments on commit a315bb0

Please sign in to comment.