Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an HTTP hedging library. #115

Merged
merged 6 commits into from
Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* [ENHANCEMENT] Added option to BasicLifecycler to keep instance in the ring when stopping. #97
* [ENHANCEMENT] Add WaitRingTokensStability function to ring, to be able to wait on ring stability excluding allowed state transitions. #95
* [ENHANCEMENT] Trigger metrics update on ring changes instead of doing it periodically to speed up tests that wait for certain metrics. #107
* [ENHANCEMENT] Add an HTTP hedging library. #115
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.16
require (
github.com/armon/go-metrics v0.3.0
github.com/coreos/etcd v3.3.25+incompatible // indirect
github.com/cristalhq/hedgedhttp v0.7.0
github.com/go-kit/log v0.1.0
github.com/gogo/protobuf v1.3.2
github.com/gogo/status v1.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf h1:CAKfRE2YtTUIjjh1bkBt
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/cristalhq/hedgedhttp v0.7.0 h1:C2XPDC+AQH4QJt6vZI4jB5WNyF86QbSJD4C4fW3H3ro=
github.com/cristalhq/hedgedhttp v0.7.0/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
126 changes: 126 additions & 0 deletions hedging/hedging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package hedging

import (
"errors"
"flag"
"net/http"
"sync"
"time"

"github.com/cristalhq/hedgedhttp"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/time/rate"
)

var (
ErrTooManyHedgeRequests = errors.New("too many hedge requests")
totalHedgeRequests prometheus.Counter
totalRateLimitedHedgeRequests prometheus.Counter
once sync.Once
)

// Config is the configuration for hedging requests.
type Config struct {
// At is the duration after which a second request will be issued.
At time.Duration `yaml:"at"`
// UpTo is the maximum number of requests that will be issued.
UpTo int `yaml:"up_to"`
// The maximun of hedge requests allowed per second.
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
MaxPerSecond int `yaml:"max_per_second"`
}

// RegisterFlags registers flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}

// RegisterFlagsWithPrefix registers flags with prefix.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.UpTo, prefix+"hedge-requests-up-to", 2, "The maximun of hedge requests allowed.")
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
f.DurationVar(&cfg.At, prefix+"hedge-requests-at", 0, "If set to a non-zero value a second request will be issued at the provided duration. Default is 0 (disabled)")
f.IntVar(&cfg.MaxPerSecond, prefix+"hedge-max-per-second", 5, "The maximun of hedge requests allowed per seconds.")
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
}

// Client returns a hedged http client.
// The client transport will be mutated to use the hedged roundtripper.
func (cfg *Config) Client(client *http.Client) (*http.Client, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're typically not used to have logic functions on Config. WDYT about moving it to standalone functions taking cfg as input?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah doesn't change much, let's do it.

return cfg.ClientWithRegisterer(client, prometheus.DefaultRegisterer)
}

// ClientWithRegisterer returns a hedged http client with instrumentation registered to the provided registerer.
// The client transport will be mutated to use the hedged roundtripper.
func (cfg *Config) ClientWithRegisterer(client *http.Client, reg prometheus.Registerer) (*http.Client, error) {
if reg == nil {
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
reg = prometheus.DefaultRegisterer
}
if client == nil {
client = http.DefaultClient
}
if cfg.At == 0 {
return client, nil
}
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
var err error
client.Transport, err = cfg.RoundTripperWithRegisterer(client.Transport, reg)
if err != nil {
return nil, err
}
return client, nil
}

// RoundTripperWithRegisterer returns a hedged roundtripper with instrumentation registered to the provided registerer.
func (cfg *Config) RoundTripperWithRegisterer(next http.RoundTripper, reg prometheus.Registerer) (http.RoundTripper, error) {
if reg == nil {
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
reg = prometheus.DefaultRegisterer
}
if next == nil {
next = http.DefaultTransport
}
if cfg.At == 0 {
return next, nil
}
// register metrics only once
once.Do(func() {
totalHedgeRequests = promauto.With(reg).NewCounter(prometheus.CounterOpts{
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
Name: "hedged_requests_total",
Help: "The total number of hedged requests.",
})
totalRateLimitedHedgeRequests = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "hedged_requests_rate_limited_total",
Help: "The total number of hedged requests rejected via rate limiting.",
})
})
return hedgedhttp.NewRoundTripper(
cfg.At,
cfg.UpTo,
newLimitedHedgingRoundTripper(cfg.MaxPerSecond, next),
)
}

// RoundTripper returns a hedged roundtripper.
func (cfg *Config) RoundTripper(next http.RoundTripper) (http.RoundTripper, error) {
return cfg.RoundTripperWithRegisterer(next, prometheus.DefaultRegisterer)
}

type limitedHedgingRoundTripper struct {
next http.RoundTripper
limiter *rate.Limiter
}

func newLimitedHedgingRoundTripper(max int, next http.RoundTripper) *limitedHedgingRoundTripper {
return &limitedHedgingRoundTripper{
next: next,
limiter: rate.NewLimiter(rate.Limit(max), max),
}
}

func (rt *limitedHedgingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
if hedgedhttp.IsHedgedRequest(req) {
if !rt.limiter.Allow() {
totalRateLimitedHedgeRequests.Inc()
return nil, ErrTooManyHedgeRequests
}
totalHedgeRequests.Inc()
}
return rt.next.RoundTrip(req)
}
97 changes: 97 additions & 0 deletions hedging/hedging_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package hedging

import (
"net/http"
"strings"
"sync"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

type RoundTripperFunc func(*http.Request) (*http.Response, error)

func (fn RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
return fn(req)
}

func resetMetrics() {
once = sync.Once{}
reg := prometheus.NewRegistry()
prometheus.DefaultRegisterer = reg
prometheus.DefaultGatherer = reg
}

func TestHedging(t *testing.T) {
resetMetrics()
cfg := &Config{
At: time.Duration(1),
UpTo: 3,
MaxPerSecond: 1000,
}
count := atomic.NewInt32(0)
client, err := cfg.Client(&http.Client{
Transport: RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
count.Inc()
time.Sleep(200 * time.Millisecond)
return &http.Response{
StatusCode: http.StatusOK,
}, nil
}),
})
if err != nil {
t.Fatal(err)
}
_, _ = client.Get("http://example.com")

require.Equal(t, int32(3), count.Load())
require.NoError(t, testutil.GatherAndCompare(prometheus.DefaultGatherer,
strings.NewReader(`
# HELP hedged_requests_rate_limited_total The total number of hedged requests rejected via rate limiting.
# TYPE hedged_requests_rate_limited_total counter
hedged_requests_rate_limited_total 0
# HELP hedged_requests_total The total number of hedged requests.
# TYPE hedged_requests_total counter
hedged_requests_total 2
`,
), "hedged_requests_total", "hedged_requests_rate_limited_total"))
}

func TestHedgingRateLimit(t *testing.T) {
resetMetrics()
cfg := &Config{
At: time.Duration(1),
UpTo: 20,
MaxPerSecond: 1,
}
count := atomic.NewInt32(0)
client, err := cfg.Client(&http.Client{
Transport: RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
count.Inc()
time.Sleep(200 * time.Millisecond)
return &http.Response{
StatusCode: http.StatusOK,
}, nil
}),
})
if err != nil {
t.Fatal(err)
}
_, _ = client.Get("http://example.com")

require.Equal(t, int32(2), count.Load())
require.NoError(t, testutil.GatherAndCompare(prometheus.DefaultGatherer,
strings.NewReader(`
# HELP hedged_requests_rate_limited_total The total number of hedged requests rejected via rate limiting.
# TYPE hedged_requests_rate_limited_total counter
hedged_requests_rate_limited_total 18
# HELP hedged_requests_total The total number of hedged requests.
# TYPE hedged_requests_total counter
hedged_requests_total 1
`,
), "hedged_requests_total", "hedged_requests_rate_limited_total"))
}