From 4a6e72637b1fbac6a49d566156ce2d32024958d9 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 12 Sep 2023 12:47:27 +0700 Subject: [PATCH] webrtc: don't close stream, but limit to 512 streams per direction --- p2p/test/transport/transport_test.go | 2 +- p2p/transport/webrtc/connection.go | 15 +++++++++++++++ p2p/transport/webrtc/stream.go | 4 +++- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/p2p/test/transport/transport_test.go b/p2p/test/transport/transport_test.go index 75bf306d13..a7e98a0d85 100644 --- a/p2p/test/transport/transport_test.go +++ b/p2p/test/transport/transport_test.go @@ -247,7 +247,7 @@ func TestLotsOfDataManyStreams(t *testing.T) { // 64k buffer const bufSize = 64 << 10 sendBuf := [bufSize]byte{} - const totalStreams = 512 + const totalStreams = 500 const parallel = 8 // Total sends are > 20MiB require.Greater(t, len(sendBuf)*totalStreams, 20<<20) diff --git a/p2p/transport/webrtc/connection.go b/p2p/transport/webrtc/connection.go index 550c6a8cdc..fd31f8351a 100644 --- a/p2p/transport/webrtc/connection.go +++ b/p2p/transport/webrtc/connection.go @@ -27,6 +27,8 @@ var _ tpt.CapableConn = &connection{} const maxAcceptQueueLen = 10 +const maxDataChannelID = 1 << 10 + type errConnectionTimeout struct{} var _ net.Error = &errConnectionTimeout{} @@ -108,6 +110,12 @@ func newConnection( if c.IsClosed() { return } + // Limit the number of streams, since we're not able to actually properly close them. + // See https://github.com/libp2p/specs/issues/575 for details. + if *dc.ID() > maxDataChannelID { + c.Close() + return + } dc.OnOpen(func() { rwc, err := dc.Detach() if err != nil { @@ -166,6 +174,13 @@ func (c *connection) OpenStream(ctx context.Context) (network.MuxedStream, error if id > math.MaxUint16 { return nil, errors.New("exhausted stream ID space") } + // Limit the number of streams, since we're not able to actually properly close them. + // See https://github.com/libp2p/specs/issues/575 for details. + if id > maxDataChannelID { + c.Close() + return c.OpenStream(ctx) + } + streamID := uint16(id) dc, err := c.pc.CreateDataChannel("", &webrtc.DataChannelInit{ID: &streamID}) if err != nil { diff --git a/p2p/transport/webrtc/stream.go b/p2p/transport/webrtc/stream.go index b86128dc31..7e873f5634 100644 --- a/p2p/transport/webrtc/stream.go +++ b/p2p/transport/webrtc/stream.go @@ -192,7 +192,9 @@ func (s *stream) maybeDeclareStreamDone() { (s.receiveState == receiveStateReset || s.receiveState == receiveStateDataRead) && len(s.controlMsgQueue) == 0 { _ = s.SetReadDeadline(time.Now().Add(-1 * time.Hour)) // pion ignores zero times - _ = s.dataChannel.Close() + // TODO: we should be closing the underlying datachannel, but this resets the stream + // See https://github.com/libp2p/specs/issues/575 for details. + // _ = s.dataChannel.Close() // TODO: write for the spawned reader to return s.onDone() }