From 4453d7a5e04f9b7c32b8e122d3fbf09685db201f Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 17 Feb 2021 09:36:57 +0200 Subject: [PATCH] Implement support for simultaneous open (#25) * implement support for simultaneous open Co-authored-by: aarshkshah1992 --- p2p/net/upgrader/listener_test.go | 20 ++++++++++++++++++-- p2p/net/upgrader/upgrader.go | 12 ++++++------ p2p/net/upgrader/upgrader_test.go | 2 +- 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/p2p/net/upgrader/listener_test.go b/p2p/net/upgrader/listener_test.go index 08205887c9..611186b4df 100644 --- a/p2p/net/upgrader/listener_test.go +++ b/p2p/net/upgrader/listener_test.go @@ -4,11 +4,13 @@ import ( "context" "fmt" "io" + "net" "sync" "testing" "time" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/sec" "github.com/libp2p/go-libp2p-core/sec/insecure" "github.com/libp2p/go-libp2p-core/transport" @@ -24,6 +26,20 @@ func init() { transport.AcceptTimeout = 1 * time.Hour } +type MuxAdapter struct { + tpt sec.SecureTransport +} + +func (mux *MuxAdapter) SecureInbound(ctx context.Context, insecure net.Conn) (sec.SecureConn, bool, error) { + sconn, err := mux.tpt.SecureInbound(ctx, insecure) + return sconn, true, err +} + +func (mux *MuxAdapter) SecureOutbound(ctx context.Context, insecure net.Conn, p peer.ID) (sec.SecureConn, bool, error) { + sconn, err := mux.tpt.SecureOutbound(ctx, insecure, p) + return sconn, false, err +} + func createListener(t *testing.T, upgrader *st.Upgrader) transport.Listener { t.Helper() require := require.New(t) @@ -119,7 +135,7 @@ func TestFailedUpgradeOnListen(t *testing.T) { require := require.New(t) upgrader := &st.Upgrader{ - Secure: insecure.New(peer.ID("1")), + Secure: &MuxAdapter{tpt: insecure.New(peer.ID("1"))}, Muxer: &errorMuxer{}, } @@ -215,7 +231,7 @@ func TestConcurrentAccept(t *testing.T) { num = 3 * st.AcceptQueueLength blockingMuxer = newBlockingMuxer() upgrader = &st.Upgrader{ - Secure: insecure.New(peer.ID("1")), + Secure: &MuxAdapter{tpt: insecure.New(peer.ID("1"))}, Muxer: blockingMuxer, } ) diff --git a/p2p/net/upgrader/upgrader.go b/p2p/net/upgrader/upgrader.go index 0c75c58125..55427e17a4 100644 --- a/p2p/net/upgrader/upgrader.go +++ b/p2p/net/upgrader/upgrader.go @@ -29,7 +29,7 @@ var AcceptQueueLength = 16 // to a full transport connection (secure and multiplexed). type Upgrader struct { PSK ipnet.PSK - Secure sec.SecureTransport + Secure sec.SecureMuxer Muxer mux.Multiplexer ConnGater connmgr.ConnectionGater } @@ -80,7 +80,7 @@ func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma return nil, ipnet.ErrNotInPrivateNetwork } - sconn, err := u.setupSecurity(ctx, conn, p) + sconn, server, err := u.setupSecurity(ctx, conn, p) if err != nil { conn.Close() return nil, fmt.Errorf("failed to negotiate security protocol: %s", err) @@ -96,7 +96,7 @@ func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma sconn.RemotePeer().Pretty(), maconn.RemoteMultiaddr(), dir) } - smconn, err := u.setupMuxer(ctx, sconn, p) + smconn, err := u.setupMuxer(ctx, sconn, server) if err != nil { sconn.Close() return nil, fmt.Errorf("failed to negotiate stream multiplexer: %s", err) @@ -111,14 +111,14 @@ func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma return tc, nil } -func (u *Upgrader) setupSecurity(ctx context.Context, conn net.Conn, p peer.ID) (sec.SecureConn, error) { +func (u *Upgrader) setupSecurity(ctx context.Context, conn net.Conn, p peer.ID) (sec.SecureConn, bool, error) { if p == "" { return u.Secure.SecureInbound(ctx, conn) } return u.Secure.SecureOutbound(ctx, conn, p) } -func (u *Upgrader) setupMuxer(ctx context.Context, conn net.Conn, p peer.ID) (mux.MuxedConn, error) { +func (u *Upgrader) setupMuxer(ctx context.Context, conn net.Conn, server bool) (mux.MuxedConn, error) { // TODO: The muxer should take a context. done := make(chan struct{}) @@ -126,7 +126,7 @@ func (u *Upgrader) setupMuxer(ctx context.Context, conn net.Conn, p peer.ID) (mu var err error go func() { defer close(done) - smconn, err = u.Muxer.NewConn(conn, p == "") + smconn, err = u.Muxer.NewConn(conn, server) }() select { diff --git a/p2p/net/upgrader/upgrader_test.go b/p2p/net/upgrader/upgrader_test.go index f1960a6273..edcac27327 100644 --- a/p2p/net/upgrader/upgrader_test.go +++ b/p2p/net/upgrader/upgrader_test.go @@ -22,7 +22,7 @@ import ( var ( defaultUpgrader = &st.Upgrader{ - Secure: insecure.New(peer.ID("1")), + Secure: &MuxAdapter{tpt: insecure.New(peer.ID("1"))}, Muxer: &negotiatingMuxer{}, } )