From 47a6225cdb63e27bb0d905b068dea1ceba2ee556 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 14 Dec 2021 11:23:51 +0400 Subject: [PATCH 1/2] remove goprocess from mock net and peernet --- p2p/net/mock/interface.go | 3 ++- p2p/net/mock/mock.go | 18 +++++-------- p2p/net/mock/mock_net.go | 39 ++++++++++++++------------- p2p/net/mock/mock_notif_test.go | 8 +++--- p2p/net/mock/mock_peernet.go | 20 ++------------ p2p/net/mock/mock_test.go | 24 ++++++++++------- p2p/protocol/identify/id_test.go | 9 +++---- p2p/protocol/identify/obsaddr_test.go | 31 +++++++-------------- 8 files changed, 62 insertions(+), 90 deletions(-) diff --git a/p2p/net/mock/interface.go b/p2p/net/mock/interface.go index 16dcfcfe56..68dee445e4 100644 --- a/p2p/net/mock/interface.go +++ b/p2p/net/mock/interface.go @@ -20,7 +20,6 @@ import ( ) type Mocknet interface { - // GenPeer generates a peer and its network.Network in the Mocknet GenPeer() (host.Host, error) @@ -63,6 +62,8 @@ type Mocknet interface { DisconnectNets(network.Network, network.Network) error LinkAll() error ConnectAllButSelf() error + + io.Closer } // LinkOptions are used to change aspects of the links. diff --git a/p2p/net/mock/mock.go b/p2p/net/mock/mock.go index 3dc840329d..86806ead51 100644 --- a/p2p/net/mock/mock.go +++ b/p2p/net/mock/mock.go @@ -1,16 +1,14 @@ package mocknet import ( - "context" - logging "github.com/ipfs/go-log/v2" ) var log = logging.Logger("mocknet") // WithNPeers constructs a Mocknet with N peers. -func WithNPeers(ctx context.Context, n int) (Mocknet, error) { - m := New(ctx) +func WithNPeers(n int) (Mocknet, error) { + m := New() for i := 0; i < n; i++ { if _, err := m.GenPeer(); err != nil { return nil, err @@ -22,8 +20,8 @@ func WithNPeers(ctx context.Context, n int) (Mocknet, error) { // FullMeshLinked constructs a Mocknet with full mesh of Links. // This means that all the peers **can** connect to each other // (not that they already are connected. you can use m.ConnectAll()) -func FullMeshLinked(ctx context.Context, n int) (Mocknet, error) { - m, err := WithNPeers(ctx, n) +func FullMeshLinked(n int) (Mocknet, error) { + m, err := WithNPeers(n) if err != nil { return nil, err } @@ -31,21 +29,19 @@ func FullMeshLinked(ctx context.Context, n int) (Mocknet, error) { if err := m.LinkAll(); err != nil { return nil, err } - return m, nil } // FullMeshConnected constructs a Mocknet with full mesh of Connections. // This means that all the peers have dialed and are ready to talk to // each other. -func FullMeshConnected(ctx context.Context, n int) (Mocknet, error) { - m, err := FullMeshLinked(ctx, n) +func FullMeshConnected(n int) (Mocknet, error) { + m, err := FullMeshLinked(n) if err != nil { return nil, err } - err = m.ConnectAllButSelf() - if err != nil { + if err := m.ConnectAllButSelf(); err != nil { return nil, err } return m, nil diff --git a/p2p/net/mock/mock_net.go b/p2p/net/mock/mock_net.go index a2f817e554..ffc8e50999 100644 --- a/p2p/net/mock/mock_net.go +++ b/p2p/net/mock/mock_net.go @@ -15,9 +15,6 @@ import ( bhost "github.com/libp2p/go-libp2p/p2p/host/basic" - "github.com/jbenet/goprocess" - goprocessctx "github.com/jbenet/goprocess/context" - p2putil "github.com/libp2p/go-libp2p-netutil" "github.com/libp2p/go-libp2p-peerstore/pstoremem" ma "github.com/multiformats/go-multiaddr" @@ -30,7 +27,7 @@ var blackholeIP6 = net.ParseIP("100::") // mocknet implements mocknet.Mocknet type mocknet struct { nets map[peer.ID]*peernet - hosts map[peer.ID]*bhost.BasicHost + hosts map[peer.ID]host.Host // links make it possible to connect two peers. // think of links as the physical medium. @@ -40,22 +37,30 @@ type mocknet struct { linkDefaults LinkOptions - proc goprocess.Process // for Context closing - ctx context.Context + ctxCancel context.CancelFunc + ctx context.Context sync.Mutex } -func New(ctx context.Context) Mocknet { - proc := goprocessctx.WithContext(ctx) - ctx = goprocessctx.WithProcessClosing(ctx, proc) - - return &mocknet{ +func New() Mocknet { + mn := &mocknet{ nets: map[peer.ID]*peernet{}, - hosts: map[peer.ID]*bhost.BasicHost{}, + hosts: map[peer.ID]host.Host{}, links: map[peer.ID]map[peer.ID]map[*link]struct{}{}, - proc: proc, - ctx: ctx, } + mn.ctx, mn.ctxCancel = context.WithCancel(context.Background()) + return mn +} + +func (mn *mocknet) Close() error { + mn.ctxCancel() + for _, h := range mn.hosts { + h.Close() + } + for _, n := range mn.nets { + n.Close() + } + return nil } func (mn *mocknet) GenPeer() (host.Host, error) { @@ -104,7 +109,7 @@ func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (host.Host, error) { } func (mn *mocknet) AddPeerWithPeerstore(p peer.ID, ps peerstore.Peerstore) (host.Host, error) { - n, err := newPeernet(mn.ctx, mn, p, ps) + n, err := newPeernet(mn, p, ps) if err != nil { return nil, err } @@ -119,10 +124,6 @@ func (mn *mocknet) AddPeerWithPeerstore(p peer.ID, ps peerstore.Peerstore) (host return nil, err } - // Ensure we close the hoset when we close the mock network. - // Otherwise, tests leak memory. - mn.proc.AddChild(goprocess.WithTeardown(h.Close)) - mn.Lock() mn.nets[n.peer] = n mn.hosts[n.peer] = h diff --git a/p2p/net/mock/mock_notif_test.go b/p2p/net/mock/mock_notif_test.go index 226300d4fa..661e1d51e8 100644 --- a/p2p/net/mock/mock_notif_test.go +++ b/p2p/net/mock/mock_notif_test.go @@ -13,15 +13,13 @@ import ( func TestNotifications(t *testing.T) { const swarmSize = 5 - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + const timeout = 10 * time.Second - mn, err := FullMeshLinked(ctx, swarmSize) + mn, err := FullMeshLinked(swarmSize) if err != nil { t.Fatal(err) } - - timeout := 10 * time.Second + defer mn.Close() // signup notifs nets := mn.Nets() diff --git a/p2p/net/mock/mock_peernet.go b/p2p/net/mock/mock_peernet.go index 894b584942..7d0d4f04d1 100644 --- a/p2p/net/mock/mock_peernet.go +++ b/p2p/net/mock/mock_peernet.go @@ -11,9 +11,6 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" - "github.com/jbenet/goprocess" - goprocessctx "github.com/jbenet/goprocess/context" - ma "github.com/multiformats/go-multiaddr" ) @@ -36,12 +33,11 @@ type peernet struct { notifmu sync.Mutex notifs map[network.Notifiee]struct{} - proc goprocess.Process sync.RWMutex } // newPeernet constructs a new peernet -func newPeernet(ctx context.Context, m *mocknet, p peer.ID, ps peerstore.Peerstore) (*peernet, error) { +func newPeernet(m *mocknet, p peer.ID, ps peerstore.Peerstore) (*peernet, error) { n := &peernet{ mocknet: m, peer: p, @@ -53,12 +49,10 @@ func newPeernet(ctx context.Context, m *mocknet, p peer.ID, ps peerstore.Peersto notifs: make(map[network.Notifiee]struct{}), } - n.proc = goprocessctx.WithContextAndTeardown(ctx, n.teardown) return n, nil } -func (pn *peernet) teardown() error { - +func (pn *peernet) Close() error { // close the connections for _, c := range pn.allConns() { c.Close() @@ -79,11 +73,6 @@ func (pn *peernet) allConns() []*conn { return cs } -// Close calls the ContextCloser func -func (pn *peernet) Close() error { - return pn.proc.Close() -} - func (pn *peernet) Peerstore() peerstore.Peerstore { return pn.ps } @@ -226,11 +215,6 @@ func (pn *peernet) removeConn(c *conn) { delete(cs, c) } -// Process returns the network's Process -func (pn *peernet) Process() goprocess.Process { - return pn.proc -} - // LocalPeer the network's LocalPeer func (pn *peernet) LocalPeer() peer.ID { return pn.peer diff --git a/p2p/net/mock/mock_test.go b/p2p/net/mock/mock_test.go index f74e9886ab..ce22c15cc2 100644 --- a/p2p/net/mock/mock_test.go +++ b/p2p/net/mock/mock_test.go @@ -24,8 +24,8 @@ func TestNetworkSetup(t *testing.T) { id1 := tnet.RandIdentityOrFatal(t) id2 := tnet.RandIdentityOrFatal(t) id3 := tnet.RandIdentityOrFatal(t) - mn := New(ctx) - // peers := []peer.ID{p1, p2, p3} + mn := New() + defer mn.Close() // add peers to mock net @@ -267,10 +267,11 @@ func TestNetworkSetup(t *testing.T) { func TestStreams(t *testing.T) { ctx := context.Background() - mn, err := FullMeshConnected(context.Background(), 3) + mn, err := FullMeshConnected(3) if err != nil { t.Fatal(err) } + defer mn.Close() handler := func(s network.Stream) { b := make([]byte, 4) @@ -362,10 +363,11 @@ func TestStreamsStress(t *testing.T) { nnodes = 30 } - mn, err := FullMeshConnected(context.Background(), nnodes) + mn, err := FullMeshConnected(nnodes) if err != nil { t.Fatal(err) } + defer mn.Close() errs := make(chan error) @@ -411,7 +413,8 @@ func TestStreamsStress(t *testing.T) { } func TestAdding(t *testing.T) { - mn := New(context.Background()) + mn := New() + defer mn.Close() var peers []peer.ID for i := 0; i < 3; i++ { @@ -534,10 +537,11 @@ func within(t1 time.Duration, t2 time.Duration, tolerance time.Duration) bool { } func TestLimitedStreams(t *testing.T) { - mn, err := FullMeshConnected(context.Background(), 2) + mn, err := FullMeshConnected(2) if err != nil { t.Fatal(err) } + defer mn.Close() var wg sync.WaitGroup messages := 4 @@ -598,22 +602,22 @@ func TestFuzzManyPeers(t *testing.T) { peerCount = 100 } for i := 0; i < peerCount; i++ { - ctx, cancel := context.WithCancel(context.Background()) - _, err := FullMeshConnected(ctx, 2) - cancel() + mn, err := FullMeshConnected(2) if err != nil { t.Fatal(err) } + mn.Close() } } func TestStreamsWithLatency(t *testing.T) { latency := time.Millisecond * 500 - mn, err := WithNPeers(context.Background(), 2) + mn, err := WithNPeers(2) if err != nil { t.Fatal(err) } + defer mn.Close() // configure the Mocknet with some latency and link/connect its peers mn.SetLinkDefaults(LinkOptions{Latency: latency}) diff --git a/p2p/protocol/identify/id_test.go b/p2p/protocol/identify/id_test.go index 415dc92cb6..6cf140f679 100644 --- a/p2p/protocol/identify/id_test.go +++ b/p2p/protocol/identify/id_test.go @@ -300,9 +300,8 @@ func TestProtoMatching(t *testing.T) { func TestLocalhostAddrFiltering(t *testing.T) { t.Skip("need to fix this test") - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - mn := mocknet.New(ctx) + mn := mocknet.New() + defer mn.Close() id1 := coretest.RandPeerIDFatal(t) ps1, err := pstoremem.NewPeerstore() if err != nil { @@ -346,11 +345,11 @@ func TestLocalhostAddrFiltering(t *testing.T) { if err != nil { t.Fatal(err) } - p1.Connect(ctx, peer.AddrInfo{ + p1.Connect(context.Background(), peer.AddrInfo{ ID: id2, Addrs: p2addrs[0:1], }) - p3.Connect(ctx, peer.AddrInfo{ + p3.Connect(context.Background(), peer.AddrInfo{ ID: id2, Addrs: p2addrs[1:], }) diff --git a/p2p/protocol/identify/obsaddr_test.go b/p2p/protocol/identify/obsaddr_test.go index 2ce0400923..8b42ed38be 100644 --- a/p2p/protocol/identify/obsaddr_test.go +++ b/p2p/protocol/identify/obsaddr_test.go @@ -1,7 +1,6 @@ package identify_test import ( - "context" "testing" "time" @@ -82,14 +81,18 @@ func (h *harness) observeInbound(observed ma.Multiaddr, observer peer.ID) networ return c } -func newHarness(ctx context.Context, t *testing.T) harness { - mn := mocknet.New(ctx) +func newHarness(t *testing.T) harness { + mn := mocknet.New() sk, err := p2putil.RandTestBogusPrivateKey() require.NoError(t, err) h, err := mn.AddPeer(sk, ma.StringCast("/ip4/127.0.0.1/tcp/10086")) require.NoError(t, err) oas, err := identify.NewObservedAddrManager(h) require.NoError(t, err) + t.Cleanup(func() { + mn.Close() + oas.Close() + }) return harness{ oas: oas, mocknet: mn, @@ -131,12 +134,7 @@ func TestObsAddrSet(t *testing.T) { b4 := ma.StringCast("/ip4/1.2.3.9/tcp/1237") b5 := ma.StringCast("/ip4/1.2.3.10/tcp/1237") - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - harness := newHarness(ctx, t) - defer harness.oas.Close() - + harness := newHarness(t) if !addrsMatch(harness.oas.Addrs(), nil) { t.Error("addrs should be empty") } @@ -234,10 +232,7 @@ func TestObsAddrSet(t *testing.T) { } func TestObservedAddrFiltering(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - harness := newHarness(ctx, t) - defer harness.oas.Close() + harness := newHarness(t) require.Empty(t, harness.oas.Addrs()) // IP4/TCP @@ -336,10 +331,7 @@ func TestObservedAddrFiltering(t *testing.T) { } func TestEmitNATDeviceTypeSymmetric(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - harness := newHarness(ctx, t) - defer harness.oas.Close() + harness := newHarness(t) require.Empty(t, harness.oas.Addrs()) emitter, err := harness.host.EventBus().Emitter(new(event.EvtLocalReachabilityChanged), eventbus.Stateful) require.NoError(t, err) @@ -383,10 +375,7 @@ func TestEmitNATDeviceTypeSymmetric(t *testing.T) { } func TestEmitNATDeviceTypeCone(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - harness := newHarness(ctx, t) - defer harness.oas.Close() + harness := newHarness(t) require.Empty(t, harness.oas.Addrs()) emitter, err := harness.host.EventBus().Emitter(new(event.EvtLocalReachabilityChanged), eventbus.Stateful) require.NoError(t, err) From e7ea19f358d3198cbded61bfa65b873a0e9bf244 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 14 Dec 2021 12:19:47 +0400 Subject: [PATCH 2/2] remove goprocess from mock conn and link --- examples/pubsub/chat/go.sum | 1 - go.mod | 1 - p2p/net/mock/mock_conn.go | 18 ++++++++---------- p2p/net/mock/mock_link.go | 7 ++----- p2p/net/mock/mock_peernet.go | 3 --- 5 files changed, 10 insertions(+), 20 deletions(-) diff --git a/examples/pubsub/chat/go.sum b/examples/pubsub/chat/go.sum index cea247241b..360c46adcd 100644 --- a/examples/pubsub/chat/go.sum +++ b/examples/pubsub/chat/go.sum @@ -350,7 +350,6 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY= github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= -github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= diff --git a/go.mod b/go.mod index 9b662e953a..0720c2fe38 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,6 @@ require ( github.com/ipfs/go-datastore v0.5.0 github.com/ipfs/go-ipfs-util v0.0.2 github.com/ipfs/go-log/v2 v2.4.0 - github.com/jbenet/goprocess v0.1.4 github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/koron/go-ssdp v0.0.2 // indirect github.com/libp2p/go-addr-util v0.1.0 diff --git a/p2p/net/mock/mock_conn.go b/p2p/net/mock/mock_conn.go index 2d549c3c62..d16908c7ea 100644 --- a/p2p/net/mock/mock_conn.go +++ b/p2p/net/mock/mock_conn.go @@ -7,7 +7,6 @@ import ( "sync" "sync/atomic" - process "github.com/jbenet/goprocess" ic "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -40,13 +39,13 @@ type conn struct { streams list.List stat network.ConnStats - pairProc, connProc process.Process + closeOnce sync.Once sync.RWMutex } -func newConn(p process.Process, ln, rn *peernet, l *link, dir network.Direction) *conn { - c := &conn{net: ln, link: l, pairProc: p} +func newConn(ln, rn *peernet, l *link, dir network.Direction) *conn { + c := &conn{net: ln, link: l} c.local = ln.peer c.remote = rn.peer c.stat.Direction = dir @@ -65,7 +64,6 @@ func newConn(p process.Process, ln, rn *peernet, l *link, dir network.Direction) c.localPrivKey = ln.ps.PrivKey(ln.peer) c.remotePubKey = rn.ps.PubKey(rn.peer) - c.connProc = process.WithParent(c.pairProc) return c } @@ -74,11 +72,11 @@ func (c *conn) ID() string { } func (c *conn) Close() error { - return c.pairProc.Close() -} - -func (c *conn) setup() { - c.connProc.SetTeardown(c.teardown) + c.closeOnce.Do(func() { + go c.rconn.Close() + c.teardown() + }) + return nil } func (c *conn) teardown() error { diff --git a/p2p/net/mock/mock_link.go b/p2p/net/mock/mock_link.go index 2c7743be2a..fe65398e1c 100644 --- a/p2p/net/mock/mock_link.go +++ b/p2p/net/mock/mock_link.go @@ -6,8 +6,6 @@ import ( "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" - - process "github.com/jbenet/goprocess" ) // link implements mocknet.Link @@ -33,13 +31,12 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) { l.RLock() defer l.RUnlock() - parent := process.WithTeardown(func() error { return nil }) target := l.nets[0] if target == dialer { target = l.nets[1] } - dc := newConn(parent, dialer, target, l, network.DirOutbound) - tc := newConn(parent, target, dialer, l, network.DirInbound) + dc := newConn(dialer, target, l, network.DirOutbound) + tc := newConn(target, dialer, l, network.DirInbound) dc.rconn = tc tc.rconn = dc return dc, tc diff --git a/p2p/net/mock/mock_peernet.go b/p2p/net/mock/mock_peernet.go index 7d0d4f04d1..a01f074db1 100644 --- a/p2p/net/mock/mock_peernet.go +++ b/p2p/net/mock/mock_peernet.go @@ -188,9 +188,6 @@ func (pn *peernet) remoteOpenedConn(c *conn) { // to given remote peer over given link func (pn *peernet) addConn(c *conn) { defer c.notifLk.Unlock() - // Call this after unlocking as it might cause us to immediately close - // the connection and remove it from the swarm. - c.setup() pn.notifyAll(func(n network.Notifiee) { n.Connected(pn, c)