From 8930f293209062724b5d533251c465e8b668314d Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 30 Mar 2021 17:09:30 +0300 Subject: [PATCH 01/29] deps: update go-libp2p-core --- go.mod | 2 +- go.sum | 33 ++------------------------------- 2 files changed, 3 insertions(+), 32 deletions(-) diff --git a/go.mod b/go.mod index 5dceff46..ae61917f 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/jbenet/goprocess v0.1.4 github.com/libp2p/go-addr-util v0.0.2 github.com/libp2p/go-conn-security-multistream v0.2.1 - github.com/libp2p/go-libp2p-core v0.8.3 + github.com/libp2p/go-libp2p-core v0.8.5 github.com/libp2p/go-libp2p-loggables v0.1.0 github.com/libp2p/go-libp2p-peerstore v0.2.6 github.com/libp2p/go-libp2p-quic-transport v0.10.0 diff --git a/go.sum b/go.sum index 34920884..f2cbf841 100644 --- a/go.sum +++ b/go.sum @@ -38,7 +38,6 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= -github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= @@ -58,7 +57,6 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJnXKk= github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY= -github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -70,7 +68,6 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191027212112-611e8accdfc9 h1:uHTyIjqVhYRhLbJ8nIiOJHkEZZ+5YoOsAbD3sk82NiE= github.com/golang/groupcache v0.0.0-20191027212112-611e8accdfc9/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -94,7 +91,6 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= -github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= @@ -124,7 +120,6 @@ github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUP github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.4/go.mod h1:4LLaPOQwmk5z9LBgQnpkivrx8BJjUyGwTXCd5Xfj6+M= -github.com/ipfs/go-cid v0.0.5 h1:o0Ix8e/ql7Zb5UVUJEUfjsWCIY8t48++9lR8qi6oiJU= github.com/ipfs/go-cid v0.0.5/go.mod h1:plgt+Y5MnOey4vO4UlUazGqdbEXuFYitED67FexhXog= github.com/ipfs/go-cid v0.0.7 h1:ysQJVJA3fNDF1qigJbsSQOdjhVLsOEoPdh0+R97k3jY= github.com/ipfs/go-cid v0.0.7/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I= @@ -140,7 +135,6 @@ github.com/ipfs/go-log v1.0.4 h1:6nLQdX4W8P9yZZFH7mO+X/PzjN8Laozm/lMJ6esdgzY= github.com/ipfs/go-log v1.0.4/go.mod h1:oDCg2FkjogeFOhqqb+N39l2RpTNPL6F/StPkB3kPgcs= github.com/ipfs/go-log/v2 v2.0.2/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0= github.com/ipfs/go-log/v2 v2.0.3/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0= -github.com/ipfs/go-log/v2 v2.0.5 h1:fL4YI+1g5V/b1Yxr1qAiXTMg1H8z9vx/VmJxBuQMHvU= github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw= github.com/ipfs/go-log/v2 v2.1.1 h1:G4TtqN+V9y9HY9TA6BwbCVyyBZ2B9MbCjR2MtGx8FR0= github.com/ipfs/go-log/v2 v2.1.1/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM= @@ -189,17 +183,14 @@ github.com/libp2p/go-libp2p-core v0.5.0/go.mod h1:49XGI+kc38oGVwqSBhDEwytaAxgZas github.com/libp2p/go-libp2p-core v0.5.1/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y= github.com/libp2p/go-libp2p-core v0.5.4/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y= github.com/libp2p/go-libp2p-core v0.7.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= -github.com/libp2p/go-libp2p-core v0.8.0 h1:5K3mT+64qDTKbV3yTdbMCzJ7O6wbNsavAEb8iqBvBcI= github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-core v0.8.1/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= -github.com/libp2p/go-libp2p-core v0.8.2 h1:/eaSZACWftJZYm07S0nRxdI84v1hSmgnCXrGOvJdpNQ= github.com/libp2p/go-libp2p-core v0.8.2/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= -github.com/libp2p/go-libp2p-core v0.8.3 h1:BZTReEF6o8g/n4DwxTyeFannOeae35Xy0TD+mES3CNE= -github.com/libp2p/go-libp2p-core v0.8.3/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= +github.com/libp2p/go-libp2p-core v0.8.5 h1:aEgbIcPGsKy6zYcC+5AJivYFedhYa4sW7mIpWpUaLKw= +github.com/libp2p/go-libp2p-core v0.8.5/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8= github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90= github.com/libp2p/go-libp2p-mplex v0.2.1/go.mod h1:SC99Rxs8Vuzrf/6WhmH41kNn13TiYdAWNYHrwImKLnE= -github.com/libp2p/go-libp2p-mplex v0.2.3 h1:2zijwaJvpdesST2MXpI5w9wWFRgYtMcpRX7rrw0jmOo= github.com/libp2p/go-libp2p-mplex v0.2.3/go.mod h1:CK3p2+9qH9x+7ER/gWWDYJ3QW5ZxWDkm+dVvjfuG3ek= github.com/libp2p/go-libp2p-mplex v0.4.1 h1:/pyhkP1nLwjG3OM+VuaNJkQT/Pqq73WzB3aDN3Fx1sc= github.com/libp2p/go-libp2p-mplex v0.4.1/go.mod h1:cmy+3GfqfM1PceHTLL7zQzAAYaryDu6iPSC+CIb094g= @@ -210,7 +201,6 @@ github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYc github.com/libp2p/go-libp2p-quic-transport v0.10.0 h1:koDCbWD9CCHwcHZL3/WEvP2A+e/o5/W5L3QS/2SPMA0= github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA= github.com/libp2p/go-libp2p-testing v0.0.3/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= -github.com/libp2p/go-libp2p-testing v0.1.1 h1:U03z3HnGI7Ni8Xx6ONVZvUFOAzWYmolWf5W5jAOPNmU= github.com/libp2p/go-libp2p-testing v0.1.1/go.mod h1:xaZWMJrPUM5GlDBxCeGUi7kI4eqnjVyavGroI2nxEM0= github.com/libp2p/go-libp2p-testing v0.1.2-0.20200422005655-8775583591d8/go.mod h1:Qy8sAncLKpwXtS2dSnDOP8ktexIAHKu+J+pnZOFZLTc= github.com/libp2p/go-libp2p-testing v0.3.0/go.mod h1:efZkql4UZ7OVsEfaxNHZPzIehtsBXMrXnCfJIgDti5g= @@ -226,15 +216,12 @@ github.com/libp2p/go-libp2p-yamux v0.5.0/go.mod h1:AyR8k5EzyM2QN9Bbdg6X1SkVVuqLw github.com/libp2p/go-maddr-filter v0.0.5/go.mod h1:Jk+36PMfIqCJhAnaASRH83bdAvfDRp/w6ENFaC9bG+M= github.com/libp2p/go-maddr-filter v0.1.0/go.mod h1:VzZhTXkMucEGGEOSKddrwGiOv0tUhgnKqNEmIAz/bPU= github.com/libp2p/go-mplex v0.1.0/go.mod h1:SXgmdki2kwCUlCCbfGLEgHjC4pFqhTp0ZoV6aiKgxDU= -github.com/libp2p/go-mplex v0.1.2 h1:qOg1s+WdGLlpkrczDqmhYzyk3vCfsQ8+RxRTQjOZWwI= github.com/libp2p/go-mplex v0.1.2/go.mod h1:Xgz2RDCi3co0LeZfgjm4OgUF15+sVR8SRcu3SFXI1lk= github.com/libp2p/go-mplex v0.3.0 h1:U1T+vmCYJaEoDJPV1aq31N56hS+lJgb397GsylNSgrU= github.com/libp2p/go-mplex v0.3.0/go.mod h1:0Oy/A9PQlwBytDRp4wSkFnzHYDKcpLot35JQ6msjvYQ= -github.com/libp2p/go-msgio v0.0.4 h1:agEFehY3zWJFUHK6SEMR7UYmk2z6kC3oeCM7ybLhguA= github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ= github.com/libp2p/go-msgio v0.0.6 h1:lQ7Uc0kS1wb1EfRxO2Eir/RJoHkHn7t6o+EiwsYIKJA= github.com/libp2p/go-msgio v0.0.6/go.mod h1:4ecVB6d9f4BDSL5fqvPiC4A3KivjWn+Venn/1ALLMWA= -github.com/libp2p/go-netroute v0.1.2 h1:UHhB35chwgvcRI392znJA3RCBtZ3MpE3ahNCN5MR4Xg= github.com/libp2p/go-netroute v0.1.2/go.mod h1:jZLDV+1PE8y5XxBySEBgbuVAXbhtuHSdmLPL2n9MKbk= github.com/libp2p/go-netroute v0.1.3 h1:1ngWRx61us/EpaKkdqkMjKk/ufr/JlIFYQAxV2XX8Ig= github.com/libp2p/go-netroute v0.1.3/go.mod h1:jZLDV+1PE8y5XxBySEBgbuVAXbhtuHSdmLPL2n9MKbk= @@ -286,7 +273,6 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= -github.com/mr-tron/base58 v1.1.3 h1:v+sk57XuaCKGXpWtVBX8YJzO7hMGx4Aajh4TQbdEFdc= github.com/mr-tron/base58 v1.1.3/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= @@ -300,7 +286,6 @@ github.com/multiformats/go-multiaddr v0.0.4/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lg github.com/multiformats/go-multiaddr v0.1.1/go.mod h1:aMKBKNEYmzmDmxfX88/vz+J5IU55txyt0p4aiWVohjo= github.com/multiformats/go-multiaddr v0.2.0/go.mod h1:0nO36NvPpyV4QzvTLi/lafl2y95ncPj0vFwVF6k6wJ4= github.com/multiformats/go-multiaddr v0.2.1/go.mod h1:s/Apk6IyxfvMjDafnhJgJ3/46z7tZ04iMk5wP4QMGGE= -github.com/multiformats/go-multiaddr v0.2.2 h1:XZLDTszBIJe6m0zF6ITBrEcZR73OPUhCBBS9rYAuUzI= github.com/multiformats/go-multiaddr v0.2.2/go.mod h1:NtfXiOtHvghW9KojvtySjH5y0u0xW5UouOmQQrn6a3Y= github.com/multiformats/go-multiaddr v0.3.0/go.mod h1:dF9kph9wfJ+3VLAaeBqo9Of8x4fJxp6ggJGteB8HQTI= github.com/multiformats/go-multiaddr v0.3.1 h1:1bxa+W7j9wZKTZREySx1vPMs2TqrYWjVZ7zE6/XLG1I= @@ -312,7 +297,6 @@ github.com/multiformats/go-multiaddr-net v0.1.3/go.mod h1:ilNnaM9HbmVFqsb/qcNysj github.com/multiformats/go-multiaddr-net v0.1.4/go.mod h1:ilNnaM9HbmVFqsb/qcNysjCu4PVONlrBZpHIrw/qQuA= github.com/multiformats/go-multiaddr-net v0.2.0 h1:MSXRGN0mFymt6B1yo/6BPnIRpLPEnKgQNvVfCX5VDJk= github.com/multiformats/go-multiaddr-net v0.2.0/go.mod h1:gGdH3UXny6U3cKKYCvpXI5rnK7YaOIEOPVDI9tsJbEA= -github.com/multiformats/go-multibase v0.0.1 h1:PN9/v21eLywrFWdFNsFKaU04kLJzuYzmrJR+ubhT9qA= github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs= github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk= github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc= @@ -320,17 +304,14 @@ github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKT github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po= github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= github.com/multiformats/go-multihash v0.0.10/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= -github.com/multiformats/go-multihash v0.0.13 h1:06x+mk/zj1FoMsgNejLpy6QTvJqlSt/BhLEy87zidlc= github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc= github.com/multiformats/go-multihash v0.0.14 h1:QoBceQYQQtNUuf6s7wHxnE2c8bhbMqhfGzNI032se/I= github.com/multiformats/go-multihash v0.0.14/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc= -github.com/multiformats/go-multistream v0.1.1 h1:JlAdpIFhBhGRLxe9W6Om0w++Gd6KMWoFPZL/dEnm9nI= github.com/multiformats/go-multistream v0.1.1/go.mod h1:KmHZ40hzVxiaiwlj3MEbYgK9JFk2/9UktWZAF54Du38= github.com/multiformats/go-multistream v0.2.1 h1:R5exp4cKvGlePuxg/bn4cnV53K4DxCe+uldxs7QzfrE= github.com/multiformats/go-multistream v0.2.1/go.mod h1:5GZPQZbkWOLOn3J2y4Y99vVW7vOfsAflxARk3x14o6k= github.com/multiformats/go-varint v0.0.1/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/multiformats/go-varint v0.0.2/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= -github.com/multiformats/go-varint v0.0.5 h1:XVZwSo04Cs3j/jS0uAEPpT3JY6DzMcVLLoWOSnCxOjg= github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/multiformats/go-varint v0.0.6 h1:gk85QWKxh3TazbLxED/NlDVv8+q+ReFJk7Y2W/KhfNY= github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= @@ -346,12 +327,10 @@ github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA= github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= -github.com/onsi/gomega v1.9.0 h1:R1uwffexN6Pr340GtYRIdZmAiN4J+iw6WG4wog1DUXg= github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= -github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= @@ -410,7 +389,6 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -431,7 +409,6 @@ github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1: go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= -go.opencensus.io v0.22.3 h1:8sGtKOrtQqkN1bp2AtX+misvLIlOmsEsNd+9NIcPEm8= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4 h1:LYy1Hy3MJdrCdMwwzxA/dRok4ejH+RwNGbuoD9fCjto= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -446,7 +423,6 @@ go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKY go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= -go.uber.org/zap v1.14.1 h1:nYDKopTbvAPq/NrUVZwT15y2lpROBiLLyoRTbXOYWOo= go.uber.org/zap v1.14.1/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= go.uber.org/zap v1.15.0 h1:ZZCA22JRF2gQE5FoNmhmrf7jeJJ2uhqDUNRYKm8dvmM= go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= @@ -463,7 +439,6 @@ golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443 h1:IcSOAf4PyMp3U3XbIEj1/xJ2BjNN2jWv7JoyOsMxXUU= golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -488,7 +463,6 @@ golang.org/x/net v0.0.0-20190227160552-c95aed5357e7/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190313220215-9f648a60d977/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU= @@ -522,7 +496,6 @@ golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 h1:LfCXLvNmTYH9kEmVgqbnsWfruoXZIrh4YBgqVHtDvw0= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -573,7 +546,6 @@ google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmE google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= -google.golang.org/grpc v1.20.1 h1:Hz2g2wirWK7H0qIIhGIqRGTuMwTE8HEKFnDZZ7lm9NU= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= @@ -600,7 +572,6 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkep gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= From 9d50a8c456fe2f5dab1322912c86dfe361eb7fba Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 30 Mar 2021 17:14:50 +0300 Subject: [PATCH 02/29] implement dial worker for synchronizing simultaneous dials --- dial_sync.go | 114 +++++------- swarm.go | 8 +- swarm_dial.go | 493 ++++++++++++++++++++++++++++++-------------------- 3 files changed, 349 insertions(+), 266 deletions(-) diff --git a/dial_sync.go b/dial_sync.go index 50f3a698..2efdf067 100644 --- a/dial_sync.go +++ b/dial_sync.go @@ -5,6 +5,7 @@ import ( "errors" "sync" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" ) @@ -12,88 +13,74 @@ import ( var errDialCanceled = errors.New("dial was aborted internally, likely due to https://git.io/Je2wW") // DialFunc is the type of function expected by DialSync. -type DialFunc func(context.Context, peer.ID) (*Conn, error) +type DialWorkerFunc func(context.Context, peer.ID, <-chan dialRequest) // NewDialSync constructs a new DialSync -func NewDialSync(dfn DialFunc) *DialSync { +func NewDialSync(worker DialWorkerFunc) *DialSync { return &DialSync{ - dials: make(map[peer.ID]*activeDial), - dialFunc: dfn, + dials: make(map[peer.ID]*activeDial), + dialWorker: worker, } } // DialSync is a dial synchronization helper that ensures that at most one dial // to any given peer is active at any given time. type DialSync struct { - dials map[peer.ID]*activeDial - dialsLk sync.Mutex - dialFunc DialFunc + dials map[peer.ID]*activeDial + dialsLk sync.Mutex + dialWorker DialWorkerFunc } type activeDial struct { - id peer.ID - refCnt int - refCntLk sync.Mutex - cancel func() + id peer.ID + refCnt int - err error - conn *Conn - waitch chan struct{} + ctx context.Context + cancel func() - ds *DialSync -} + reqch chan dialRequest -func (ad *activeDial) wait(ctx context.Context) (*Conn, error) { - defer ad.decref() - select { - case <-ad.waitch: - return ad.conn, ad.err - case <-ctx.Done(): - return nil, ctx.Err() - } + ds *DialSync } func (ad *activeDial) incref() { - ad.refCntLk.Lock() - defer ad.refCntLk.Unlock() ad.refCnt++ } func (ad *activeDial) decref() { - ad.refCntLk.Lock() + ad.ds.dialsLk.Lock() ad.refCnt-- - maybeZero := (ad.refCnt <= 0) - ad.refCntLk.Unlock() - - // make sure to always take locks in correct order. - if maybeZero { - ad.ds.dialsLk.Lock() - ad.refCntLk.Lock() - // check again after lock swap drop to make sure nobody else called incref - // in between locks - if ad.refCnt <= 0 { - ad.cancel() - delete(ad.ds.dials, ad.id) - } - ad.refCntLk.Unlock() - ad.ds.dialsLk.Unlock() + if ad.refCnt == 0 { + ad.cancel() + close(ad.reqch) + delete(ad.ds.dials, ad.id) } + ad.ds.dialsLk.Unlock() } -func (ad *activeDial) start(ctx context.Context) { - ad.conn, ad.err = ad.ds.dialFunc(ctx, ad.id) - - // This isn't the user's context so we should fix the error. - switch ad.err { - case context.Canceled: - // The dial was canceled with `CancelDial`. - ad.err = errDialCanceled - case context.DeadlineExceeded: - // We hit an internal timeout, not a context timeout. - ad.err = ErrDialTimeout +func (ad *activeDial) dial(ctx context.Context, p peer.ID) (*Conn, error) { + dialCtx := ad.ctx + + if forceDirect, reason := network.GetForceDirectDial(ctx); forceDirect { + dialCtx = network.WithForceDirectDial(dialCtx, reason) + } + if simConnect, reason := network.GetSimultaneousConnect(ctx); simConnect { + dialCtx = network.WithSimultaneousConnect(dialCtx, reason) + } + + resch := make(chan dialResponse, 1) + select { + case ad.reqch <- dialRequest{ctx: dialCtx, resch: resch}: + case <-ctx.Done(): + return nil, ctx.Err() + } + + select { + case res := <-resch: + return res.conn, res.err + case <-ctx.Done(): + return nil, ctx.Err() } - close(ad.waitch) - ad.cancel() } func (ds *DialSync) getActiveDial(p peer.ID) *activeDial { @@ -109,13 +96,14 @@ func (ds *DialSync) getActiveDial(p peer.ID) *activeDial { adctx, cancel := context.WithCancel(context.Background()) actd = &activeDial{ id: p, + ctx: adctx, cancel: cancel, - waitch: make(chan struct{}), + reqch: make(chan dialRequest), ds: ds, } ds.dials[p] = actd - go actd.start(adctx) + go ds.dialWorker(adctx, p, actd.reqch) } // increase ref count before dropping dialsLk @@ -127,14 +115,8 @@ func (ds *DialSync) getActiveDial(p peer.ID) *activeDial { // DialLock initiates a dial to the given peer if there are none in progress // then waits for the dial to that peer to complete. func (ds *DialSync) DialLock(ctx context.Context, p peer.ID) (*Conn, error) { - return ds.getActiveDial(p).wait(ctx) -} + ad := ds.getActiveDial(p) + defer ad.decref() -// CancelDial cancels all in-progress dials to the given peer. -func (ds *DialSync) CancelDial(p peer.ID) { - ds.dialsLk.Lock() - defer ds.dialsLk.Unlock() - if ad, ok := ds.dials[p]; ok { - ad.cancel() - } + return ad.dial(ctx, p) } diff --git a/swarm.go b/swarm.go index 3b3cf832..c57c563c 100644 --- a/swarm.go +++ b/swarm.go @@ -121,7 +121,7 @@ func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc } } - s.dsync = NewDialSync(s.doDial) + s.dsync = NewDialSync(s.dialWorker) s.limiter = newDialLimiter(s.dialAddr, s.IsFdConsumingAddr) s.proc = goprocessctx.WithContext(ctx) s.ctx = goprocessctx.OnClosingContext(s.proc) @@ -259,12 +259,6 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, c.notifyLk.Lock() s.conns.Unlock() - // We have a connection now. Cancel all other in-progress dials. - // This should be fast, no reason to wait till later. - if dir == network.DirOutbound { - s.dsync.CancelDial(p) - } - s.notifyAll(func(f network.Notifiee) { f.Connected(s, c) }) diff --git a/swarm_dial.go b/swarm_dial.go index ccf33c2e..cc739791 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -14,7 +14,6 @@ import ( addrutil "github.com/libp2p/go-addr-util" lgbl "github.com/libp2p/go-libp2p-loggables" - logging "github.com/ipfs/go-log" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" ) @@ -58,6 +57,12 @@ var ( ErrGaterDisallowedConnection = errors.New("gater disallows connection to peer") ) +var ( + DelayDialPrivateAddr = 5 * time.Millisecond + DelayDialPublicAddr = 50 * time.Millisecond + DelayDialRelayAddr = 100 * time.Millisecond +) + // DialAttempts governs how many times a goroutine will try to dial a given peer. // Note: this is down to one, as we have _too many dials_ atm. To add back in, // add loop back in Dial(.) @@ -281,39 +286,306 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { return nil, err } -// doDial is an ugly shim method to retain all the logging and backoff logic -// of the old dialsync code -func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) { - // Short circuit. - // By the time we take the dial lock, we may already *have* a connection - // to the peer. - c := s.bestAcceptableConnToPeer(ctx, p) - if c != nil { - return c, nil +/////////////////////////////////////////////////////////////////////////////////// +// lo and behold, The Dialer +// TODO explain how all this works +////////////////////////////////////////////////////////////////////////////////// +type dialRequest struct { + ctx context.Context + resch chan dialResponse +} + +type dialResponse struct { + conn *Conn + err error +} + +type dialComplete struct { + addr ma.Multiaddr + conn *Conn + err error +} + +// dialWorker is an active dial goroutine that synchronizes and executes concurrent dials +func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan dialRequest) { + if p == s.local { + for { + select { + case req, ok := <-reqch: + if !ok { + return + } + + req.resch <- dialResponse{err: ErrDialToSelf} + } + } } - logdial := lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil) + s.dialWorkerLoop(ctx, p, reqch) +} - // ok, we have been charged to dial! let's do it. - // if it succeeds, dial will add the conn to the swarm itself. - defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done() +func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan dialRequest) { + defer s.limiter.clearAllPeerDials(p) - conn, err := s.dial(ctx, p) - if err != nil { - conn := s.bestAcceptableConnToPeer(ctx, p) - if conn != nil { - // Hm? What error? - // Could have canceled the dial because we received a - // connection or some other random reason. - // Just ignore the error and return the connection. - log.Debugf("ignoring dial error because we already have a connection: %s", err) - return conn, nil + type pendRequest struct { + req dialRequest // the original request + err *DialError // dial error accumulator + addrs map[ma.Multiaddr]struct{} // pending addr dials + } + + type addrDial struct { + ctx context.Context + conn *Conn + err error + requests []int + } + + reqno := 0 + requests := make(map[int]*pendRequest) + pending := make(map[ma.Multiaddr]*addrDial) + + var triggerDial <-chan time.Time + var nextDial []ma.Multiaddr + active := 0 + done := false + + resch := make(chan dialComplete) + +loop: + for { + select { + case req, ok := <-reqch: + if !ok { + // request channel has been closed, wait for pending dials to complete + if active > 0 { + done = true + reqch = nil + triggerDial = nil + continue loop + } + + // no active dials, we are done + return + } + + c := s.bestAcceptableConnToPeer(req.ctx, p) + if c != nil { + req.resch <- dialResponse{conn: c} + continue loop + } + + addrs, err := s.addrsForDial(req.ctx, p) + if err != nil { + req.resch <- dialResponse{err: err} + continue loop + } + + // at this point, len(addrs) > 0 or else it would be error from addrsForDial + // ranke them to process in order + addrs = s.rankAddrs(addrs) + + // create the pending request object + pr := &pendRequest{ + req: req, + err: &DialError{Peer: p}, + addrs: make(map[ma.Multiaddr]struct{}), + } + for _, a := range addrs { + pr.addrs[a] = struct{}{} + } + + // check if any of the addrs has been successfully dialed and accumulate + // errors from complete dials while collecting new addrs to dial/join + var todial []ma.Multiaddr + var tojoin []*addrDial + + for _, a := range addrs { + ad, ok := pending[a] + if !ok { + todial = append(todial, a) + continue + } + + if ad.conn != nil { + // dial to this addr was successful, complete the request + req.resch <- dialResponse{conn: ad.conn} + continue loop + } + + if ad.err != nil { + // dial to this addr errored, accumulate the error + pr.err.recordErr(a, ad.err) + delete(pr.addrs, a) + } + + // dial is still pending, add to the join list + tojoin = append(tojoin, ad) + } + + if len(todial) == 0 && len(tojoin) == 0 { + // all request applicable addrs have been dialed, we must have errored + req.resch <- dialResponse{err: pr.err} + continue loop + } + + // the request has some pending or new dials, track it and schedule new dials + reqno++ + requests[reqno] = pr + + for _, ad := range tojoin { + ad.requests = append(ad.requests, reqno) + } + + if len(todial) > 0 { + for _, a := range todial { + pending[a] = &addrDial{ctx: req.ctx, requests: []int{reqno}} + } + + nextDial = append(nextDial, todial...) + nextDial = s.rankAddrs(nextDial) + + if triggerDial == nil { + trigger := make(chan time.Time) + close(trigger) + triggerDial = trigger + } + } + + case <-triggerDial: + if len(nextDial) == 0 { + triggerDial = nil + continue loop + } + + next := nextDial[0] + nextDial = nextDial[1:] + + // spawn the next dial + ad := pending[next] + go s.dialNextAddr(ad.ctx, p, next, resch) + active++ + + // select an appropriate delay for the next dial trigger + delay := s.delayForNextDial(next) + triggerDial = time.After(delay) + + case res := <-resch: + active-- + + if done && active == 0 { + return + } + + ad := pending[res.addr] + ad.conn = res.conn + ad.err = res.err + + dialRequests := ad.requests + ad.requests = nil + + if res.conn != nil { + // we got a connection, dispatch to still pending requests + for _, reqno := range dialRequests { + pr, ok := requests[reqno] + if !ok { + // it has already dispatched a connection + continue + } + + pr.req.resch <- dialResponse{conn: res.conn} + delete(requests, reqno) + } + + continue loop + } + + // it must be an error, accumulate it and dispatch dial error if the request has tried all addrs + for _, reqno := range dialRequests { + pr, ok := requests[reqno] + if !ok { + // has already been dispatched + continue + } + + // accumulate the error + pr.err.recordErr(res.addr, res.err) + + delete(pr.addrs, res.addr) + if len(pr.addrs) == 0 { + // all addrs have erred, dispatch dial error + pr.req.resch <- dialResponse{err: pr.err} + delete(requests, reqno) + } + } } + } +} - // ok, we failed. - return nil, err +func (s *Swarm) addrsForDial(ctx context.Context, p peer.ID) ([]ma.Multiaddr, error) { + peerAddrs := s.peers.Addrs(p) + if len(peerAddrs) == 0 { + return nil, ErrNoAddresses + } + + goodAddrs := s.filterKnownUndialables(p, peerAddrs) + if forceDirect, _ := network.GetForceDirectDial(ctx); forceDirect { + goodAddrs = addrutil.FilterAddrs(goodAddrs, s.nonProxyAddr) } - return conn, nil + + if len(goodAddrs) == 0 { + return nil, ErrNoGoodAddresses + } + + return goodAddrs, nil +} + +func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, resch chan dialComplete) { + // check the dial backoff + if forceDirect, _ := network.GetForceDirectDial(ctx); !forceDirect { + if s.backf.Backoff(p, addr) { + resch <- dialComplete{addr: addr, err: ErrDialBackoff} + return + } + } + + // start the dial + dresch := make(chan dialResult) + s.limitedDial(ctx, p, addr, dresch) + select { + case res := <-dresch: + if res.Err != nil { + if res.Err != context.Canceled { + s.backf.AddBackoff(p, addr) + } + + resch <- dialComplete{addr: addr, err: res.Err} + return + } + + conn, err := s.addConn(res.Conn, network.DirOutbound) + if err != nil { + res.Conn.Close() + resch <- dialComplete{addr: addr, err: err} + return + } + + resch <- dialComplete{addr: addr, conn: conn} + + case <-ctx.Done(): + resch <- dialComplete{addr: addr, err: ctx.Err()} + } +} + +func (s *Swarm) delayForNextDial(addr ma.Multiaddr) time.Duration { + if _, err := addr.ValueForProtocol(ma.P_CIRCUIT); err == nil { + return DelayDialRelayAddr + } + + if manet.IsPrivateAddr(addr) { + return DelayDialPrivateAddr + } + + return DelayDialPublicAddr } func (s *Swarm) canDial(addr ma.Multiaddr) bool { @@ -365,80 +637,6 @@ func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { return append(append(append(localUdpAddrs, othersUdp...), fds...), relays...) } -// dial is the actual swarm's dial logic, gated by Dial. -func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { - forceDirect, _ := network.GetForceDirectDial(ctx) - var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil) - if p == s.local { - log.Event(ctx, "swarmDialDoDialSelf", logdial) - return nil, ErrDialToSelf - } - defer log.EventBegin(ctx, "swarmDialDo", logdial).Done() - logdial["dial"] = "failure" // start off with failure. set to "success" at the end. - - sk := s.peers.PrivKey(s.local) - logdial["encrypted"] = sk != nil // log whether this will be an encrypted dial or not. - if sk == nil { - // fine for sk to be nil, just log. - log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.") - } - - ////// - peerAddrs := s.peers.Addrs(p) - if len(peerAddrs) == 0 { - return nil, &DialError{Peer: p, Cause: ErrNoAddresses} - } - goodAddrs := s.filterKnownUndialables(p, peerAddrs) - if forceDirect { - goodAddrs = addrutil.FilterAddrs(goodAddrs, s.nonProxyAddr) - } - if len(goodAddrs) == 0 { - return nil, &DialError{Peer: p, Cause: ErrNoGoodAddresses} - } - - if !forceDirect { - /////// Check backoff andnRank addresses - var nonBackoff bool - for _, a := range goodAddrs { - // skip addresses in back-off - if !s.backf.Backoff(p, a) { - nonBackoff = true - } - } - if !nonBackoff { - return nil, ErrDialBackoff - } - } - - connC, dialErr := s.dialAddrs(ctx, p, s.rankAddrs(goodAddrs)) - if dialErr != nil { - logdial["error"] = dialErr.Cause.Error() - switch dialErr.Cause { - case context.Canceled, context.DeadlineExceeded: - // Always prefer the context errors as we rely on being - // able to check them. - // - // Removing this will BREAK backoff (causing us to - // backoff when canceling dials). - return nil, dialErr.Cause - } - return nil, dialErr - } - logdial["conn"] = logging.Metadata{ - "localAddr": connC.LocalMultiaddr(), - "remoteAddr": connC.RemoteMultiaddr(), - } - swarmC, err := s.addConn(connC, network.DirOutbound) - if err != nil { - logdial["error"] = err.Error() - connC.Close() // close the connection. didn't work out :( - return nil, &DialError{Peer: p, Cause: err} - } - - logdial["dial"] = "success" - return swarmC, nil -} - // filterKnownUndialables takes a list of multiaddrs, and removes those // that we definitely don't want to dial: addresses configured to be blocked, // IPv6 link-local addresses, addresses without a dial-capable transport, @@ -466,98 +664,6 @@ func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr) []ma.Mul ) } -func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs []ma.Multiaddr) (transport.CapableConn, *DialError) { - /* - This slice-to-chan code is temporary, the peerstore can currently provide - a channel as an interface for receiving addresses, but more thought - needs to be put into the execution. For now, this allows us to use - the improved rate limiter, while maintaining the outward behaviour - that we previously had (halting a dial when we run out of addrs) - */ - var remoteAddrChan chan ma.Multiaddr - if len(remoteAddrs) > 0 { - remoteAddrChan = make(chan ma.Multiaddr, len(remoteAddrs)) - for i := range remoteAddrs { - remoteAddrChan <- remoteAddrs[i] - } - close(remoteAddrChan) - } - - log.Debugf("%s swarm dialing %s", s.local, p) - - ctx, cancel := context.WithCancel(ctx) - defer cancel() // cancel work when we exit func - - // use a single response type instead of errs and conns, reduces complexity *a ton* - respch := make(chan dialResult) - err := &DialError{Peer: p} - - defer s.limiter.clearAllPeerDials(p) - - var active int -dialLoop: - for remoteAddrChan != nil || active > 0 { - // Check for context cancellations and/or responses first. - select { - case <-ctx.Done(): - break dialLoop - case resp := <-respch: - active-- - if resp.Err != nil { - // Errors are normal, lots of dials will fail - if resp.Err != context.Canceled { - s.backf.AddBackoff(p, resp.Addr) - } - - log.Infof("got error on dial: %s", resp.Err) - err.recordErr(resp.Addr, resp.Err) - } else if resp.Conn != nil { - return resp.Conn, nil - } - - // We got a result, try again from the top. - continue - default: - } - - // Now, attempt to dial. - select { - case addr, ok := <-remoteAddrChan: - if !ok { - remoteAddrChan = nil - continue - } - - s.limitedDial(ctx, p, addr, respch) - active++ - case <-ctx.Done(): - break dialLoop - case resp := <-respch: - active-- - if resp.Err != nil { - // Errors are normal, lots of dials will fail - if resp.Err != context.Canceled { - s.backf.AddBackoff(p, resp.Addr) - } - - log.Infof("got error on dial: %s", resp.Err) - err.recordErr(resp.Addr, resp.Err) - } else if resp.Conn != nil { - return resp.Conn, nil - } - } - } - - if ctxErr := ctx.Err(); ctxErr != nil { - err.Cause = ctxErr - } else if len(err.DialErrors) == 0 { - err.Cause = network.ErrNoRemoteAddrs - } else { - err.Cause = ErrAllDialsFailed - } - return nil, err -} - // limitedDial will start a dial to the given peer when // it is able, respecting the various different types of rate // limiting that occur without using extra goroutines per addr @@ -570,6 +676,7 @@ func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp }) } +// dialAddr is the actual dial for an addr, indirectly invoked through the limiter func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (transport.CapableConn, error) { // Just to double check. Costs nothing. if s.local == p { From 201a8d1428b1db09b533ff037d329d5b7f80bcc0 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 30 Mar 2021 17:46:01 +0300 Subject: [PATCH 03/29] adjust next dial delays --- swarm_dial.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/swarm_dial.go b/swarm_dial.go index cc739791..428845f3 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -59,8 +59,8 @@ var ( var ( DelayDialPrivateAddr = 5 * time.Millisecond - DelayDialPublicAddr = 50 * time.Millisecond - DelayDialRelayAddr = 100 * time.Millisecond + DelayDialPublicAddr = 25 * time.Millisecond + DelayDialRelayAddr = 50 * time.Millisecond ) // DialAttempts governs how many times a goroutine will try to dial a given peer. From 2e742fb9eddfd0fe6f57afc14f906cedb786074d Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 30 Mar 2021 20:38:48 +0300 Subject: [PATCH 04/29] fix dial_sync tests --- dial_sync.go | 12 ++++---- dial_sync_test.go | 70 ++++++++++++++++++++++++++++++++++++----------- swarm_dial.go | 38 ++++++++++++------------- 3 files changed, 79 insertions(+), 41 deletions(-) diff --git a/dial_sync.go b/dial_sync.go index 2efdf067..54a21067 100644 --- a/dial_sync.go +++ b/dial_sync.go @@ -13,7 +13,7 @@ import ( var errDialCanceled = errors.New("dial was aborted internally, likely due to https://git.io/Je2wW") // DialFunc is the type of function expected by DialSync. -type DialWorkerFunc func(context.Context, peer.ID, <-chan dialRequest) +type DialWorkerFunc func(context.Context, peer.ID, <-chan DialRequest) // NewDialSync constructs a new DialSync func NewDialSync(worker DialWorkerFunc) *DialSync { @@ -38,7 +38,7 @@ type activeDial struct { ctx context.Context cancel func() - reqch chan dialRequest + reqch chan DialRequest ds *DialSync } @@ -68,16 +68,16 @@ func (ad *activeDial) dial(ctx context.Context, p peer.ID) (*Conn, error) { dialCtx = network.WithSimultaneousConnect(dialCtx, reason) } - resch := make(chan dialResponse, 1) + resch := make(chan DialResponse, 1) select { - case ad.reqch <- dialRequest{ctx: dialCtx, resch: resch}: + case ad.reqch <- DialRequest{Ctx: dialCtx, Resch: resch}: case <-ctx.Done(): return nil, ctx.Err() } select { case res := <-resch: - return res.conn, res.err + return res.Conn, res.Err case <-ctx.Done(): return nil, ctx.Err() } @@ -98,7 +98,7 @@ func (ds *DialSync) getActiveDial(p peer.ID) *activeDial { id: p, ctx: adctx, cancel: cancel, - reqch: make(chan dialRequest), + reqch: make(chan DialRequest), ds: ds, } ds.dials[p] = actd diff --git a/dial_sync_test.go b/dial_sync_test.go index 485d1a31..ef7458a5 100644 --- a/dial_sync_test.go +++ b/dial_sync_test.go @@ -12,19 +12,33 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) -func getMockDialFunc() (DialFunc, func(), context.Context, <-chan struct{}) { +func getMockDialFunc() (DialWorkerFunc, func(), context.Context, <-chan struct{}) { dfcalls := make(chan struct{}, 512) // buffer it large enough that we won't care dialctx, cancel := context.WithCancel(context.Background()) ch := make(chan struct{}) - f := func(ctx context.Context, p peer.ID) (*Conn, error) { + f := func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { dfcalls <- struct{}{} - defer cancel() - select { - case <-ch: - return new(Conn), nil - case <-ctx.Done(): - return nil, ctx.Err() - } + go func() { + defer cancel() + for { + select { + case req, ok := <-reqch: + if !ok { + return + } + + select { + case <-ch: + req.Resch <- DialResponse{Conn: new(Conn)} + case <-ctx.Done(): + req.Resch <- DialResponse{Err: ctx.Err()} + return + } + case <-ctx.Done(): + return + } + } + }() } o := new(sync.Once) @@ -174,12 +188,25 @@ func TestDialSyncAllCancel(t *testing.T) { func TestFailFirst(t *testing.T) { var count int - f := func(ctx context.Context, p peer.ID) (*Conn, error) { - if count > 0 { - return new(Conn), nil + f := func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { + for { + select { + case req, ok := <-reqch: + if !ok { + return + } + + if count > 0 { + req.Resch <- DialResponse{Conn: new(Conn)} + } else { + req.Resch <- DialResponse{Err: fmt.Errorf("gophers ate the modem")} + } + count++ + + case <-ctx.Done(): + return + } } - count++ - return nil, fmt.Errorf("gophers ate the modem") } ds := NewDialSync(f) @@ -205,8 +232,19 @@ func TestFailFirst(t *testing.T) { } func TestStressActiveDial(t *testing.T) { - ds := NewDialSync(func(ctx context.Context, p peer.ID) (*Conn, error) { - return nil, nil + ds := NewDialSync(func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { + for { + select { + case req, ok := <-reqch: + if !ok { + return + } + + req.Resch <- DialResponse{} + case <-ctx.Done(): + return + } + } }) wg := sync.WaitGroup{} diff --git a/swarm_dial.go b/swarm_dial.go index 428845f3..ae4c87f1 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -290,14 +290,14 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { // lo and behold, The Dialer // TODO explain how all this works ////////////////////////////////////////////////////////////////////////////////// -type dialRequest struct { - ctx context.Context - resch chan dialResponse +type DialRequest struct { + Ctx context.Context + Resch chan DialResponse } -type dialResponse struct { - conn *Conn - err error +type DialResponse struct { + Conn *Conn + Err error } type dialComplete struct { @@ -307,7 +307,7 @@ type dialComplete struct { } // dialWorker is an active dial goroutine that synchronizes and executes concurrent dials -func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan dialRequest) { +func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { if p == s.local { for { select { @@ -316,7 +316,7 @@ func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan dialRequ return } - req.resch <- dialResponse{err: ErrDialToSelf} + req.Resch <- DialResponse{Err: ErrDialToSelf} } } } @@ -324,11 +324,11 @@ func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan dialRequ s.dialWorkerLoop(ctx, p, reqch) } -func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan dialRequest) { +func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { defer s.limiter.clearAllPeerDials(p) type pendRequest struct { - req dialRequest // the original request + req DialRequest // the original request err *DialError // dial error accumulator addrs map[ma.Multiaddr]struct{} // pending addr dials } @@ -368,15 +368,15 @@ loop: return } - c := s.bestAcceptableConnToPeer(req.ctx, p) + c := s.bestAcceptableConnToPeer(req.Ctx, p) if c != nil { - req.resch <- dialResponse{conn: c} + req.Resch <- DialResponse{Conn: c} continue loop } - addrs, err := s.addrsForDial(req.ctx, p) + addrs, err := s.addrsForDial(req.Ctx, p) if err != nil { - req.resch <- dialResponse{err: err} + req.Resch <- DialResponse{Err: err} continue loop } @@ -408,7 +408,7 @@ loop: if ad.conn != nil { // dial to this addr was successful, complete the request - req.resch <- dialResponse{conn: ad.conn} + req.Resch <- DialResponse{Conn: ad.conn} continue loop } @@ -424,7 +424,7 @@ loop: if len(todial) == 0 && len(tojoin) == 0 { // all request applicable addrs have been dialed, we must have errored - req.resch <- dialResponse{err: pr.err} + req.Resch <- DialResponse{Err: pr.err} continue loop } @@ -438,7 +438,7 @@ loop: if len(todial) > 0 { for _, a := range todial { - pending[a] = &addrDial{ctx: req.ctx, requests: []int{reqno}} + pending[a] = &addrDial{ctx: req.Ctx, requests: []int{reqno}} } nextDial = append(nextDial, todial...) @@ -492,7 +492,7 @@ loop: continue } - pr.req.resch <- dialResponse{conn: res.conn} + pr.req.Resch <- DialResponse{Conn: res.conn} delete(requests, reqno) } @@ -513,7 +513,7 @@ loop: delete(pr.addrs, res.addr) if len(pr.addrs) == 0 { // all addrs have erred, dispatch dial error - pr.req.resch <- dialResponse{err: pr.err} + pr.req.Resch <- DialResponse{Err: pr.err} delete(requests, reqno) } } From 59006419d1b7a2441726c752b27ccf0df0bd4359 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 30 Mar 2021 21:35:48 +0300 Subject: [PATCH 05/29] clear address dial when they fail because of backoff makes TestDialBackoff happy --- swarm_dial.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/swarm_dial.go b/swarm_dial.go index ae4c87f1..9b10db16 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -517,6 +517,11 @@ loop: delete(requests, reqno) } } + + // if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests + if res.err == ErrDialBackoff { + delete(pending, res.addr) + } } } } From de528f18f6858fb680bc97c4c5a9207775ef0087 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 12:27:51 +0300 Subject: [PATCH 06/29] nuke incref, it's useless --- dial_sync.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/dial_sync.go b/dial_sync.go index 54a21067..e5f25478 100644 --- a/dial_sync.go +++ b/dial_sync.go @@ -43,10 +43,6 @@ type activeDial struct { ds *DialSync } -func (ad *activeDial) incref() { - ad.refCnt++ -} - func (ad *activeDial) decref() { ad.ds.dialsLk.Lock() ad.refCnt-- @@ -107,7 +103,7 @@ func (ds *DialSync) getActiveDial(p peer.ID) *activeDial { } // increase ref count before dropping dialsLk - actd.incref() + actd.refCnt++ return actd } From 2ee7bf0a9ada537fbf343463978dfa47dd23ee9b Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 12:34:54 +0300 Subject: [PATCH 07/29] make dialWorker return an error for self dials and responsible for spawning the loop --- dial_sync.go | 19 ++++++++++----- dial_sync_test.go | 61 ++++++++++++++++++++++++++--------------------- swarm_dial.go | 16 ++++--------- 3 files changed, 51 insertions(+), 45 deletions(-) diff --git a/dial_sync.go b/dial_sync.go index e5f25478..32196cb2 100644 --- a/dial_sync.go +++ b/dial_sync.go @@ -13,7 +13,7 @@ import ( var errDialCanceled = errors.New("dial was aborted internally, likely due to https://git.io/Je2wW") // DialFunc is the type of function expected by DialSync. -type DialWorkerFunc func(context.Context, peer.ID, <-chan DialRequest) +type DialWorkerFunc func(context.Context, peer.ID, <-chan DialRequest) error // NewDialSync constructs a new DialSync func NewDialSync(worker DialWorkerFunc) *DialSync { @@ -79,7 +79,7 @@ func (ad *activeDial) dial(ctx context.Context, p peer.ID) (*Conn, error) { } } -func (ds *DialSync) getActiveDial(p peer.ID) *activeDial { +func (ds *DialSync) getActiveDial(p peer.ID) (*activeDial, error) { ds.dialsLk.Lock() defer ds.dialsLk.Unlock() @@ -99,20 +99,27 @@ func (ds *DialSync) getActiveDial(p peer.ID) *activeDial { } ds.dials[p] = actd - go ds.dialWorker(adctx, p, actd.reqch) + err := ds.dialWorker(adctx, p, actd.reqch) + if err != nil { + cancel() + return nil, err + } } // increase ref count before dropping dialsLk actd.refCnt++ - return actd + return actd, nil } // DialLock initiates a dial to the given peer if there are none in progress // then waits for the dial to that peer to complete. func (ds *DialSync) DialLock(ctx context.Context, p peer.ID) (*Conn, error) { - ad := ds.getActiveDial(p) - defer ad.decref() + ad, err := ds.getActiveDial(p) + if err != nil { + return nil, err + } + defer ad.decref() return ad.dial(ctx, p) } diff --git a/dial_sync_test.go b/dial_sync_test.go index ef7458a5..f1a9f8a5 100644 --- a/dial_sync_test.go +++ b/dial_sync_test.go @@ -16,7 +16,7 @@ func getMockDialFunc() (DialWorkerFunc, func(), context.Context, <-chan struct{} dfcalls := make(chan struct{}, 512) // buffer it large enough that we won't care dialctx, cancel := context.WithCancel(context.Background()) ch := make(chan struct{}) - f := func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { + f := func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { dfcalls <- struct{}{} go func() { defer cancel() @@ -39,6 +39,7 @@ func getMockDialFunc() (DialWorkerFunc, func(), context.Context, <-chan struct{} } } }() + return nil } o := new(sync.Once) @@ -188,25 +189,28 @@ func TestDialSyncAllCancel(t *testing.T) { func TestFailFirst(t *testing.T) { var count int - f := func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { - for { - select { - case req, ok := <-reqch: - if !ok { - return - } + f := func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { + go func() { + for { + select { + case req, ok := <-reqch: + if !ok { + return + } - if count > 0 { - req.Resch <- DialResponse{Conn: new(Conn)} - } else { - req.Resch <- DialResponse{Err: fmt.Errorf("gophers ate the modem")} - } - count++ + if count > 0 { + req.Resch <- DialResponse{Conn: new(Conn)} + } else { + req.Resch <- DialResponse{Err: fmt.Errorf("gophers ate the modem")} + } + count++ - case <-ctx.Done(): - return + case <-ctx.Done(): + return + } } - } + }() + return nil } ds := NewDialSync(f) @@ -232,19 +236,22 @@ func TestFailFirst(t *testing.T) { } func TestStressActiveDial(t *testing.T) { - ds := NewDialSync(func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { - for { - select { - case req, ok := <-reqch: - if !ok { + ds := NewDialSync(func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { + go func() { + for { + select { + case req, ok := <-reqch: + if !ok { + return + } + + req.Resch <- DialResponse{} + case <-ctx.Done(): return } - - req.Resch <- DialResponse{} - case <-ctx.Done(): - return } - } + }() + return nil }) wg := sync.WaitGroup{} diff --git a/swarm_dial.go b/swarm_dial.go index 9b10db16..77cb8a6b 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -307,21 +307,13 @@ type dialComplete struct { } // dialWorker is an active dial goroutine that synchronizes and executes concurrent dials -func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { +func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { if p == s.local { - for { - select { - case req, ok := <-reqch: - if !ok { - return - } - - req.Resch <- DialResponse{Err: ErrDialToSelf} - } - } + return ErrDialToSelf } - s.dialWorkerLoop(ctx, p, reqch) + go s.dialWorkerLoop(ctx, p, reqch) + return nil } func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { From 0510dcb8ac7eec7eb1baf5252518f2159b13f5b4 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 13:58:16 +0300 Subject: [PATCH 08/29] don't use a goroutine for the actual dial --- swarm_dial.go | 138 ++++++++++++++++++++++++++------------------------ 1 file changed, 72 insertions(+), 66 deletions(-) diff --git a/swarm_dial.go b/swarm_dial.go index 77cb8a6b..8f1e2c3c 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -290,6 +290,7 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { // lo and behold, The Dialer // TODO explain how all this works ////////////////////////////////////////////////////////////////////////////////// + type DialRequest struct { Ctx context.Context Resch chan DialResponse @@ -300,12 +301,6 @@ type DialResponse struct { Err error } -type dialComplete struct { - addr ma.Multiaddr - conn *Conn - err error -} - // dialWorker is an active dial goroutine that synchronizes and executes concurrent dials func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { if p == s.local { @@ -326,6 +321,7 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial } type addrDial struct { + addr ma.Multiaddr ctx context.Context conn *Conn err error @@ -336,12 +332,40 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial requests := make(map[int]*pendRequest) pending := make(map[ma.Multiaddr]*addrDial) + dispatchError := func(ad *addrDial, err error) { + ad.err = err + for _, reqno := range ad.requests { + pr, ok := requests[reqno] + if !ok { + // has already been dispatched + continue + } + + // accumulate the error + pr.err.recordErr(ad.addr, err) + + delete(pr.addrs, ad.addr) + if len(pr.addrs) == 0 { + // all addrs have erred, dispatch dial error + pr.req.Resch <- DialResponse{Err: pr.err} + delete(requests, reqno) + } + } + + ad.requests = nil + + // if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests + if err == ErrDialBackoff { + delete(pending, ad.addr) + } + } + var triggerDial <-chan time.Time var nextDial []ma.Multiaddr active := 0 done := false - resch := make(chan dialComplete) + resch := make(chan dialResult) loop: for { @@ -408,6 +432,7 @@ loop: // dial to this addr errored, accumulate the error pr.err.recordErr(a, ad.err) delete(pr.addrs, a) + continue } // dial is still pending, add to the join list @@ -430,7 +455,7 @@ loop: if len(todial) > 0 { for _, a := range todial { - pending[a] = &addrDial{ctx: req.Ctx, requests: []int{reqno}} + pending[a] = &addrDial{addr: a, ctx: req.Ctx, requests: []int{reqno}} } nextDial = append(nextDial, todial...) @@ -454,7 +479,12 @@ loop: // spawn the next dial ad := pending[next] - go s.dialNextAddr(ad.ctx, p, next, resch) + err := s.dialNextAddr(ad.ctx, p, next, resch) + if err != nil { + dispatchError(ad, err) + continue loop + } + active++ // select an appropriate delay for the next dial trigger @@ -465,55 +495,54 @@ loop: active-- if done && active == 0 { + if res.Conn != nil { + // we got an actual connection, but the dial has been cancelled + // Should we close it? I think not, we should just add it to the swarm + _, err := s.addConn(res.Conn, network.DirOutbound) + if err != nil { + // well duh, now we have to close it + res.Conn.Close() + } + } return } - ad := pending[res.addr] - ad.conn = res.conn - ad.err = res.err + ad := pending[res.Addr] - dialRequests := ad.requests - ad.requests = nil + if res.Conn != nil { + // we got a connection, add it to the swarm + conn, err := s.addConn(res.Conn, network.DirOutbound) + if err != nil { + // oops no, we failed to add it to the swarm + res.Conn.Close() + dispatchError(ad, err) + continue loop + } - if res.conn != nil { - // we got a connection, dispatch to still pending requests - for _, reqno := range dialRequests { + // dispatch to still pending requests + for _, reqno := range ad.requests { pr, ok := requests[reqno] if !ok { // it has already dispatched a connection continue } - pr.req.Resch <- DialResponse{Conn: res.conn} + pr.req.Resch <- DialResponse{Conn: conn} delete(requests, reqno) } + ad.conn = conn + ad.requests = nil + continue loop } - // it must be an error, accumulate it and dispatch dial error if the request has tried all addrs - for _, reqno := range dialRequests { - pr, ok := requests[reqno] - if !ok { - // has already been dispatched - continue - } - - // accumulate the error - pr.err.recordErr(res.addr, res.err) - - delete(pr.addrs, res.addr) - if len(pr.addrs) == 0 { - // all addrs have erred, dispatch dial error - pr.req.Resch <- DialResponse{Err: pr.err} - delete(requests, reqno) - } + // it must be an error -- add backoff if applicable and dispatch + if res.Err != context.Canceled { + s.backf.AddBackoff(p, res.Addr) } - // if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests - if res.err == ErrDialBackoff { - delete(pending, res.addr) - } + dispatchError(ad, res.Err) } } } @@ -536,41 +565,18 @@ func (s *Swarm) addrsForDial(ctx context.Context, p peer.ID) ([]ma.Multiaddr, er return goodAddrs, nil } -func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, resch chan dialComplete) { +func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, resch chan dialResult) error { // check the dial backoff if forceDirect, _ := network.GetForceDirectDial(ctx); !forceDirect { if s.backf.Backoff(p, addr) { - resch <- dialComplete{addr: addr, err: ErrDialBackoff} - return + return ErrDialBackoff } } // start the dial - dresch := make(chan dialResult) - s.limitedDial(ctx, p, addr, dresch) - select { - case res := <-dresch: - if res.Err != nil { - if res.Err != context.Canceled { - s.backf.AddBackoff(p, addr) - } - - resch <- dialComplete{addr: addr, err: res.Err} - return - } + s.limitedDial(ctx, p, addr, resch) - conn, err := s.addConn(res.Conn, network.DirOutbound) - if err != nil { - res.Conn.Close() - resch <- dialComplete{addr: addr, err: err} - return - } - - resch <- dialComplete{addr: addr, conn: conn} - - case <-ctx.Done(): - resch <- dialComplete{addr: addr, err: ctx.Err()} - } + return nil } func (s *Swarm) delayForNextDial(addr ma.Multiaddr) time.Duration { From 96723d14bccf86ffe3fae92c40edc25432327ee9 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 15:38:04 +0300 Subject: [PATCH 09/29] bump go version to 1.15 i want to use binary literals; technically only requires 1.13, but let's not be ancient --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index ae61917f..b8fd49cc 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/libp2p/go-libp2p-swarm -go 1.12 +go 1.15 require ( github.com/ipfs/go-log v1.0.4 From bbd0a01770a5c0db4d880a2535230fa1ade1aca4 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 16:12:45 +0300 Subject: [PATCH 10/29] batch dials together, rework address ranking --- swarm_dial.go | 163 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 107 insertions(+), 56 deletions(-) diff --git a/swarm_dial.go b/swarm_dial.go index 8f1e2c3c..3d8ec3df 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -58,9 +58,9 @@ var ( ) var ( - DelayDialPrivateAddr = 5 * time.Millisecond - DelayDialPublicAddr = 25 * time.Millisecond - DelayDialRelayAddr = 50 * time.Millisecond + delayDialPrivateAddr = 5 * time.Millisecond + delayDialPublicAddr = 25 * time.Millisecond + delayDialRelayAddr = 50 * time.Millisecond ) // DialAttempts governs how many times a goroutine will try to dial a given peer. @@ -361,6 +361,9 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial } var triggerDial <-chan time.Time + triggerNow := make(chan time.Time) + close(triggerNow) + var nextDial []ma.Multiaddr active := 0 done := false @@ -461,34 +464,46 @@ loop: nextDial = append(nextDial, todial...) nextDial = s.rankAddrs(nextDial) - if triggerDial == nil { - trigger := make(chan time.Time) - close(trigger) - triggerDial = trigger - } + // trigger a new dial now to account for the new addrs we added + triggerDial = triggerNow } case <-triggerDial: - if len(nextDial) == 0 { - triggerDial = nil - continue loop - } + // we dial batches of addresses together, logically belonging to the same batch + // after a batch of addresses has been dialed, we add a delay before initiating the next batch + dialed := false + last := 0 + next := 0 + for i, addr := range nextDial { + if dialed && !s.sameAddrBatch(nextDial[last], addr) { + break + } - next := nextDial[0] - nextDial = nextDial[1:] + next = i + 1 - // spawn the next dial - ad := pending[next] - err := s.dialNextAddr(ad.ctx, p, next, resch) - if err != nil { - dispatchError(ad, err) - continue loop + // spawn the dial + ad := pending[addr] + err := s.dialNextAddr(ad.ctx, p, addr, resch) + if err != nil { + dispatchError(ad, err) + continue + } + + dialed = true + last = i + active++ } - active++ + lastDial := nextDial[last] + nextDial = nextDial[next:] + if !dialed || len(nextDial) == 0 { + // we didn't dial anything because of backoff or we don't have any more addresses + triggerDial = nil + continue loop + } - // select an appropriate delay for the next dial trigger - delay := s.delayForNextDial(next) + // select an appropriate delay for the next dial batch + delay := s.delayForNextDial(lastDial) triggerDial = time.After(delay) case res := <-resch: @@ -516,6 +531,9 @@ loop: // oops no, we failed to add it to the swarm res.Conn.Close() dispatchError(ad, err) + if active == 0 && len(nextDial) > 0 { + triggerDial = triggerNow + } continue loop } @@ -543,6 +561,9 @@ loop: } dispatchError(ad, res.Err) + if active == 0 && len(nextDial) > 0 { + triggerDial = triggerNow + } } } } @@ -579,16 +600,37 @@ func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, return nil } +func (s *Swarm) sameAddrBatch(a, b ma.Multiaddr) bool { + // is it a relay addr? + if s.IsRelayAddr(a) { + return s.IsRelayAddr(b) + } + + // is it an expensive addr? + if s.IsExpensiveAddr(a) { + return s.IsExpensiveAddr(b) + } + + // is it a public addr? + if !manet.IsPrivateAddr(a) { + return !manet.IsPrivateAddr(b) && + s.IsFdConsumingAddr(a) == s.IsFdConsumingAddr(b) + } + + // it's a private addr + return manet.IsPrivateAddr(b) +} + func (s *Swarm) delayForNextDial(addr ma.Multiaddr) time.Duration { if _, err := addr.ValueForProtocol(ma.P_CIRCUIT); err == nil { - return DelayDialRelayAddr + return delayDialRelayAddr } if manet.IsPrivateAddr(addr) { - return DelayDialPrivateAddr + return delayDialPrivateAddr } - return DelayDialPublicAddr + return delayDialPublicAddr } func (s *Swarm) canDial(addr ma.Multiaddr) bool { @@ -601,43 +643,41 @@ func (s *Swarm) nonProxyAddr(addr ma.Multiaddr) bool { return !t.Proxy() } -// ranks addresses in descending order of preference for dialing -// Private UDP > Public UDP > Private TCP > Public TCP > UDP Relay server > TCP Relay server +// ranks addresses in descending order of preference for dialing, with the following rules: +// NonRelay > Relay +// NonWS > WS +// Private > Public +// UDP > TCP func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { - var localUdpAddrs []ma.Multiaddr // private udp - var relayUdpAddrs []ma.Multiaddr // relay udp - var othersUdp []ma.Multiaddr // public udp + addrTier := func(a ma.Multiaddr) (tier int) { + if s.IsRelayAddr(a) { + tier |= 0b1000 + } + if s.IsExpensiveAddr(a) { + tier |= 0b0100 + } + if !manet.IsPrivateAddr(a) { + tier |= 0b0010 + } + if s.IsFdConsumingAddr(a) { + tier |= 0b0001 + } - var localFdAddrs []ma.Multiaddr // private fd consuming - var relayFdAddrs []ma.Multiaddr // relay fd consuming - var othersFd []ma.Multiaddr // public fd consuming + return tier + } + tiers := make([][]ma.Multiaddr, 16) for _, a := range addrs { - if _, err := a.ValueForProtocol(ma.P_CIRCUIT); err == nil { - if s.IsFdConsumingAddr(a) { - relayFdAddrs = append(relayFdAddrs, a) - continue - } - relayUdpAddrs = append(relayUdpAddrs, a) - } else if manet.IsPrivateAddr(a) { - if s.IsFdConsumingAddr(a) { - localFdAddrs = append(localFdAddrs, a) - continue - } - localUdpAddrs = append(localUdpAddrs, a) - } else { - if s.IsFdConsumingAddr(a) { - othersFd = append(othersFd, a) - continue - } - othersUdp = append(othersUdp, a) - } + tier := addrTier(a) + tiers[tier] = append(tiers[tier], a) } - relays := append(relayUdpAddrs, relayFdAddrs...) - fds := append(localFdAddrs, othersFd...) + result := make([]ma.Multiaddr, 0, len(addrs)) + for _, tier := range tiers { + result = append(result, tier...) + } - return append(append(append(localUdpAddrs, othersUdp...), fds...), relays...) + return result } // filterKnownUndialables takes a list of multiaddrs, and removes those @@ -729,3 +769,14 @@ func (s *Swarm) IsFdConsumingAddr(addr ma.Multiaddr) bool { _, err2 := first.ValueForProtocol(ma.P_UNIX) return err1 == nil || err2 == nil } + +func (s *Swarm) IsExpensiveAddr(addr ma.Multiaddr) bool { + _, err1 := addr.ValueForProtocol(ma.P_WS) + _, err2 := addr.ValueForProtocol(ma.P_WSS) + return err1 == nil || err2 == nil +} + +func (s *Swarm) IsRelayAddr(addr ma.Multiaddr) bool { + _, err := addr.ValueForProtocol(ma.P_CIRCUIT) + return err == nil +} From a6c28389ecfb359fa9ba27f2c856bb9d3b269d57 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 16:28:10 +0300 Subject: [PATCH 11/29] tune down batch dial delays --- swarm_dial.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/swarm_dial.go b/swarm_dial.go index 3d8ec3df..2b7f0b03 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -58,9 +58,9 @@ var ( ) var ( - delayDialPrivateAddr = 5 * time.Millisecond - delayDialPublicAddr = 25 * time.Millisecond - delayDialRelayAddr = 50 * time.Millisecond + delayDialPrivateAddr = 1 * time.Millisecond + delayDialPublicAddr = 5 * time.Millisecond + delayDialRelayAddr = 10 * time.Millisecond ) // DialAttempts governs how many times a goroutine will try to dial a given peer. From 7ccf58e500b49e447530e1721c1a8e60369cf49a Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 16:42:04 +0300 Subject: [PATCH 12/29] use a timer instead of time.After --- swarm_dial.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/swarm_dial.go b/swarm_dial.go index 2b7f0b03..de16ba3f 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -361,9 +361,16 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial } var triggerDial <-chan time.Time + var triggerTimer *time.Timer triggerNow := make(chan time.Time) close(triggerNow) + defer func() { + if triggerTimer != nil { + triggerTimer.Stop() + } + }() + var nextDial []ma.Multiaddr active := 0 done := false @@ -504,7 +511,15 @@ loop: // select an appropriate delay for the next dial batch delay := s.delayForNextDial(lastDial) - triggerDial = time.After(delay) + if triggerTimer == nil { + triggerTimer = time.NewTimer(delay) + } else { + if !triggerTimer.Stop() && triggerDial != triggerTimer.C { + <-triggerTimer.C + } + triggerTimer.Reset(delay) + } + triggerDial = triggerTimer.C case res := <-resch: active-- From 0fc0ade25ac23fe349923132d9d8d5f116ab0037 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 17:46:14 +0300 Subject: [PATCH 13/29] kill dial jump delays --- swarm_dial.go | 43 ++----------------------------------------- 1 file changed, 2 insertions(+), 41 deletions(-) diff --git a/swarm_dial.go b/swarm_dial.go index de16ba3f..1fe37a1f 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -57,12 +57,6 @@ var ( ErrGaterDisallowedConnection = errors.New("gater disallows connection to peer") ) -var ( - delayDialPrivateAddr = 1 * time.Millisecond - delayDialPublicAddr = 5 * time.Millisecond - delayDialRelayAddr = 10 * time.Millisecond -) - // DialAttempts governs how many times a goroutine will try to dial a given peer. // Note: this is down to one, as we have _too many dials_ atm. To add back in, // add loop back in Dial(.) @@ -360,17 +354,10 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial } } - var triggerDial <-chan time.Time - var triggerTimer *time.Timer - triggerNow := make(chan time.Time) + var triggerDial <-chan struct{} + triggerNow := make(chan struct{}) close(triggerNow) - defer func() { - if triggerTimer != nil { - triggerTimer.Stop() - } - }() - var nextDial []ma.Multiaddr active := 0 done := false @@ -501,26 +488,12 @@ loop: active++ } - lastDial := nextDial[last] nextDial = nextDial[next:] if !dialed || len(nextDial) == 0 { // we didn't dial anything because of backoff or we don't have any more addresses triggerDial = nil - continue loop } - // select an appropriate delay for the next dial batch - delay := s.delayForNextDial(lastDial) - if triggerTimer == nil { - triggerTimer = time.NewTimer(delay) - } else { - if !triggerTimer.Stop() && triggerDial != triggerTimer.C { - <-triggerTimer.C - } - triggerTimer.Reset(delay) - } - triggerDial = triggerTimer.C - case res := <-resch: active-- @@ -636,18 +609,6 @@ func (s *Swarm) sameAddrBatch(a, b ma.Multiaddr) bool { return manet.IsPrivateAddr(b) } -func (s *Swarm) delayForNextDial(addr ma.Multiaddr) time.Duration { - if _, err := addr.ValueForProtocol(ma.P_CIRCUIT); err == nil { - return delayDialRelayAddr - } - - if manet.IsPrivateAddr(addr) { - return delayDialPrivateAddr - } - - return delayDialPublicAddr -} - func (s *Swarm) canDial(addr ma.Multiaddr) bool { t := s.TransportForDialing(addr) return t != nil && t.CanDial(addr) From 12a0cdb776f9ffa3eee75238f8fdcdc39cfc98df Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 18:10:43 +0300 Subject: [PATCH 14/29] add TestDialExistingConnection --- dial_test.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/dial_test.go b/dial_test.go index 9fc5df41..c8b8bbe9 100644 --- a/dial_test.go +++ b/dial_test.go @@ -524,3 +524,29 @@ func TestDialPeerFailed(t *testing.T) { t.Errorf("expected %d errors, got %d", expectedErrorsCount, len(dialErr.DialErrors)) } } + +func TestDialExistingConnection(t *testing.T) { + ctx := context.Background() + + swarms := makeSwarms(ctx, t, 2) + defer closeSwarms(swarms) + s1 := swarms[0] + s2 := swarms[1] + + s1.Peerstore().AddAddrs(s2.LocalPeer(), s2.ListenAddresses(), peerstore.PermanentAddrTTL) + + c1, err := s1.DialPeer(ctx, s2.LocalPeer()) + if err != nil { + t.Fatal(err) + } + + c2, err := s1.DialPeer(ctx, s2.LocalPeer()) + if err != nil { + t.Fatal(err) + } + + if c1 != c2 { + t.Fatal("expecting the same connection from both dials") + } + +} From e7b6af60ca827a7ac737218d1dfae60e956fa294 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 18:18:22 +0300 Subject: [PATCH 15/29] do a last ditch check for acceptable connections before dispatching a dial error --- swarm_dial.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/swarm_dial.go b/swarm_dial.go index 1fe37a1f..2d3255b4 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -341,7 +341,14 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial delete(pr.addrs, ad.addr) if len(pr.addrs) == 0 { // all addrs have erred, dispatch dial error - pr.req.Resch <- DialResponse{Err: pr.err} + // but first do a last one check in case an acceptable connection has landed from + // a simultaneous dial that started later and added new acceptable addrs + c := s.bestAcceptableConnToPeer(pr.req.Ctx, p) + if c != nil { + pr.req.Resch <- DialResponse{Conn: c} + } else { + pr.req.Resch <- DialResponse{Err: pr.err} + } delete(requests, reqno) } } From 580a818b48798344c6f2ece64ba67150e3f3ae0a Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 18:30:30 +0300 Subject: [PATCH 16/29] merge dial contexts where possible --- swarm_dial.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/swarm_dial.go b/swarm_dial.go index 2d3255b4..513ce1c6 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -320,6 +320,7 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial conn *Conn err error requests []int + dialed bool } reqno := 0 @@ -454,6 +455,9 @@ loop: requests[reqno] = pr for _, ad := range tojoin { + if !ad.dialed { + ad.ctx = s.mergeDialContexts(ad.ctx, req.Ctx) + } ad.requests = append(ad.requests, reqno) } @@ -490,6 +494,7 @@ loop: continue } + ad.dialed = true dialed = true last = i active++ @@ -581,6 +586,18 @@ func (s *Swarm) addrsForDial(ctx context.Context, p peer.ID) ([]ma.Multiaddr, er return goodAddrs, nil } +func (s *Swarm) mergeDialContexts(a, b context.Context) context.Context { + dialCtx := a + + if simConnect, reason := network.GetSimultaneousConnect(b); simConnect { + if simConnect, _ := network.GetSimultaneousConnect(a); !simConnect { + dialCtx = network.WithSimultaneousConnect(dialCtx, reason) + } + } + + return dialCtx +} + func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, resch chan dialResult) error { // check the dial backoff if forceDirect, _ := network.GetForceDirectDial(ctx); !forceDirect { From 699b4d1160fc8dca33b92517dc2b8c4d8439f9f1 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 18:55:52 +0300 Subject: [PATCH 17/29] add TestDialSimultaneousJoin test --- dial_test.go | 101 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/dial_test.go b/dial_test.go index c8b8bbe9..86390d62 100644 --- a/dial_test.go +++ b/dial_test.go @@ -9,6 +9,7 @@ import ( addrutil "github.com/libp2p/go-addr-util" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/transport" @@ -548,5 +549,105 @@ func TestDialExistingConnection(t *testing.T) { if c1 != c2 { t.Fatal("expecting the same connection from both dials") } +} + +func newSilentListener(t *testing.T) ([]ma.Multiaddr, net.Listener) { + lst, err := net.Listen("tcp4", "localhost:0") + if err != nil { + t.Fatal(err) + } + addr, err := manet.FromNetAddr(lst.Addr()) + if err != nil { + t.Fatal(err) + } + addrs := []ma.Multiaddr{addr} + addrs, err = addrutil.ResolveUnspecifiedAddresses(addrs, nil) + if err != nil { + t.Fatal(err) + } + return addrs, lst + +} + +func TestDialSimultaneousJoin(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + swarms := makeSwarms(ctx, t, 2) + s1 := swarms[0] + s2 := swarms[1] + defer s1.Close() + defer s2.Close() + + s2silentAddrs, s2silentListener := newSilentListener(t) + go acceptAndHang(s2silentListener) + + connch := make(chan network.Conn, 512) + + // start a dial to s2 through the silent addr + go func() { + s1.Peerstore().AddAddrs(s2.LocalPeer(), s2silentAddrs, peerstore.PermanentAddrTTL) + + c, err := s1.DialPeer(ctx, s2.LocalPeer()) + if err != nil { + t.Fatal(err) + } + + t.Logf("first dial succedded; conn: %+v", c) + + connch <- c + }() + + // wait a bit for the dial to take hold + time.Sleep(100 * time.Millisecond) + + // start a second dial to s2 that uses the real s2 addrs + go func() { + s2addrs, err := s2.InterfaceListenAddresses() + if err != nil { + t.Fatal(err) + } + s1.Peerstore().AddAddrs(s2.LocalPeer(), s2addrs[:1], peerstore.PermanentAddrTTL) + + c, err := s1.DialPeer(ctx, s2.LocalPeer()) + if err != nil { + t.Fatal(err) + } + t.Logf("second dial succedded; conn: %+v", c) + + connch <- c + }() + + // wait for the second dial to finish + c2 := <-connch + + // start a third dial to s2, this should get the existing connection from the successful dial + go func() { + c, err := s1.DialPeer(ctx, s2.LocalPeer()) + if err != nil { + t.Fatal(err) + } + + t.Logf("third dial succedded; conn: %+v", c) + + connch <- c + }() + + c3 := <-connch + + if c2 != c3 { + t.Fatal("expected c2 and c3 to be the same") + } + + // next, the first dial to s2, using the silent addr should timeout; at this point the dial + // will error but the last chance check will see the existing connection and return it + select { + case c1 := <-connch: + if c1 != c2 { + t.Fatal("expected c1 and c2 to be the same") + } + case <-time.After(2 * transport.DialTimeout): + t.Fatal("no connection from first dial") + } } From b67b73604c789fab097fa4116131fd082dc1a563 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 20:04:42 +0300 Subject: [PATCH 18/29] don't add backoff if we have successfully connected for consistency with the old dialer behaviour. --- swarm_dial.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/swarm_dial.go b/swarm_dial.go index 513ce1c6..1a5cb120 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -368,7 +368,8 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial var nextDial []ma.Multiaddr active := 0 - done := false + done := false // true when the request channel has been closed + connected := false // true when a connection has been successfully established resch := make(chan dialResult) @@ -509,6 +510,10 @@ loop: case res := <-resch: active-- + if res.Conn != nil { + connected = true + } + if done && active == 0 { if res.Conn != nil { // we got an actual connection, but the dial has been cancelled @@ -556,7 +561,9 @@ loop: } // it must be an error -- add backoff if applicable and dispatch - if res.Err != context.Canceled { + if res.Err != context.Canceled && !connected { + // we only add backoff if there has not been a successful connection + // for consistency with the old dialer behavior. s.backf.AddBackoff(p, res.Addr) } From 0538806be79cf321e79253cecaec8c9b0d2a7167 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 20:14:52 +0300 Subject: [PATCH 19/29] fix TestConnectednessCorrect we might get more connections because simultaneous dials can succeed and we have both TCP and QUIC addrs by default --- swarm_net_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/swarm_net_test.go b/swarm_net_test.go index 2ba64edb..64121bb1 100644 --- a/swarm_net_test.go +++ b/swarm_net_test.go @@ -57,8 +57,8 @@ func TestConnectednessCorrect(t *testing.T) { t.Fatal("expected net 0 to have two peers") } - if len(nets[2].Conns()) != 2 { - t.Fatal("expected net 2 to have two conns") + if len(nets[2].Peers()) != 2 { + t.Fatal("expected net 2 to have two peers") } if len(nets[1].ConnsToPeer(nets[3].LocalPeer())) != 0 { From acc35e8afbf5916f2b2f4c3daaae4833c4848a47 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 23:04:42 +0300 Subject: [PATCH 20/29] don't store the active dial if it errors while starting the worker --- dial_sync.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dial_sync.go b/dial_sync.go index 32196cb2..ae3578a5 100644 --- a/dial_sync.go +++ b/dial_sync.go @@ -97,13 +97,14 @@ func (ds *DialSync) getActiveDial(p peer.ID) (*activeDial, error) { reqch: make(chan DialRequest), ds: ds, } - ds.dials[p] = actd err := ds.dialWorker(adctx, p, actd.reqch) if err != nil { cancel() return nil, err } + + ds.dials[p] = actd } // increase ref count before dropping dialsLk From 27f6c394db45454e0127d8f852dd2b069d535004 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 23:04:49 +0300 Subject: [PATCH 21/29] add TestSelfDial --- dial_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/dial_test.go b/dial_test.go index 86390d62..26bf47ca 100644 --- a/dial_test.go +++ b/dial_test.go @@ -651,3 +651,23 @@ func TestDialSimultaneousJoin(t *testing.T) { t.Fatal("no connection from first dial") } } + +func TestDialSelf(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + swarms := makeSwarms(ctx, t, 2) + s1 := swarms[0] + defer s1.Close() + + _, err := s1.DialPeer(ctx, s1.LocalPeer()) + if err != ErrDialToSelf { + t.Fatal("expected error from self dial") + } + + // do it twice to make sure we get a new active dial object that fails again + _, err = s1.DialPeer(ctx, s1.LocalPeer()) + if err != ErrDialToSelf { + t.Fatal("expected error from self dial") + } +} From 1624828080b83bf2b7ea7b0d4c8290b3f73f1b79 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 1 Apr 2021 12:13:03 +0300 Subject: [PATCH 22/29] make DialRequest and DialResponse private --- dial_sync.go | 14 +++++++------- dial_sync_test.go | 20 +++++++++----------- swarm_dial.go | 42 +++++++++++++++++++++--------------------- 3 files changed, 37 insertions(+), 39 deletions(-) diff --git a/dial_sync.go b/dial_sync.go index ae3578a5..24781dd1 100644 --- a/dial_sync.go +++ b/dial_sync.go @@ -12,8 +12,8 @@ import ( // TODO: change this text when we fix the bug var errDialCanceled = errors.New("dial was aborted internally, likely due to https://git.io/Je2wW") -// DialFunc is the type of function expected by DialSync. -type DialWorkerFunc func(context.Context, peer.ID, <-chan DialRequest) error +// DialWorerFunc is used by DialSync to spawn a new dial worker +type DialWorkerFunc func(context.Context, peer.ID, <-chan dialRequest) error // NewDialSync constructs a new DialSync func NewDialSync(worker DialWorkerFunc) *DialSync { @@ -38,7 +38,7 @@ type activeDial struct { ctx context.Context cancel func() - reqch chan DialRequest + reqch chan dialRequest ds *DialSync } @@ -64,16 +64,16 @@ func (ad *activeDial) dial(ctx context.Context, p peer.ID) (*Conn, error) { dialCtx = network.WithSimultaneousConnect(dialCtx, reason) } - resch := make(chan DialResponse, 1) + resch := make(chan dialResponse, 1) select { - case ad.reqch <- DialRequest{Ctx: dialCtx, Resch: resch}: + case ad.reqch <- dialRequest{ctx: dialCtx, resch: resch}: case <-ctx.Done(): return nil, ctx.Err() } select { case res := <-resch: - return res.Conn, res.Err + return res.conn, res.err case <-ctx.Done(): return nil, ctx.Err() } @@ -94,7 +94,7 @@ func (ds *DialSync) getActiveDial(p peer.ID) (*activeDial, error) { id: p, ctx: adctx, cancel: cancel, - reqch: make(chan DialRequest), + reqch: make(chan dialRequest), ds: ds, } diff --git a/dial_sync_test.go b/dial_sync_test.go index f1a9f8a5..e414dd52 100644 --- a/dial_sync_test.go +++ b/dial_sync_test.go @@ -1,4 +1,4 @@ -package swarm_test +package swarm import ( "context" @@ -7,8 +7,6 @@ import ( "testing" "time" - . "github.com/libp2p/go-libp2p-swarm" - "github.com/libp2p/go-libp2p-core/peer" ) @@ -16,7 +14,7 @@ func getMockDialFunc() (DialWorkerFunc, func(), context.Context, <-chan struct{} dfcalls := make(chan struct{}, 512) // buffer it large enough that we won't care dialctx, cancel := context.WithCancel(context.Background()) ch := make(chan struct{}) - f := func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { + f := func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error { dfcalls <- struct{}{} go func() { defer cancel() @@ -29,9 +27,9 @@ func getMockDialFunc() (DialWorkerFunc, func(), context.Context, <-chan struct{} select { case <-ch: - req.Resch <- DialResponse{Conn: new(Conn)} + req.resch <- dialResponse{conn: new(Conn)} case <-ctx.Done(): - req.Resch <- DialResponse{Err: ctx.Err()} + req.resch <- dialResponse{err: ctx.Err()} return } case <-ctx.Done(): @@ -189,7 +187,7 @@ func TestDialSyncAllCancel(t *testing.T) { func TestFailFirst(t *testing.T) { var count int - f := func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { + f := func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error { go func() { for { select { @@ -199,9 +197,9 @@ func TestFailFirst(t *testing.T) { } if count > 0 { - req.Resch <- DialResponse{Conn: new(Conn)} + req.resch <- dialResponse{conn: new(Conn)} } else { - req.Resch <- DialResponse{Err: fmt.Errorf("gophers ate the modem")} + req.resch <- dialResponse{err: fmt.Errorf("gophers ate the modem")} } count++ @@ -236,7 +234,7 @@ func TestFailFirst(t *testing.T) { } func TestStressActiveDial(t *testing.T) { - ds := NewDialSync(func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { + ds := NewDialSync(func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error { go func() { for { select { @@ -245,7 +243,7 @@ func TestStressActiveDial(t *testing.T) { return } - req.Resch <- DialResponse{} + req.resch <- dialResponse{} case <-ctx.Done(): return } diff --git a/swarm_dial.go b/swarm_dial.go index 1a5cb120..ab95a461 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -285,18 +285,18 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { // TODO explain how all this works ////////////////////////////////////////////////////////////////////////////////// -type DialRequest struct { - Ctx context.Context - Resch chan DialResponse +type dialRequest struct { + ctx context.Context + resch chan dialResponse } -type DialResponse struct { - Conn *Conn - Err error +type dialResponse struct { + conn *Conn + err error } // dialWorker is an active dial goroutine that synchronizes and executes concurrent dials -func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { +func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error { if p == s.local { return ErrDialToSelf } @@ -305,11 +305,11 @@ func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan DialRequ return nil } -func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { +func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan dialRequest) { defer s.limiter.clearAllPeerDials(p) type pendRequest struct { - req DialRequest // the original request + req dialRequest // the original request err *DialError // dial error accumulator addrs map[ma.Multiaddr]struct{} // pending addr dials } @@ -344,11 +344,11 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial // all addrs have erred, dispatch dial error // but first do a last one check in case an acceptable connection has landed from // a simultaneous dial that started later and added new acceptable addrs - c := s.bestAcceptableConnToPeer(pr.req.Ctx, p) + c := s.bestAcceptableConnToPeer(pr.req.ctx, p) if c != nil { - pr.req.Resch <- DialResponse{Conn: c} + pr.req.resch <- dialResponse{conn: c} } else { - pr.req.Resch <- DialResponse{Err: pr.err} + pr.req.resch <- dialResponse{err: pr.err} } delete(requests, reqno) } @@ -390,15 +390,15 @@ loop: return } - c := s.bestAcceptableConnToPeer(req.Ctx, p) + c := s.bestAcceptableConnToPeer(req.ctx, p) if c != nil { - req.Resch <- DialResponse{Conn: c} + req.resch <- dialResponse{conn: c} continue loop } - addrs, err := s.addrsForDial(req.Ctx, p) + addrs, err := s.addrsForDial(req.ctx, p) if err != nil { - req.Resch <- DialResponse{Err: err} + req.resch <- dialResponse{err: err} continue loop } @@ -430,7 +430,7 @@ loop: if ad.conn != nil { // dial to this addr was successful, complete the request - req.Resch <- DialResponse{Conn: ad.conn} + req.resch <- dialResponse{conn: ad.conn} continue loop } @@ -447,7 +447,7 @@ loop: if len(todial) == 0 && len(tojoin) == 0 { // all request applicable addrs have been dialed, we must have errored - req.Resch <- DialResponse{Err: pr.err} + req.resch <- dialResponse{err: pr.err} continue loop } @@ -457,14 +457,14 @@ loop: for _, ad := range tojoin { if !ad.dialed { - ad.ctx = s.mergeDialContexts(ad.ctx, req.Ctx) + ad.ctx = s.mergeDialContexts(ad.ctx, req.ctx) } ad.requests = append(ad.requests, reqno) } if len(todial) > 0 { for _, a := range todial { - pending[a] = &addrDial{addr: a, ctx: req.Ctx, requests: []int{reqno}} + pending[a] = &addrDial{addr: a, ctx: req.ctx, requests: []int{reqno}} } nextDial = append(nextDial, todial...) @@ -550,7 +550,7 @@ loop: continue } - pr.req.Resch <- DialResponse{Conn: conn} + pr.req.resch <- dialResponse{conn: conn} delete(requests, reqno) } From 13d355657bc492a627db7ed863c69c27132a9491 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 1 Apr 2021 12:16:06 +0300 Subject: [PATCH 23/29] add comment about the necessity of removing the address tracking when a dial backoff occurs --- swarm_dial.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/swarm_dial.go b/swarm_dial.go index ab95a461..29e3a8d7 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -356,7 +356,12 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan dial ad.requests = nil - // if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests + // if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests. + // this is necessary to support active listen scenarios, where a new dial comes in while + // another dial is in progress, and needs to do a direct connection without inhibitions from + // dial backoff. + // it is also necessary to preserve consisent behaviour with the old dialer -- TestDialBackoff + // regresses without this. if err == ErrDialBackoff { delete(pending, ad.addr) } From 4a69fa2f85aaad952b9d7d6ccece688dc8880543 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 1 Apr 2021 12:20:25 +0300 Subject: [PATCH 24/29] remove dial batching --- swarm_dial.go | 47 +++-------------------------------------------- 1 file changed, 3 insertions(+), 44 deletions(-) diff --git a/swarm_dial.go b/swarm_dial.go index 29e3a8d7..c656df5a 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -480,37 +480,17 @@ loop: } case <-triggerDial: - // we dial batches of addresses together, logically belonging to the same batch - // after a batch of addresses has been dialed, we add a delay before initiating the next batch - dialed := false - last := 0 - next := 0 - for i, addr := range nextDial { - if dialed && !s.sameAddrBatch(nextDial[last], addr) { - break - } - - next = i + 1 - + for _, addr := range nextDial { // spawn the dial ad := pending[addr] err := s.dialNextAddr(ad.ctx, p, addr, resch) if err != nil { dispatchError(ad, err) - continue } - - ad.dialed = true - dialed = true - last = i - active++ } - nextDial = nextDial[next:] - if !dialed || len(nextDial) == 0 { - // we didn't dial anything because of backoff or we don't have any more addresses - triggerDial = nil - } + nextDial = nil + triggerDial = nil case res := <-resch: active-- @@ -624,27 +604,6 @@ func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, return nil } -func (s *Swarm) sameAddrBatch(a, b ma.Multiaddr) bool { - // is it a relay addr? - if s.IsRelayAddr(a) { - return s.IsRelayAddr(b) - } - - // is it an expensive addr? - if s.IsExpensiveAddr(a) { - return s.IsExpensiveAddr(b) - } - - // is it a public addr? - if !manet.IsPrivateAddr(a) { - return !manet.IsPrivateAddr(b) && - s.IsFdConsumingAddr(a) == s.IsFdConsumingAddr(b) - } - - // it's a private addr - return manet.IsPrivateAddr(b) -} - func (s *Swarm) canDial(addr ma.Multiaddr) bool { t := s.TransportForDialing(addr) return t != nil && t.CanDial(addr) From 43b03828ad884e250a9727d3da184d5f0777d895 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 1 Apr 2021 12:41:58 +0300 Subject: [PATCH 25/29] add new TestDialSelf so that we exercise the dialWorker dial to self error path --- dial_sync_test.go | 23 +++++++++++++++++++++++ dial_test.go | 8 +------- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/dial_sync_test.go b/dial_sync_test.go index e414dd52..e5a7da69 100644 --- a/dial_sync_test.go +++ b/dial_sync_test.go @@ -270,3 +270,26 @@ func TestStressActiveDial(t *testing.T) { wg.Wait() } + +func TestDialSelf(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + self := peer.ID("ABC") + s := NewSwarm(ctx, self, nil, nil) + defer s.Close() + + ds := NewDialSync(s.dialWorker) + + // this should fail + _, err := ds.DialLock(ctx, self) + if err != ErrDialToSelf { + t.Fatal("expected error from self dial") + } + + // do it twice to make sure we get a new active dial object that fails again + _, err = ds.DialLock(ctx, self) + if err != ErrDialToSelf { + t.Fatal("expected error from self dial") + } +} diff --git a/dial_test.go b/dial_test.go index 26bf47ca..2a966a46 100644 --- a/dial_test.go +++ b/dial_test.go @@ -652,7 +652,7 @@ func TestDialSimultaneousJoin(t *testing.T) { } } -func TestDialSelf(t *testing.T) { +func TestDialSelf2(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -664,10 +664,4 @@ func TestDialSelf(t *testing.T) { if err != ErrDialToSelf { t.Fatal("expected error from self dial") } - - // do it twice to make sure we get a new active dial object that fails again - _, err = s1.DialPeer(ctx, s1.LocalPeer()) - if err != ErrDialToSelf { - t.Fatal("expected error from self dial") - } } From 17bc04b6ef33ad41afff444f9abc5858fdc43208 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 1 Apr 2021 15:36:55 +0300 Subject: [PATCH 26/29] make DialWorkerFunc, NewDialSync private they work with private data types, so there is no point in having them public --- dial_sync.go | 8 ++++---- dial_sync_test.go | 18 ++++++++---------- swarm.go | 2 +- 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/dial_sync.go b/dial_sync.go index 24781dd1..31790166 100644 --- a/dial_sync.go +++ b/dial_sync.go @@ -13,10 +13,10 @@ import ( var errDialCanceled = errors.New("dial was aborted internally, likely due to https://git.io/Je2wW") // DialWorerFunc is used by DialSync to spawn a new dial worker -type DialWorkerFunc func(context.Context, peer.ID, <-chan dialRequest) error +type dialWorkerFunc func(context.Context, peer.ID, <-chan dialRequest) error -// NewDialSync constructs a new DialSync -func NewDialSync(worker DialWorkerFunc) *DialSync { +// newDialSync constructs a new DialSync +func newDialSync(worker dialWorkerFunc) *DialSync { return &DialSync{ dials: make(map[peer.ID]*activeDial), dialWorker: worker, @@ -28,7 +28,7 @@ func NewDialSync(worker DialWorkerFunc) *DialSync { type DialSync struct { dials map[peer.ID]*activeDial dialsLk sync.Mutex - dialWorker DialWorkerFunc + dialWorker dialWorkerFunc } type activeDial struct { diff --git a/dial_sync_test.go b/dial_sync_test.go index e5a7da69..59ace9ae 100644 --- a/dial_sync_test.go +++ b/dial_sync_test.go @@ -10,7 +10,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) -func getMockDialFunc() (DialWorkerFunc, func(), context.Context, <-chan struct{}) { +func getMockDialFunc() (dialWorkerFunc, func(), context.Context, <-chan struct{}) { dfcalls := make(chan struct{}, 512) // buffer it large enough that we won't care dialctx, cancel := context.WithCancel(context.Background()) ch := make(chan struct{}) @@ -48,7 +48,7 @@ func getMockDialFunc() (DialWorkerFunc, func(), context.Context, <-chan struct{} func TestBasicDialSync(t *testing.T) { df, done, _, callsch := getMockDialFunc() - dsync := NewDialSync(df) + dsync := newDialSync(df) p := peer.ID("testpeer") @@ -86,7 +86,7 @@ func TestBasicDialSync(t *testing.T) { func TestDialSyncCancel(t *testing.T) { df, done, _, dcall := getMockDialFunc() - dsync := NewDialSync(df) + dsync := newDialSync(df) p := peer.ID("testpeer") @@ -137,7 +137,7 @@ func TestDialSyncCancel(t *testing.T) { func TestDialSyncAllCancel(t *testing.T) { df, done, dctx, _ := getMockDialFunc() - dsync := NewDialSync(df) + dsync := newDialSync(df) p := peer.ID("testpeer") @@ -211,7 +211,7 @@ func TestFailFirst(t *testing.T) { return nil } - ds := NewDialSync(f) + ds := newDialSync(f) p := peer.ID("testing") @@ -234,7 +234,7 @@ func TestFailFirst(t *testing.T) { } func TestStressActiveDial(t *testing.T) { - ds := NewDialSync(func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error { + ds := newDialSync(func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error { go func() { for { select { @@ -279,16 +279,14 @@ func TestDialSelf(t *testing.T) { s := NewSwarm(ctx, self, nil, nil) defer s.Close() - ds := NewDialSync(s.dialWorker) - // this should fail - _, err := ds.DialLock(ctx, self) + _, err := s.dsync.DialLock(ctx, self) if err != ErrDialToSelf { t.Fatal("expected error from self dial") } // do it twice to make sure we get a new active dial object that fails again - _, err = ds.DialLock(ctx, self) + _, err = s.dsync.DialLock(ctx, self) if err != ErrDialToSelf { t.Fatal("expected error from self dial") } diff --git a/swarm.go b/swarm.go index c57c563c..95b164d1 100644 --- a/swarm.go +++ b/swarm.go @@ -121,7 +121,7 @@ func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc } } - s.dsync = NewDialSync(s.dialWorker) + s.dsync = newDialSync(s.dialWorker) s.limiter = newDialLimiter(s.dialAddr, s.IsFdConsumingAddr) s.proc = goprocessctx.WithContext(ctx) s.ctx = goprocessctx.OnClosingContext(s.proc) From be3e9404cc91b9fd10b76c54a101c1104bebae57 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 1 Apr 2021 23:11:25 +0300 Subject: [PATCH 27/29] rename dialWorker to startDialWorker --- swarm.go | 2 +- swarm_dial.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/swarm.go b/swarm.go index 95b164d1..1abe1227 100644 --- a/swarm.go +++ b/swarm.go @@ -121,7 +121,7 @@ func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc } } - s.dsync = newDialSync(s.dialWorker) + s.dsync = newDialSync(s.startDialWorker) s.limiter = newDialLimiter(s.dialAddr, s.IsFdConsumingAddr) s.proc = goprocessctx.WithContext(ctx) s.ctx = goprocessctx.OnClosingContext(s.proc) diff --git a/swarm_dial.go b/swarm_dial.go index c656df5a..d60c9f39 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -295,8 +295,8 @@ type dialResponse struct { err error } -// dialWorker is an active dial goroutine that synchronizes and executes concurrent dials -func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error { +// startDialWorker starts an active dial goroutine that synchronizes and executes concurrent dials +func (s *Swarm) startDialWorker(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error { if p == s.local { return ErrDialToSelf } From df0ab8b92e26c745eae752f64c58cd03fa0e7610 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 1 Apr 2021 23:14:59 +0300 Subject: [PATCH 28/29] make addr utility funcs standalone and not exported --- swarm_dial.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/swarm_dial.go b/swarm_dial.go index d60c9f39..3cc3275e 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -621,10 +621,10 @@ func (s *Swarm) nonProxyAddr(addr ma.Multiaddr) bool { // UDP > TCP func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { addrTier := func(a ma.Multiaddr) (tier int) { - if s.IsRelayAddr(a) { + if isRelayAddr(a) { tier |= 0b1000 } - if s.IsExpensiveAddr(a) { + if isExpensiveAddr(a) { tier |= 0b0100 } if !manet.IsPrivateAddr(a) { @@ -741,13 +741,13 @@ func (s *Swarm) IsFdConsumingAddr(addr ma.Multiaddr) bool { return err1 == nil || err2 == nil } -func (s *Swarm) IsExpensiveAddr(addr ma.Multiaddr) bool { +func isExpensiveAddr(addr ma.Multiaddr) bool { _, err1 := addr.ValueForProtocol(ma.P_WS) _, err2 := addr.ValueForProtocol(ma.P_WSS) return err1 == nil || err2 == nil } -func (s *Swarm) IsRelayAddr(addr ma.Multiaddr) bool { +func isRelayAddr(addr ma.Multiaddr) bool { _, err := addr.ValueForProtocol(ma.P_CIRCUIT) return err == nil } From 084fffea6139193b9af3e91cb63476ffda664beb Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 1 Apr 2021 23:31:29 +0300 Subject: [PATCH 29/29] make IsFdConsumingAddr a standalone utility func --- swarm.go | 2 +- swarm_dial.go | 4 ++-- swarm_test.go | 48 ---------------------------------------------- util_test.go | 53 +++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 56 insertions(+), 51 deletions(-) create mode 100644 util_test.go diff --git a/swarm.go b/swarm.go index 1abe1227..00c43f7a 100644 --- a/swarm.go +++ b/swarm.go @@ -122,7 +122,7 @@ func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc } s.dsync = newDialSync(s.startDialWorker) - s.limiter = newDialLimiter(s.dialAddr, s.IsFdConsumingAddr) + s.limiter = newDialLimiter(s.dialAddr, isFdConsumingAddr) s.proc = goprocessctx.WithContext(ctx) s.ctx = goprocessctx.OnClosingContext(s.proc) s.backf.init(s.ctx) diff --git a/swarm_dial.go b/swarm_dial.go index 3cc3275e..14129257 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -630,7 +630,7 @@ func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { if !manet.IsPrivateAddr(a) { tier |= 0b0010 } - if s.IsFdConsumingAddr(a) { + if isFdConsumingAddr(a) { tier |= 0b0001 } @@ -726,7 +726,7 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (tra // A Non-circuit address which has the TCP/UNIX protocol is deemed FD consuming. // For a circuit-relay address, we look at the address of the relay server/proxy // and use the same logic as above to decide. -func (s *Swarm) IsFdConsumingAddr(addr ma.Multiaddr) bool { +func isFdConsumingAddr(addr ma.Multiaddr) bool { first, _ := ma.SplitFunc(addr, func(c ma.Component) bool { return c.Protocol().Code == ma.P_CIRCUIT }) diff --git a/swarm_test.go b/swarm_test.go index 9b1e9c42..4e9801ad 100644 --- a/swarm_test.go +++ b/swarm_test.go @@ -15,7 +15,6 @@ import ( "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" - "github.com/libp2p/go-libp2p-core/test" . "github.com/libp2p/go-libp2p-swarm" . "github.com/libp2p/go-libp2p-swarm/testing" @@ -387,53 +386,6 @@ func TestConnectionGating(t *testing.T) { } } -func TestIsFdConsuming(t *testing.T) { - tcs := map[string]struct { - addr string - isFdConsuming bool - }{ - "tcp": { - addr: "/ip4/127.0.0.1/tcp/20", - isFdConsuming: true, - }, - "quic": { - addr: "/ip4/127.0.0.1/udp/0/quic", - isFdConsuming: false, - }, - "addr-without-registered-transport": { - addr: "/ip4/127.0.0.1/tcp/20/ws", - isFdConsuming: true, - }, - "relay-tcp": { - addr: fmt.Sprintf("/ip4/127.0.0.1/tcp/20/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)), - isFdConsuming: true, - }, - "relay-quic": { - addr: fmt.Sprintf("/ip4/127.0.0.1/udp/20/quic/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)), - isFdConsuming: false, - }, - "relay-without-serveraddr": { - addr: fmt.Sprintf("/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)), - isFdConsuming: true, - }, - "relay-without-registered-transport-server": { - addr: fmt.Sprintf("/ip4/127.0.0.1/tcp/20/ws/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)), - isFdConsuming: true, - }, - } - - ctx := context.Background() - sw := GenSwarm(t, ctx) - sk := sw.Peerstore().PrivKey(sw.LocalPeer()) - require.NotNil(t, sk) - - for name := range tcs { - maddr, err := ma.NewMultiaddr(tcs[name].addr) - require.NoError(t, err, name) - require.Equal(t, tcs[name].isFdConsuming, sw.IsFdConsumingAddr(maddr), name) - } -} - func TestNoDial(t *testing.T) { ctx := context.Background() swarms := makeSwarms(ctx, t, 2) diff --git a/util_test.go b/util_test.go new file mode 100644 index 00000000..11124adb --- /dev/null +++ b/util_test.go @@ -0,0 +1,53 @@ +package swarm + +import ( + "fmt" + "testing" + + "github.com/libp2p/go-libp2p-core/test" + ma "github.com/multiformats/go-multiaddr" + + "github.com/stretchr/testify/require" +) + +func TestIsFdConsuming(t *testing.T) { + tcs := map[string]struct { + addr string + isFdConsuming bool + }{ + "tcp": { + addr: "/ip4/127.0.0.1/tcp/20", + isFdConsuming: true, + }, + "quic": { + addr: "/ip4/127.0.0.1/udp/0/quic", + isFdConsuming: false, + }, + "addr-without-registered-transport": { + addr: "/ip4/127.0.0.1/tcp/20/ws", + isFdConsuming: true, + }, + "relay-tcp": { + addr: fmt.Sprintf("/ip4/127.0.0.1/tcp/20/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)), + isFdConsuming: true, + }, + "relay-quic": { + addr: fmt.Sprintf("/ip4/127.0.0.1/udp/20/quic/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)), + isFdConsuming: false, + }, + "relay-without-serveraddr": { + addr: fmt.Sprintf("/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)), + isFdConsuming: true, + }, + "relay-without-registered-transport-server": { + addr: fmt.Sprintf("/ip4/127.0.0.1/tcp/20/ws/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)), + isFdConsuming: true, + }, + } + + for name := range tcs { + maddr, err := ma.NewMultiaddr(tcs[name].addr) + require.NoError(t, err, name) + require.Equal(t, tcs[name].isFdConsuming, isFdConsumingAddr(maddr), name) + } +}