diff --git a/blockservice/worker/worker.go b/blockservice/worker/worker.go index 77097bef65b..ee45d32ad55 100644 --- a/blockservice/worker/worker.go +++ b/blockservice/worker/worker.go @@ -117,11 +117,10 @@ func (w *Worker) start(c Config) { } }) - // reads from |workerChan| until process closes - w.process.Go(func(proc process.Process) { + // reads from |workerChan| until w.process closes + limiter := ratelimit.NewRateLimiter(w.process, c.NumWorkers) + limiter.Go(func(proc process.Process) { ctx := waitable.Context(proc) // shut down in-progress HasBlock when time to die - limiter := ratelimit.NewRateLimiter(process.Background(), c.NumWorkers) - defer limiter.Close() for { select { case <-proc.Closing(): diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 39fd0297d44..696c903e93a 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -385,20 +385,23 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote go func() { // rate limiting just in case. at most 10 addrs at once. limiter := ratelimit.NewRateLimiter(procctx.WithContext(ctx), 10) - - // permute addrs so we try different sets first each time. - for _, i := range rand.Perm(len(remoteAddrs)) { - select { - case <-foundConn: // if one of them succeeded already - break - default: + limiter.Go(func(worker process.Process) { + // permute addrs so we try different sets first each time. + for _, i := range rand.Perm(len(remoteAddrs)) { + select { + case <-foundConn: // if one of them succeeded already + break + case <-worker.Closing(): // our context was cancelled + break + default: + } + + workerAddr := remoteAddrs[i] // shadow variable to avoid race + limiter.LimitedGo(func(worker process.Process) { + dialSingleAddr(workerAddr) + }) } - - workerAddr := remoteAddrs[i] // shadow variable to avoid race - limiter.Go(func(worker process.Process) { - dialSingleAddr(workerAddr) - }) - } + }) }() // wair fot the results. diff --git a/thirdparty/notifier/notifier.go b/thirdparty/notifier/notifier.go index c3721138180..aff69409356 100644 --- a/thirdparty/notifier/notifier.go +++ b/thirdparty/notifier/notifier.go @@ -120,18 +120,27 @@ func (n *Notifier) StopNotify(e Notifiee) { // hooks into your object that block you accidentally. func (n *Notifier) NotifyAll(notify func(Notifiee)) { n.mu.Lock() - if n.nots != nil { // so that zero-value is ready to be used. - for notifiee := range n.nots { + defer n.mu.Unlock() + + if n.nots == nil { // so that zero-value is ready to be used. + return + } - if n.lim == nil { // no rate limit - go notify(notifiee) - } else { - notifiee := notifiee // rebind for data races - n.lim.LimitedGo(func(worker process.Process) { - notify(notifiee) - }) - } + // no rate limiting. + if n.lim == nil { + for notifiee := range n.nots { + go notify(notifiee) } + return } - n.mu.Unlock() + + // with rate limiting. + n.lim.Go(func(worker process.Process) { + for notifiee := range n.nots { + notifiee := notifiee // rebind for loop data races + n.lim.LimitedGo(func(worker process.Process) { + notify(notifiee) + }) + } + }) }