Skip to content

Commit

Permalink
Bandwidth limits (#22)
Browse files Browse the repository at this point in the history
### Description

Add configurable bandwidth limits that can be shared between any group
of backends.

### Type of change

* [x] New feature
* [ ] Feature improvement
* [ ] Bug fix
* [ ] Documentation
* [ ] Cleanup / refactoring
* [ ] Other (please explain)


### How is this change tested ?

* [x] Unit tests
* [x] Manual tests (explain)
* [ ] Tests are not needed
  • Loading branch information
rthellend committed Nov 11, 2023
1 parent ecda1ec commit 8904f80
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 8 deletions.
41 changes: 37 additions & 4 deletions proxy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,20 @@ type Config struct {
// PKI is a list of locally hosted and managed Certificate Authorities
// that can be used to authenticate TLS clients and backend servers.
PKI []*ConfigPKI `yaml:"pki,omitempty"`
// BWLimits is the list of named bandwidth limit groups.
// Each backend can be associated with one group. The group's limits
// are shared between all the backends associated with it.
BWLimits []*BWLimit `yaml:"bwLimits,omitempty"`
}

// BWLimit is a named bandwidth limit configuration.
type BWLimit struct {
// Name is the name of the group.
Name string `yaml:"name"`
// Ingress is the ingress limit, in bytes per second.
Ingress float64 `yaml:"ingress"`
// Egress is the engress limit, in bytes per second.
Egress float64 `yaml:"egress"`
}

// Backend encapsulates the data of one backend.
Expand Down Expand Up @@ -190,6 +204,10 @@ type Backend struct {
// When more than one address are specified, requests are distributed
// using a simple round robin.
Addresses []string `yaml:"addresses,omitempty"`
// BWLimit is the name of the bandwidth limit policy to apply to this
// backend. All backends using the same policy are subject to common
// limits.
BWLimit string `yaml:"bwLimit,omitempty"`
// InsecureSkipVerify disabled the verification of the backend server's
// TLS certificate. See https://pkg.go.dev/crypto/tls#Config
InsecureSkipVerify bool `yaml:"insecureSkipVerify,omitempty"`
Expand Down Expand Up @@ -253,7 +271,8 @@ type Backend struct {
tlsConfig *tls.Config
forwardRootCAs *x509.CertPool
pkiMap map[string]*pki.PKIManager
limiter *rate.Limiter
bwLimit *bwLimit
connLimit *rate.Limiter

allowIPs *[]*net.IPNet
denyIPs *[]*net.IPNet
Expand Down Expand Up @@ -605,8 +624,19 @@ func (cfg *Config) Check() error {
}
}
pkis := make(map[string]bool)
for _, i := range cfg.PKI {
pkis[i.Name] = true
for i, p := range cfg.PKI {
if pkis[p.Name] {
return fmt.Errorf("pki[%d].Name: duplicate name %q", i, p.Name)
}
pkis[p.Name] = true
}

bwLimits := make(map[string]bool)
for i, l := range cfg.BWLimits {
if bwLimits[l.Name] {
return fmt.Errorf("bwLimit[%d].Name: duplicate name %q", i, l.Name)
}
bwLimits[l.Name] = true
}

serverNames := make(map[string]bool)
Expand Down Expand Up @@ -641,6 +671,9 @@ func (cfg *Config) Check() error {
}
serverNames[sn] = true
}
if n := be.BWLimit; n != "" && !bwLimits[n] {
return fmt.Errorf("backend[%d].BWLimit: undefined name %q", i, n)
}
if be.ClientAuth != nil {
pool := x509.NewCertPool()
for j, n := range be.ClientAuth.RootCAs {
Expand Down Expand Up @@ -728,7 +761,7 @@ func (cfg *Config) Check() error {
if be.ForwardRateLimit == 0 {
be.ForwardRateLimit = 5
}
be.limiter = rate.NewLimiter(rate.Limit(be.ForwardRateLimit), be.ForwardRateLimit)
be.connLimit = rate.NewLimiter(rate.Limit(be.ForwardRateLimit), be.ForwardRateLimit)
}
return os.MkdirAll(cfg.CacheDir, 0o700)
}
Expand Down
29 changes: 29 additions & 0 deletions proxy/internal/netw/netw.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
package netw

import (
"context"
"io"
"net"
"sync"
"time"

"golang.org/x/time/rate"
)

// Listen creates a net listener that is instrumented to store per connection
Expand All @@ -52,8 +55,11 @@ func (l listener) Accept() (net.Conn, error) {
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
return &Conn{
Conn: c,
ctx: ctx,
cancel: cancel,
annotations: make(map[string]any),
}, nil
}
Expand All @@ -62,6 +68,11 @@ func (l listener) Accept() (net.Conn, error) {
type Conn struct {
net.Conn

ctx context.Context
cancel func()
ingressLimiter *rate.Limiter
egressLimiter *rate.Limiter

mu sync.Mutex
onClose func()
annotations map[string]any
Expand Down Expand Up @@ -98,6 +109,13 @@ func (c *Conn) Annotation(key string, defaultValue any) any {
return defaultValue
}

// SetLimiter sets the rate limiters for this connection.
// It must be called before the first Read() or Write(). Peek() is OK.
func (c *Conn) SetLimiters(ingress, egress *rate.Limiter) {
c.ingressLimiter = ingress
c.egressLimiter = egress
}

// BytesSent returns the number of bytes sent on this connection so far.
func (c *Conn) BytesSent() int64 {
c.mu.Lock()
Expand Down Expand Up @@ -141,6 +159,11 @@ func (c *Conn) Peek(b []byte) (int, error) {
}

func (c *Conn) Read(b []byte) (int, error) {
if l := c.ingressLimiter; l != nil {
if err := l.WaitN(c.ctx, len(b)); err != nil {
return 0, err
}
}
c.mu.Lock()
defer c.mu.Unlock()
if len(c.peekBuf) > 0 {
Expand All @@ -157,6 +180,11 @@ func (c *Conn) Read(b []byte) (int, error) {
}

func (c *Conn) Write(b []byte) (int, error) {
if l := c.egressLimiter; l != nil {
if err := l.WaitN(c.ctx, len(b)); err != nil {
return 0, err
}
}
n, err := c.Conn.Write(b)
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -169,6 +197,7 @@ func (c *Conn) Close() error {
f := c.onClose
c.onClose = nil
c.mu.Unlock()
c.cancel()
if f != nil {
f()
}
Expand Down
2 changes: 1 addition & 1 deletion proxy/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (p *Proxy) metricsHandler(w http.ResponseWriter, req *http.Request) {
sort.Strings(serverNames)
fmt.Fprintln(&buf, "Backend metrics:")
fmt.Fprintln(&buf)
fmt.Fprintf(&buf, " %*s %12s %12s %12s\n", -maxLen, "Server", "Count", "Sent", "Recv")
fmt.Fprintf(&buf, " %*s %12s %12s %12s\n", -maxLen, "Server", "Count", "Egress", "Ingress")
for _, s := range serverNames {
fmt.Fprintf(&buf, " %*s %12d %12d %12d\n", -maxLen, s, totals[s].numConnections, totals[s].numBytesSent, totals[s].numBytesReceived)
}
Expand Down
37 changes: 34 additions & 3 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/c2FmZQ/storage/crypto"
"golang.org/x/crypto/acme"
"golang.org/x/crypto/acme/autocert"
"golang.org/x/time/rate"
yaml "gopkg.in/yaml.v3"

"github.com/c2FmZQ/tlsproxy/certmanager"
Expand Down Expand Up @@ -102,6 +103,7 @@ type Proxy struct {
backends map[string]*Backend
connections map[connKey]*netw.Conn
pkis map[string]*pki.PKIManager
bwLimits map[string]*bwLimit

metrics map[string]*backendMetrics
startTime time.Time
Expand All @@ -115,6 +117,11 @@ type connKey struct {
src net.Addr
}

type bwLimit struct {
ingress *rate.Limiter
egress *rate.Limiter
}

type backendMetrics struct {
numConnections int64
numBytesSent int64
Expand Down Expand Up @@ -169,6 +176,7 @@ func New(cfg *Config, passphrase []byte) (*Proxy, error) {
tokenManager: tm,
connections: make(map[connKey]*netw.Conn),
pkis: make(map[string]*pki.PKIManager),
bwLimits: make(map[string]*bwLimit),
}
if err := p.Reconfigure(cfg); err != nil {
return nil, err
Expand Down Expand Up @@ -212,6 +220,7 @@ func NewTestProxy(cfg *Config) (*Proxy, error) {
store: store,
tokenManager: tm,
pkis: make(map[string]*pki.PKIManager),
bwLimits: make(map[string]*bwLimit),
}
if err := p.Reconfigure(cfg); err != nil {
return nil, err
Expand Down Expand Up @@ -342,6 +351,22 @@ func (p *Proxy) Reconfigure(cfg *Config) error {
pkis[pp.Name] = m
}

for _, bwl := range cfg.BWLimits {
const minBurst = 1 << 17 // 128 KB
name := strings.ToLower(bwl.Name)
if l, ok := p.bwLimits[name]; ok {
l.ingress.SetLimit(rate.Limit(bwl.Ingress))
l.ingress.SetBurst(int(max(bwl.Ingress, minBurst)))
l.egress.SetLimit(rate.Limit(bwl.Egress))
l.egress.SetBurst(int(max(bwl.Egress, minBurst)))
continue
}
p.bwLimits[name] = &bwLimit{
ingress: rate.NewLimiter(rate.Limit(bwl.Ingress), int(max(bwl.Ingress, minBurst))),
egress: rate.NewLimiter(rate.Limit(bwl.Egress), int(max(bwl.Egress, minBurst))),
}
}

backends := make(map[string]*Backend, len(cfg.Backends))
for _, be := range cfg.Backends {
be.recordEvent = p.recordEvent
Expand All @@ -350,6 +375,9 @@ func (p *Proxy) Reconfigure(cfg *Config) error {
for _, sn := range be.ServerNames {
backends[sn] = be
}
if l, ok := p.bwLimits[be.BWLimit]; ok {
be.bwLimit = l
}
if be.SSO != nil {
idp, ok := identityProviders[be.SSO.Provider]
if !ok {
Expand Down Expand Up @@ -841,6 +869,9 @@ func (p *Proxy) handleConnection(conn *netw.Conn) {
return
}
conn.SetAnnotation(backendKey, be)
if l := be.bwLimit; l != nil {
conn.SetLimiters(l.ingress, l.egress)
}
be.incInFlight(1)
switch {
case be.Mode == ModeTLSPassthrough:
Expand Down Expand Up @@ -950,7 +981,7 @@ func (p *Proxy) handleHTTPConnection(conn *tls.Conn) {
}
serverName := connServerName(conn)
be := connBackend(conn)
if err := be.limiter.Wait(p.ctx); err != nil {
if err := be.connLimit.Wait(p.ctx); err != nil {
p.recordEvent(err.Error())
log.Printf("ERR [-] %s ➔ %q Wait: %v", conn.RemoteAddr(), serverName, err)
conn.Close()
Expand Down Expand Up @@ -979,7 +1010,7 @@ func (p *Proxy) handleTLSConnection(extConn *tls.Conn) {
}
serverName := connServerName(extConn)
be := connBackend(extConn)
if err := be.limiter.Wait(p.ctx); err != nil {
if err := be.connLimit.Wait(p.ctx); err != nil {
p.recordEvent(err.Error())
log.Printf("ERR [-] %s ➔ %q Wait: %v", extConn.RemoteAddr(), serverName, err)
return
Expand Down Expand Up @@ -1020,7 +1051,7 @@ func (p *Proxy) handleTLSConnection(extConn *tls.Conn) {
func (p *Proxy) handleTLSPassthroughConnection(extConn net.Conn) {
serverName := connServerName(extConn)
be := connBackend(extConn)
if err := be.limiter.Wait(p.ctx); err != nil {
if err := be.connLimit.Wait(p.ctx); err != nil {
p.recordEvent(err.Error())
log.Printf("ERR [-] %s ➔ %q Wait: %v", extConn.RemoteAddr(), serverName, err)
sendInternalError(extConn)
Expand Down
Loading

0 comments on commit 8904f80

Please sign in to comment.