Skip to content

Commit

Permalink
add WebRTC Direct transport implementation (#2337)
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Sep 12, 2023
1 parent c2f0e13 commit 31ffac2
Show file tree
Hide file tree
Showing 32 changed files with 4,349 additions and 73 deletions.
19 changes: 17 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ require (
github.com/multiformats/go-multistream v0.5.0
github.com/multiformats/go-varint v0.0.7
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/pion/datachannel v1.5.5
github.com/pion/ice/v2 v2.3.6
github.com/pion/logging v0.2.2
github.com/pion/stun v0.6.0
github.com/pion/webrtc/v3 v3.2.9
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.4.0
github.com/quic-go/quic-go v0.38.1
Expand All @@ -53,6 +58,7 @@ require (
github.com/stretchr/testify v1.8.4
go.uber.org/fx v1.20.0
go.uber.org/goleak v1.2.0
go.uber.org/zap v1.25.0
golang.org/x/crypto v0.12.0
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63
golang.org/x/sync v0.3.0
Expand Down Expand Up @@ -94,9 +100,19 @@ require (
github.com/miekg/dns v1.1.55 // indirect
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/ginkgo/v2 v2.11.0 // indirect
github.com/opencontainers/runtime-spec v1.1.0 // indirect
github.com/pion/dtls/v2 v2.2.7 // indirect
github.com/pion/interceptor v0.1.17 // indirect
github.com/pion/mdns v0.0.7 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/rtcp v1.2.10 // indirect
github.com/pion/rtp v1.7.13 // indirect
github.com/pion/sctp v1.8.7 // indirect
github.com/pion/sdp/v3 v3.0.6 // indirect
github.com/pion/srtp/v2 v2.0.15 // indirect
github.com/pion/transport/v2 v2.2.1 // indirect
github.com/pion/turn/v2 v2.1.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
Expand All @@ -108,7 +124,6 @@ require (
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/dig v1.17.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.25.0 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/text v0.12.0 // indirect
Expand Down
85 changes: 85 additions & 0 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion p2p/metricshelper/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package metricshelper

import ma "github.com/multiformats/go-multiaddr"

var transports = [...]int{ma.P_CIRCUIT, ma.P_WEBRTC, ma.P_WEBTRANSPORT, ma.P_QUIC, ma.P_QUIC_V1, ma.P_WSS, ma.P_WS, ma.P_TCP}
var transports = [...]int{ma.P_CIRCUIT, ma.P_WEBRTC, ma.P_WEBRTC_DIRECT, ma.P_WEBTRANSPORT, ma.P_QUIC, ma.P_QUIC_V1, ma.P_WSS, ma.P_WS, ma.P_TCP}

func GetTransport(a ma.Multiaddr) string {
for _, t := range transports {
Expand Down
8 changes: 6 additions & 2 deletions p2p/test/transport/deadline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ func TestReadWriteDeadlines(t *testing.T) {
buf := make([]byte, 1)
_, err = s.Read(buf)
require.Error(t, err)
require.True(t, err.(net.Error).Timeout())
var nerr net.Error
require.ErrorAs(t, err, &nerr)
require.True(t, nerr.Timeout())
require.Less(t, time.Since(start), 1*time.Second)
})

Expand Down Expand Up @@ -80,7 +82,9 @@ func TestReadWriteDeadlines(t *testing.T) {
_, err = s.Write(sendBuf)
}
require.Error(t, err)
require.True(t, err.(net.Error).Timeout())
var nerr net.Error
require.ErrorAs(t, err, &nerr)
require.True(t, nerr.Timeout())
require.Less(t, time.Since(start), 1*time.Second)
})
}
Expand Down
37 changes: 31 additions & 6 deletions p2p/test/transport/gating_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package transport_integration

import (
"context"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -88,6 +89,8 @@ func TestInterceptSecuredOutgoing(t *testing.T) {

h1 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true, ConnGater: connGater})
h2 := tc.HostGenerator(t, TransportTestCaseOpts{})
defer h1.Close()
defer h2.Close()
require.Len(t, h2.Addrs(), 1)
require.Len(t, h2.Addrs(), 1)

Expand All @@ -97,7 +100,7 @@ func TestInterceptSecuredOutgoing(t *testing.T) {
connGater.EXPECT().InterceptPeerDial(h2.ID()).Return(true),
connGater.EXPECT().InterceptAddrDial(h2.ID(), gomock.Any()).Return(true),
connGater.EXPECT().InterceptSecured(network.DirOutbound, h2.ID(), gomock.Any()).Do(func(_ network.Direction, _ peer.ID, addrs network.ConnMultiaddrs) {
// remove the certhash component from WebTransport addresses
// remove the certhash component from WebTransport and WebRTC addresses
require.Equal(t, stripCertHash(h2.Addrs()[0]).String(), addrs.RemoteMultiaddr().String())
}),
)
Expand All @@ -120,6 +123,8 @@ func TestInterceptUpgradedOutgoing(t *testing.T) {

h1 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true, ConnGater: connGater})
h2 := tc.HostGenerator(t, TransportTestCaseOpts{})
defer h1.Close()
defer h2.Close()
require.Len(t, h2.Addrs(), 1)
require.Len(t, h2.Addrs(), 1)

Expand Down Expand Up @@ -154,19 +159,35 @@ func TestInterceptAccept(t *testing.T) {

h1 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true})
h2 := tc.HostGenerator(t, TransportTestCaseOpts{ConnGater: connGater})
defer h1.Close()
defer h2.Close()
require.Len(t, h2.Addrs(), 1)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// The basic host dials the first connection.
connGater.EXPECT().InterceptAccept(gomock.Any()).Do(func(addrs network.ConnMultiaddrs) {
// remove the certhash component from WebTransport addresses
require.Equal(t, stripCertHash(h2.Addrs()[0]), addrs.LocalMultiaddr())
})
if strings.Contains(tc.Name, "WebRTC") {
// In WebRTC, retransmissions of the STUN packet might cause us to create multiple connections,
// if the first connection attempt is rejected.
connGater.EXPECT().InterceptAccept(gomock.Any()).Do(func(addrs network.ConnMultiaddrs) {
// remove the certhash component from WebTransport addresses
require.Equal(t, stripCertHash(h2.Addrs()[0]), addrs.LocalMultiaddr())
}).AnyTimes()
} else {
connGater.EXPECT().InterceptAccept(gomock.Any()).Do(func(addrs network.ConnMultiaddrs) {
// remove the certhash component from WebTransport addresses
require.Equal(t, stripCertHash(h2.Addrs()[0]), addrs.LocalMultiaddr())
})
}

h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), time.Hour)
_, err := h1.NewStream(ctx, h2.ID(), protocol.TestingID)
require.Error(t, err)
require.NotErrorIs(t, err, context.DeadlineExceeded)
if _, err := h2.Addrs()[0].ValueForProtocol(ma.P_WEBRTC_DIRECT); err != nil {
// WebRTC rejects connection attempt before an error can be sent to the client.
// This means that the connection attempt will time out.
require.NotErrorIs(t, err, context.DeadlineExceeded)
}
})
}
}
Expand All @@ -183,6 +204,8 @@ func TestInterceptSecuredIncoming(t *testing.T) {

h1 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true})
h2 := tc.HostGenerator(t, TransportTestCaseOpts{ConnGater: connGater})
defer h1.Close()
defer h2.Close()
require.Len(t, h2.Addrs(), 1)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand Down Expand Up @@ -214,6 +237,8 @@ func TestInterceptUpgradedIncoming(t *testing.T) {

h1 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true})
h2 := tc.HostGenerator(t, TransportTestCaseOpts{ConnGater: connGater})
defer h1.Close()
defer h2.Close()
require.Len(t, h2.Addrs(), 1)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand Down
6 changes: 3 additions & 3 deletions p2p/test/transport/rcmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"testing"
"time"

gomock "github.com/golang/mock/gomock"
"github.com/golang/mock/gomock"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
mocknetwork "github.com/libp2p/go-libp2p/core/network/mocks"
Expand Down Expand Up @@ -55,7 +55,7 @@ func TestResourceManagerIsUsed(t *testing.T) {
}

expectFd := true
if strings.Contains(tc.Name, "QUIC") || strings.Contains(tc.Name, "WebTransport") {
if strings.Contains(tc.Name, "QUIC") || strings.Contains(tc.Name, "WebTransport") || strings.Contains(tc.Name, "WebRTC") {
expectFd = false
}

Expand Down Expand Up @@ -86,7 +86,7 @@ func TestResourceManagerIsUsed(t *testing.T) {
}
return nil
})
connScope.EXPECT().Done()
connScope.EXPECT().Done().MinTimes(1)

var allStreamsDone sync.WaitGroup

Expand Down
101 changes: 55 additions & 46 deletions p2p/test/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"net"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
Expand All @@ -27,6 +28,8 @@ import (
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/libp2p/go-libp2p/p2p/security/noise"
tls "github.com/libp2p/go-libp2p/p2p/security/tls"
libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc"

"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -134,6 +137,21 @@ var transportsToTest = []TransportTestCase{
return h
},
},
{
Name: "WebRTC",
HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
libp2pOpts := transformOpts(opts)
libp2pOpts = append(libp2pOpts, libp2p.Transport(libp2pwebrtc.New))
if opts.NoListen {
libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs)
} else {
libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/udp/0/webrtc-direct"))
}
h, err := libp2p.New(libp2pOpts...)
require.NoError(t, err)
return h
},
},
}

func TestPing(t *testing.T) {
Expand Down Expand Up @@ -229,7 +247,7 @@ func TestLotsOfDataManyStreams(t *testing.T) {
// 64k buffer
const bufSize = 64 << 10
sendBuf := [bufSize]byte{}
const totalStreams = 512
const totalStreams = 500
const parallel = 8
// Total sends are > 20MiB
require.Greater(t, len(sendBuf)*totalStreams, 20<<20)
Expand Down Expand Up @@ -296,6 +314,9 @@ func TestManyStreams(t *testing.T) {
const streamCount = 128
for _, tc := range transportsToTest {
t.Run(tc.Name, func(t *testing.T) {
if strings.Contains(tc.Name, "WebRTC") {
t.Skip("Pion doesn't correctly handle large queues of streams.")
}
h1 := tc.HostGenerator(t, TransportTestCaseOpts{NoRcmgr: true})
h2 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true, NoRcmgr: true})
defer h1.Close()
Expand Down Expand Up @@ -361,6 +382,9 @@ func TestMoreStreamsThanOurLimits(t *testing.T) {
const streamCount = 1024
for _, tc := range transportsToTest {
t.Run(tc.Name, func(t *testing.T) {
if strings.Contains(tc.Name, "WebRTC") {
t.Skip("This test potentially exhausts the uint16 WebRTC stream ID space.")
}
listenerLimits := rcmgr.PartialLimitConfig{
PeerDefault: rcmgr.ResourceLimits{
Streams: 32,
Expand Down Expand Up @@ -455,6 +479,8 @@ func TestMoreStreamsThanOurLimits(t *testing.T) {
time.Sleep(50 * time.Millisecond)
continue
}
t.Logf("opening stream failed: %v", err)
return
}
err = func(s network.Stream) error {
defer s.Close()
Expand Down Expand Up @@ -596,8 +622,8 @@ func TestStreamReadDeadline(t *testing.T) {
_, err = s.Read([]byte{0})
require.Error(t, err)
require.Contains(t, err.Error(), "deadline")
nerr, ok := err.(net.Error)
require.True(t, ok, "expected a net.Error")
var nerr net.Error
require.True(t, errors.As(err, &nerr), "expected a net.Error")
require.True(t, nerr.Timeout(), "expected net.Error.Timeout() == true")
// now test that the stream is still usable
s.SetReadDeadline(time.Time{})
Expand Down Expand Up @@ -628,58 +654,41 @@ func TestDiscoverPeerIDFromSecurityNegotiation(t *testing.T) {
return "", inputErr
}

// runs a test to verify we can extract the peer ID from a target with just its address
runTest := func(t *testing.T, h host.Host) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Use a bogus peer ID so that when we connect to the target we get an error telling
// us the targets real peer ID
bogusPeerId, err := peer.Decode("QmadAdJ3f63JyNs65X7HHzqDwV53ynvCcKtNFvdNaz3nhk")
if err != nil {
t.Fatal("the hard coded bogus peerID is invalid")
}
for _, tc := range transportsToTest {
t.Run(tc.Name, func(t *testing.T) {
h1 := tc.HostGenerator(t, TransportTestCaseOpts{})
h2 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true})
defer h1.Close()
defer h2.Close()

ai := &peer.AddrInfo{
ID: bogusPeerId,
Addrs: []multiaddr.Multiaddr{h.Addrs()[0]},
}
// runs a test to verify we can extract the peer ID from a target with just its address
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

testHost, err := libp2p.New()
if err != nil {
t.Fatal(err)
}
// Use a bogus peer ID so that when we connect to the target we get an error telling
// us the targets real peer ID
bogusPeerId, err := peer.Decode("QmadAdJ3f63JyNs65X7HHzqDwV53ynvCcKtNFvdNaz3nhk")
require.NoError(t, err, "the hard coded bogus peerID is invalid")

ai := &peer.AddrInfo{
ID: bogusPeerId,
Addrs: []multiaddr.Multiaddr{h1.Addrs()[0]},
}

// Try connecting with the bogus peer ID
err = h2.Connect(ctx, *ai)
require.Error(t, err, "somehow we successfully connected to a bogus peerID!")

// Try connecting with the bogus peer ID
if err := testHost.Connect(ctx, *ai); err != nil {
// Extract the actual peer ID from the error
newPeerId, err := extractPeerIDFromError(err)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
ai.ID = newPeerId

// Make sure the new ID is what we expected
if ai.ID != h.ID() {
t.Fatalf("peerID mismatch: expected %s, got %s", h.ID(), ai.ID)
}
require.Equal(t, h1.ID(), ai.ID)

// and just to double-check try connecting again to make sure it works
if err := testHost.Connect(ctx, *ai); err != nil {
t.Fatal(err)
}
} else {
t.Fatal("somehow we successfully connected to a bogus peerID!")
}
}

for _, tc := range transportsToTest {
t.Run(tc.Name, func(t *testing.T) {
h := tc.HostGenerator(t, TransportTestCaseOpts{})
defer h.Close()

runTest(t, h)
require.NoError(t, h2.Connect(ctx, *ai))
})
}
}
Loading

0 comments on commit 31ffac2

Please sign in to comment.