Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

count the number of streams on a connection for the stats #298

Merged
merged 1 commit into from
Dec 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ require (
github.com/ipfs/go-log/v2 v2.4.0
github.com/libp2p/go-addr-util v0.1.0
github.com/libp2p/go-conn-security-multistream v0.3.0
github.com/libp2p/go-libp2p-core v0.11.0
github.com/libp2p/go-libp2p-peerstore v0.4.0
github.com/libp2p/go-libp2p-core v0.13.0
github.com/libp2p/go-libp2p-peerstore v0.6.0
github.com/libp2p/go-libp2p-quic-transport v0.13.0
github.com/libp2p/go-libp2p-testing v0.5.0
github.com/libp2p/go-libp2p-transport-upgrader v0.5.0
github.com/libp2p/go-libp2p-transport-upgrader v0.6.0
github.com/libp2p/go-libp2p-yamux v0.5.0
github.com/libp2p/go-maddr-filter v0.1.0
github.com/libp2p/go-stream-muxer-multistream v0.3.0
Expand Down
17 changes: 8 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,10 @@ github.com/ipfs/go-ds-badger v0.3.0/go.mod h1:1ke6mXNqeV8K3y5Ak2bAA0osoTfmxUdupV
github.com/ipfs/go-ds-leveldb v0.5.0/go.mod h1:d3XG9RUDzQ6V4SHi8+Xgj9j1XuEk1z82lquxrVbml/Q=
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM=
github.com/ipfs/go-log v1.0.4 h1:6nLQdX4W8P9yZZFH7mO+X/PzjN8Laozm/lMJ6esdgzY=
github.com/ipfs/go-log v1.0.4/go.mod h1:oDCg2FkjogeFOhqqb+N39l2RpTNPL6F/StPkB3kPgcs=
github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8=
github.com/ipfs/go-log v1.0.5/go.mod h1:j0b8ZoR+7+R99LD9jZ6+AJsrzkPbSXbZfGakb5JPtIo=
github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw=
github.com/ipfs/go-log/v2 v2.1.1/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM=
github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g=
github.com/ipfs/go-log/v2 v2.3.0/go.mod h1:QqGoj30OTpnKaG/LKTGTxoP2mmQtjVMEnK72gynbe/g=
github.com/ipfs/go-log/v2 v2.4.0 h1:iR/2o9PGWanVJrBgIH5Ff8mPGOwpqLaPIAFqSnsdlzk=
github.com/ipfs/go-log/v2 v2.4.0/go.mod h1:nPZnh7Cj7lwS3LpRU5Mwr2ol1c2gXIEXuF6aywqrtmo=
Expand Down Expand Up @@ -279,14 +277,14 @@ github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL
github.com/libp2p/go-libp2p-core v0.5.0/go.mod h1:49XGI+kc38oGVwqSBhDEwytaAxgZasHhFfQKibzTls0=
github.com/libp2p/go-libp2p-core v0.5.1/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y=
github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.8.6/go.mod h1:dgHr0l0hIKfWpGpqAMbpo19pen9wJfdCGv51mTmdpmM=
github.com/libp2p/go-libp2p-core v0.10.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
github.com/libp2p/go-libp2p-core v0.11.0 h1:75jAgdA+IChNa+/mZXogfmrGkgwxkVvxmIC7pV+F6sI=
github.com/libp2p/go-libp2p-core v0.11.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
github.com/libp2p/go-libp2p-core v0.12.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
github.com/libp2p/go-libp2p-core v0.13.0 h1:IFG/s8dN6JN2OTrXX9eq2wNU/Zlz2KLdwZUp5FplgXI=
github.com/libp2p/go-libp2p-core v0.13.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
github.com/libp2p/go-libp2p-mplex v0.4.1 h1:/pyhkP1nLwjG3OM+VuaNJkQT/Pqq73WzB3aDN3Fx1sc=
github.com/libp2p/go-libp2p-mplex v0.4.1/go.mod h1:cmy+3GfqfM1PceHTLL7zQzAAYaryDu6iPSC+CIb094g=
github.com/libp2p/go-libp2p-peerstore v0.4.0 h1:DOhRJLnM9Dc9lIXi3rPDZBf789LXy1BrzwIs7Tj0cKA=
github.com/libp2p/go-libp2p-peerstore v0.4.0/go.mod h1:rDJUFyzEWPpXpEwywkcTYYzDHlwza8riYMaUzaN6hX0=
github.com/libp2p/go-libp2p-peerstore v0.6.0 h1:HJminhQSGISBIRb93N6WK3t6Fa8OOTnHd/VBjL4mY5A=
github.com/libp2p/go-libp2p-peerstore v0.6.0/go.mod h1:DGEmKdXrcYpK9Jha3sS7MhqYdInxJy84bIPtSu65bKc=
github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-quic-transport v0.13.0 h1:MTVojS4AnGD/rng6rF/HXEqwMHL27rHUEf3DaqSdnUw=
Expand All @@ -297,8 +295,9 @@ github.com/libp2p/go-libp2p-testing v0.5.0 h1:bTjC29TTQ/ODq0ld3+0KLq3irdA5cAH3OM
github.com/libp2p/go-libp2p-testing v0.5.0/go.mod h1:QBk8fqIL1XNcno/l3/hhaIEn4aLRijpYOR+zVjjlh+A=
github.com/libp2p/go-libp2p-tls v0.3.0 h1:8BgvUJiOTcj0Gp6XvEicF0rL5aUtRg/UzEdeZDmDlC8=
github.com/libp2p/go-libp2p-tls v0.3.0/go.mod h1:fwF5X6PWGxm6IDRwF3V8AVCCj/hOd5oFlg+wo2FxJDY=
github.com/libp2p/go-libp2p-transport-upgrader v0.5.0 h1:7SDl3O2+AYOgfE40Mis83ClpfGNkNA6m4FwhbOHs+iI=
github.com/libp2p/go-libp2p-transport-upgrader v0.5.0/go.mod h1:Rc+XODlB3yce7dvFV4q/RmyJGsFcCZRkeZMu/Zdg0mo=
github.com/libp2p/go-libp2p-transport-upgrader v0.6.0 h1:GfMCU+2aGGEm1zW3UcOz6wYSn8tXQalFfVfcww99i5A=
github.com/libp2p/go-libp2p-transport-upgrader v0.6.0/go.mod h1:1e07y1ZSZdHo9HPbuU8IztM1Cj+DR5twgycb4pnRzRo=
github.com/libp2p/go-libp2p-yamux v0.5.0 h1:ZzmUhbQE+X7NuYUT2naxN31JyebZfRmpZVhKtRP13ys=
github.com/libp2p/go-libp2p-yamux v0.5.0/go.mod h1:AyR8k5EzyM2QN9Bbdg6X1SkVVuqLwTGf0L4DFq9g6po=
github.com/libp2p/go-maddr-filter v0.1.0 h1:4ACqZKw8AqiuJfwFGq1CYDFugfXTOos+qQ3DETkhtCE=
Expand Down
2 changes: 1 addition & 1 deletion swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
)

// create the Stat object, initializing with the underlying connection Stat if available
var stat network.Stat
var stat network.ConnStats
if cs, ok := tc.(network.ConnStat); ok {
stat = cs.Stat()
}
Expand Down
19 changes: 11 additions & 8 deletions swarm_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Conn struct {
m map[*Stream]struct{}
}

stat network.Stat
stat network.ConnStats
}

func (c *Conn) ID() string {
Expand Down Expand Up @@ -90,6 +90,7 @@ func (c *Conn) doClose() {

func (c *Conn) removeStream(s *Stream) {
c.streams.Lock()
c.stat.NumStreams--
delete(c.streams.m, s)
c.streams.Unlock()
}
Expand Down Expand Up @@ -171,7 +172,9 @@ func (c *Conn) RemotePublicKey() ic.PubKey {
}

// Stat returns metadata pertaining to this connection
func (c *Conn) Stat() network.Stat {
func (c *Conn) Stat() network.ConnStats {
c.streams.Lock()
defer c.streams.Unlock()
return c.stat
}

Expand Down Expand Up @@ -201,16 +204,16 @@ func (c *Conn) addStream(ts mux.MuxedStream, dir network.Direction) (*Stream, er
}

// Wrap and register the stream.
stat := network.Stat{
Direction: dir,
Opened: time.Now(),
}
s := &Stream{
stream: ts,
conn: c,
stat: stat,
id: atomic.AddUint64(&c.swarm.nextStreamID, 1),
stat: network.Stats{
Direction: dir,
Opened: time.Now(),
},
id: atomic.AddUint64(&c.swarm.nextStreamID, 1),
}
c.stat.NumStreams++
c.streams.m[s] = struct{}{}

// Released once the stream disconnect notifications have finished
Expand Down
4 changes: 2 additions & 2 deletions swarm_net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ func TestNetworkOpenStream(t *testing.T) {
t.Fatal(err)
}

numStreams := 0
var numStreams int
for _, conn := range nets[0].ConnsToPeer(nets[1].LocalPeer()) {
numStreams += len(conn.GetStreams())
numStreams += conn.Stat().NumStreams
}

if numStreams != 1 {
Expand Down
4 changes: 2 additions & 2 deletions swarm_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Stream struct {

protocol atomic.Value

stat network.Stat
stat network.Stats
}

func (s *Stream) ID() string {
Expand Down Expand Up @@ -151,6 +151,6 @@ func (s *Stream) SetWriteDeadline(t time.Time) error {
}

// Stat returns metadata information for this stream.
func (s *Stream) Stat() network.Stat {
func (s *Stream) Stat() network.Stats {
return s.stat
}
41 changes: 41 additions & 0 deletions swarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,3 +424,44 @@ func TestPreventDialListenAddr(t *testing.T) {
t.Fatal("expected dial to fail: %w", err)
}
}

func TestStreamCount(t *testing.T) {
s1 := GenSwarm(t)
s2 := GenSwarm(t)
connectSwarms(t, context.Background(), []*swarm.Swarm{s2, s1})

countStreams := func() (n int) {
var num int
for _, c := range s1.ConnsToPeer(s2.LocalPeer()) {
n += c.Stat().NumStreams
num += len(c.GetStreams())
}
require.Equal(t, n, num, "inconsistent stream count")
return
}

streams := make(chan network.Stream, 20)
streamAccepted := make(chan struct{}, 1)
s1.SetStreamHandler(func(str network.Stream) {
streams <- str
streamAccepted <- struct{}{}
})

for i := 0; i < 10; i++ {
str, err := s2.NewStream(context.Background(), s1.LocalPeer())
require.NoError(t, err)
str.Write([]byte("foobar"))
<-streamAccepted
}
require.Eventually(t, func() bool { return len(streams) == 10 }, 5*time.Second, 10*time.Millisecond)
require.Equal(t, countStreams(), 10)
(<-streams).Reset()
(<-streams).Close()
require.Equal(t, countStreams(), 8)

str, err := s1.NewStream(context.Background(), s2.LocalPeer())
require.NoError(t, err)
require.Equal(t, countStreams(), 9)
str.Close()
require.Equal(t, countStreams(), 8)
}