diff --git a/decision/blockstoremanager.go b/decision/blockstoremanager.go index e97bbdda..8d880a6c 100644 --- a/decision/blockstoremanager.go +++ b/decision/blockstoremanager.go @@ -2,6 +2,7 @@ package decision import ( "context" + "fmt" "sync" blocks "github.com/ipfs/go-block-format" @@ -50,25 +51,29 @@ func (bsm *blockstoreManager) worker() { } } -func (bsm *blockstoreManager) addJob(ctx context.Context, job func()) { +func (bsm *blockstoreManager) addJob(ctx context.Context, job func()) error { select { case <-ctx.Done(): + return ctx.Err() case <-bsm.px.Closing(): + return fmt.Errorf("shutting down") case bsm.jobs <- job: + return nil } } -func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) map[cid.Cid]int { +func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) (map[cid.Cid]int, error) { res := make(map[cid.Cid]int) if len(ks) == 0 { - return res + return res, nil } var lk sync.Mutex - bsm.jobPerKey(ctx, ks, func(c cid.Cid) { + return res, bsm.jobPerKey(ctx, ks, func(c cid.Cid) { size, err := bsm.bs.GetSize(c) if err != nil { if err != bstore.ErrNotFound { + // Note: this isn't a fatal error. We shouldn't abort the request log.Errorf("blockstore.GetSize(%s) error: %s", c, err) } } else { @@ -77,21 +82,20 @@ func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) m lk.Unlock() } }) - - return res } -func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) map[cid.Cid]blocks.Block { +func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]blocks.Block, error) { res := make(map[cid.Cid]blocks.Block) if len(ks) == 0 { - return res + return res, nil } var lk sync.Mutex - bsm.jobPerKey(ctx, ks, func(c cid.Cid) { + return res, bsm.jobPerKey(ctx, ks, func(c cid.Cid) { blk, err := bsm.bs.Get(c) if err != nil { if err != bstore.ErrNotFound { + // Note: this isn't a fatal error. We shouldn't abort the request log.Errorf("blockstore.Get(%s) error: %s", c, err) } } else { @@ -100,19 +104,23 @@ func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) map[c lk.Unlock() } }) - - return res } -func (bsm *blockstoreManager) jobPerKey(ctx context.Context, ks []cid.Cid, jobFn func(c cid.Cid)) { +func (bsm *blockstoreManager) jobPerKey(ctx context.Context, ks []cid.Cid, jobFn func(c cid.Cid)) error { + var err error wg := sync.WaitGroup{} for _, k := range ks { c := k wg.Add(1) - bsm.addJob(ctx, func() { + err = bsm.addJob(ctx, func() { jobFn(c) wg.Done() }) + if err != nil { + wg.Done() + break + } } wg.Wait() + return err } diff --git a/decision/blockstoremanager_test.go b/decision/blockstoremanager_test.go index a5fee74e..c57c4892 100644 --- a/decision/blockstoremanager_test.go +++ b/decision/blockstoremanager_test.go @@ -3,7 +3,6 @@ package decision import ( "context" "crypto/rand" - "errors" "sync" "testing" "time" @@ -30,7 +29,10 @@ func TestBlockstoreManagerNotFoundKey(t *testing.T) { bsm.start(process.WithTeardown(func() error { return nil })) cids := testutil.GenerateCids(4) - sizes := bsm.getBlockSizes(ctx, cids) + sizes, err := bsm.getBlockSizes(ctx, cids) + if err != nil { + t.Fatal(err) + } if len(sizes) != 0 { t.Fatal("Wrong response length") } @@ -41,7 +43,10 @@ func TestBlockstoreManagerNotFoundKey(t *testing.T) { } } - blks := bsm.getBlocks(ctx, cids) + blks, err := bsm.getBlocks(ctx, cids) + if err != nil { + t.Fatal(err) + } if len(blks) != 0 { t.Fatal("Wrong response length") } @@ -82,7 +87,10 @@ func TestBlockstoreManager(t *testing.T) { cids = append(cids, b.Cid()) } - sizes := bsm.getBlockSizes(ctx, cids) + sizes, err := bsm.getBlockSizes(ctx, cids) + if err != nil { + t.Fatal(err) + } if len(sizes) != len(blks)-1 { t.Fatal("Wrong response length") } @@ -106,7 +114,10 @@ func TestBlockstoreManager(t *testing.T) { } } - fetched := bsm.getBlocks(ctx, cids) + fetched, err := bsm.getBlocks(ctx, cids) + if err != nil { + t.Fatal(err) + } if len(fetched) != len(blks)-1 { t.Fatal("Wrong response length") } @@ -160,17 +171,16 @@ func TestBlockstoreManagerConcurrency(t *testing.T) { go func(t *testing.T) { defer wg.Done() - sizes := bsm.getBlockSizes(ctx, ks) + sizes, err := bsm.getBlockSizes(ctx, ks) + if err != nil { + t.Error(err) + } if len(sizes) != len(blks) { - err = errors.New("Wrong response length") + t.Error("Wrong response length") } }(t) } wg.Wait() - - if err != nil { - t.Fatal(err) - } } func TestBlockstoreManagerClose(t *testing.T) { @@ -184,7 +194,7 @@ func TestBlockstoreManagerClose(t *testing.T) { px := process.WithTeardown(func() error { return nil }) bsm.start(px) - blks := testutil.GenerateBlocksOfSize(3, 1024) + blks := testutil.GenerateBlocksOfSize(10, 1024) var ks []cid.Cid for _, b := range blks { ks = append(ks, b.Cid()) @@ -199,34 +209,29 @@ func TestBlockstoreManagerClose(t *testing.T) { time.Sleep(5 * time.Millisecond) - fnCallDone := make(chan struct{}) - go func() { - bsm.getBlockSizes(ctx, ks) - fnCallDone <- struct{}{} - }() - - select { - case <-fnCallDone: - t.Fatal("call to BlockstoreManager should be cancelled") - case <-px.Closed(): + before := time.Now() + _, err = bsm.getBlockSizes(ctx, ks) + if err == nil { + t.Error("expected an error") + } + // would expect to wait delayTime*10 if we didn't cancel. + if time.Since(before) > delayTime*2 { + t.Error("expected a fast timeout") } } func TestBlockstoreManagerCtxDone(t *testing.T) { delayTime := 20 * time.Millisecond - ctx := context.Background() - ctx, cancel := context.WithTimeout(context.Background(), delayTime/2) - defer cancel() bsdelay := delay.Fixed(delayTime) dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay)) bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)) - bsm := newBlockstoreManager(ctx, bstore, 3) + bsm := newBlockstoreManager(context.Background(), bstore, 3) proc := process.WithTeardown(func() error { return nil }) bsm.start(proc) - blks := testutil.GenerateBlocksOfSize(3, 1024) + blks := testutil.GenerateBlocksOfSize(10, 1024) var ks []cid.Cid for _, b := range blks { ks = append(ks, b.Cid()) @@ -237,15 +242,17 @@ func TestBlockstoreManagerCtxDone(t *testing.T) { t.Fatal(err) } - fnCallDone := make(chan struct{}) - go func() { - bsm.getBlockSizes(ctx, ks) - fnCallDone <- struct{}{} - }() + ctx, cancel := context.WithTimeout(context.Background(), delayTime/2) + defer cancel() + + before := time.Now() + _, err = bsm.getBlockSizes(ctx, ks) + if err == nil { + t.Error("expected an error") + } - select { - case <-fnCallDone: - t.Fatal("call to BlockstoreManager should be cancelled") - case <-ctx.Done(): + // would expect to wait delayTime*10 if we didn't cancel. + if time.Since(before) > delayTime*2 { + t.Error("expected a fast timeout") } } diff --git a/decision/engine.go b/decision/engine.go index 3154b5e5..7a58bb3f 100644 --- a/decision/engine.go +++ b/decision/engine.go @@ -367,7 +367,11 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { for _, t := range nextTask.Tasks { blockCids.Add(t.Identifier.(cid.Cid)) } - blks := e.bsm.getBlocks(ctx, blockCids.Keys()) + blks, err := e.bsm.getBlocks(ctx, blockCids.Keys()) + if err != nil { + // we're dropping the envelope but that's not an issue in practice. + return nil, err + } msg := bsmsg.New(true) for _, b := range blks { @@ -437,7 +441,11 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap wantKs.Add(entry.Cid) } } - blockSizes := e.bsm.getBlockSizes(ctx, wantKs.Keys()) + blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys()) + if err != nil { + log.Info("aborting message processing", err) + return + } l := e.findOrCreate(p) l.lk.Lock()