From 911512e8d8e56969e83b9407490798cd5567887f Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 23 Aug 2021 17:58:36 +0100 Subject: [PATCH 1/3] rename DialSync.DialLock to Dial --- dial_sync.go | 4 ++-- dial_sync_test.go | 24 ++++++++++++------------ swarm_dial.go | 2 +- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/dial_sync.go b/dial_sync.go index 03ba9cc0..83167f63 100644 --- a/dial_sync.go +++ b/dial_sync.go @@ -108,9 +108,9 @@ func (ds *DialSync) getActiveDial(p peer.ID) (*activeDial, error) { return actd, nil } -// DialLock initiates a dial to the given peer if there are none in progress +// Dial initiates a dial to the given peer if there are none in progress // then waits for the dial to that peer to complete. -func (ds *DialSync) DialLock(ctx context.Context, p peer.ID) (*Conn, error) { +func (ds *DialSync) Dial(ctx context.Context, p peer.ID) (*Conn, error) { ad, err := ds.getActiveDial(p) if err != nil { return nil, err diff --git a/dial_sync_test.go b/dial_sync_test.go index 441c8b7d..47aa68f7 100644 --- a/dial_sync_test.go +++ b/dial_sync_test.go @@ -38,14 +38,14 @@ func TestBasicDialSync(t *testing.T) { finished := make(chan struct{}, 2) go func() { - if _, err := dsync.DialLock(context.Background(), p); err != nil { + if _, err := dsync.Dial(context.Background(), p); err != nil { t.Error(err) } finished <- struct{}{} }() go func() { - if _, err := dsync.DialLock(context.Background(), p); err != nil { + if _, err := dsync.Dial(context.Background(), p); err != nil { t.Error(err) } finished <- struct{}{} @@ -74,7 +74,7 @@ func TestDialSyncCancel(t *testing.T) { finished := make(chan struct{}) go func() { - _, err := dsync.DialLock(ctx1, p) + _, err := dsync.Dial(ctx1, p) if err != ctx1.Err() { t.Error("should have gotten context error") } @@ -90,7 +90,7 @@ func TestDialSyncCancel(t *testing.T) { // Add a second dialwait in so two actors are waiting on the same dial go func() { - _, err := dsync.DialLock(context.Background(), p) + _, err := dsync.Dial(context.Background(), p) if err != nil { t.Error(err) } @@ -123,7 +123,7 @@ func TestDialSyncAllCancel(t *testing.T) { finished := make(chan struct{}) go func() { - if _, err := dsync.DialLock(ctx, p); err != ctx.Err() { + if _, err := dsync.Dial(ctx, p); err != ctx.Err() { t.Error("should have gotten context error") } finished <- struct{}{} @@ -131,7 +131,7 @@ func TestDialSyncAllCancel(t *testing.T) { // Add a second dialwait in so two actors are waiting on the same dial go func() { - if _, err := dsync.DialLock(ctx, p); err != ctx.Err() { + if _, err := dsync.Dial(ctx, p); err != ctx.Err() { t.Error("should have gotten context error") } finished <- struct{}{} @@ -155,7 +155,7 @@ func TestDialSyncAllCancel(t *testing.T) { // should be able to successfully dial that peer again done() - if _, err := dsync.DialLock(context.Background(), p); err != nil { + if _, err := dsync.Dial(context.Background(), p); err != nil { t.Fatal(err) } } @@ -188,12 +188,12 @@ func TestFailFirst(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - _, err := ds.DialLock(ctx, p) + _, err := ds.Dial(ctx, p) if err == nil { t.Fatal("expected gophers to have eaten the modem") } - c, err := ds.DialLock(ctx, p) + c, err := ds.Dial(ctx, p) if err != nil { t.Fatal(err) } @@ -223,7 +223,7 @@ func TestStressActiveDial(t *testing.T) { makeDials := func() { for i := 0; i < 10000; i++ { - ds.DialLock(context.Background(), pid) + ds.Dial(context.Background(), pid) } wg.Done() } @@ -245,13 +245,13 @@ func TestDialSelf(t *testing.T) { defer s.Close() // this should fail - _, err := s.dsync.DialLock(ctx, self) + _, err := s.dsync.Dial(ctx, self) if err != ErrDialToSelf { t.Fatal("expected error from self dial") } // do it twice to make sure we get a new active dial object that fails again - _, err = s.dsync.DialLock(ctx, self) + _, err = s.dsync.Dial(ctx, self) if err != ErrDialToSelf { t.Fatal("expected error from self dial") } diff --git a/swarm_dial.go b/swarm_dial.go index 83a0468f..9c9e48cc 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -259,7 +259,7 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { ctx, cancel := context.WithTimeout(ctx, network.GetDialPeerTimeout(ctx)) defer cancel() - conn, err = s.dsync.DialLock(ctx, p) + conn, err = s.dsync.Dial(ctx, p) if err == nil { return conn, nil } From 62d5d6f0b523db38b1c327cc5fb3cfb0b81d5aa6 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 23 Aug 2021 18:00:01 +0100 Subject: [PATCH 2/3] only check for self dials once --- dial_sync.go | 11 +++-------- dial_sync_test.go | 35 ++++------------------------------- dial_test.go | 2 +- swarm_dial.go | 9 ++------- 4 files changed, 10 insertions(+), 47 deletions(-) diff --git a/dial_sync.go b/dial_sync.go index 83167f63..2271221d 100644 --- a/dial_sync.go +++ b/dial_sync.go @@ -8,8 +8,8 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) -// DialWorerFunc is used by DialSync to spawn a new dial worker -type dialWorkerFunc func(peer.ID, <-chan dialRequest) error +// dialWorkerFunc is used by DialSync to spawn a new dial worker +type dialWorkerFunc func(peer.ID, <-chan dialRequest) // newDialSync constructs a new DialSync func newDialSync(worker dialWorkerFunc) *DialSync { @@ -93,12 +93,7 @@ func (ds *DialSync) getActiveDial(p peer.ID) (*activeDial, error) { reqch: make(chan dialRequest), ds: ds, } - - if err := ds.dialWorker(p, actd.reqch); err != nil { - cancel() - return nil, err - } - + ds.dialWorker(p, actd.reqch) ds.dials[p] = actd } diff --git a/dial_sync_test.go b/dial_sync_test.go index 47aa68f7..0d9c6ca4 100644 --- a/dial_sync_test.go +++ b/dial_sync_test.go @@ -15,7 +15,7 @@ func getMockDialFunc() (dialWorkerFunc, func(), context.Context, <-chan struct{} dfcalls := make(chan struct{}, 512) // buffer it large enough that we won't care dialctx, cancel := context.WithCancel(context.Background()) ch := make(chan struct{}) - f := func(p peer.ID, reqch <-chan dialRequest) error { + f := func(p peer.ID, reqch <-chan dialRequest) { defer cancel() dfcalls <- struct{}{} go func() { @@ -24,7 +24,6 @@ func getMockDialFunc() (dialWorkerFunc, func(), context.Context, <-chan struct{} req.resch <- dialResponse{conn: new(Conn)} } }() - return nil } var once sync.Once @@ -162,7 +161,7 @@ func TestDialSyncAllCancel(t *testing.T) { func TestFailFirst(t *testing.T) { var count int32 - f := func(p peer.ID, reqch <-chan dialRequest) error { + f := func(p peer.ID, reqch <-chan dialRequest) { go func() { for { req, ok := <-reqch @@ -178,18 +177,15 @@ func TestFailFirst(t *testing.T) { atomic.AddInt32(&count, 1) } }() - return nil } ds := newDialSync(f) - p := peer.ID("testing") ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - _, err := ds.Dial(ctx, p) - if err == nil { + if _, err := ds.Dial(ctx, p); err == nil { t.Fatal("expected gophers to have eaten the modem") } @@ -197,14 +193,13 @@ func TestFailFirst(t *testing.T) { if err != nil { t.Fatal(err) } - if c == nil { t.Fatal("should have gotten a 'real' conn back") } } func TestStressActiveDial(t *testing.T) { - ds := newDialSync(func(p peer.ID, reqch <-chan dialRequest) error { + ds := newDialSync(func(p peer.ID, reqch <-chan dialRequest) { go func() { for { req, ok := <-reqch @@ -214,7 +209,6 @@ func TestStressActiveDial(t *testing.T) { req.resch <- dialResponse{} } }() - return nil }) wg := sync.WaitGroup{} @@ -235,24 +229,3 @@ func TestStressActiveDial(t *testing.T) { wg.Wait() } - -func TestDialSelf(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - self := peer.ID("ABC") - s := NewSwarm(ctx, self, nil, nil) - defer s.Close() - - // this should fail - _, err := s.dsync.Dial(ctx, self) - if err != ErrDialToSelf { - t.Fatal("expected error from self dial") - } - - // do it twice to make sure we get a new active dial object that fails again - _, err = s.dsync.Dial(ctx, self) - if err != ErrDialToSelf { - t.Fatal("expected error from self dial") - } -} diff --git a/dial_test.go b/dial_test.go index 6258d0ed..3bbc5e2b 100644 --- a/dial_test.go +++ b/dial_test.go @@ -672,7 +672,7 @@ func TestDialSimultaneousJoin(t *testing.T) { } } -func TestDialSelf2(t *testing.T) { +func TestDialSelf(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/swarm_dial.go b/swarm_dial.go index 9c9e48cc..9f76c975 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -294,14 +294,9 @@ type dialResponse struct { err error } -// startDialWorker starts an active dial goroutine that synchronizes and executes concurrent dials -func (s *Swarm) startDialWorker(p peer.ID, reqch <-chan dialRequest) error { - if p == s.local { - return ErrDialToSelf - } - +// startDialWorker starts an active dial goroutine that synchronizes and executes concurrent dials to a single peer +func (s *Swarm) startDialWorker(p peer.ID, reqch <-chan dialRequest) { go s.dialWorkerLoop(p, reqch) - return nil } func (s *Swarm) dialWorkerLoop(p peer.ID, reqch <-chan dialRequest) { From 1bf84f799ac69fc7d96ee665a2d9b8d2e04b5807 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 24 Aug 2021 12:03:46 +0100 Subject: [PATCH 3/3] simplify starting of the dialWorkerLoop --- dial_sync.go | 2 +- swarm.go | 2 +- swarm_dial.go | 6 +----- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/dial_sync.go b/dial_sync.go index 2271221d..6b3f2afe 100644 --- a/dial_sync.go +++ b/dial_sync.go @@ -93,7 +93,7 @@ func (ds *DialSync) getActiveDial(p peer.ID) (*activeDial, error) { reqch: make(chan dialRequest), ds: ds, } - ds.dialWorker(p, actd.reqch) + go ds.dialWorker(p, actd.reqch) ds.dials[p] = actd } diff --git a/swarm.go b/swarm.go index 35eb4156..c34497f7 100644 --- a/swarm.go +++ b/swarm.go @@ -122,7 +122,7 @@ func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc } } - s.dsync = newDialSync(s.startDialWorker) + s.dsync = newDialSync(s.dialWorkerLoop) s.limiter = newDialLimiter(s.dialAddr) s.proc = goprocessctx.WithContext(ctx) s.ctx = goprocessctx.OnClosingContext(s.proc) diff --git a/swarm_dial.go b/swarm_dial.go index 9f76c975..21b5b84e 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -294,11 +294,7 @@ type dialResponse struct { err error } -// startDialWorker starts an active dial goroutine that synchronizes and executes concurrent dials to a single peer -func (s *Swarm) startDialWorker(p peer.ID, reqch <-chan dialRequest) { - go s.dialWorkerLoop(p, reqch) -} - +// dialWorkerLoop synchronizes and executes concurrent dials to a single peer func (s *Swarm) dialWorkerLoop(p peer.ID, reqch <-chan dialRequest) { defer s.limiter.clearAllPeerDials(p)