diff --git a/go.mod b/go.mod index c1a174b8..c0b0ac74 100644 --- a/go.mod +++ b/go.mod @@ -6,11 +6,11 @@ require ( github.com/ipfs/go-log/v2 v2.4.0 github.com/libp2p/go-addr-util v0.1.0 github.com/libp2p/go-conn-security-multistream v0.3.0 - github.com/libp2p/go-libp2p-core v0.11.0 - github.com/libp2p/go-libp2p-peerstore v0.4.0 + github.com/libp2p/go-libp2p-core v0.13.0 + github.com/libp2p/go-libp2p-peerstore v0.6.0 github.com/libp2p/go-libp2p-quic-transport v0.13.0 github.com/libp2p/go-libp2p-testing v0.5.0 - github.com/libp2p/go-libp2p-transport-upgrader v0.5.0 + github.com/libp2p/go-libp2p-transport-upgrader v0.6.0 github.com/libp2p/go-libp2p-yamux v0.5.0 github.com/libp2p/go-maddr-filter v0.1.0 github.com/libp2p/go-stream-muxer-multistream v0.3.0 diff --git a/go.sum b/go.sum index d784ccbb..326329cb 100644 --- a/go.sum +++ b/go.sum @@ -217,12 +217,10 @@ github.com/ipfs/go-ds-badger v0.3.0/go.mod h1:1ke6mXNqeV8K3y5Ak2bAA0osoTfmxUdupV github.com/ipfs/go-ds-leveldb v0.5.0/go.mod h1:d3XG9RUDzQ6V4SHi8+Xgj9j1XuEk1z82lquxrVbml/Q= github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM= +github.com/ipfs/go-log v1.0.4 h1:6nLQdX4W8P9yZZFH7mO+X/PzjN8Laozm/lMJ6esdgzY= github.com/ipfs/go-log v1.0.4/go.mod h1:oDCg2FkjogeFOhqqb+N39l2RpTNPL6F/StPkB3kPgcs= -github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8= -github.com/ipfs/go-log v1.0.5/go.mod h1:j0b8ZoR+7+R99LD9jZ6+AJsrzkPbSXbZfGakb5JPtIo= github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw= github.com/ipfs/go-log/v2 v2.1.1/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM= -github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g= github.com/ipfs/go-log/v2 v2.3.0/go.mod h1:QqGoj30OTpnKaG/LKTGTxoP2mmQtjVMEnK72gynbe/g= github.com/ipfs/go-log/v2 v2.4.0 h1:iR/2o9PGWanVJrBgIH5Ff8mPGOwpqLaPIAFqSnsdlzk= github.com/ipfs/go-log/v2 v2.4.0/go.mod h1:nPZnh7Cj7lwS3LpRU5Mwr2ol1c2gXIEXuF6aywqrtmo= @@ -279,14 +277,14 @@ github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL github.com/libp2p/go-libp2p-core v0.5.0/go.mod h1:49XGI+kc38oGVwqSBhDEwytaAxgZasHhFfQKibzTls0= github.com/libp2p/go-libp2p-core v0.5.1/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y= github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= -github.com/libp2p/go-libp2p-core v0.8.6/go.mod h1:dgHr0l0hIKfWpGpqAMbpo19pen9wJfdCGv51mTmdpmM= github.com/libp2p/go-libp2p-core v0.10.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg= -github.com/libp2p/go-libp2p-core v0.11.0 h1:75jAgdA+IChNa+/mZXogfmrGkgwxkVvxmIC7pV+F6sI= -github.com/libp2p/go-libp2p-core v0.11.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg= +github.com/libp2p/go-libp2p-core v0.12.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg= +github.com/libp2p/go-libp2p-core v0.13.0 h1:IFG/s8dN6JN2OTrXX9eq2wNU/Zlz2KLdwZUp5FplgXI= +github.com/libp2p/go-libp2p-core v0.13.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg= github.com/libp2p/go-libp2p-mplex v0.4.1 h1:/pyhkP1nLwjG3OM+VuaNJkQT/Pqq73WzB3aDN3Fx1sc= github.com/libp2p/go-libp2p-mplex v0.4.1/go.mod h1:cmy+3GfqfM1PceHTLL7zQzAAYaryDu6iPSC+CIb094g= -github.com/libp2p/go-libp2p-peerstore v0.4.0 h1:DOhRJLnM9Dc9lIXi3rPDZBf789LXy1BrzwIs7Tj0cKA= -github.com/libp2p/go-libp2p-peerstore v0.4.0/go.mod h1:rDJUFyzEWPpXpEwywkcTYYzDHlwza8riYMaUzaN6hX0= +github.com/libp2p/go-libp2p-peerstore v0.6.0 h1:HJminhQSGISBIRb93N6WK3t6Fa8OOTnHd/VBjL4mY5A= +github.com/libp2p/go-libp2p-peerstore v0.6.0/go.mod h1:DGEmKdXrcYpK9Jha3sS7MhqYdInxJy84bIPtSu65bKc= github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= github.com/libp2p/go-libp2p-quic-transport v0.13.0 h1:MTVojS4AnGD/rng6rF/HXEqwMHL27rHUEf3DaqSdnUw= @@ -297,8 +295,9 @@ github.com/libp2p/go-libp2p-testing v0.5.0 h1:bTjC29TTQ/ODq0ld3+0KLq3irdA5cAH3OM github.com/libp2p/go-libp2p-testing v0.5.0/go.mod h1:QBk8fqIL1XNcno/l3/hhaIEn4aLRijpYOR+zVjjlh+A= github.com/libp2p/go-libp2p-tls v0.3.0 h1:8BgvUJiOTcj0Gp6XvEicF0rL5aUtRg/UzEdeZDmDlC8= github.com/libp2p/go-libp2p-tls v0.3.0/go.mod h1:fwF5X6PWGxm6IDRwF3V8AVCCj/hOd5oFlg+wo2FxJDY= -github.com/libp2p/go-libp2p-transport-upgrader v0.5.0 h1:7SDl3O2+AYOgfE40Mis83ClpfGNkNA6m4FwhbOHs+iI= github.com/libp2p/go-libp2p-transport-upgrader v0.5.0/go.mod h1:Rc+XODlB3yce7dvFV4q/RmyJGsFcCZRkeZMu/Zdg0mo= +github.com/libp2p/go-libp2p-transport-upgrader v0.6.0 h1:GfMCU+2aGGEm1zW3UcOz6wYSn8tXQalFfVfcww99i5A= +github.com/libp2p/go-libp2p-transport-upgrader v0.6.0/go.mod h1:1e07y1ZSZdHo9HPbuU8IztM1Cj+DR5twgycb4pnRzRo= github.com/libp2p/go-libp2p-yamux v0.5.0 h1:ZzmUhbQE+X7NuYUT2naxN31JyebZfRmpZVhKtRP13ys= github.com/libp2p/go-libp2p-yamux v0.5.0/go.mod h1:AyR8k5EzyM2QN9Bbdg6X1SkVVuqLwTGf0L4DFq9g6po= github.com/libp2p/go-maddr-filter v0.1.0 h1:4ACqZKw8AqiuJfwFGq1CYDFugfXTOos+qQ3DETkhtCE= diff --git a/swarm.go b/swarm.go index e5077ac0..b1f51a35 100644 --- a/swarm.go +++ b/swarm.go @@ -212,7 +212,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, ) // create the Stat object, initializing with the underlying connection Stat if available - var stat network.Stat + var stat network.ConnStats if cs, ok := tc.(network.ConnStat); ok { stat = cs.Stat() } diff --git a/swarm_conn.go b/swarm_conn.go index 3d5f8f37..388e9e76 100644 --- a/swarm_conn.go +++ b/swarm_conn.go @@ -39,7 +39,7 @@ type Conn struct { m map[*Stream]struct{} } - stat network.Stat + stat network.ConnStats } func (c *Conn) ID() string { @@ -90,6 +90,7 @@ func (c *Conn) doClose() { func (c *Conn) removeStream(s *Stream) { c.streams.Lock() + c.stat.NumStreams-- delete(c.streams.m, s) c.streams.Unlock() } @@ -171,7 +172,9 @@ func (c *Conn) RemotePublicKey() ic.PubKey { } // Stat returns metadata pertaining to this connection -func (c *Conn) Stat() network.Stat { +func (c *Conn) Stat() network.ConnStats { + c.streams.Lock() + defer c.streams.Unlock() return c.stat } @@ -201,16 +204,16 @@ func (c *Conn) addStream(ts mux.MuxedStream, dir network.Direction) (*Stream, er } // Wrap and register the stream. - stat := network.Stat{ - Direction: dir, - Opened: time.Now(), - } s := &Stream{ stream: ts, conn: c, - stat: stat, - id: atomic.AddUint64(&c.swarm.nextStreamID, 1), + stat: network.Stats{ + Direction: dir, + Opened: time.Now(), + }, + id: atomic.AddUint64(&c.swarm.nextStreamID, 1), } + c.stat.NumStreams++ c.streams.m[s] = struct{}{} // Released once the stream disconnect notifications have finished diff --git a/swarm_net_test.go b/swarm_net_test.go index 1f1d0454..3096574d 100644 --- a/swarm_net_test.go +++ b/swarm_net_test.go @@ -130,9 +130,9 @@ func TestNetworkOpenStream(t *testing.T) { t.Fatal(err) } - numStreams := 0 + var numStreams int for _, conn := range nets[0].ConnsToPeer(nets[1].LocalPeer()) { - numStreams += len(conn.GetStreams()) + numStreams += conn.Stat().NumStreams } if numStreams != 1 { diff --git a/swarm_stream.go b/swarm_stream.go index a5f0738a..b0063b1b 100644 --- a/swarm_stream.go +++ b/swarm_stream.go @@ -28,7 +28,7 @@ type Stream struct { protocol atomic.Value - stat network.Stat + stat network.Stats } func (s *Stream) ID() string { @@ -151,6 +151,6 @@ func (s *Stream) SetWriteDeadline(t time.Time) error { } // Stat returns metadata information for this stream. -func (s *Stream) Stat() network.Stat { +func (s *Stream) Stat() network.Stats { return s.stat } diff --git a/swarm_test.go b/swarm_test.go index 043e5735..0dac49d1 100644 --- a/swarm_test.go +++ b/swarm_test.go @@ -424,3 +424,44 @@ func TestPreventDialListenAddr(t *testing.T) { t.Fatal("expected dial to fail: %w", err) } } + +func TestStreamCount(t *testing.T) { + s1 := GenSwarm(t) + s2 := GenSwarm(t) + connectSwarms(t, context.Background(), []*swarm.Swarm{s2, s1}) + + countStreams := func() (n int) { + var num int + for _, c := range s1.ConnsToPeer(s2.LocalPeer()) { + n += c.Stat().NumStreams + num += len(c.GetStreams()) + } + require.Equal(t, n, num, "inconsistent stream count") + return + } + + streams := make(chan network.Stream, 20) + streamAccepted := make(chan struct{}, 1) + s1.SetStreamHandler(func(str network.Stream) { + streams <- str + streamAccepted <- struct{}{} + }) + + for i := 0; i < 10; i++ { + str, err := s2.NewStream(context.Background(), s1.LocalPeer()) + require.NoError(t, err) + str.Write([]byte("foobar")) + <-streamAccepted + } + require.Eventually(t, func() bool { return len(streams) == 10 }, 5*time.Second, 10*time.Millisecond) + require.Equal(t, countStreams(), 10) + (<-streams).Reset() + (<-streams).Close() + require.Equal(t, countStreams(), 8) + + str, err := s1.NewStream(context.Background(), s2.LocalPeer()) + require.NoError(t, err) + require.Equal(t, countStreams(), 9) + str.Close() + require.Equal(t, countStreams(), 8) +}