Skip to content

Commit

Permalink
use autoscaling limits
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Jul 3, 2022
1 parent 0d7f192 commit c8044d2
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 153 deletions.
9 changes: 4 additions & 5 deletions defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,17 @@ var DefaultEnableRelay = func(cfg *Config) error {

var DefaultResourceManager = func(cfg *Config) error {
// Default memory limit: 1/8th of total memory, minimum 128MB, maximum 1GB
limiter := rcmgr.NewDefaultLimiter()
SetDefaultServiceLimits(limiter)

mgr, err := rcmgr.NewResourceManager(limiter)
limits := rcmgr.DefaultLimits
SetDefaultServiceLimits(&limits)
mgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(limits.AutoScale()))
if err != nil {
return err
}

return cfg.Apply(ResourceManager(mgr))
}

// DefaultConnManager creates a default connection manager
// DefaultConnectionManager creates a default connection manager
var DefaultConnectionManager = func(cfg *Config) error {
mgr, err := connmgr.NewConnManager(160, 192)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/libp2p/go-libp2p-asn-util v0.2.0
github.com/libp2p/go-libp2p-core v0.19.0
github.com/libp2p/go-libp2p-peerstore v0.7.1
github.com/libp2p/go-libp2p-resource-manager v0.4.0
github.com/libp2p/go-libp2p-resource-manager v0.4.1-0.20220702093928-7ceb0b850c67
github.com/libp2p/go-libp2p-testing v0.10.0
github.com/libp2p/go-mplex v0.7.0
github.com/libp2p/go-msgio v0.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,8 @@ github.com/libp2p/go-libp2p-core v0.19.0 h1:KDw7hanmh0EuVdZqsHCAzmkdiYMk5uR5h0UG
github.com/libp2p/go-libp2p-core v0.19.0/go.mod h1:AkA+FUKQfYt1FLNef5fOPlo/naAWjKy/RCjkcPjqzYg=
github.com/libp2p/go-libp2p-peerstore v0.7.1 h1:7FpALlqR+3+oOBXdzm3AVt0vjMYLW1b7jM03E4iEHlw=
github.com/libp2p/go-libp2p-peerstore v0.7.1/go.mod h1:cdUWTHro83vpg6unCpGUr8qJoX3e93Vy8o97u5ppIM0=
github.com/libp2p/go-libp2p-resource-manager v0.4.0 h1:+/gSDLSJ+n8qHVdMoY7wfrk3EvvL9Ktw6sAyKKZPQRw=
github.com/libp2p/go-libp2p-resource-manager v0.4.0/go.mod h1:+5QPxFLRXYlRDZ0P1bPKE7zyZDvex5TLVOqePwRmwfc=
github.com/libp2p/go-libp2p-resource-manager v0.4.1-0.20220702093928-7ceb0b850c67 h1:LEikdvzrVWpCC+ZiV1nA5xidrWR9w4QJDN0Lk3lJfH4=
github.com/libp2p/go-libp2p-resource-manager v0.4.1-0.20220702093928-7ceb0b850c67/go.mod h1:CggtV6EZb+Y0dGh41q5ezO4udcVKyhcEFpydHD8EMe0=
github.com/libp2p/go-libp2p-testing v0.1.1/go.mod h1:xaZWMJrPUM5GlDBxCeGUi7kI4eqnjVyavGroI2nxEM0=
github.com/libp2p/go-libp2p-testing v0.10.0 h1:LO7wuUPPNAe1D1s0HZ+9WoROaGIn/MEl1wtugXuTRzg=
github.com/libp2p/go-libp2p-testing v0.10.0/go.mod h1:jJ4fiJwyZ3UlPTLcnz/sEmPPSviQ79Q0MVD/CykzrP0=
Expand Down
215 changes: 93 additions & 122 deletions limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,143 +15,114 @@ import (
)

// SetDefaultServiceLimits sets the default limits for bundled libp2p services
//
// More specifically this sets the following limits:
// - identify:
// 128 streams in, 128 streams out, 256 streams total, 4MB min, 64MB max svc memory
// 16/16/32 streams per peer
// - ping:
// 128 streams in, 128 sreams out, 256 streasms total, 4MB min, 64MB max svc memory
// 2/3/4 streams per peer
// - autonat
// 128 streams in, 128 streams out, 128 streams total, 4MB min, 64MB max svc memory
// 2/2/2 streams per peer
// - holepunch
// 128 streams in, 128 streams out, 128 streams total, 4MB min, 64MB max svc memory
// 2/2/2 streams per peer
// - relay v1 and v2 (separate services)
// 1024 streams in, 1024 streams out, 1024 streams total, 4MB min, 64MB max svc memory
// 64/64/64 streams per peer
func SetDefaultServiceLimits(limiter *rcmgr.BasicLimiter) {
if limiter.ServiceLimits == nil {
limiter.ServiceLimits = make(map[string]rcmgr.Limit)
}
if limiter.ServicePeerLimits == nil {
limiter.ServicePeerLimits = make(map[string]rcmgr.Limit)
}
if limiter.ProtocolLimits == nil {
limiter.ProtocolLimits = make(map[protocol.ID]rcmgr.Limit)
}
if limiter.ProtocolPeerLimits == nil {
limiter.ProtocolPeerLimits = make(map[protocol.ID]rcmgr.Limit)
}

func SetDefaultServiceLimits(config *rcmgr.ScalingLimitConfig) {
// identify
setServiceLimits(limiter, identify.ServiceName,
limiter.DefaultServiceLimits.
WithMemoryLimit(1, 4<<20, 64<<20). // max 64MB service memory
WithStreamLimit(128, 128, 256), // max 256 streams -- symmetric
peerLimit(16, 16, 32))

setProtocolLimits(limiter, identify.ID,
limiter.DefaultProtocolLimits.WithMemoryLimit(1, 4<<20, 32<<20),
peerLimit(16, 16, 32))
setProtocolLimits(limiter, identify.IDPush,
limiter.DefaultProtocolLimits.WithMemoryLimit(1, 4<<20, 32<<20),
peerLimit(16, 16, 32))
setProtocolLimits(limiter, identify.IDDelta,
limiter.DefaultProtocolLimits.WithMemoryLimit(1, 4<<20, 32<<20),
peerLimit(16, 16, 32))
config.AddServiceLimit(
identify.ServiceName,
rcmgr.BaseLimit{StreamsInbound: 64, StreamsOutbound: 64, Streams: 128, Memory: 4 << 20},
rcmgr.BaseLimitIncrease{StreamsInbound: 64, StreamsOutbound: 64, Streams: 128, Memory: 4 << 20},
)
config.AddServicePeerLimit(
identify.ServiceName,
rcmgr.BaseLimit{StreamsInbound: 16, StreamsOutbound: 16, Streams: 32, Memory: 1 << 20},
rcmgr.BaseLimitIncrease{},
)
for _, id := range [...]protocol.ID{identify.ID, identify.IDDelta, identify.IDPush} {
config.AddProtocolLimit(
id,
rcmgr.BaseLimit{StreamsInbound: 64, StreamsOutbound: 64, Streams: 128, Memory: 4 << 20},
rcmgr.BaseLimitIncrease{StreamsInbound: 64, StreamsOutbound: 64, Streams: 128, Memory: 4 << 20},
)
config.AddProtocolPeerLimit(
id,
rcmgr.BaseLimit{StreamsInbound: 16, StreamsOutbound: 16, Streams: 32, Memory: 32 * (256<<20 + 16<<10)},
rcmgr.BaseLimitIncrease{},
)
}

// ping
setServiceLimits(limiter, ping.ServiceName,
limiter.DefaultServiceLimits.
WithMemoryLimit(1, 4<<20, 64<<20). // max 64MB service memory
WithStreamLimit(128, 128, 128), // max 128 streams - asymmetric
peerLimit(2, 3, 4))
setProtocolLimits(limiter, ping.ID,
limiter.DefaultProtocolLimits.WithMemoryLimit(1, 4<<20, 64<<20),
peerLimit(2, 3, 4))
// ping
addServiceAndProtocolLimit(config,
ping.ServiceName, ping.ID,
rcmgr.BaseLimit{StreamsInbound: 64, StreamsOutbound: 64, Streams: 64, Memory: 4 << 20},
rcmgr.BaseLimitIncrease{StreamsInbound: 64, StreamsOutbound: 64, Streams: 64, Memory: 4 << 20},
)
addServicePeerAndProtocolPeerLimit(
config,
ping.ServiceName, ping.ID,
rcmgr.BaseLimit{StreamsInbound: 2, StreamsOutbound: 3, Streams: 4, Memory: 32 * (256<<20 + 16<<10)},
rcmgr.BaseLimitIncrease{},
)

// autonat
setServiceLimits(limiter, autonat.ServiceName,
limiter.DefaultServiceLimits.
WithMemoryLimit(1, 4<<20, 64<<20). // max 64MB service memory
WithStreamLimit(128, 128, 128), // max 128 streams - asymmetric
peerLimit(2, 2, 2))
setProtocolLimits(limiter, autonat.AutoNATProto,
limiter.DefaultProtocolLimits.WithMemoryLimit(1, 4<<20, 64<<20),
peerLimit(2, 2, 2))
addServiceAndProtocolLimit(config,
autonat.ServiceName, autonat.AutoNATProto,
rcmgr.BaseLimit{StreamsInbound: 64, StreamsOutbound: 64, Streams: 64, Memory: 4 << 20},
rcmgr.BaseLimitIncrease{StreamsInbound: 4, StreamsOutbound: 4, Streams: 4, Memory: 2 << 20},
)
addServicePeerAndProtocolPeerLimit(
config,
autonat.ServiceName, autonat.AutoNATProto,
rcmgr.BaseLimit{StreamsInbound: 2, StreamsOutbound: 2, Streams: 2, Memory: 1 << 20},
rcmgr.BaseLimitIncrease{},
)

// holepunch
setServiceLimits(limiter, holepunch.ServiceName,
limiter.DefaultServiceLimits.
WithMemoryLimit(1, 4<<20, 64<<20). // max 64MB service memory
WithStreamLimit(128, 128, 256), // max 256 streams - symmetric
peerLimit(2, 2, 2))
setProtocolLimits(limiter, holepunch.Protocol,
limiter.DefaultProtocolLimits.WithMemoryLimit(1, 4<<20, 64<<20),
peerLimit(2, 2, 2))
addServiceAndProtocolLimit(config,
holepunch.ServiceName, holepunch.Protocol,
rcmgr.BaseLimit{StreamsInbound: 32, StreamsOutbound: 32, Streams: 64, Memory: 4 << 20},
rcmgr.BaseLimitIncrease{StreamsInbound: 8, StreamsOutbound: 8, Streams: 16, Memory: 4 << 20},
)
addServicePeerAndProtocolPeerLimit(config,
holepunch.ServiceName, holepunch.Protocol,
rcmgr.BaseLimit{StreamsInbound: 2, StreamsOutbound: 2, Streams: 2, Memory: 1 << 20},
rcmgr.BaseLimitIncrease{},
)

// relay/v1
setServiceLimits(limiter, relayv1.ServiceName,
limiter.DefaultServiceLimits.
WithMemoryLimit(1, 4<<20, 64<<20). // max 64MB service memory
WithStreamLimit(1024, 1024, 1024), // max 1024 streams - asymmetric
peerLimit(64, 64, 64))
config.AddServiceLimit(
relayv1.ServiceName,
rcmgr.BaseLimit{StreamsInbound: 256, StreamsOutbound: 256, Streams: 256, Memory: 16 << 20},
rcmgr.BaseLimitIncrease{StreamsInbound: 256, StreamsOutbound: 256, Streams: 256, Memory: 16 << 20},
)
config.AddServicePeerLimit(
relayv1.ServiceName,
rcmgr.BaseLimit{StreamsInbound: 64, StreamsOutbound: 64, Streams: 64, Memory: 1 << 20},
rcmgr.BaseLimitIncrease{},
)

// relay/v2
setServiceLimits(limiter, relayv2.ServiceName,
limiter.DefaultServiceLimits.
WithMemoryLimit(1, 4<<20, 64<<20). // max 64MB service memory
WithStreamLimit(1024, 1024, 1024), // max 1024 streams - asymmetric
peerLimit(64, 64, 64))
config.AddServiceLimit(
relayv2.ServiceName,
rcmgr.BaseLimit{StreamsInbound: 256, StreamsOutbound: 256, Streams: 256, Memory: 16 << 20},
rcmgr.BaseLimitIncrease{StreamsInbound: 256, StreamsOutbound: 256, Streams: 256, Memory: 16 << 20},
)
config.AddServicePeerLimit(
relayv2.ServiceName,
rcmgr.BaseLimit{StreamsInbound: 64, StreamsOutbound: 64, Streams: 64, Memory: 1 << 20},
rcmgr.BaseLimitIncrease{},
)

// circuit protocols, both client and service
setProtocolLimits(limiter, circuit.ProtoIDv1,
limiter.DefaultProtocolLimits.
WithMemoryLimit(1, 4<<20, 64<<20).
WithStreamLimit(1280, 1280, 1280),
peerLimit(128, 128, 128))
setProtocolLimits(limiter, circuit.ProtoIDv2Hop,
limiter.DefaultProtocolLimits.
WithMemoryLimit(1, 4<<20, 64<<20).
WithStreamLimit(1280, 1280, 1280),
peerLimit(128, 128, 128))
setProtocolLimits(limiter, circuit.ProtoIDv2Stop,
limiter.DefaultProtocolLimits.
WithMemoryLimit(1, 4<<20, 64<<20).
WithStreamLimit(1280, 1280, 1280),
peerLimit(128, 128, 128))

}

func setServiceLimits(limiter *rcmgr.BasicLimiter, svc string, limit rcmgr.Limit, peerLimit rcmgr.Limit) {
if _, ok := limiter.ServiceLimits[svc]; !ok {
limiter.ServiceLimits[svc] = limit
}
if _, ok := limiter.ServicePeerLimits[svc]; !ok {
limiter.ServicePeerLimits[svc] = peerLimit
for _, proto := range [...]protocol.ID{circuit.ProtoIDv1, circuit.ProtoIDv2Hop, circuit.ProtoIDv2Stop} {
config.AddProtocolLimit(
proto,
rcmgr.BaseLimit{StreamsInbound: 640, StreamsOutbound: 640, Streams: 640, Memory: 16 << 20},
rcmgr.BaseLimitIncrease{StreamsInbound: 640, StreamsOutbound: 640, Streams: 640, Memory: 16 << 20},
)
config.AddProtocolPeerLimit(
proto,
rcmgr.BaseLimit{StreamsInbound: 128, StreamsOutbound: 128, Streams: 128, Memory: 32 << 20},
rcmgr.BaseLimitIncrease{},
)
}
}

func setProtocolLimits(limiter *rcmgr.BasicLimiter, proto protocol.ID, limit rcmgr.Limit, peerLimit rcmgr.Limit) {
if _, ok := limiter.ProtocolLimits[proto]; !ok {
limiter.ProtocolLimits[proto] = limit
}
if _, ok := limiter.ProtocolPeerLimits[proto]; !ok {
limiter.ProtocolPeerLimits[proto] = peerLimit
}
func addServiceAndProtocolLimit(config *rcmgr.ScalingLimitConfig, service string, proto protocol.ID, limit rcmgr.BaseLimit, increase rcmgr.BaseLimitIncrease) {
config.AddServiceLimit(service, limit, increase)
config.AddProtocolLimit(proto, limit, increase)
}

func peerLimit(numStreamsIn, numStreamsOut, numStreamsTotal int) rcmgr.Limit {
return &rcmgr.StaticLimit{
// memory: 256kb for window buffers plus some change for message buffers per stream
Memory: int64(numStreamsTotal * (256<<10 + 16384)),
BaseLimit: rcmgr.BaseLimit{
StreamsInbound: numStreamsIn,
StreamsOutbound: numStreamsOut,
Streams: numStreamsTotal,
},
}
func addServicePeerAndProtocolPeerLimit(config *rcmgr.ScalingLimitConfig, service string, proto protocol.ID, limit rcmgr.BaseLimit, increase rcmgr.BaseLimitIncrease) {
config.AddServicePeerLimit(service, limit, increase)
config.AddProtocolPeerLimit(proto, limit, increase)
}
57 changes: 34 additions & 23 deletions p2p/test/resource-manager/rcmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@ import (
"github.com/stretchr/testify/require"
)

func makeRcmgrOption(t *testing.T, limiter *rcmgr.BasicLimiter, test string) func(int) libp2p.Option {
func makeRcmgrOption(t *testing.T, cfg rcmgr.LimitConfig, test string) func(int) libp2p.Option {
return func(i int) libp2p.Option {
var opts []rcmgr.Option

if os.Getenv("LIBP2P_TEST_RCMGR_TRACE") == "1" {
opts = append(opts, rcmgr.WithTrace(fmt.Sprintf("%s-%d.json.gz", test, i)))
}

mgr, err := rcmgr.NewResourceManager(limiter, opts...)
if err != nil {
t.Fatal(err)
}
mgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(cfg), opts...)
require.NoError(t, err)
return libp2p.ResourceManager(mgr)
}
}
Expand All @@ -50,11 +47,15 @@ func waitForConnection(t *testing.T, src, dest *Echo) {
func TestResourceManagerConnInbound(t *testing.T) {
// this test checks that we can not exceed the inbound conn limit at system level
// we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns
limiter := rcmgr.NewDefaultLimiter()
limiter.SystemLimits = limiter.SystemLimits.WithConnLimit(3, 1024, 1024)
limiter.DefaultPeerLimits = limiter.DefaultPeerLimits.WithConnLimit(1, 1, 1)

echos := createEchos(t, 5, makeRcmgrOption(t, limiter, "TestResourceManagerConnInbound"))
cfg := rcmgr.DefaultLimits.AutoScale()
cfg.System.ConnsInbound = 3
cfg.System.ConnsOutbound = 1024
cfg.System.Conns = 1024
cfg.PeerDefault.Conns = 1
cfg.PeerDefault.ConnsInbound = 1
cfg.PeerDefault.ConnsOutbound = 1

echos := createEchos(t, 5, makeRcmgrOption(t, cfg, "TestResourceManagerConnInbound"))
defer closeEchos(echos)
defer closeRcmgrs(echos)

Expand Down Expand Up @@ -82,10 +83,14 @@ func TestResourceManagerConnInbound(t *testing.T) {
func TestResourceManagerConnOutbound(t *testing.T) {
// this test checks that we can not exceed the inbound conn limit at system level
// we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns
limiter := rcmgr.NewDefaultLimiter()
limiter.SystemLimits = limiter.SystemLimits.WithConnLimit(1024, 3, 1024)
limiter.DefaultPeerLimits = limiter.DefaultPeerLimits.WithConnLimit(1, 1, 1)
echos := createEchos(t, 5, makeRcmgrOption(t, limiter, "TestResourceManagerConnOutbound"))
cfg := rcmgr.DefaultLimits.AutoScale()
cfg.System.ConnsInbound = 1024
cfg.System.ConnsOutbound = 3
cfg.System.Conns = 1024
cfg.PeerDefault.Conns = 1
cfg.PeerDefault.ConnsInbound = 1
cfg.PeerDefault.ConnsOutbound = 1
echos := createEchos(t, 5, makeRcmgrOption(t, cfg, "TestResourceManagerConnOutbound"))
defer closeEchos(echos)
defer closeRcmgrs(echos)

Expand Down Expand Up @@ -113,9 +118,11 @@ func TestResourceManagerConnOutbound(t *testing.T) {
func TestResourceManagerServiceInbound(t *testing.T) {
// this test checks that we can not exceed the inbound stream limit at service level
// we specify: 3 streams for the service, and we try to create 4 streams
limiter := rcmgr.NewDefaultLimiter()
limiter.DefaultServiceLimits = limiter.DefaultServiceLimits.WithStreamLimit(3, 1024, 1024)
echos := createEchos(t, 5, makeRcmgrOption(t, limiter, "TestResourceManagerServiceInbound"))
cfg := rcmgr.DefaultLimits.AutoScale()
cfg.ServiceDefault.StreamsInbound = 3
cfg.ServiceDefault.StreamsOutbound = 1024
cfg.ServiceDefault.Streams = 1024
echos := createEchos(t, 5, makeRcmgrOption(t, cfg, "TestResourceManagerServiceInbound"))
defer closeEchos(echos)
defer closeRcmgrs(echos)

Expand Down Expand Up @@ -164,11 +171,15 @@ func TestResourceManagerServiceInbound(t *testing.T) {
func TestResourceManagerServicePeerInbound(t *testing.T) {
// this test checks that we cannot exceed the per peer inbound stream limit at service level
// we specify: 2 streams per peer for echo, and we try to create 3 streams
limiter := rcmgr.NewDefaultLimiter()
limiter.ServicePeerLimits = map[string]rcmgr.Limit{
EchoService: limiter.DefaultPeerLimits.WithStreamLimit(2, 1024, 1024),
}
echos := createEchos(t, 5, makeRcmgrOption(t, limiter, "TestResourceManagerServicePeerInbound"))
cfg := rcmgr.DefaultLimits
cfg.AddServicePeerLimit(
EchoService,
rcmgr.BaseLimit{StreamsInbound: 2, StreamsOutbound: 1024, Streams: 1024, Memory: 9999999},
rcmgr.BaseLimitIncrease{},
)
limits := cfg.AutoScale()

echos := createEchos(t, 5, makeRcmgrOption(t, limits, "TestResourceManagerServicePeerInbound"))
defer closeEchos(echos)
defer closeRcmgrs(echos)

Expand Down

0 comments on commit c8044d2

Please sign in to comment.