From c8044d2b710c976a266999f5e5dc8ffbb24ac24c Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 2 Jul 2022 12:59:57 +0200 Subject: [PATCH] use autoscaling limits --- defaults.go | 9 +- go.mod | 2 +- go.sum | 4 +- limits.go | 215 ++++++++++-------------- p2p/test/resource-manager/rcmgr_test.go | 57 ++++--- 5 files changed, 134 insertions(+), 153 deletions(-) diff --git a/defaults.go b/defaults.go index 7cca89a6cf..d95a6a9582 100644 --- a/defaults.go +++ b/defaults.go @@ -87,10 +87,9 @@ var DefaultEnableRelay = func(cfg *Config) error { var DefaultResourceManager = func(cfg *Config) error { // Default memory limit: 1/8th of total memory, minimum 128MB, maximum 1GB - limiter := rcmgr.NewDefaultLimiter() - SetDefaultServiceLimits(limiter) - - mgr, err := rcmgr.NewResourceManager(limiter) + limits := rcmgr.DefaultLimits + SetDefaultServiceLimits(&limits) + mgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(limits.AutoScale())) if err != nil { return err } @@ -98,7 +97,7 @@ var DefaultResourceManager = func(cfg *Config) error { return cfg.Apply(ResourceManager(mgr)) } -// DefaultConnManager creates a default connection manager +// DefaultConnectionManager creates a default connection manager var DefaultConnectionManager = func(cfg *Config) error { mgr, err := connmgr.NewConnManager(160, 192) if err != nil { diff --git a/go.mod b/go.mod index 3a521902b2..9b44b3ee47 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/libp2p/go-libp2p-asn-util v0.2.0 github.com/libp2p/go-libp2p-core v0.19.0 github.com/libp2p/go-libp2p-peerstore v0.7.1 - github.com/libp2p/go-libp2p-resource-manager v0.4.0 + github.com/libp2p/go-libp2p-resource-manager v0.4.1-0.20220702093928-7ceb0b850c67 github.com/libp2p/go-libp2p-testing v0.10.0 github.com/libp2p/go-mplex v0.7.0 github.com/libp2p/go-msgio v0.2.0 diff --git a/go.sum b/go.sum index 5d651d33b9..1cfe007d70 100644 --- a/go.sum +++ b/go.sum @@ -362,8 +362,8 @@ github.com/libp2p/go-libp2p-core v0.19.0 h1:KDw7hanmh0EuVdZqsHCAzmkdiYMk5uR5h0UG github.com/libp2p/go-libp2p-core v0.19.0/go.mod h1:AkA+FUKQfYt1FLNef5fOPlo/naAWjKy/RCjkcPjqzYg= github.com/libp2p/go-libp2p-peerstore v0.7.1 h1:7FpALlqR+3+oOBXdzm3AVt0vjMYLW1b7jM03E4iEHlw= github.com/libp2p/go-libp2p-peerstore v0.7.1/go.mod h1:cdUWTHro83vpg6unCpGUr8qJoX3e93Vy8o97u5ppIM0= -github.com/libp2p/go-libp2p-resource-manager v0.4.0 h1:+/gSDLSJ+n8qHVdMoY7wfrk3EvvL9Ktw6sAyKKZPQRw= -github.com/libp2p/go-libp2p-resource-manager v0.4.0/go.mod h1:+5QPxFLRXYlRDZ0P1bPKE7zyZDvex5TLVOqePwRmwfc= +github.com/libp2p/go-libp2p-resource-manager v0.4.1-0.20220702093928-7ceb0b850c67 h1:LEikdvzrVWpCC+ZiV1nA5xidrWR9w4QJDN0Lk3lJfH4= +github.com/libp2p/go-libp2p-resource-manager v0.4.1-0.20220702093928-7ceb0b850c67/go.mod h1:CggtV6EZb+Y0dGh41q5ezO4udcVKyhcEFpydHD8EMe0= github.com/libp2p/go-libp2p-testing v0.1.1/go.mod h1:xaZWMJrPUM5GlDBxCeGUi7kI4eqnjVyavGroI2nxEM0= github.com/libp2p/go-libp2p-testing v0.10.0 h1:LO7wuUPPNAe1D1s0HZ+9WoROaGIn/MEl1wtugXuTRzg= github.com/libp2p/go-libp2p-testing v0.10.0/go.mod h1:jJ4fiJwyZ3UlPTLcnz/sEmPPSviQ79Q0MVD/CykzrP0= diff --git a/limits.go b/limits.go index a2a7664f43..fd71bca76c 100644 --- a/limits.go +++ b/limits.go @@ -15,143 +15,114 @@ import ( ) // SetDefaultServiceLimits sets the default limits for bundled libp2p services -// -// More specifically this sets the following limits: -// - identify: -// 128 streams in, 128 streams out, 256 streams total, 4MB min, 64MB max svc memory -// 16/16/32 streams per peer -// - ping: -// 128 streams in, 128 sreams out, 256 streasms total, 4MB min, 64MB max svc memory -// 2/3/4 streams per peer -// - autonat -// 128 streams in, 128 streams out, 128 streams total, 4MB min, 64MB max svc memory -// 2/2/2 streams per peer -// - holepunch -// 128 streams in, 128 streams out, 128 streams total, 4MB min, 64MB max svc memory -// 2/2/2 streams per peer -// - relay v1 and v2 (separate services) -// 1024 streams in, 1024 streams out, 1024 streams total, 4MB min, 64MB max svc memory -// 64/64/64 streams per peer -func SetDefaultServiceLimits(limiter *rcmgr.BasicLimiter) { - if limiter.ServiceLimits == nil { - limiter.ServiceLimits = make(map[string]rcmgr.Limit) - } - if limiter.ServicePeerLimits == nil { - limiter.ServicePeerLimits = make(map[string]rcmgr.Limit) - } - if limiter.ProtocolLimits == nil { - limiter.ProtocolLimits = make(map[protocol.ID]rcmgr.Limit) - } - if limiter.ProtocolPeerLimits == nil { - limiter.ProtocolPeerLimits = make(map[protocol.ID]rcmgr.Limit) - } - +func SetDefaultServiceLimits(config *rcmgr.ScalingLimitConfig) { // identify - setServiceLimits(limiter, identify.ServiceName, - limiter.DefaultServiceLimits. - WithMemoryLimit(1, 4<<20, 64<<20). // max 64MB service memory - WithStreamLimit(128, 128, 256), // max 256 streams -- symmetric - peerLimit(16, 16, 32)) - - setProtocolLimits(limiter, identify.ID, - limiter.DefaultProtocolLimits.WithMemoryLimit(1, 4<<20, 32<<20), - peerLimit(16, 16, 32)) - setProtocolLimits(limiter, identify.IDPush, - limiter.DefaultProtocolLimits.WithMemoryLimit(1, 4<<20, 32<<20), - peerLimit(16, 16, 32)) - setProtocolLimits(limiter, identify.IDDelta, - limiter.DefaultProtocolLimits.WithMemoryLimit(1, 4<<20, 32<<20), - peerLimit(16, 16, 32)) + config.AddServiceLimit( + identify.ServiceName, + rcmgr.BaseLimit{StreamsInbound: 64, StreamsOutbound: 64, Streams: 128, Memory: 4 << 20}, + rcmgr.BaseLimitIncrease{StreamsInbound: 64, StreamsOutbound: 64, Streams: 128, Memory: 4 << 20}, + ) + config.AddServicePeerLimit( + identify.ServiceName, + rcmgr.BaseLimit{StreamsInbound: 16, StreamsOutbound: 16, Streams: 32, Memory: 1 << 20}, + rcmgr.BaseLimitIncrease{}, + ) + for _, id := range [...]protocol.ID{identify.ID, identify.IDDelta, identify.IDPush} { + config.AddProtocolLimit( + id, + rcmgr.BaseLimit{StreamsInbound: 64, StreamsOutbound: 64, Streams: 128, Memory: 4 << 20}, + rcmgr.BaseLimitIncrease{StreamsInbound: 64, StreamsOutbound: 64, Streams: 128, Memory: 4 << 20}, + ) + config.AddProtocolPeerLimit( + id, + rcmgr.BaseLimit{StreamsInbound: 16, StreamsOutbound: 16, Streams: 32, Memory: 32 * (256<<20 + 16<<10)}, + rcmgr.BaseLimitIncrease{}, + ) + } - // ping - setServiceLimits(limiter, ping.ServiceName, - limiter.DefaultServiceLimits. - WithMemoryLimit(1, 4<<20, 64<<20). // max 64MB service memory - WithStreamLimit(128, 128, 128), // max 128 streams - asymmetric - peerLimit(2, 3, 4)) - setProtocolLimits(limiter, ping.ID, - limiter.DefaultProtocolLimits.WithMemoryLimit(1, 4<<20, 64<<20), - peerLimit(2, 3, 4)) + // ping + addServiceAndProtocolLimit(config, + ping.ServiceName, ping.ID, + rcmgr.BaseLimit{StreamsInbound: 64, StreamsOutbound: 64, Streams: 64, Memory: 4 << 20}, + rcmgr.BaseLimitIncrease{StreamsInbound: 64, StreamsOutbound: 64, Streams: 64, Memory: 4 << 20}, + ) + addServicePeerAndProtocolPeerLimit( + config, + ping.ServiceName, ping.ID, + rcmgr.BaseLimit{StreamsInbound: 2, StreamsOutbound: 3, Streams: 4, Memory: 32 * (256<<20 + 16<<10)}, + rcmgr.BaseLimitIncrease{}, + ) // autonat - setServiceLimits(limiter, autonat.ServiceName, - limiter.DefaultServiceLimits. - WithMemoryLimit(1, 4<<20, 64<<20). // max 64MB service memory - WithStreamLimit(128, 128, 128), // max 128 streams - asymmetric - peerLimit(2, 2, 2)) - setProtocolLimits(limiter, autonat.AutoNATProto, - limiter.DefaultProtocolLimits.WithMemoryLimit(1, 4<<20, 64<<20), - peerLimit(2, 2, 2)) + addServiceAndProtocolLimit(config, + autonat.ServiceName, autonat.AutoNATProto, + rcmgr.BaseLimit{StreamsInbound: 64, StreamsOutbound: 64, Streams: 64, Memory: 4 << 20}, + rcmgr.BaseLimitIncrease{StreamsInbound: 4, StreamsOutbound: 4, Streams: 4, Memory: 2 << 20}, + ) + addServicePeerAndProtocolPeerLimit( + config, + autonat.ServiceName, autonat.AutoNATProto, + rcmgr.BaseLimit{StreamsInbound: 2, StreamsOutbound: 2, Streams: 2, Memory: 1 << 20}, + rcmgr.BaseLimitIncrease{}, + ) // holepunch - setServiceLimits(limiter, holepunch.ServiceName, - limiter.DefaultServiceLimits. - WithMemoryLimit(1, 4<<20, 64<<20). // max 64MB service memory - WithStreamLimit(128, 128, 256), // max 256 streams - symmetric - peerLimit(2, 2, 2)) - setProtocolLimits(limiter, holepunch.Protocol, - limiter.DefaultProtocolLimits.WithMemoryLimit(1, 4<<20, 64<<20), - peerLimit(2, 2, 2)) + addServiceAndProtocolLimit(config, + holepunch.ServiceName, holepunch.Protocol, + rcmgr.BaseLimit{StreamsInbound: 32, StreamsOutbound: 32, Streams: 64, Memory: 4 << 20}, + rcmgr.BaseLimitIncrease{StreamsInbound: 8, StreamsOutbound: 8, Streams: 16, Memory: 4 << 20}, + ) + addServicePeerAndProtocolPeerLimit(config, + holepunch.ServiceName, holepunch.Protocol, + rcmgr.BaseLimit{StreamsInbound: 2, StreamsOutbound: 2, Streams: 2, Memory: 1 << 20}, + rcmgr.BaseLimitIncrease{}, + ) // relay/v1 - setServiceLimits(limiter, relayv1.ServiceName, - limiter.DefaultServiceLimits. - WithMemoryLimit(1, 4<<20, 64<<20). // max 64MB service memory - WithStreamLimit(1024, 1024, 1024), // max 1024 streams - asymmetric - peerLimit(64, 64, 64)) + config.AddServiceLimit( + relayv1.ServiceName, + rcmgr.BaseLimit{StreamsInbound: 256, StreamsOutbound: 256, Streams: 256, Memory: 16 << 20}, + rcmgr.BaseLimitIncrease{StreamsInbound: 256, StreamsOutbound: 256, Streams: 256, Memory: 16 << 20}, + ) + config.AddServicePeerLimit( + relayv1.ServiceName, + rcmgr.BaseLimit{StreamsInbound: 64, StreamsOutbound: 64, Streams: 64, Memory: 1 << 20}, + rcmgr.BaseLimitIncrease{}, + ) // relay/v2 - setServiceLimits(limiter, relayv2.ServiceName, - limiter.DefaultServiceLimits. - WithMemoryLimit(1, 4<<20, 64<<20). // max 64MB service memory - WithStreamLimit(1024, 1024, 1024), // max 1024 streams - asymmetric - peerLimit(64, 64, 64)) + config.AddServiceLimit( + relayv2.ServiceName, + rcmgr.BaseLimit{StreamsInbound: 256, StreamsOutbound: 256, Streams: 256, Memory: 16 << 20}, + rcmgr.BaseLimitIncrease{StreamsInbound: 256, StreamsOutbound: 256, Streams: 256, Memory: 16 << 20}, + ) + config.AddServicePeerLimit( + relayv2.ServiceName, + rcmgr.BaseLimit{StreamsInbound: 64, StreamsOutbound: 64, Streams: 64, Memory: 1 << 20}, + rcmgr.BaseLimitIncrease{}, + ) // circuit protocols, both client and service - setProtocolLimits(limiter, circuit.ProtoIDv1, - limiter.DefaultProtocolLimits. - WithMemoryLimit(1, 4<<20, 64<<20). - WithStreamLimit(1280, 1280, 1280), - peerLimit(128, 128, 128)) - setProtocolLimits(limiter, circuit.ProtoIDv2Hop, - limiter.DefaultProtocolLimits. - WithMemoryLimit(1, 4<<20, 64<<20). - WithStreamLimit(1280, 1280, 1280), - peerLimit(128, 128, 128)) - setProtocolLimits(limiter, circuit.ProtoIDv2Stop, - limiter.DefaultProtocolLimits. - WithMemoryLimit(1, 4<<20, 64<<20). - WithStreamLimit(1280, 1280, 1280), - peerLimit(128, 128, 128)) - -} - -func setServiceLimits(limiter *rcmgr.BasicLimiter, svc string, limit rcmgr.Limit, peerLimit rcmgr.Limit) { - if _, ok := limiter.ServiceLimits[svc]; !ok { - limiter.ServiceLimits[svc] = limit - } - if _, ok := limiter.ServicePeerLimits[svc]; !ok { - limiter.ServicePeerLimits[svc] = peerLimit + for _, proto := range [...]protocol.ID{circuit.ProtoIDv1, circuit.ProtoIDv2Hop, circuit.ProtoIDv2Stop} { + config.AddProtocolLimit( + proto, + rcmgr.BaseLimit{StreamsInbound: 640, StreamsOutbound: 640, Streams: 640, Memory: 16 << 20}, + rcmgr.BaseLimitIncrease{StreamsInbound: 640, StreamsOutbound: 640, Streams: 640, Memory: 16 << 20}, + ) + config.AddProtocolPeerLimit( + proto, + rcmgr.BaseLimit{StreamsInbound: 128, StreamsOutbound: 128, Streams: 128, Memory: 32 << 20}, + rcmgr.BaseLimitIncrease{}, + ) } } -func setProtocolLimits(limiter *rcmgr.BasicLimiter, proto protocol.ID, limit rcmgr.Limit, peerLimit rcmgr.Limit) { - if _, ok := limiter.ProtocolLimits[proto]; !ok { - limiter.ProtocolLimits[proto] = limit - } - if _, ok := limiter.ProtocolPeerLimits[proto]; !ok { - limiter.ProtocolPeerLimits[proto] = peerLimit - } +func addServiceAndProtocolLimit(config *rcmgr.ScalingLimitConfig, service string, proto protocol.ID, limit rcmgr.BaseLimit, increase rcmgr.BaseLimitIncrease) { + config.AddServiceLimit(service, limit, increase) + config.AddProtocolLimit(proto, limit, increase) } -func peerLimit(numStreamsIn, numStreamsOut, numStreamsTotal int) rcmgr.Limit { - return &rcmgr.StaticLimit{ - // memory: 256kb for window buffers plus some change for message buffers per stream - Memory: int64(numStreamsTotal * (256<<10 + 16384)), - BaseLimit: rcmgr.BaseLimit{ - StreamsInbound: numStreamsIn, - StreamsOutbound: numStreamsOut, - Streams: numStreamsTotal, - }, - } +func addServicePeerAndProtocolPeerLimit(config *rcmgr.ScalingLimitConfig, service string, proto protocol.ID, limit rcmgr.BaseLimit, increase rcmgr.BaseLimitIncrease) { + config.AddServicePeerLimit(service, limit, increase) + config.AddProtocolPeerLimit(proto, limit, increase) } diff --git a/p2p/test/resource-manager/rcmgr_test.go b/p2p/test/resource-manager/rcmgr_test.go index e2a40d5f1b..ae361113c1 100644 --- a/p2p/test/resource-manager/rcmgr_test.go +++ b/p2p/test/resource-manager/rcmgr_test.go @@ -18,18 +18,15 @@ import ( "github.com/stretchr/testify/require" ) -func makeRcmgrOption(t *testing.T, limiter *rcmgr.BasicLimiter, test string) func(int) libp2p.Option { +func makeRcmgrOption(t *testing.T, cfg rcmgr.LimitConfig, test string) func(int) libp2p.Option { return func(i int) libp2p.Option { var opts []rcmgr.Option - if os.Getenv("LIBP2P_TEST_RCMGR_TRACE") == "1" { opts = append(opts, rcmgr.WithTrace(fmt.Sprintf("%s-%d.json.gz", test, i))) } - mgr, err := rcmgr.NewResourceManager(limiter, opts...) - if err != nil { - t.Fatal(err) - } + mgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(cfg), opts...) + require.NoError(t, err) return libp2p.ResourceManager(mgr) } } @@ -50,11 +47,15 @@ func waitForConnection(t *testing.T, src, dest *Echo) { func TestResourceManagerConnInbound(t *testing.T) { // this test checks that we can not exceed the inbound conn limit at system level // we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns - limiter := rcmgr.NewDefaultLimiter() - limiter.SystemLimits = limiter.SystemLimits.WithConnLimit(3, 1024, 1024) - limiter.DefaultPeerLimits = limiter.DefaultPeerLimits.WithConnLimit(1, 1, 1) - - echos := createEchos(t, 5, makeRcmgrOption(t, limiter, "TestResourceManagerConnInbound")) + cfg := rcmgr.DefaultLimits.AutoScale() + cfg.System.ConnsInbound = 3 + cfg.System.ConnsOutbound = 1024 + cfg.System.Conns = 1024 + cfg.PeerDefault.Conns = 1 + cfg.PeerDefault.ConnsInbound = 1 + cfg.PeerDefault.ConnsOutbound = 1 + + echos := createEchos(t, 5, makeRcmgrOption(t, cfg, "TestResourceManagerConnInbound")) defer closeEchos(echos) defer closeRcmgrs(echos) @@ -82,10 +83,14 @@ func TestResourceManagerConnInbound(t *testing.T) { func TestResourceManagerConnOutbound(t *testing.T) { // this test checks that we can not exceed the inbound conn limit at system level // we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns - limiter := rcmgr.NewDefaultLimiter() - limiter.SystemLimits = limiter.SystemLimits.WithConnLimit(1024, 3, 1024) - limiter.DefaultPeerLimits = limiter.DefaultPeerLimits.WithConnLimit(1, 1, 1) - echos := createEchos(t, 5, makeRcmgrOption(t, limiter, "TestResourceManagerConnOutbound")) + cfg := rcmgr.DefaultLimits.AutoScale() + cfg.System.ConnsInbound = 1024 + cfg.System.ConnsOutbound = 3 + cfg.System.Conns = 1024 + cfg.PeerDefault.Conns = 1 + cfg.PeerDefault.ConnsInbound = 1 + cfg.PeerDefault.ConnsOutbound = 1 + echos := createEchos(t, 5, makeRcmgrOption(t, cfg, "TestResourceManagerConnOutbound")) defer closeEchos(echos) defer closeRcmgrs(echos) @@ -113,9 +118,11 @@ func TestResourceManagerConnOutbound(t *testing.T) { func TestResourceManagerServiceInbound(t *testing.T) { // this test checks that we can not exceed the inbound stream limit at service level // we specify: 3 streams for the service, and we try to create 4 streams - limiter := rcmgr.NewDefaultLimiter() - limiter.DefaultServiceLimits = limiter.DefaultServiceLimits.WithStreamLimit(3, 1024, 1024) - echos := createEchos(t, 5, makeRcmgrOption(t, limiter, "TestResourceManagerServiceInbound")) + cfg := rcmgr.DefaultLimits.AutoScale() + cfg.ServiceDefault.StreamsInbound = 3 + cfg.ServiceDefault.StreamsOutbound = 1024 + cfg.ServiceDefault.Streams = 1024 + echos := createEchos(t, 5, makeRcmgrOption(t, cfg, "TestResourceManagerServiceInbound")) defer closeEchos(echos) defer closeRcmgrs(echos) @@ -164,11 +171,15 @@ func TestResourceManagerServiceInbound(t *testing.T) { func TestResourceManagerServicePeerInbound(t *testing.T) { // this test checks that we cannot exceed the per peer inbound stream limit at service level // we specify: 2 streams per peer for echo, and we try to create 3 streams - limiter := rcmgr.NewDefaultLimiter() - limiter.ServicePeerLimits = map[string]rcmgr.Limit{ - EchoService: limiter.DefaultPeerLimits.WithStreamLimit(2, 1024, 1024), - } - echos := createEchos(t, 5, makeRcmgrOption(t, limiter, "TestResourceManagerServicePeerInbound")) + cfg := rcmgr.DefaultLimits + cfg.AddServicePeerLimit( + EchoService, + rcmgr.BaseLimit{StreamsInbound: 2, StreamsOutbound: 1024, Streams: 1024, Memory: 9999999}, + rcmgr.BaseLimitIncrease{}, + ) + limits := cfg.AutoScale() + + echos := createEchos(t, 5, makeRcmgrOption(t, limits, "TestResourceManagerServicePeerInbound")) defer closeEchos(echos) defer closeRcmgrs(echos)