Skip to content

Commit

Permalink
fix(dsync): PROTOCOL_ERROR responses retry instead of failing
Browse files Browse the repository at this point in the history
  • Loading branch information
b5 committed Oct 29, 2020
1 parent 93d90d4 commit e26737d
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 16 deletions.
6 changes: 3 additions & 3 deletions dsync/dsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ var log = golog.Logger("dsync")

const (
// default to parallelism of 3. So far 4 was enough to blow up a std k8s pod running IPFS :(
defaultPushParallelism = 3
defaultPushParallelism = 1
// default to parallelism of 3
// TODO (b5): tune this figure
defaultPullParallelism = 3
defaultPullParallelism = 1
// total number of retries to attempt before send is considered faulty
// TODO (b5): this number should be retries *per object*, and a much lower
// number, like 5.
maxRetries = 25
maxRetries = 80
)

// DagSyncable is a source that can be synced to & from. dsync requests automate
Expand Down
7 changes: 5 additions & 2 deletions dsync/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (rem *HTTPClient) ReceiveBlock(sid, hash string, data []byte) ReceiveRespon
url := fmt.Sprintf("%s?sid=%s&hash=%s", rem.URL, sid, hash)
req, err := http.NewRequest("PUT", url, bytes.NewBuffer(data))
if err != nil {
log.Debugf("http client create request error=%s", err)
return ReceiveResponse{
Hash: hash,
Status: StatusErrored,
Expand All @@ -82,10 +83,11 @@ func (rem *HTTPClient) ReceiveBlock(sid, hash string, data []byte) ReceiveRespon

res, err := http.DefaultClient.Do(req)
if err != nil {
log.Debugf("http client perform request error=%s", err)
return ReceiveResponse{
Hash: hash,
Status: StatusErrored,
Err: err,
Status: StatusRetry,
Err: fmt.Errorf("performing HTTP PUT: %w", err),
}
}

Expand All @@ -100,6 +102,7 @@ func (rem *HTTPClient) ReceiveBlock(sid, hash string, data []byte) ReceiveRespon
Err: fmt.Errorf("remote error: %d %s", res.StatusCode, msg),
}
}

return ReceiveResponse{
Hash: hash,
Status: StatusOk,
Expand Down
17 changes: 10 additions & 7 deletions dsync/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (snd *Push) SetMeta(meta map[string]string) {

// Do executes the push, blocking until complete
func (snd *Push) Do(ctx context.Context) (err error) {
log.Debugf("initiating push")
// how this process works:
// * Do sends a dag.Info to the remote node
// * The remote returns a session id for the push, and manifest of blocks to send
Expand All @@ -107,6 +108,7 @@ func (snd *Push) Do(ctx context.Context) (err error) {
// responses
snd.sid, snd.diff, err = snd.remote.NewReceiveSession(snd.info, snd.pinOnComplete, snd.meta)
if err != nil {
log.Debugf("error creating receive session: %s", err)
return err
}
log.Debugf("push has receive session: %s", snd.sid)
Expand All @@ -128,14 +130,13 @@ func (snd *Push) do(ctx context.Context) (err error) {
sends[i] = sender{
id: i,
sid: snd.sid,
ctx: ctx,
blocksCh: snd.blocksCh,
responses: snd.responses,
lng: snd.lng,
remote: snd.remote,
stopCh: make(chan bool),
}
go sends[i].start()
go sends[i].start(ctx)
}

errCh := make(chan error)
Expand All @@ -146,7 +147,6 @@ func (snd *Push) do(ctx context.Context) (err error) {
// never block, so all responses are handled in their own goroutine
for res := range snd.responses {
go func(r ReceiveResponse) {
log.Debugf("push block response: %s. cid: %s", r.Status, r.Hash)
switch r.Status {
case StatusOk:
// this is the only place we should modify progress after creation
Expand All @@ -161,11 +161,13 @@ func (snd *Push) do(ctx context.Context) (err error) {
return
}
case StatusErrored:
log.Debugf("error pushing block. hash=%q error=%q", r.Hash, r.Err)
errCh <- r.Err
for _, s := range sends {
s.stop()
}
case StatusRetry:
log.Debugf("retrying push block. hash=%q error=%q", r.Hash, r.Err)
snd.retries <- r.Hash
}
}(res)
Expand Down Expand Up @@ -212,15 +214,14 @@ func (snd *Push) completionChanged() {
type sender struct {
id int
sid string
ctx context.Context
lng ipld.NodeGetter
remote DagSyncable
blocksCh chan string
responses chan ReceiveResponse
stopCh chan bool
}

func (s sender) start() {
func (s sender) start(ctx context.Context) {
for {
select {
case hash := <-s.blocksCh:
Expand All @@ -232,13 +233,14 @@ func (s sender) start() {
go func() {
id, err := cid.Parse(hash)
if err != nil {
log.Debugf("error parsing sent block: %s", err)
s.responses <- ReceiveResponse{
Hash: hash,
Status: StatusErrored,
Err: err,
}
}
node, err := s.lng.Get(s.ctx, id)
node, err := s.lng.Get(ctx, id)
if err != nil {
s.responses <- ReceiveResponse{
Hash: hash,
Expand All @@ -252,7 +254,8 @@ func (s sender) start() {

case <-s.stopCh:
return
case <-s.ctx.Done():
case <-ctx.Done():
log.Debugf("sender context done. err=%q", ctx.Err())
return
}
}
Expand Down
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ go 1.13

require (
github.com/google/go-cmp v0.4.0
github.com/ipfs/go-cid v0.0.6
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.4
github.com/ipfs/go-ipfs v0.6.0
github.com/ipfs/go-ipfs-config v0.8.0
github.com/ipfs/go-ipfs-files v0.0.8
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log v1.0.4
github.com/ipfs/interface-go-ipfs-core v0.3.0
github.com/libp2p/go-libp2p v0.9.6
github.com/libp2p/go-libp2p-core v0.5.7
github.com/libp2p/go-libp2p v0.11.0
github.com/libp2p/go-libp2p-core v0.6.1
github.com/libp2p/go-libp2p-peerstore v0.2.6
github.com/libp2p/go-libp2p-quic-transport v0.8.2 // indirect
github.com/multiformats/go-multicodec v0.1.6
github.com/multiformats/go-multihash v0.0.13
github.com/multiformats/go-multihash v0.0.14
github.com/spf13/cobra v1.0.0
github.com/ugorji/go/codec v1.1.7
)
Loading

0 comments on commit e26737d

Please sign in to comment.