Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Fix simultaneous dials #250

Merged
merged 29 commits into from
Apr 1, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8930f29
deps: update go-libp2p-core
vyzo Mar 30, 2021
9d50a8c
implement dial worker for synchronizing simultaneous dials
vyzo Mar 30, 2021
201a8d1
adjust next dial delays
vyzo Mar 30, 2021
2e742fb
fix dial_sync tests
vyzo Mar 30, 2021
5900641
clear address dial when they fail because of backoff
vyzo Mar 30, 2021
de528f1
nuke incref, it's useless
vyzo Mar 31, 2021
2ee7bf0
make dialWorker return an error for self dials and responsible for sp…
vyzo Mar 31, 2021
0510dcb
don't use a goroutine for the actual dial
vyzo Mar 31, 2021
96723d1
bump go version to 1.15
vyzo Mar 31, 2021
bbd0a01
batch dials together, rework address ranking
vyzo Mar 31, 2021
a6c2838
tune down batch dial delays
vyzo Mar 31, 2021
7ccf58e
use a timer instead of time.After
vyzo Mar 31, 2021
0fc0ade
kill dial jump delays
vyzo Mar 31, 2021
12a0cdb
add TestDialExistingConnection
vyzo Mar 31, 2021
e7b6af6
do a last ditch check for acceptable connections before dispatching a…
vyzo Mar 31, 2021
580a818
merge dial contexts where possible
vyzo Mar 31, 2021
699b4d1
add TestDialSimultaneousJoin test
vyzo Mar 31, 2021
b67b736
don't add backoff if we have successfully connected
vyzo Mar 31, 2021
0538806
fix TestConnectednessCorrect
vyzo Mar 31, 2021
acc35e8
don't store the active dial if it errors while starting the worker
vyzo Mar 31, 2021
27f6c39
add TestSelfDial
vyzo Mar 31, 2021
1624828
make DialRequest and DialResponse private
vyzo Apr 1, 2021
13d3556
add comment about the necessity of removing the address tracking when…
vyzo Apr 1, 2021
4a69fa2
remove dial batching
vyzo Apr 1, 2021
43b0382
add new TestDialSelf
vyzo Apr 1, 2021
17bc04b
make DialWorkerFunc, NewDialSync private
vyzo Apr 1, 2021
be3e940
rename dialWorker to startDialWorker
vyzo Apr 1, 2021
df0ab8b
make addr utility funcs standalone and not exported
vyzo Apr 1, 2021
084fffe
make IsFdConsumingAddr a standalone utility func
vyzo Apr 1, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 48 additions & 66 deletions dial_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,95 +5,82 @@ import (
"errors"
"sync"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
)

// TODO: change this text when we fix the bug
var errDialCanceled = errors.New("dial was aborted internally, likely due to https://git.io/Je2wW")

// DialFunc is the type of function expected by DialSync.
type DialFunc func(context.Context, peer.ID) (*Conn, error)
type DialWorkerFunc func(context.Context, peer.ID, <-chan DialRequest)

// NewDialSync constructs a new DialSync
func NewDialSync(dfn DialFunc) *DialSync {
func NewDialSync(worker DialWorkerFunc) *DialSync {
return &DialSync{
dials: make(map[peer.ID]*activeDial),
dialFunc: dfn,
dials: make(map[peer.ID]*activeDial),
dialWorker: worker,
}
}

// DialSync is a dial synchronization helper that ensures that at most one dial
// to any given peer is active at any given time.
type DialSync struct {
dials map[peer.ID]*activeDial
dialsLk sync.Mutex
dialFunc DialFunc
dials map[peer.ID]*activeDial
dialsLk sync.Mutex
dialWorker DialWorkerFunc
}

type activeDial struct {
id peer.ID
refCnt int
refCntLk sync.Mutex
cancel func()
id peer.ID
refCnt int

err error
conn *Conn
waitch chan struct{}
ctx context.Context
cancel func()

ds *DialSync
}
reqch chan DialRequest

func (ad *activeDial) wait(ctx context.Context) (*Conn, error) {
defer ad.decref()
select {
case <-ad.waitch:
return ad.conn, ad.err
case <-ctx.Done():
return nil, ctx.Err()
}
ds *DialSync
}

func (ad *activeDial) incref() {
ad.refCntLk.Lock()
defer ad.refCntLk.Unlock()
ad.refCnt++
}

func (ad *activeDial) decref() {
ad.refCntLk.Lock()
ad.ds.dialsLk.Lock()
vyzo marked this conversation as resolved.
Show resolved Hide resolved
ad.refCnt--
maybeZero := (ad.refCnt <= 0)
ad.refCntLk.Unlock()

// make sure to always take locks in correct order.
if maybeZero {
ad.ds.dialsLk.Lock()
ad.refCntLk.Lock()
// check again after lock swap drop to make sure nobody else called incref
// in between locks
if ad.refCnt <= 0 {
ad.cancel()
delete(ad.ds.dials, ad.id)
}
ad.refCntLk.Unlock()
ad.ds.dialsLk.Unlock()
if ad.refCnt == 0 {
ad.cancel()
close(ad.reqch)
delete(ad.ds.dials, ad.id)
}
ad.ds.dialsLk.Unlock()
}

func (ad *activeDial) start(ctx context.Context) {
ad.conn, ad.err = ad.ds.dialFunc(ctx, ad.id)

// This isn't the user's context so we should fix the error.
switch ad.err {
case context.Canceled:
// The dial was canceled with `CancelDial`.
ad.err = errDialCanceled
case context.DeadlineExceeded:
// We hit an internal timeout, not a context timeout.
ad.err = ErrDialTimeout
func (ad *activeDial) dial(ctx context.Context, p peer.ID) (*Conn, error) {
dialCtx := ad.ctx

if forceDirect, reason := network.GetForceDirectDial(ctx); forceDirect {
dialCtx = network.WithForceDirectDial(dialCtx, reason)
}
if simConnect, reason := network.GetSimultaneousConnect(ctx); simConnect {
dialCtx = network.WithSimultaneousConnect(dialCtx, reason)
}
marten-seemann marked this conversation as resolved.
Show resolved Hide resolved

resch := make(chan DialResponse, 1)
select {
case ad.reqch <- DialRequest{Ctx: dialCtx, Resch: resch}:
case <-ctx.Done():
return nil, ctx.Err()
}

select {
case res := <-resch:
return res.Conn, res.Err
case <-ctx.Done():
return nil, ctx.Err()
}
close(ad.waitch)
ad.cancel()
}

func (ds *DialSync) getActiveDial(p peer.ID) *activeDial {
Expand All @@ -109,13 +96,14 @@ func (ds *DialSync) getActiveDial(p peer.ID) *activeDial {
adctx, cancel := context.WithCancel(context.Background())
actd = &activeDial{
id: p,
ctx: adctx,
cancel: cancel,
waitch: make(chan struct{}),
reqch: make(chan DialRequest),
ds: ds,
}
ds.dials[p] = actd

go actd.start(adctx)
go ds.dialWorker(adctx, p, actd.reqch)
}

// increase ref count before dropping dialsLk
Expand All @@ -127,14 +115,8 @@ func (ds *DialSync) getActiveDial(p peer.ID) *activeDial {
// DialLock initiates a dial to the given peer if there are none in progress
// then waits for the dial to that peer to complete.
func (ds *DialSync) DialLock(ctx context.Context, p peer.ID) (*Conn, error) {
return ds.getActiveDial(p).wait(ctx)
}
ad := ds.getActiveDial(p)
defer ad.decref()

// CancelDial cancels all in-progress dials to the given peer.
func (ds *DialSync) CancelDial(p peer.ID) {
ds.dialsLk.Lock()
defer ds.dialsLk.Unlock()
if ad, ok := ds.dials[p]; ok {
ad.cancel()
}
return ad.dial(ctx, p)
}
70 changes: 54 additions & 16 deletions dial_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,33 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
)

func getMockDialFunc() (DialFunc, func(), context.Context, <-chan struct{}) {
func getMockDialFunc() (DialWorkerFunc, func(), context.Context, <-chan struct{}) {
dfcalls := make(chan struct{}, 512) // buffer it large enough that we won't care
dialctx, cancel := context.WithCancel(context.Background())
ch := make(chan struct{})
f := func(ctx context.Context, p peer.ID) (*Conn, error) {
f := func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) {
dfcalls <- struct{}{}
defer cancel()
select {
case <-ch:
return new(Conn), nil
case <-ctx.Done():
return nil, ctx.Err()
}
go func() {
defer cancel()
for {
select {
case req, ok := <-reqch:
if !ok {
return
}

select {
case <-ch:
req.Resch <- DialResponse{Conn: new(Conn)}
case <-ctx.Done():
req.Resch <- DialResponse{Err: ctx.Err()}
return
}
case <-ctx.Done():
return
}
}
}()
}

o := new(sync.Once)
Expand Down Expand Up @@ -174,12 +188,25 @@ func TestDialSyncAllCancel(t *testing.T) {

func TestFailFirst(t *testing.T) {
var count int
f := func(ctx context.Context, p peer.ID) (*Conn, error) {
if count > 0 {
return new(Conn), nil
f := func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) {
for {
select {
case req, ok := <-reqch:
if !ok {
return
}

if count > 0 {
req.Resch <- DialResponse{Conn: new(Conn)}
} else {
req.Resch <- DialResponse{Err: fmt.Errorf("gophers ate the modem")}
}
count++

case <-ctx.Done():
return
}
}
count++
return nil, fmt.Errorf("gophers ate the modem")
}

ds := NewDialSync(f)
Expand All @@ -205,8 +232,19 @@ func TestFailFirst(t *testing.T) {
}

func TestStressActiveDial(t *testing.T) {
ds := NewDialSync(func(ctx context.Context, p peer.ID) (*Conn, error) {
return nil, nil
ds := NewDialSync(func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) {
for {
select {
case req, ok := <-reqch:
if !ok {
return
}

req.Resch <- DialResponse{}
case <-ctx.Done():
return
}
}
})

wg := sync.WaitGroup{}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-addr-util v0.0.2
github.com/libp2p/go-conn-security-multistream v0.2.1
github.com/libp2p/go-libp2p-core v0.8.3
github.com/libp2p/go-libp2p-core v0.8.5
github.com/libp2p/go-libp2p-loggables v0.1.0
github.com/libp2p/go-libp2p-peerstore v0.2.6
github.com/libp2p/go-libp2p-quic-transport v0.10.0
Expand Down
Loading