Skip to content

Commit

Permalink
Implement support for simultaneous open (#25)
Browse files Browse the repository at this point in the history
* implement support for simultaneous open

Co-authored-by: aarshkshah1992 <aarshkshah1992@gmail.com>
  • Loading branch information
vyzo and aarshkshah1992 committed Feb 17, 2021
1 parent ebab543 commit 4453d7a
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 9 deletions.
20 changes: 18 additions & 2 deletions p2p/net/upgrader/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"
"fmt"
"io"
"net"
"sync"
"testing"
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/sec"
"github.com/libp2p/go-libp2p-core/sec/insecure"
"github.com/libp2p/go-libp2p-core/transport"

Expand All @@ -24,6 +26,20 @@ func init() {
transport.AcceptTimeout = 1 * time.Hour
}

type MuxAdapter struct {
tpt sec.SecureTransport
}

func (mux *MuxAdapter) SecureInbound(ctx context.Context, insecure net.Conn) (sec.SecureConn, bool, error) {
sconn, err := mux.tpt.SecureInbound(ctx, insecure)
return sconn, true, err
}

func (mux *MuxAdapter) SecureOutbound(ctx context.Context, insecure net.Conn, p peer.ID) (sec.SecureConn, bool, error) {
sconn, err := mux.tpt.SecureOutbound(ctx, insecure, p)
return sconn, false, err
}

func createListener(t *testing.T, upgrader *st.Upgrader) transport.Listener {
t.Helper()
require := require.New(t)
Expand Down Expand Up @@ -119,7 +135,7 @@ func TestFailedUpgradeOnListen(t *testing.T) {
require := require.New(t)

upgrader := &st.Upgrader{
Secure: insecure.New(peer.ID("1")),
Secure: &MuxAdapter{tpt: insecure.New(peer.ID("1"))},
Muxer: &errorMuxer{},
}

Expand Down Expand Up @@ -215,7 +231,7 @@ func TestConcurrentAccept(t *testing.T) {
num = 3 * st.AcceptQueueLength
blockingMuxer = newBlockingMuxer()
upgrader = &st.Upgrader{
Secure: insecure.New(peer.ID("1")),
Secure: &MuxAdapter{tpt: insecure.New(peer.ID("1"))},
Muxer: blockingMuxer,
}
)
Expand Down
12 changes: 6 additions & 6 deletions p2p/net/upgrader/upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var AcceptQueueLength = 16
// to a full transport connection (secure and multiplexed).
type Upgrader struct {
PSK ipnet.PSK
Secure sec.SecureTransport
Secure sec.SecureMuxer
Muxer mux.Multiplexer
ConnGater connmgr.ConnectionGater
}
Expand Down Expand Up @@ -80,7 +80,7 @@ func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
return nil, ipnet.ErrNotInPrivateNetwork
}

sconn, err := u.setupSecurity(ctx, conn, p)
sconn, server, err := u.setupSecurity(ctx, conn, p)
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to negotiate security protocol: %s", err)
Expand All @@ -96,7 +96,7 @@ func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
sconn.RemotePeer().Pretty(), maconn.RemoteMultiaddr(), dir)
}

smconn, err := u.setupMuxer(ctx, sconn, p)
smconn, err := u.setupMuxer(ctx, sconn, server)
if err != nil {
sconn.Close()
return nil, fmt.Errorf("failed to negotiate stream multiplexer: %s", err)
Expand All @@ -111,22 +111,22 @@ func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
return tc, nil
}

func (u *Upgrader) setupSecurity(ctx context.Context, conn net.Conn, p peer.ID) (sec.SecureConn, error) {
func (u *Upgrader) setupSecurity(ctx context.Context, conn net.Conn, p peer.ID) (sec.SecureConn, bool, error) {
if p == "" {
return u.Secure.SecureInbound(ctx, conn)
}
return u.Secure.SecureOutbound(ctx, conn, p)
}

func (u *Upgrader) setupMuxer(ctx context.Context, conn net.Conn, p peer.ID) (mux.MuxedConn, error) {
func (u *Upgrader) setupMuxer(ctx context.Context, conn net.Conn, server bool) (mux.MuxedConn, error) {
// TODO: The muxer should take a context.
done := make(chan struct{})

var smconn mux.MuxedConn
var err error
go func() {
defer close(done)
smconn, err = u.Muxer.NewConn(conn, p == "")
smconn, err = u.Muxer.NewConn(conn, server)
}()

select {
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/upgrader/upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

var (
defaultUpgrader = &st.Upgrader{
Secure: insecure.New(peer.ID("1")),
Secure: &MuxAdapter{tpt: insecure.New(peer.ID("1"))},
Muxer: &negotiatingMuxer{},
}
)
Expand Down

0 comments on commit 4453d7a

Please sign in to comment.