diff --git a/p2p/host/resource-manager/README.md b/p2p/host/resource-manager/README.md index 96edd1d0cf..f81b90d9f3 100644 --- a/p2p/host/resource-manager/README.md +++ b/p2p/host/resource-manager/README.md @@ -28,9 +28,21 @@ scalingLimits := rcmgr.DefaultLimits // Add limits around included libp2p protocols libp2p.SetDefaultServiceLimits(&scalingLimits) -// Turn the scaling limits into a static set of limits using `.AutoScale`. This +// Turn the scaling limits into a concrete set of limits using `.AutoScale`. This // scales the limits proportional to your system memory. -limits := scalingLimits.AutoScale() +scaledDefaultLimits := scalingLimits.AutoScale() + +// Tweak certain settings +cfg := rcmgr.PartialLimitConfig{ + System: &rcmgr.ResourceLimits{ + // Allow unlimited outbound streams + StreamsOutbound: rcmgr.Unlimited, + }, + // Everything else is default. The exact values will come from `scaledDefaultLimits` above. +} + +// Create our limits by using our cfg and replacing the default values with values from `scaledDefaultLimits` +limits := cfg.Build(scaledDefaultLimits) // The resource manager expects a limiter, se we create one from our limits. limiter := rcmgr.NewFixedLimiter(limits) @@ -51,6 +63,54 @@ if err != nil { host, err := libp2p.New(libp2p.ResourceManager(rm)) ``` +### Saving the limits config +The easiest way to save the defined limits is to serialize the `PartialLimitConfig` +type as JSON. + +```go +noisyNeighbor, _ := peer.Decode("QmVvtzcZgCkMnSFf2dnrBPXrWuNFWNM9J3MpZQCvWPuVZf") +cfg := rcmgr.PartialLimitConfig{ + System: &rcmgr.ResourceLimits{ + // Allow unlimited outbound streams + StreamsOutbound: rcmgr.Unlimited, + }, + Peer: map[peer.ID]rcmgr.ResourceLimits{ + noisyNeighbor: { + // No inbound connections from this peer + ConnsInbound: rcmgr.BlockAllLimit, + // But let me open connections to them + Conns: rcmgr.DefaultLimit, + ConnsOutbound: rcmgr.DefaultLimit, + // No inbound streams from this peer + StreamsInbound: rcmgr.BlockAllLimit, + // And let me open unlimited (by me) outbound streams (the peer may have their own limits on me) + StreamsOutbound: rcmgr.Unlimited, + }, + }, +} +jsonBytes, _ := json.Marshal(&cfg) + +// string(jsonBytes) +// { +// "System": { +// "StreamsOutbound": "unlimited" +// }, +// "Peer": { +// "QmVvtzcZgCkMnSFf2dnrBPXrWuNFWNM9J3MpZQCvWPuVZf": { +// "StreamsInbound": "blockAll", +// "StreamsOutbound": "unlimited", +// "ConnsInbound": "blockAll" +// } +// } +// } +``` + +This will omit defaults from the JSON output. It will also serialize the +blockAll, and unlimited values explicitly. + +The `Memory` field is serialized as a string to workaround the JSON limitation +of 32 bit integers (`Memory` is an int64). + ## Basic Resources ### Memory @@ -278,7 +338,7 @@ This is done using the `ScalingLimitConfig`. For every scope, this configuration struct defines the absolutely bare minimum limits, and an (optional) increase of these limits, which will be applied on nodes that have sufficient memory. -A `ScalingLimitConfig` can be converted into a `LimitConfig` (which can then be +A `ScalingLimitConfig` can be converted into a `ConcreteLimitConfig` (which can then be used to initialize a fixed limiter with `NewFixedLimiter`) by calling the `Scale` method. The `Scale` method takes two parameters: the amount of memory and the number of file descriptors that an application is willing to dedicate to libp2p. @@ -346,7 +406,7 @@ go-libp2p process. For the default definitions see [`DefaultLimits` and If the defaults seem mostly okay, but you want to adjust one facet you can simply copy the default struct object and update the field you want to change. You can -apply changes to a `BaseLimit`, `BaseLimitIncrease`, and `LimitConfig` with +apply changes to a `BaseLimit`, `BaseLimitIncrease`, and `ConcreteLimitConfig` with `.Apply`. Example diff --git a/p2p/host/resource-manager/limit.go b/p2p/host/resource-manager/limit.go index c3d6dd88c8..ef7fcdc9b4 100644 --- a/p2p/host/resource-manager/limit.go +++ b/p2p/host/resource-manager/limit.go @@ -12,6 +12,7 @@ package rcmgr import ( "encoding/json" "io" + "math" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -56,7 +57,7 @@ func NewDefaultLimiterFromJSON(in io.Reader) (Limiter, error) { } // NewLimiterFromJSON creates a new limiter by parsing a json configuration. -func NewLimiterFromJSON(in io.Reader, defaults LimitConfig) (Limiter, error) { +func NewLimiterFromJSON(in io.Reader, defaults ConcreteLimitConfig) (Limiter, error) { cfg, err := readLimiterConfigFromJSON(in, defaults) if err != nil { return nil, err @@ -64,25 +65,24 @@ func NewLimiterFromJSON(in io.Reader, defaults LimitConfig) (Limiter, error) { return &fixedLimiter{cfg}, nil } -func readLimiterConfigFromJSON(in io.Reader, defaults LimitConfig) (LimitConfig, error) { - var cfg LimitConfig +func readLimiterConfigFromJSON(in io.Reader, defaults ConcreteLimitConfig) (ConcreteLimitConfig, error) { + var cfg PartialLimitConfig if err := json.NewDecoder(in).Decode(&cfg); err != nil { - return LimitConfig{}, err + return ConcreteLimitConfig{}, err } - cfg.Apply(defaults) - return cfg, nil + return cfg.Build(defaults), nil } // fixedLimiter is a limiter with fixed limits. type fixedLimiter struct { - LimitConfig + ConcreteLimitConfig } var _ Limiter = (*fixedLimiter)(nil) -func NewFixedLimiter(conf LimitConfig) Limiter { +func NewFixedLimiter(conf ConcreteLimitConfig) Limiter { log.Debugw("initializing new limiter with config", "limits", conf) - return &fixedLimiter{LimitConfig: conf} + return &fixedLimiter{conf} } // BaseLimit is a mixin type for basic resource limits. @@ -97,6 +97,37 @@ type BaseLimit struct { Memory int64 `json:",omitempty"` } +func valueOrBlockAll(n int) LimitVal { + if n == 0 { + return BlockAllLimit + } else if n == math.MaxInt { + return Unlimited + } + return LimitVal(n) +} +func valueOrBlockAll64(n int64) LimitVal64 { + if n == 0 { + return BlockAllLimit64 + } else if n == math.MaxInt { + return Unlimited64 + } + return LimitVal64(n) +} + +// ToResourceLimits converts the BaseLimit to a ResourceLimits +func (l BaseLimit) ToResourceLimits() ResourceLimits { + return ResourceLimits{ + Streams: valueOrBlockAll(l.Streams), + StreamsInbound: valueOrBlockAll(l.StreamsInbound), + StreamsOutbound: valueOrBlockAll(l.StreamsOutbound), + Conns: valueOrBlockAll(l.Conns), + ConnsInbound: valueOrBlockAll(l.ConnsInbound), + ConnsOutbound: valueOrBlockAll(l.ConnsOutbound), + FD: valueOrBlockAll(l.FD), + Memory: valueOrBlockAll64(l.Memory), + } +} + // Apply overwrites all zero-valued limits with the values of l2 // Must not use a pointer receiver. func (l *BaseLimit) Apply(l2 BaseLimit) { @@ -169,7 +200,7 @@ func (l *BaseLimitIncrease) Apply(l2 BaseLimitIncrease) { } } -func (l *BaseLimit) GetStreamLimit(dir network.Direction) int { +func (l BaseLimit) GetStreamLimit(dir network.Direction) int { if dir == network.DirInbound { return l.StreamsInbound } else { @@ -177,11 +208,11 @@ func (l *BaseLimit) GetStreamLimit(dir network.Direction) int { } } -func (l *BaseLimit) GetStreamTotalLimit() int { +func (l BaseLimit) GetStreamTotalLimit() int { return l.Streams } -func (l *BaseLimit) GetConnLimit(dir network.Direction) int { +func (l BaseLimit) GetConnLimit(dir network.Direction) int { if dir == network.DirInbound { return l.ConnsInbound } else { @@ -189,78 +220,78 @@ func (l *BaseLimit) GetConnLimit(dir network.Direction) int { } } -func (l *BaseLimit) GetConnTotalLimit() int { +func (l BaseLimit) GetConnTotalLimit() int { return l.Conns } -func (l *BaseLimit) GetFDLimit() int { +func (l BaseLimit) GetFDLimit() int { return l.FD } -func (l *BaseLimit) GetMemoryLimit() int64 { +func (l BaseLimit) GetMemoryLimit() int64 { return l.Memory } func (l *fixedLimiter) GetSystemLimits() Limit { - return &l.System + return &l.system } func (l *fixedLimiter) GetTransientLimits() Limit { - return &l.Transient + return &l.transient } func (l *fixedLimiter) GetAllowlistedSystemLimits() Limit { - return &l.AllowlistedSystem + return &l.allowlistedSystem } func (l *fixedLimiter) GetAllowlistedTransientLimits() Limit { - return &l.AllowlistedTransient + return &l.allowlistedTransient } func (l *fixedLimiter) GetServiceLimits(svc string) Limit { - sl, ok := l.Service[svc] + sl, ok := l.service[svc] if !ok { - return &l.ServiceDefault + return &l.serviceDefault } return &sl } func (l *fixedLimiter) GetServicePeerLimits(svc string) Limit { - pl, ok := l.ServicePeer[svc] + pl, ok := l.servicePeer[svc] if !ok { - return &l.ServicePeerDefault + return &l.servicePeerDefault } return &pl } func (l *fixedLimiter) GetProtocolLimits(proto protocol.ID) Limit { - pl, ok := l.Protocol[proto] + pl, ok := l.protocol[proto] if !ok { - return &l.ProtocolDefault + return &l.protocolDefault } return &pl } func (l *fixedLimiter) GetProtocolPeerLimits(proto protocol.ID) Limit { - pl, ok := l.ProtocolPeer[proto] + pl, ok := l.protocolPeer[proto] if !ok { - return &l.ProtocolPeerDefault + return &l.protocolPeerDefault } return &pl } func (l *fixedLimiter) GetPeerLimits(p peer.ID) Limit { - pl, ok := l.Peer[p] + pl, ok := l.peer[p] if !ok { - return &l.PeerDefault + return &l.peerDefault } return &pl } func (l *fixedLimiter) GetStreamLimits(_ peer.ID) Limit { - return &l.Stream + return &l.stream } func (l *fixedLimiter) GetConnLimits() Limit { - return &l.Conn + return &l.conn } diff --git a/p2p/host/resource-manager/limit_config_test.backwards-compat.json b/p2p/host/resource-manager/limit_config_test.backwards-compat.json new file mode 100644 index 0000000000..b1a5e9ecb7 --- /dev/null +++ b/p2p/host/resource-manager/limit_config_test.backwards-compat.json @@ -0,0 +1,45 @@ +{ + "System": { + "Memory": 65536, + "Conns": 16, + "ConnsInbound": 8, + "ConnsOutbound": 16, + "FD": 16 + }, + "ServiceDefault": { + "Memory": 8765 + }, + "Service": { + "A": { + "Memory": 8192 + }, + "B": {} + }, + "ServicePeerDefault": { + "Memory": 2048 + }, + "ServicePeer": { + "A": { + "Memory": 4096 + } + }, + "ProtocolDefault": { + "Memory": 2048 + }, + "ProtocolPeerDefault": { + "Memory": 1024 + }, + "Protocol": { + "/A": { + "Memory": 8192 + } + }, + "PeerDefault": { + "Memory": 4096 + }, + "Peer": { + "12D3KooWPFH2Bx2tPfw6RLxN8k2wh47GRXgkt9yrAHU37zFwHWzS": { + "Memory": 4097 + } + } +} \ No newline at end of file diff --git a/p2p/host/resource-manager/limit_config_test.go b/p2p/host/resource-manager/limit_config_test.go index 21dede5641..d1c5a81619 100644 --- a/p2p/host/resource-manager/limit_config_test.go +++ b/p2p/host/resource-manager/limit_config_test.go @@ -17,46 +17,134 @@ func withMemoryLimit(l BaseLimit, m int64) BaseLimit { return l2 } +func TestLimitConfigParserBackwardsCompat(t *testing.T) { + // Tests that we can parse the old limit config format. + in, err := os.Open("limit_config_test.backwards-compat.json") + require.NoError(t, err) + defer in.Close() + + defaultScaledLimits := DefaultLimits + defaultScaledLimits.AddServiceLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) + defaultScaledLimits.AddProtocolPeerLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) + defaults := defaultScaledLimits.AutoScale() + cfg, err := readLimiterConfigFromJSON(in, defaults) + require.NoError(t, err) + + require.Equal(t, int64(65536), cfg.system.Memory) + require.Equal(t, defaults.system.Streams, cfg.system.Streams) + require.Equal(t, defaults.system.StreamsInbound, cfg.system.StreamsInbound) + require.Equal(t, defaults.system.StreamsOutbound, cfg.system.StreamsOutbound) + require.Equal(t, 16, cfg.system.Conns) + require.Equal(t, 8, cfg.system.ConnsInbound) + require.Equal(t, 16, cfg.system.ConnsOutbound) + require.Equal(t, 16, cfg.system.FD) + + require.Equal(t, defaults.transient, cfg.transient) + require.Equal(t, int64(8765), cfg.serviceDefault.Memory) + + require.Contains(t, cfg.service, "A") + require.Equal(t, withMemoryLimit(cfg.serviceDefault, 8192), cfg.service["A"]) + require.Contains(t, cfg.service, "B") + require.Equal(t, cfg.serviceDefault, cfg.service["B"]) + require.Contains(t, cfg.service, "C") + require.Equal(t, defaults.service["C"], cfg.service["C"]) + + require.Equal(t, int64(4096), cfg.peerDefault.Memory) + peerID, err := peer.Decode("12D3KooWPFH2Bx2tPfw6RLxN8k2wh47GRXgkt9yrAHU37zFwHWzS") + require.NoError(t, err) + require.Contains(t, cfg.peer, peerID) + require.Equal(t, int64(4097), cfg.peer[peerID].Memory) +} + func TestLimitConfigParser(t *testing.T) { in, err := os.Open("limit_config_test.json") require.NoError(t, err) defer in.Close() - DefaultLimits.AddServiceLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) - DefaultLimits.AddProtocolPeerLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) - defaults := DefaultLimits.AutoScale() + defaultScaledLimits := DefaultLimits + defaultScaledLimits.AddServiceLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) + defaultScaledLimits.AddProtocolPeerLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) + defaults := defaultScaledLimits.AutoScale() cfg, err := readLimiterConfigFromJSON(in, defaults) require.NoError(t, err) - require.Equal(t, int64(65536), cfg.System.Memory) - require.Equal(t, defaults.System.Streams, cfg.System.Streams) - require.Equal(t, defaults.System.StreamsInbound, cfg.System.StreamsInbound) - require.Equal(t, defaults.System.StreamsOutbound, cfg.System.StreamsOutbound) - require.Equal(t, 16, cfg.System.Conns) - require.Equal(t, 8, cfg.System.ConnsInbound) - require.Equal(t, 16, cfg.System.ConnsOutbound) - require.Equal(t, 16, cfg.System.FD) + require.Equal(t, int64(65536), cfg.system.Memory) + require.Equal(t, defaults.system.Streams, cfg.system.Streams) + require.Equal(t, defaults.system.StreamsInbound, cfg.system.StreamsInbound) + require.Equal(t, defaults.system.StreamsOutbound, cfg.system.StreamsOutbound) + require.Equal(t, 16, cfg.system.Conns) + require.Equal(t, 8, cfg.system.ConnsInbound) + require.Equal(t, 16, cfg.system.ConnsOutbound) + require.Equal(t, 16, cfg.system.FD) - require.Equal(t, defaults.Transient, cfg.Transient) - require.Equal(t, int64(8765), cfg.ServiceDefault.Memory) + require.Equal(t, defaults.transient, cfg.transient) + require.Equal(t, int64(8765), cfg.serviceDefault.Memory) - require.Contains(t, cfg.Service, "A") - require.Equal(t, withMemoryLimit(cfg.ServiceDefault, 8192), cfg.Service["A"]) - require.Contains(t, cfg.Service, "B") - require.Equal(t, cfg.ServiceDefault, cfg.Service["B"]) - require.Contains(t, cfg.Service, "C") - require.Equal(t, defaults.Service["C"], cfg.Service["C"]) + require.Contains(t, cfg.service, "A") + require.Equal(t, withMemoryLimit(cfg.serviceDefault, 8192), cfg.service["A"]) + require.Contains(t, cfg.service, "B") + require.Equal(t, cfg.serviceDefault, cfg.service["B"]) + require.Contains(t, cfg.service, "C") + require.Equal(t, defaults.service["C"], cfg.service["C"]) - require.Equal(t, int64(4096), cfg.PeerDefault.Memory) + require.Equal(t, int64(4096), cfg.peerDefault.Memory) peerID, err := peer.Decode("12D3KooWPFH2Bx2tPfw6RLxN8k2wh47GRXgkt9yrAHU37zFwHWzS") require.NoError(t, err) - require.Contains(t, cfg.Peer, peerID) - require.Equal(t, int64(4097), cfg.Peer[peerID].Memory) + require.Contains(t, cfg.peer, peerID) + require.Equal(t, int64(4097), cfg.peer[peerID].Memory) // Roundtrip - jsonBytes, err := json.Marshal(&cfg) + limitConfig := cfg.ToPartialLimitConfig() + jsonBytes, err := json.Marshal(&limitConfig) require.NoError(t, err) cfgAfterRoundTrip, err := readLimiterConfigFromJSON(bytes.NewReader(jsonBytes), defaults) require.NoError(t, err) - require.Equal(t, cfg, cfgAfterRoundTrip) + require.Equal(t, limitConfig, cfgAfterRoundTrip.ToPartialLimitConfig()) +} + +func TestLimitConfigRoundTrip(t *testing.T) { + // Tests that we can roundtrip a PartialLimitConfig to a ConcreteLimitConfig and back. + in, err := os.Open("limit_config_test.json") + require.NoError(t, err) + defer in.Close() + + defaults := DefaultLimits + defaults.AddServiceLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) + defaults.AddProtocolPeerLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{}) + concreteCfg, err := readLimiterConfigFromJSON(in, defaults.AutoScale()) + require.NoError(t, err) + + // Roundtrip + limitConfig := concreteCfg.ToPartialLimitConfig() + // Using InfiniteLimits because it's different then the defaults used above. + // If anything was marked "default" in the round trip, it would show up as a + // difference here. + concreteCfgRT := limitConfig.Build(InfiniteLimits) + require.Equal(t, concreteCfg, concreteCfgRT) +} + +func TestReadmeLimitConfigSerialization(t *testing.T) { + noisyNeighbor, _ := peer.Decode("QmVvtzcZgCkMnSFf2dnrBPXrWuNFWNM9J3MpZQCvWPuVZf") + cfg := PartialLimitConfig{ + System: ResourceLimits{ + // Allow unlimited outbound streams + StreamsOutbound: Unlimited, + }, + Peer: map[peer.ID]ResourceLimits{ + noisyNeighbor: { + // No inbound connections from this peer + ConnsInbound: BlockAllLimit, + // But let me open connections to them + Conns: DefaultLimit, + ConnsOutbound: DefaultLimit, + // No inbound streams from this peer + StreamsInbound: BlockAllLimit, + // And let me open unlimited (by me) outbound streams (the peer may have their own limits on me) + StreamsOutbound: Unlimited, + }, + }, + } + jsonBytes, err := json.Marshal(&cfg) + require.NoError(t, err) + require.Equal(t, `{"Peer":{"QmVvtzcZgCkMnSFf2dnrBPXrWuNFWNM9J3MpZQCvWPuVZf":{"StreamsInbound":"blockAll","StreamsOutbound":"unlimited","ConnsInbound":"blockAll"}},"System":{"StreamsOutbound":"unlimited"}}`, string(jsonBytes)) } diff --git a/p2p/host/resource-manager/limit_defaults.go b/p2p/host/resource-manager/limit_defaults.go index a9c73a4d9e..e7489c45d1 100644 --- a/p2p/host/resource-manager/limit_defaults.go +++ b/p2p/host/resource-manager/limit_defaults.go @@ -2,8 +2,11 @@ package rcmgr import ( "encoding/json" + "fmt" "math" + "strconv" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" @@ -108,53 +111,339 @@ func (cfg *ScalingLimitConfig) AddProtocolPeerLimit(proto protocol.ID, base Base } } -type LimitConfig struct { - System BaseLimit `json:",omitempty"` - Transient BaseLimit `json:",omitempty"` +type LimitVal int + +const ( + // DefaultLimit is the default value for resources. The exact value depends on the context, but will get values from `DefaultLimits`. + DefaultLimit LimitVal = 0 + // Unlimited is the value for unlimited resources. An arbitrarily high number will also work. + Unlimited LimitVal = -1 + // BlockAllLimit is the LimitVal for allowing no amount of resources. + BlockAllLimit LimitVal = -2 +) + +func (l LimitVal) MarshalJSON() ([]byte, error) { + if l == Unlimited { + return json.Marshal("unlimited") + } else if l == DefaultLimit { + return json.Marshal("default") + } else if l == BlockAllLimit { + return json.Marshal("blockAll") + } + return json.Marshal(int(l)) +} + +func (l *LimitVal) UnmarshalJSON(b []byte) error { + if string(b) == `"default"` { + *l = DefaultLimit + return nil + } else if string(b) == `"unlimited"` { + *l = Unlimited + return nil + } else if string(b) == `"blockAll"` { + *l = BlockAllLimit + return nil + } + + var val int + if err := json.Unmarshal(b, &val); err != nil { + return err + } + + if val == 0 { + // If there is an explicit 0 in the JSON we should interpret this as block all. + *l = BlockAllLimit + return nil + } + + *l = LimitVal(val) + return nil +} + +func (l LimitVal) Build(defaultVal int) int { + if l == DefaultLimit { + return defaultVal + } + if l == Unlimited { + return math.MaxInt + } + if l == BlockAllLimit { + return 0 + } + return int(l) +} + +type LimitVal64 int64 + +const ( + // Default is the default value for resources. + DefaultLimit64 LimitVal64 = 0 + // Unlimited is the value for unlimited resources. + Unlimited64 LimitVal64 = -1 + // BlockAllLimit64 is the LimitVal for allowing no amount of resources. + BlockAllLimit64 LimitVal64 = -2 +) + +func (l LimitVal64) MarshalJSON() ([]byte, error) { + if l == Unlimited64 { + return json.Marshal("unlimited") + } else if l == DefaultLimit64 { + return json.Marshal("default") + } else if l == BlockAllLimit64 { + return json.Marshal("blockAll") + } + + // Convert this to a string because JSON doesn't support 64-bit integers. + return json.Marshal(strconv.FormatInt(int64(l), 10)) +} + +func (l *LimitVal64) UnmarshalJSON(b []byte) error { + if string(b) == `"default"` { + *l = DefaultLimit64 + return nil + } else if string(b) == `"unlimited"` { + *l = Unlimited64 + return nil + } else if string(b) == `"blockAll"` { + *l = BlockAllLimit64 + return nil + } + + var val string + if err := json.Unmarshal(b, &val); err != nil { + // Is this an integer? Possible because of backwards compatibility. + var val int + if err := json.Unmarshal(b, &val); err != nil { + return fmt.Errorf("failed to unmarshal limit value: %w", err) + } + + if val == 0 { + // If there is an explicit 0 in the JSON we should interpret this as block all. + *l = BlockAllLimit64 + return nil + } + + *l = LimitVal64(val) + return nil + } + + i, err := strconv.ParseInt(val, 10, 64) + if err != nil { + return err + } + + if i == 0 { + // If there is an explicit 0 in the JSON we should interpret this as block all. + *l = BlockAllLimit64 + return nil + } + + *l = LimitVal64(i) + return nil +} + +func (l LimitVal64) Build(defaultVal int64) int64 { + if l == DefaultLimit64 { + return defaultVal + } + if l == Unlimited64 { + return math.MaxInt64 + } + if l == BlockAllLimit64 { + return 0 + } + return int64(l) +} + +// ResourceLimits is the type for basic resource limits. +type ResourceLimits struct { + Streams LimitVal `json:",omitempty"` + StreamsInbound LimitVal `json:",omitempty"` + StreamsOutbound LimitVal `json:",omitempty"` + Conns LimitVal `json:",omitempty"` + ConnsInbound LimitVal `json:",omitempty"` + ConnsOutbound LimitVal `json:",omitempty"` + FD LimitVal `json:",omitempty"` + Memory LimitVal64 `json:",omitempty"` +} + +func (l *ResourceLimits) IsDefault() bool { + if l == nil { + return true + } + + if l.Streams == DefaultLimit && + l.StreamsInbound == DefaultLimit && + l.StreamsOutbound == DefaultLimit && + l.Conns == DefaultLimit && + l.ConnsInbound == DefaultLimit && + l.ConnsOutbound == DefaultLimit && + l.FD == DefaultLimit && + l.Memory == DefaultLimit64 { + return true + } + return false +} + +func (l *ResourceLimits) ToMaybeNilPtr() *ResourceLimits { + if l.IsDefault() { + return nil + } + return l +} + +// Apply overwrites all default limits with the values of l2 +func (l *ResourceLimits) Apply(l2 ResourceLimits) { + if l.Streams == DefaultLimit { + l.Streams = l2.Streams + } + if l.StreamsInbound == DefaultLimit { + l.StreamsInbound = l2.StreamsInbound + } + if l.StreamsOutbound == DefaultLimit { + l.StreamsOutbound = l2.StreamsOutbound + } + if l.Conns == DefaultLimit { + l.Conns = l2.Conns + } + if l.ConnsInbound == DefaultLimit { + l.ConnsInbound = l2.ConnsInbound + } + if l.ConnsOutbound == DefaultLimit { + l.ConnsOutbound = l2.ConnsOutbound + } + if l.FD == DefaultLimit { + l.FD = l2.FD + } + if l.Memory == DefaultLimit64 { + l.Memory = l2.Memory + } +} + +func (l *ResourceLimits) Build(defaults Limit) BaseLimit { + if l == nil { + return BaseLimit{ + Streams: defaults.GetStreamTotalLimit(), + StreamsInbound: defaults.GetStreamLimit(network.DirInbound), + StreamsOutbound: defaults.GetStreamLimit(network.DirOutbound), + Conns: defaults.GetConnTotalLimit(), + ConnsInbound: defaults.GetConnLimit(network.DirInbound), + ConnsOutbound: defaults.GetConnLimit(network.DirOutbound), + FD: defaults.GetFDLimit(), + Memory: defaults.GetMemoryLimit(), + } + } + + return BaseLimit{ + Streams: l.Streams.Build(defaults.GetStreamTotalLimit()), + StreamsInbound: l.StreamsInbound.Build(defaults.GetStreamLimit(network.DirInbound)), + StreamsOutbound: l.StreamsOutbound.Build(defaults.GetStreamLimit(network.DirOutbound)), + Conns: l.Conns.Build(defaults.GetConnTotalLimit()), + ConnsInbound: l.ConnsInbound.Build(defaults.GetConnLimit(network.DirInbound)), + ConnsOutbound: l.ConnsOutbound.Build(defaults.GetConnLimit(network.DirOutbound)), + FD: l.FD.Build(defaults.GetFDLimit()), + Memory: l.Memory.Build(defaults.GetMemoryLimit()), + } +} + +type PartialLimitConfig struct { + System ResourceLimits `json:",omitempty"` + Transient ResourceLimits `json:",omitempty"` // Limits that are applied to resources with an allowlisted multiaddr. // These will only be used if the normal System & Transient limits are // reached. - AllowlistedSystem BaseLimit `json:",omitempty"` - AllowlistedTransient BaseLimit `json:",omitempty"` + AllowlistedSystem ResourceLimits `json:",omitempty"` + AllowlistedTransient ResourceLimits `json:",omitempty"` - ServiceDefault BaseLimit `json:",omitempty"` - Service map[string]BaseLimit `json:",omitempty"` + ServiceDefault ResourceLimits `json:",omitempty"` + Service map[string]ResourceLimits `json:",omitempty"` - ServicePeerDefault BaseLimit `json:",omitempty"` - ServicePeer map[string]BaseLimit `json:",omitempty"` + ServicePeerDefault ResourceLimits `json:",omitempty"` + ServicePeer map[string]ResourceLimits `json:",omitempty"` - ProtocolDefault BaseLimit `json:",omitempty"` - Protocol map[protocol.ID]BaseLimit `json:",omitempty"` + ProtocolDefault ResourceLimits `json:",omitempty"` + Protocol map[protocol.ID]ResourceLimits `json:",omitempty"` - ProtocolPeerDefault BaseLimit `json:",omitempty"` - ProtocolPeer map[protocol.ID]BaseLimit `json:",omitempty"` + ProtocolPeerDefault ResourceLimits `json:",omitempty"` + ProtocolPeer map[protocol.ID]ResourceLimits `json:",omitempty"` - PeerDefault BaseLimit `json:",omitempty"` - Peer map[peer.ID]BaseLimit `json:",omitempty"` + PeerDefault ResourceLimits `json:",omitempty"` + Peer map[peer.ID]ResourceLimits `json:",omitempty"` - Conn BaseLimit `json:",omitempty"` - Stream BaseLimit `json:",omitempty"` + Conn ResourceLimits `json:",omitempty"` + Stream ResourceLimits `json:",omitempty"` } -func (cfg *LimitConfig) MarshalJSON() ([]byte, error) { +func (cfg *PartialLimitConfig) MarshalJSON() ([]byte, error) { // we want to marshal the encoded peer id - encodedPeerMap := make(map[string]BaseLimit, len(cfg.Peer)) + encodedPeerMap := make(map[string]ResourceLimits, len(cfg.Peer)) for p, v := range cfg.Peer { encodedPeerMap[p.String()] = v } - type Alias LimitConfig + type Alias PartialLimitConfig return json.Marshal(&struct { *Alias - Peer map[string]BaseLimit `json:",omitempty"` + // String so we can have the properly marshalled peer id + Peer map[string]ResourceLimits `json:",omitempty"` + + // The rest of the fields as pointers so that we omit empty values in the serialized result + System *ResourceLimits `json:",omitempty"` + Transient *ResourceLimits `json:",omitempty"` + AllowlistedSystem *ResourceLimits `json:",omitempty"` + AllowlistedTransient *ResourceLimits `json:",omitempty"` + + ServiceDefault *ResourceLimits `json:",omitempty"` + + ServicePeerDefault *ResourceLimits `json:",omitempty"` + + ProtocolDefault *ResourceLimits `json:",omitempty"` + + ProtocolPeerDefault *ResourceLimits `json:",omitempty"` + + PeerDefault *ResourceLimits `json:",omitempty"` + + Conn *ResourceLimits `json:",omitempty"` + Stream *ResourceLimits `json:",omitempty"` }{ Alias: (*Alias)(cfg), Peer: encodedPeerMap, + + System: cfg.System.ToMaybeNilPtr(), + Transient: cfg.Transient.ToMaybeNilPtr(), + AllowlistedSystem: cfg.AllowlistedSystem.ToMaybeNilPtr(), + AllowlistedTransient: cfg.AllowlistedTransient.ToMaybeNilPtr(), + ServiceDefault: cfg.ServiceDefault.ToMaybeNilPtr(), + ServicePeerDefault: cfg.ServicePeerDefault.ToMaybeNilPtr(), + ProtocolDefault: cfg.ProtocolDefault.ToMaybeNilPtr(), + ProtocolPeerDefault: cfg.ProtocolPeerDefault.ToMaybeNilPtr(), + PeerDefault: cfg.PeerDefault.ToMaybeNilPtr(), + Conn: cfg.Conn.ToMaybeNilPtr(), + Stream: cfg.Stream.ToMaybeNilPtr(), }) } -func (cfg *LimitConfig) Apply(c LimitConfig) { +func applyResourceLimitsMap[K comparable](this *map[K]ResourceLimits, other map[K]ResourceLimits, fallbackDefault ResourceLimits) { + for k, l := range *this { + r := fallbackDefault + if l2, ok := other[k]; ok { + r = l2 + } + l.Apply(r) + (*this)[k] = l + } + if *this == nil && other != nil { + *this = make(map[K]ResourceLimits) + } + for k, l := range other { + if _, ok := (*this)[k]; !ok { + (*this)[k] = l + } + } +} + +func (cfg *PartialLimitConfig) Apply(c PartialLimitConfig) { cfg.System.Apply(c.System) cfg.Transient.Apply(c.Transient) cfg.AllowlistedSystem.Apply(c.AllowlistedSystem) @@ -167,90 +456,123 @@ func (cfg *LimitConfig) Apply(c LimitConfig) { cfg.Conn.Apply(c.Conn) cfg.Stream.Apply(c.Stream) - // TODO: the following could be solved a lot nicer, if only we could use generics - for s, l := range cfg.Service { - r := cfg.ServiceDefault - if l2, ok := c.Service[s]; ok { - r = l2 - } - l.Apply(r) - cfg.Service[s] = l - } - if c.Service != nil && cfg.Service == nil { - cfg.Service = make(map[string]BaseLimit) - } - for s, l := range c.Service { - if _, ok := cfg.Service[s]; !ok { - cfg.Service[s] = l - } - } + applyResourceLimitsMap(&cfg.Service, c.Service, cfg.ServiceDefault) + applyResourceLimitsMap(&cfg.ServicePeer, c.ServicePeer, cfg.ServicePeerDefault) + applyResourceLimitsMap(&cfg.Protocol, c.Protocol, cfg.ProtocolDefault) + applyResourceLimitsMap(&cfg.ProtocolPeer, c.ProtocolPeer, cfg.ProtocolPeerDefault) + applyResourceLimitsMap(&cfg.Peer, c.Peer, cfg.PeerDefault) +} - for s, l := range cfg.ServicePeer { - r := cfg.ServicePeerDefault - if l2, ok := c.ServicePeer[s]; ok { - r = l2 - } - l.Apply(r) - cfg.ServicePeer[s] = l - } - if c.ServicePeer != nil && cfg.ServicePeer == nil { - cfg.ServicePeer = make(map[string]BaseLimit) - } - for s, l := range c.ServicePeer { - if _, ok := cfg.ServicePeer[s]; !ok { - cfg.ServicePeer[s] = l - } - } +func (cfg PartialLimitConfig) Build(defaults ConcreteLimitConfig) ConcreteLimitConfig { + out := defaults + + out.system = cfg.System.Build(defaults.system) + out.transient = cfg.Transient.Build(defaults.transient) + out.allowlistedSystem = cfg.AllowlistedSystem.Build(defaults.allowlistedSystem) + out.allowlistedTransient = cfg.AllowlistedTransient.Build(defaults.allowlistedTransient) + out.serviceDefault = cfg.ServiceDefault.Build(defaults.serviceDefault) + out.servicePeerDefault = cfg.ServicePeerDefault.Build(defaults.servicePeerDefault) + out.protocolDefault = cfg.ProtocolDefault.Build(defaults.protocolDefault) + out.protocolPeerDefault = cfg.ProtocolPeerDefault.Build(defaults.protocolPeerDefault) + out.peerDefault = cfg.PeerDefault.Build(defaults.peerDefault) + out.conn = cfg.Conn.Build(defaults.conn) + out.stream = cfg.Stream.Build(defaults.stream) + + out.service = buildMapWithDefault(cfg.Service, defaults.service, out.serviceDefault) + out.servicePeer = buildMapWithDefault(cfg.ServicePeer, defaults.servicePeer, out.servicePeerDefault) + out.protocol = buildMapWithDefault(cfg.Protocol, defaults.protocol, out.protocolDefault) + out.protocolPeer = buildMapWithDefault(cfg.ProtocolPeer, defaults.protocolPeer, out.protocolPeerDefault) + out.peer = buildMapWithDefault(cfg.Peer, defaults.peer, out.peerDefault) + + return out +} - for s, l := range cfg.Protocol { - r := cfg.ProtocolDefault - if l2, ok := c.Protocol[s]; ok { - r = l2 - } - l.Apply(r) - cfg.Protocol[s] = l - } - if c.Protocol != nil && cfg.Protocol == nil { - cfg.Protocol = make(map[protocol.ID]BaseLimit) - } - for s, l := range c.Protocol { - if _, ok := cfg.Protocol[s]; !ok { - cfg.Protocol[s] = l - } +func buildMapWithDefault[K comparable](definedLimits map[K]ResourceLimits, defaults map[K]BaseLimit, fallbackDefault BaseLimit) map[K]BaseLimit { + if definedLimits == nil && defaults == nil { + return nil } - for s, l := range cfg.ProtocolPeer { - r := cfg.ProtocolPeerDefault - if l2, ok := c.ProtocolPeer[s]; ok { - r = l2 - } - l.Apply(r) - cfg.ProtocolPeer[s] = l - } - if c.ProtocolPeer != nil && cfg.ProtocolPeer == nil { - cfg.ProtocolPeer = make(map[protocol.ID]BaseLimit) + out := make(map[K]BaseLimit) + for k, l := range defaults { + out[k] = l } - for s, l := range c.ProtocolPeer { - if _, ok := cfg.ProtocolPeer[s]; !ok { - cfg.ProtocolPeer[s] = l + + for k, l := range definedLimits { + if defaultForKey, ok := out[k]; ok { + out[k] = l.Build(defaultForKey) + } else { + out[k] = l.Build(fallbackDefault) } } - for s, l := range cfg.Peer { - r := cfg.PeerDefault - if l2, ok := c.Peer[s]; ok { - r = l2 - } - l.Apply(r) - cfg.Peer[s] = l + return out +} + +// ConcreteLimitConfig is similar to PartialLimitConfig, but all values are defined. +// There is no unset "default" value. Commonly constructed by calling +// PartialLimitConfig.Build(rcmgr.DefaultLimits.AutoScale()) +type ConcreteLimitConfig struct { + system BaseLimit + transient BaseLimit + + // Limits that are applied to resources with an allowlisted multiaddr. + // These will only be used if the normal System & Transient limits are + // reached. + allowlistedSystem BaseLimit + allowlistedTransient BaseLimit + + serviceDefault BaseLimit + service map[string]BaseLimit + + servicePeerDefault BaseLimit + servicePeer map[string]BaseLimit + + protocolDefault BaseLimit + protocol map[protocol.ID]BaseLimit + + protocolPeerDefault BaseLimit + protocolPeer map[protocol.ID]BaseLimit + + peerDefault BaseLimit + peer map[peer.ID]BaseLimit + + conn BaseLimit + stream BaseLimit +} + +func resourceLimitsMapFromBaseLimitMap[K comparable](baseLimitMap map[K]BaseLimit) map[K]ResourceLimits { + if baseLimitMap == nil { + return nil } - if c.Peer != nil && cfg.Peer == nil { - cfg.Peer = make(map[peer.ID]BaseLimit) + + out := make(map[K]ResourceLimits) + for k, l := range baseLimitMap { + out[k] = l.ToResourceLimits() } - for s, l := range c.Peer { - if _, ok := cfg.Peer[s]; !ok { - cfg.Peer[s] = l - } + + return out +} + +// ToPartialLimitConfig converts a ConcreteLimitConfig to a PartialLimitConfig. +// The returned PartialLimitConfig will have no default values. +func (cfg ConcreteLimitConfig) ToPartialLimitConfig() PartialLimitConfig { + return PartialLimitConfig{ + System: cfg.system.ToResourceLimits(), + Transient: cfg.transient.ToResourceLimits(), + AllowlistedSystem: cfg.allowlistedSystem.ToResourceLimits(), + AllowlistedTransient: cfg.allowlistedTransient.ToResourceLimits(), + ServiceDefault: cfg.serviceDefault.ToResourceLimits(), + Service: resourceLimitsMapFromBaseLimitMap(cfg.service), + ServicePeerDefault: cfg.servicePeerDefault.ToResourceLimits(), + ServicePeer: resourceLimitsMapFromBaseLimitMap(cfg.servicePeer), + ProtocolDefault: cfg.protocolDefault.ToResourceLimits(), + Protocol: resourceLimitsMapFromBaseLimitMap(cfg.protocol), + ProtocolPeerDefault: cfg.protocolPeerDefault.ToResourceLimits(), + ProtocolPeer: resourceLimitsMapFromBaseLimitMap(cfg.protocolPeer), + PeerDefault: cfg.peerDefault.ToResourceLimits(), + Peer: resourceLimitsMapFromBaseLimitMap(cfg.peer), + Conn: cfg.conn.ToResourceLimits(), + Stream: cfg.stream.ToResourceLimits(), } } @@ -258,54 +580,54 @@ func (cfg *LimitConfig) Apply(c LimitConfig) { // memory is the amount of memory that the stack is allowed to consume, // for a dedicated node it's recommended to use 1/8 of the installed system memory. // If memory is smaller than 128 MB, the base configuration will be used. -func (cfg *ScalingLimitConfig) Scale(memory int64, numFD int) LimitConfig { - lc := LimitConfig{ - System: scale(cfg.SystemBaseLimit, cfg.SystemLimitIncrease, memory, numFD), - Transient: scale(cfg.TransientBaseLimit, cfg.TransientLimitIncrease, memory, numFD), - AllowlistedSystem: scale(cfg.AllowlistedSystemBaseLimit, cfg.AllowlistedSystemLimitIncrease, memory, numFD), - AllowlistedTransient: scale(cfg.AllowlistedTransientBaseLimit, cfg.AllowlistedTransientLimitIncrease, memory, numFD), - ServiceDefault: scale(cfg.ServiceBaseLimit, cfg.ServiceLimitIncrease, memory, numFD), - ServicePeerDefault: scale(cfg.ServicePeerBaseLimit, cfg.ServicePeerLimitIncrease, memory, numFD), - ProtocolDefault: scale(cfg.ProtocolBaseLimit, cfg.ProtocolLimitIncrease, memory, numFD), - ProtocolPeerDefault: scale(cfg.ProtocolPeerBaseLimit, cfg.ProtocolPeerLimitIncrease, memory, numFD), - PeerDefault: scale(cfg.PeerBaseLimit, cfg.PeerLimitIncrease, memory, numFD), - Conn: scale(cfg.ConnBaseLimit, cfg.ConnLimitIncrease, memory, numFD), - Stream: scale(cfg.StreamBaseLimit, cfg.ConnLimitIncrease, memory, numFD), +func (cfg *ScalingLimitConfig) Scale(memory int64, numFD int) ConcreteLimitConfig { + lc := ConcreteLimitConfig{ + system: scale(cfg.SystemBaseLimit, cfg.SystemLimitIncrease, memory, numFD), + transient: scale(cfg.TransientBaseLimit, cfg.TransientLimitIncrease, memory, numFD), + allowlistedSystem: scale(cfg.AllowlistedSystemBaseLimit, cfg.AllowlistedSystemLimitIncrease, memory, numFD), + allowlistedTransient: scale(cfg.AllowlistedTransientBaseLimit, cfg.AllowlistedTransientLimitIncrease, memory, numFD), + serviceDefault: scale(cfg.ServiceBaseLimit, cfg.ServiceLimitIncrease, memory, numFD), + servicePeerDefault: scale(cfg.ServicePeerBaseLimit, cfg.ServicePeerLimitIncrease, memory, numFD), + protocolDefault: scale(cfg.ProtocolBaseLimit, cfg.ProtocolLimitIncrease, memory, numFD), + protocolPeerDefault: scale(cfg.ProtocolPeerBaseLimit, cfg.ProtocolPeerLimitIncrease, memory, numFD), + peerDefault: scale(cfg.PeerBaseLimit, cfg.PeerLimitIncrease, memory, numFD), + conn: scale(cfg.ConnBaseLimit, cfg.ConnLimitIncrease, memory, numFD), + stream: scale(cfg.StreamBaseLimit, cfg.ConnLimitIncrease, memory, numFD), } if cfg.ServiceLimits != nil { - lc.Service = make(map[string]BaseLimit) + lc.service = make(map[string]BaseLimit) for svc, l := range cfg.ServiceLimits { - lc.Service[svc] = scale(l.BaseLimit, l.BaseLimitIncrease, memory, numFD) + lc.service[svc] = scale(l.BaseLimit, l.BaseLimitIncrease, memory, numFD) } } if cfg.ProtocolLimits != nil { - lc.Protocol = make(map[protocol.ID]BaseLimit) + lc.protocol = make(map[protocol.ID]BaseLimit) for proto, l := range cfg.ProtocolLimits { - lc.Protocol[proto] = scale(l.BaseLimit, l.BaseLimitIncrease, memory, numFD) + lc.protocol[proto] = scale(l.BaseLimit, l.BaseLimitIncrease, memory, numFD) } } if cfg.PeerLimits != nil { - lc.Peer = make(map[peer.ID]BaseLimit) + lc.peer = make(map[peer.ID]BaseLimit) for p, l := range cfg.PeerLimits { - lc.Peer[p] = scale(l.BaseLimit, l.BaseLimitIncrease, memory, numFD) + lc.peer[p] = scale(l.BaseLimit, l.BaseLimitIncrease, memory, numFD) } } if cfg.ServicePeerLimits != nil { - lc.ServicePeer = make(map[string]BaseLimit) + lc.servicePeer = make(map[string]BaseLimit) for svc, l := range cfg.ServicePeerLimits { - lc.ServicePeer[svc] = scale(l.BaseLimit, l.BaseLimitIncrease, memory, numFD) + lc.servicePeer[svc] = scale(l.BaseLimit, l.BaseLimitIncrease, memory, numFD) } } if cfg.ProtocolPeerLimits != nil { - lc.ProtocolPeer = make(map[protocol.ID]BaseLimit) + lc.protocolPeer = make(map[protocol.ID]BaseLimit) for p, l := range cfg.ProtocolPeerLimits { - lc.ProtocolPeer[p] = scale(l.BaseLimit, l.BaseLimitIncrease, memory, numFD) + lc.protocolPeer[p] = scale(l.BaseLimit, l.BaseLimitIncrease, memory, numFD) } } return lc } -func (cfg *ScalingLimitConfig) AutoScale() LimitConfig { +func (cfg *ScalingLimitConfig) AutoScale() ConcreteLimitConfig { return cfg.Scale( int64(memory.TotalMemory())/8, getNumFDs()/2, @@ -540,18 +862,18 @@ var infiniteBaseLimit = BaseLimit{ Memory: math.MaxInt64, } -// InfiniteLimits are a limiter configuration that uses infinite limits, thus effectively not limiting anything. +// InfiniteLimits are a limiter configuration that uses unlimited limits, thus effectively not limiting anything. // Keep in mind that the operating system limits the number of file descriptors that an application can use. -var InfiniteLimits = LimitConfig{ - System: infiniteBaseLimit, - Transient: infiniteBaseLimit, - AllowlistedSystem: infiniteBaseLimit, - AllowlistedTransient: infiniteBaseLimit, - ServiceDefault: infiniteBaseLimit, - ServicePeerDefault: infiniteBaseLimit, - ProtocolDefault: infiniteBaseLimit, - ProtocolPeerDefault: infiniteBaseLimit, - PeerDefault: infiniteBaseLimit, - Conn: infiniteBaseLimit, - Stream: infiniteBaseLimit, +var InfiniteLimits = ConcreteLimitConfig{ + system: infiniteBaseLimit, + transient: infiniteBaseLimit, + allowlistedSystem: infiniteBaseLimit, + allowlistedTransient: infiniteBaseLimit, + serviceDefault: infiniteBaseLimit, + servicePeerDefault: infiniteBaseLimit, + protocolDefault: infiniteBaseLimit, + protocolPeerDefault: infiniteBaseLimit, + peerDefault: infiniteBaseLimit, + conn: infiniteBaseLimit, + stream: infiniteBaseLimit, } diff --git a/p2p/host/resource-manager/limit_test.go b/p2p/host/resource-manager/limit_test.go index 9070045bc7..68d6bf29cd 100644 --- a/p2p/host/resource-manager/limit_test.go +++ b/p2p/host/resource-manager/limit_test.go @@ -2,6 +2,7 @@ package rcmgr import ( "encoding/json" + "math" "runtime" "testing" @@ -32,7 +33,7 @@ func TestScaling(t *testing.T) { t.Run("no scaling if no increase is defined", func(t *testing.T) { cfg := ScalingLimitConfig{ServiceBaseLimit: base} scaled := cfg.Scale(8<<30, 100) - require.Equal(t, base, scaled.ServiceDefault) + require.Equal(t, base, scaled.serviceDefault) }) t.Run("scaling", func(t *testing.T) { @@ -50,14 +51,14 @@ func TestScaling(t *testing.T) { }, } scaled := cfg.Scale(128<<20+4<<30, 1000) - require.Equal(t, 500, scaled.Transient.FD) - require.Equal(t, base.Streams+4, scaled.Transient.Streams) - require.Equal(t, base.StreamsInbound+4*2, scaled.Transient.StreamsInbound) - require.Equal(t, base.StreamsOutbound+4*3, scaled.Transient.StreamsOutbound) - require.Equal(t, base.Conns+4*4, scaled.Transient.Conns) - require.Equal(t, base.ConnsInbound+4*5, scaled.Transient.ConnsInbound) - require.Equal(t, base.ConnsOutbound+4*6, scaled.Transient.ConnsOutbound) - require.Equal(t, base.Memory+4*7, scaled.Transient.Memory) + require.Equal(t, 500, scaled.transient.FD) + require.Equal(t, base.Streams+4, scaled.transient.Streams) + require.Equal(t, base.StreamsInbound+4*2, scaled.transient.StreamsInbound) + require.Equal(t, base.StreamsOutbound+4*3, scaled.transient.StreamsOutbound) + require.Equal(t, base.Conns+4*4, scaled.transient.Conns) + require.Equal(t, base.ConnsInbound+4*5, scaled.transient.ConnsInbound) + require.Equal(t, base.ConnsOutbound+4*6, scaled.transient.ConnsOutbound) + require.Equal(t, base.Memory+4*7, scaled.transient.Memory) }) t.Run("scaling and using the base amounts", func(t *testing.T) { @@ -75,14 +76,14 @@ func TestScaling(t *testing.T) { }, } scaled := cfg.Scale(1, 10) - require.Equal(t, 1, scaled.Transient.FD) - require.Equal(t, base.Streams, scaled.Transient.Streams) - require.Equal(t, base.StreamsInbound, scaled.Transient.StreamsInbound) - require.Equal(t, base.StreamsOutbound, scaled.Transient.StreamsOutbound) - require.Equal(t, base.Conns, scaled.Transient.Conns) - require.Equal(t, base.ConnsInbound, scaled.Transient.ConnsInbound) - require.Equal(t, base.ConnsOutbound, scaled.Transient.ConnsOutbound) - require.Equal(t, base.Memory, scaled.Transient.Memory) + require.Equal(t, 1, scaled.transient.FD) + require.Equal(t, base.Streams, scaled.transient.Streams) + require.Equal(t, base.StreamsInbound, scaled.transient.StreamsInbound) + require.Equal(t, base.StreamsOutbound, scaled.transient.StreamsOutbound) + require.Equal(t, base.Conns, scaled.transient.Conns) + require.Equal(t, base.ConnsInbound, scaled.transient.ConnsInbound) + require.Equal(t, base.ConnsOutbound, scaled.transient.ConnsOutbound) + require.Equal(t, base.Memory, scaled.transient.Memory) }) t.Run("scaling limits in maps", func(t *testing.T) { @@ -99,16 +100,16 @@ func TestScaling(t *testing.T) { } scaled := cfg.Scale(128<<20+4<<30, 1000) - require.Len(t, scaled.Service, 2) - require.Contains(t, scaled.Service, "A") - require.Equal(t, 10, scaled.Service["A"].Streams) - require.Equal(t, int64(100), scaled.Service["A"].Memory) - require.Equal(t, 9, scaled.Service["A"].FD) + require.Len(t, scaled.service, 2) + require.Contains(t, scaled.service, "A") + require.Equal(t, 10, scaled.service["A"].Streams) + require.Equal(t, int64(100), scaled.service["A"].Memory) + require.Equal(t, 9, scaled.service["A"].FD) - require.Contains(t, scaled.Service, "B") - require.Equal(t, 20+4*2, scaled.Service["B"].Streams) - require.Equal(t, int64(200+4*3), scaled.Service["B"].Memory) - require.Equal(t, 400, scaled.Service["B"].FD) + require.Contains(t, scaled.service, "B") + require.Equal(t, 20+4*2, scaled.service["B"].Streams) + require.Equal(t, int64(200+4*3), scaled.service["B"].Memory) + require.Equal(t, 400, scaled.service["B"].FD) }) } @@ -139,8 +140,74 @@ func TestReadmeExample(t *testing.T) { limitConf := scalingLimits.Scale(4<<30, 1000) - require.Equal(t, 384, limitConf.System.Conns) - require.Equal(t, 1000, limitConf.System.FD) + require.Equal(t, 384, limitConf.system.Conns) + require.Equal(t, 1000, limitConf.system.FD) +} + +func TestJSONMarshalling(t *testing.T) { + bl := ResourceLimits{ + Streams: DefaultLimit, + StreamsInbound: 10, + StreamsOutbound: BlockAllLimit, + Conns: 10, + // ConnsInbound: DefaultLimit, + ConnsOutbound: Unlimited, + Memory: Unlimited64, + } + + jsonEncoded, err := json.Marshal(bl) + require.NoError(t, err) + require.Equal(t, string(jsonEncoded), `{"StreamsInbound":10,"StreamsOutbound":"blockAll","Conns":10,"ConnsOutbound":"unlimited","Memory":"unlimited"}`) + + // Roundtrip + var blDecoded ResourceLimits + err = json.Unmarshal(jsonEncoded, &blDecoded) + require.NoError(t, err) + + require.Equal(t, bl, blDecoded) +} + +func TestJSONRoundTripInt64(t *testing.T) { + bl := ResourceLimits{ + Memory: math.MaxInt64, + } + + jsonEncoded, err := json.Marshal(bl) + require.NoError(t, err) + + require.Equal(t, string(jsonEncoded), `{"Memory":"9223372036854775807"}`) + + // Roundtrip + var blDecoded ResourceLimits + err = json.Unmarshal(jsonEncoded, &blDecoded) + require.NoError(t, err) + + require.Equal(t, bl, blDecoded) +} + +func TestRoundTripFromConcreteAndBack(t *testing.T) { + l := PartialLimitConfig{ + System: ResourceLimits{ + Conns: 1234, + Memory: 54321, + }, + + ServiceDefault: ResourceLimits{ + Conns: 2, + }, + + Service: map[string]ResourceLimits{ + "foo": { + Conns: 3, + }, + }, + } + + concrete := l.Build(InfiniteLimits) + + // Roundtrip + fromConcrete := concrete.ToPartialLimitConfig().Build(InfiniteLimits) + require.Equal(t, concrete, fromConcrete) } func TestSerializeJSON(t *testing.T) { @@ -160,3 +227,24 @@ func TestSerializeJSON(t *testing.T) { require.NoError(t, err) require.Equal(t, "{\"Streams\":10}", string(out)) } + +func TestWhatIsZeroInResourceLimits(t *testing.T) { + l := ResourceLimits{ + Streams: BlockAllLimit, + Memory: BlockAllLimit64, + } + + out, err := json.Marshal(l) + require.NoError(t, err) + require.Equal(t, `{"Streams":"blockAll","Memory":"blockAll"}`, string(out)) + + l2 := ResourceLimits{} + err = json.Unmarshal([]byte(`{"Streams":0,"Memory":0}`), &l2) + require.NoError(t, err) + require.Equal(t, l, l2) + + l3 := ResourceLimits{} + err = json.Unmarshal([]byte(`{"Streams":0,"Memory":"0"}`), &l3) + require.NoError(t, err) + require.Equal(t, l, l3) +} diff --git a/p2p/host/resource-manager/rcmgr_test.go b/p2p/host/resource-manager/rcmgr_test.go index 12420ac712..61d53bebb3 100644 --- a/p2p/host/resource-manager/rcmgr_test.go +++ b/p2p/host/resource-manager/rcmgr_test.go @@ -21,8 +21,8 @@ func TestResourceManager(t *testing.T) { svcA := "A.svc" svcB := "B.svc" nmgr, err := NewResourceManager( - NewFixedLimiter(LimitConfig{ - System: BaseLimit{ + NewFixedLimiter(ConcreteLimitConfig{ + system: BaseLimit{ Memory: 16384, StreamsInbound: 3, StreamsOutbound: 3, @@ -32,7 +32,7 @@ func TestResourceManager(t *testing.T) { Conns: 6, FD: 2, }, - Transient: BaseLimit{ + transient: BaseLimit{ Memory: 4096, StreamsInbound: 1, StreamsOutbound: 1, @@ -42,7 +42,7 @@ func TestResourceManager(t *testing.T) { Conns: 2, FD: 1, }, - ServiceDefault: BaseLimit{ + serviceDefault: BaseLimit{ Memory: 4096, StreamsInbound: 1, StreamsOutbound: 1, @@ -52,13 +52,13 @@ func TestResourceManager(t *testing.T) { Conns: 2, FD: 1, }, - ServicePeerDefault: BaseLimit{ + servicePeerDefault: BaseLimit{ Memory: 4096, StreamsInbound: 5, StreamsOutbound: 5, Streams: 10, }, - Service: map[string]BaseLimit{ + service: map[string]BaseLimit{ svcA: { Memory: 8192, StreamsInbound: 2, @@ -80,7 +80,7 @@ func TestResourceManager(t *testing.T) { FD: 1, }, }, - ServicePeer: map[string]BaseLimit{ + servicePeer: map[string]BaseLimit{ svcB: { Memory: 8192, StreamsInbound: 1, @@ -88,13 +88,13 @@ func TestResourceManager(t *testing.T) { Streams: 2, }, }, - ProtocolDefault: BaseLimit{ + protocolDefault: BaseLimit{ Memory: 4096, StreamsInbound: 1, StreamsOutbound: 1, Streams: 2, }, - Protocol: map[protocol.ID]BaseLimit{ + protocol: map[protocol.ID]BaseLimit{ protoA: { Memory: 8192, StreamsInbound: 2, @@ -102,7 +102,7 @@ func TestResourceManager(t *testing.T) { Streams: 2, }, }, - ProtocolPeer: map[protocol.ID]BaseLimit{ + protocolPeer: map[protocol.ID]BaseLimit{ protoB: { Memory: 8192, StreamsInbound: 1, @@ -110,7 +110,7 @@ func TestResourceManager(t *testing.T) { Streams: 2, }, }, - PeerDefault: BaseLimit{ + peerDefault: BaseLimit{ Memory: 4096, StreamsInbound: 1, StreamsOutbound: 1, @@ -120,13 +120,13 @@ func TestResourceManager(t *testing.T) { Conns: 2, FD: 1, }, - ProtocolPeerDefault: BaseLimit{ + protocolPeerDefault: BaseLimit{ Memory: 4096, StreamsInbound: 5, StreamsOutbound: 5, Streams: 10, }, - Peer: map[peer.ID]BaseLimit{ + peer: map[peer.ID]BaseLimit{ peerA: { Memory: 8192, StreamsInbound: 2, @@ -138,14 +138,14 @@ func TestResourceManager(t *testing.T) { FD: 1, }, }, - Conn: BaseLimit{ + conn: BaseLimit{ Memory: 4096, ConnsInbound: 1, ConnsOutbound: 1, Conns: 1, FD: 1, }, - Stream: BaseLimit{ + stream: BaseLimit{ Memory: 4096, StreamsInbound: 1, StreamsOutbound: 1, @@ -979,24 +979,24 @@ func TestResourceManagerWithAllowlist(t *testing.T) { peerA := test.RandPeerIDFatal(t) limits := DefaultLimits.AutoScale() - limits.System.Conns = 0 - limits.Transient.Conns = 0 + limits.system.Conns = 0 + limits.transient.Conns = 0 baseLimit := BaseLimit{ Conns: 2, ConnsInbound: 2, ConnsOutbound: 1, } - baseLimit.Apply(limits.AllowlistedSystem) - limits.AllowlistedSystem = baseLimit + baseLimit.Apply(limits.allowlistedSystem) + limits.allowlistedSystem = baseLimit baseLimit = BaseLimit{ Conns: 1, ConnsInbound: 1, ConnsOutbound: 1, } - baseLimit.Apply(limits.AllowlistedTransient) - limits.AllowlistedTransient = baseLimit + baseLimit.Apply(limits.allowlistedTransient) + limits.allowlistedTransient = baseLimit rcmgr, err := NewResourceManager(NewFixedLimiter(limits), WithAllowlistedMultiaddrs([]multiaddr.Multiaddr{ multiaddr.StringCast("/ip4/1.2.3.4"), diff --git a/p2p/test/resource-manager/rcmgr_test.go b/p2p/test/resource-manager/rcmgr_test.go index ae546c5af9..017b49293b 100644 --- a/p2p/test/resource-manager/rcmgr_test.go +++ b/p2p/test/resource-manager/rcmgr_test.go @@ -13,11 +13,12 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" + rcmgrObs "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs" "github.com/stretchr/testify/require" ) -func makeRcmgrOption(t *testing.T, cfg rcmgr.LimitConfig) func(int) libp2p.Option { +func makeRcmgrOption(t *testing.T, cfg rcmgr.ConcreteLimitConfig) func(int) libp2p.Option { return func(i int) libp2p.Option { var opts []rcmgr.Option if os.Getenv("LIBP2P_TEST_RCMGR_TRACE") == "1" { @@ -46,13 +47,19 @@ func waitForConnection(t *testing.T, src, dest *Echo) { func TestResourceManagerConnInbound(t *testing.T) { // this test checks that we can not exceed the inbound conn limit at system level // we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns - cfg := rcmgr.DefaultLimits.AutoScale() - cfg.System.ConnsInbound = 3 - cfg.System.ConnsOutbound = 1024 - cfg.System.Conns = 1024 - cfg.PeerDefault.Conns = 1 - cfg.PeerDefault.ConnsInbound = 1 - cfg.PeerDefault.ConnsOutbound = 1 + cfg := rcmgr.PartialLimitConfig{ + System: rcmgr.ResourceLimits{ + ConnsInbound: 3, + ConnsOutbound: 1024, + Conns: 1024, + StreamsOutbound: rcmgr.Unlimited, + }, + PeerDefault: rcmgr.ResourceLimits{ + ConnsInbound: 1, + ConnsOutbound: 1, + Conns: 1, + }, + }.Build(rcmgr.DefaultLimits.AutoScale()) echos := createEchos(t, 5, makeRcmgrOption(t, cfg)) defer closeEchos(echos) @@ -82,13 +89,18 @@ func TestResourceManagerConnInbound(t *testing.T) { func TestResourceManagerConnOutbound(t *testing.T) { // this test checks that we can not exceed the inbound conn limit at system level // we specify: 1 conn per peer, 3 conns total, and we try to create 4 conns - cfg := rcmgr.DefaultLimits.AutoScale() - cfg.System.ConnsInbound = 1024 - cfg.System.ConnsOutbound = 3 - cfg.System.Conns = 1024 - cfg.PeerDefault.Conns = 1 - cfg.PeerDefault.ConnsInbound = 1 - cfg.PeerDefault.ConnsOutbound = 1 + cfg := rcmgr.PartialLimitConfig{ + System: rcmgr.ResourceLimits{ + ConnsInbound: 1024, + ConnsOutbound: 3, + Conns: 1024, + }, + PeerDefault: rcmgr.ResourceLimits{ + ConnsInbound: 1, + ConnsOutbound: 1, + Conns: 1, + }, + }.Build(rcmgr.DefaultLimits.AutoScale()) echos := createEchos(t, 5, makeRcmgrOption(t, cfg)) defer closeEchos(echos) defer closeRcmgrs(echos) @@ -117,10 +129,13 @@ func TestResourceManagerConnOutbound(t *testing.T) { func TestResourceManagerServiceInbound(t *testing.T) { // this test checks that we can not exceed the inbound stream limit at service level // we specify: 3 streams for the service, and we try to create 4 streams - cfg := rcmgr.DefaultLimits.AutoScale() - cfg.ServiceDefault.StreamsInbound = 3 - cfg.ServiceDefault.StreamsOutbound = 1024 - cfg.ServiceDefault.Streams = 1024 + cfg := rcmgr.PartialLimitConfig{ + ServiceDefault: rcmgr.ResourceLimits{ + StreamsInbound: 3, + StreamsOutbound: 1024, + Streams: 1024, + }, + }.Build(rcmgr.DefaultLimits.AutoScale()) echos := createEchos(t, 5, makeRcmgrOption(t, cfg)) defer closeEchos(echos) defer closeRcmgrs(echos) @@ -276,3 +291,50 @@ func waitForChannel(ready chan struct{}, timeout time.Duration) func() error { } } } + +func TestReadmeExample(t *testing.T) { + // Start with the default scaling limits. + scalingLimits := rcmgr.DefaultLimits + + // Add limits around included libp2p protocols + libp2p.SetDefaultServiceLimits(&scalingLimits) + + // Turn the scaling limits into a concrete set of limits using `.AutoScale`. This + // scales the limits proportional to your system memory. + scaledDefaultLimits := scalingLimits.AutoScale() + + // Tweak certain settings + cfg := rcmgr.PartialLimitConfig{ + System: rcmgr.ResourceLimits{ + // Allow unlimited outbound streams + StreamsOutbound: rcmgr.Unlimited, + }, + // Everything else is default. The exact values will come from `scaledDefaultLimits` above. + } + + // Create our limits by using our cfg and replacing the default values with values from `scaledDefaultLimits` + limits := cfg.Build(scaledDefaultLimits) + + // The resource manager expects a limiter, se we create one from our limits. + limiter := rcmgr.NewFixedLimiter(limits) + + // (Optional if you want metrics) Construct the OpenCensus metrics reporter. + str, err := rcmgrObs.NewStatsTraceReporter() + if err != nil { + panic(err) + } + + // Initialize the resource manager + rm, err := rcmgr.NewResourceManager(limiter, rcmgr.WithTraceReporter(str)) + if err != nil { + panic(err) + } + + // Create a libp2p host + host, err := libp2p.New(libp2p.ResourceManager(rm)) + if err != nil { + panic(err) + } + + host.Close() +}