Skip to content
This repository has been archived by the owner on Sep 10, 2022. It is now read-only.

call the connection gater when accepting connections and after crypto handshake #55

Merged
merged 10 commits into from
May 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions gater_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package stream_test

import (
"sync"

"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/control"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

ma "github.com/multiformats/go-multiaddr"
)

type testGater struct {
sync.Mutex

blockAccept, blockSecured bool
}

var _ connmgr.ConnectionGater = (*testGater)(nil)

func (t *testGater) BlockAccept(block bool) {
t.Lock()
defer t.Unlock()

t.blockAccept = block
}

func (t *testGater) BlockSecured(block bool) {
t.Lock()
defer t.Unlock()

t.blockSecured = block
}

func (t *testGater) InterceptPeerDial(p peer.ID) (allow bool) {
panic("not implemented")
}

func (t *testGater) InterceptAddrDial(id peer.ID, multiaddr ma.Multiaddr) (allow bool) {
panic("not implemented")
}

func (t *testGater) InterceptAccept(multiaddrs network.ConnMultiaddrs) (allow bool) {
t.Lock()
defer t.Unlock()

return !t.blockAccept
}

func (t *testGater) InterceptSecured(direction network.Direction, id peer.ID, multiaddrs network.ConnMultiaddrs) (allow bool) {
t.Lock()
defer t.Unlock()

return !t.blockSecured
}

func (t *testGater) InterceptUpgraded(conn network.Conn) (allow bool, reason control.DisconnectReason) {
panic("not implemented")
}
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ go 1.12
require (
github.com/ipfs/go-log v1.0.4
github.com/jbenet/go-temp-err-catcher v0.1.0
github.com/libp2p/go-libp2p-core v0.5.3
github.com/libp2p/go-libp2p-core v0.5.5
github.com/libp2p/go-libp2p-mplex v0.2.3
github.com/libp2p/go-libp2p-pnet v0.2.0
github.com/libp2p/go-maddr-filter v0.0.5
github.com/multiformats/go-multiaddr v0.2.1
github.com/multiformats/go-multiaddr-net v0.1.5
github.com/stretchr/testify v1.4.0
Expand Down
10 changes: 4 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,22 @@ github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS
github.com/libp2p/go-libp2p-core v0.2.0/go.mod h1:X0eyB0Gy93v0DZtSYbEM7RnMChm9Uv3j7yRXjO77xSI=
github.com/libp2p/go-libp2p-core v0.3.1/go.mod h1:thvWy0hvaSBhnVBaW37BvzgVV68OUhgJJLAa6almrII=
github.com/libp2p/go-libp2p-core v0.5.0/go.mod h1:49XGI+kc38oGVwqSBhDEwytaAxgZasHhFfQKibzTls0=
github.com/libp2p/go-libp2p-core v0.5.3 h1:b9W3w7AZR2n/YJhG8d0qPFGhGhCWKIvPuJgp4hhc4MM=
github.com/libp2p/go-libp2p-core v0.5.3/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y=
github.com/libp2p/go-libp2p-core v0.5.5 h1:/yiFUZDoBWqvpWeHHJ1iA8SOs5obT1/+UdNfckwD57M=
github.com/libp2p/go-libp2p-core v0.5.5/go.mod h1:vj3awlOr9+GMZJFH9s4mpt9RHHgGqeHCopzbYKZdRjM=
github.com/libp2p/go-libp2p-mplex v0.2.3 h1:2zijwaJvpdesST2MXpI5w9wWFRgYtMcpRX7rrw0jmOo=
github.com/libp2p/go-libp2p-mplex v0.2.3/go.mod h1:CK3p2+9qH9x+7ER/gWWDYJ3QW5ZxWDkm+dVvjfuG3ek=
github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-testing v0.1.1 h1:U03z3HnGI7Ni8Xx6ONVZvUFOAzWYmolWf5W5jAOPNmU=
github.com/libp2p/go-libp2p-testing v0.1.1/go.mod h1:xaZWMJrPUM5GlDBxCeGUi7kI4eqnjVyavGroI2nxEM0=
github.com/libp2p/go-maddr-filter v0.0.5 h1:CW3AgbMO6vUvT4kf87y4N+0P8KUl2aqLYhrGyDUbLSg=
github.com/libp2p/go-maddr-filter v0.0.5/go.mod h1:Jk+36PMfIqCJhAnaASRH83bdAvfDRp/w6ENFaC9bG+M=
github.com/libp2p/go-mplex v0.1.2 h1:qOg1s+WdGLlpkrczDqmhYzyk3vCfsQ8+RxRTQjOZWwI=
github.com/libp2p/go-mplex v0.1.2/go.mod h1:Xgz2RDCi3co0LeZfgjm4OgUF15+sVR8SRcu3SFXI1lk=
github.com/libp2p/go-msgio v0.0.4 h1:agEFehY3zWJFUHK6SEMR7UYmk2z6kC3oeCM7ybLhguA=
github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=
github.com/libp2p/go-openssl v0.0.4 h1:d27YZvLoTyMhIN4njrkr8zMDOM4lfpHIp6A+TK9fovg=
github.com/libp2p/go-openssl v0.0.4/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc=
github.com/libp2p/go-openssl v0.0.5 h1:pQkejVhF0xp08D4CQUcw8t+BFJeXowja6RVcb5p++EA=
github.com/libp2p/go-openssl v0.0.5/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc=
github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
Expand All @@ -108,7 +108,6 @@ github.com/mr-tron/base58 v1.1.3 h1:v+sk57XuaCKGXpWtVBX8YJzO7hMGx4Aajh4TQbdEFdc=
github.com/mr-tron/base58 v1.1.3/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI=
github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA=
github.com/multiformats/go-multiaddr v0.0.1/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multiaddr v0.0.4/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multiaddr v0.2.0/go.mod h1:0nO36NvPpyV4QzvTLi/lafl2y95ncPj0vFwVF6k6wJ4=
github.com/multiformats/go-multiaddr v0.2.1 h1:SgG/cw5vqyB5QQe5FPe2TqggU9WtrA9X4nZw7LlVqOI=
Expand Down Expand Up @@ -172,7 +171,6 @@ golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8 h1:1wopBVtVdWnn03fZelqdXTqk7U7zPQCb+T4rbU9ZEoU=
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443 h1:IcSOAf4PyMp3U3XbIEj1/xJ2BjNN2jWv7JoyOsMxXUU=
golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand Down
11 changes: 11 additions & 0 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ func (l *listener) handleIncoming() {
return
}

// gate the connection if applicable
if l.upgrader.ConnGater != nil && !l.upgrader.ConnGater.InterceptAccept(maconn) {
log.Debugf("gater blocked incoming connection on local addr %s from %s",
maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())

if err := maconn.Close(); err != nil {
log.Warnf("failed to incoming connection rejected by gater; err: %s", err)
}
continue
}

// The go routine below calls Release when the context is
// canceled so there's no need to wait on it here.
l.threshold.Wait()
Expand Down
135 changes: 46 additions & 89 deletions listener_test.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
package stream_test

import (
"context"
"errors"
"io"
"net"
"sync"
"testing"
"time"

"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/sec/insecure"
"github.com/libp2p/go-libp2p-core/transport"

mplex "github.com/libp2p/go-libp2p-mplex"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"

Expand All @@ -23,94 +18,10 @@ import (
"github.com/stretchr/testify/require"
)

// negotiatingMuxer sets up a new mplex connection
// It makes sure that this happens at the same time for client and server.
type negotiatingMuxer struct{}

func (m *negotiatingMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
var err error
// run a fake muxer negotiation
if isServer {
_, err = c.Write([]byte("setup"))
} else {
_, err = c.Read(make([]byte, 5))
}
if err != nil {
return nil, err
}
return mplex.DefaultTransport.NewConn(c, isServer)
}

// blockingMuxer blocks the muxer negotiation until the contain chan is closed
type blockingMuxer struct {
unblock chan struct{}
}

var _ mux.Multiplexer = &blockingMuxer{}

func newBlockingMuxer() *blockingMuxer {
return &blockingMuxer{unblock: make(chan struct{})}
}

func (m *blockingMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
<-m.unblock
return (&negotiatingMuxer{}).NewConn(c, isServer)
}

func (m *blockingMuxer) Unblock() {
close(m.unblock)
}

// errorMuxer is a muxer that errors while setting up
type errorMuxer struct{}

var _ mux.Multiplexer = &errorMuxer{}

func (m *errorMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
return nil, errors.New("mux error")
}

var (
defaultUpgrader = &st.Upgrader{
Secure: insecure.New(peer.ID(1)),
Muxer: &negotiatingMuxer{},
}
)

func init() {
transport.AcceptTimeout = 1 * time.Hour
}

func testConn(t *testing.T, clientConn, serverConn transport.CapableConn) {
t.Helper()
require := require.New(t)

cstr, err := clientConn.OpenStream()
require.NoError(err)

_, err = cstr.Write([]byte("foobar"))
require.NoError(err)

sstr, err := serverConn.AcceptStream()
require.NoError(err)

b := make([]byte, 6)
_, err = sstr.Read(b)
require.NoError(err)
require.Equal([]byte("foobar"), b)
}

func dial(t *testing.T, upgrader *st.Upgrader, raddr ma.Multiaddr, p peer.ID) (transport.CapableConn, error) {
t.Helper()

macon, err := manet.Dial(raddr)
if err != nil {
return nil, err
}

return upgrader.UpgradeOutbound(context.Background(), nil, macon, p)
}

func createListener(t *testing.T, upgrader *st.Upgrader) transport.Listener {
t.Helper()
require := require.New(t)
Expand Down Expand Up @@ -385,3 +296,49 @@ func TestAcceptQueueBacklogged(t *testing.T) {

require.Eventually(func() bool { return len(errCh) == st.AcceptQueueLength+1 }, 2*time.Second, 100*time.Millisecond)
}

func TestListenerConnectionGater(t *testing.T) {
require := require.New(t)

testGater := &testGater{}
upgrader := *defaultUpgrader
upgrader.ConnGater = testGater

ln := createListener(t, &upgrader)
defer ln.Close()

// no gating.
conn, err := dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(0))
require.NoError(err)
require.False(conn.IsClosed())
_ = conn.Close()

// rejecting after handshake.
testGater.BlockSecured(true)
testGater.BlockAccept(false)
conn, err = dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(0))
require.Error(err)
require.Nil(conn)

// rejecting on accept will trigger first.
testGater.BlockSecured(true)
testGater.BlockAccept(true)
conn, err = dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(0))
require.Error(err)
require.Nil(conn)

// rejecting only on acceptance.
testGater.BlockSecured(false)
testGater.BlockAccept(true)
conn, err = dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(0))
require.Error(err)
require.Nil(conn)

// back to normal
testGater.BlockSecured(false)
testGater.BlockAccept(false)
conn, err = dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(0))
require.NoError(err)
require.False(conn.IsClosed())
_ = conn.Close()
}
38 changes: 22 additions & 16 deletions upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ import (
"context"
"errors"
"fmt"
"github.com/libp2p/go-libp2p-core/network"
"net"

"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/peer"
ipnet "github.com/libp2p/go-libp2p-core/pnet"
"github.com/libp2p/go-libp2p-core/sec"
"github.com/libp2p/go-libp2p-core/transport"
"github.com/libp2p/go-libp2p-pnet"

filter "github.com/libp2p/go-maddr-filter"
manet "github.com/multiformats/go-multiaddr-net"
)

Expand All @@ -27,10 +28,10 @@ var AcceptQueueLength = 16
// Upgrader is a multistream upgrader that can upgrade an underlying connection
// to a full transport connection (secure and multiplexed).
type Upgrader struct {
PSK ipnet.PSK
Secure sec.SecureTransport
Muxer mux.Multiplexer
Filters *filter.Filters
PSK ipnet.PSK
Secure sec.SecureTransport
Muxer mux.Multiplexer
ConnGater connmgr.ConnectionGater
}

// UpgradeListener upgrades the passed multiaddr-net listener into a full libp2p-transport listener.
Expand All @@ -55,22 +56,16 @@ func (u *Upgrader) UpgradeOutbound(ctx context.Context, t transport.Transport, m
if p == "" {
return nil, ErrNilPeer
}
return u.upgrade(ctx, t, maconn, p)
return u.upgrade(ctx, t, maconn, p, network.DirOutbound)
}

// UpgradeInbound upgrades the given inbound multiaddr-net connection into a
// full libp2p-transport connection.
func (u *Upgrader) UpgradeInbound(ctx context.Context, t transport.Transport, maconn manet.Conn) (transport.CapableConn, error) {
return u.upgrade(ctx, t, maconn, "")
return u.upgrade(ctx, t, maconn, "", network.DirInbound)
}

func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn manet.Conn, p peer.ID) (transport.CapableConn, error) {
if u.Filters != nil && u.Filters.AddrBlocked(maconn.RemoteMultiaddr()) {
log.Debugf("blocked connection from %s", maconn.RemoteMultiaddr())
maconn.Close()
return nil, fmt.Errorf("blocked connection from %s", maconn.RemoteMultiaddr())
}

func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn manet.Conn, p peer.ID, dir network.Direction) (transport.CapableConn, error) {
var conn net.Conn = maconn
if u.PSK != nil {
pconn, err := pnet.NewProtectedConn(u.PSK, conn)
Expand All @@ -91,18 +86,29 @@ func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
return nil, fmt.Errorf("failed to negotiate security protocol: %s", err)
}

// call the connection gater, if one is registered.
if u.ConnGater != nil && !u.ConnGater.InterceptSecured(dir, sconn.RemotePeer(), maconn) {
if err := maconn.Close(); err != nil {
log.Errorf("failed to close connection with peer %s and addr %s; err: %s",
p.Pretty(), maconn.RemoteMultiaddr(), err)
}
return nil, fmt.Errorf("gater rejected connection with peer %s and addr %s with direction %d",
sconn.RemotePeer().Pretty(), maconn.RemoteMultiaddr(), dir)
}

smconn, err := u.setupMuxer(ctx, sconn, p)
if err != nil {
sconn.Close()
return nil, fmt.Errorf("failed to negotiate stream multiplexer: %s", err)
}

return &transportConn{
tc := &transportConn{
MuxedConn: smconn,
ConnMultiaddrs: maconn,
ConnSecurity: sconn,
transport: t,
}, nil
}
return tc, nil
}

func (u *Upgrader) setupSecurity(ctx context.Context, conn net.Conn, p peer.ID) (sec.SecureConn, error) {
Expand Down
Loading