Skip to content

Commit

Permalink
Merge pull request #58880 from gmarek/origin/automated-cherry-pick-of…
Browse files Browse the repository at this point in the history
…-#58340-#58342-upstream-release-1.8

Automatic merge from submit-queue.

Automated cherry pick of #58340: Add apiserver metric for number of requests dropped by #58342: Add a metric to track usage of inflight request limit.

Cherry pick of #58340 #58342 on release-1.8.

#58340: Add apiserver metric for number of requests dropped by
#58342: Add a metric to track usage of inflight request limit.

```release-note
Add apiserver metric for current inflight-request usage and number of requests dropped because of inflight limit.
```
  • Loading branch information
Kubernetes Submit Queue committed Jan 29, 2018
2 parents ce87b2b + 7ae2f56 commit 8eff38d
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 5 deletions.
31 changes: 31 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,46 @@ var (
},
[]string{"verb", "resource", "subresource", "scope"},
)
// DroppedRequests is a number of requests dropped with 'Try again later' reponse"
DroppedRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "apiserver_dropped_requests",
Help: "Number of requests dropped with 'Try again later' reponse",
},
[]string{"requestKind"},
)
// Becasue of volatality of the base metric this is pre-aggregated one. Instead of reporing current usage all the time
// it reports maximal usage during the last second.
currentInflightRequests = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "apiserver_current_inflight_requests",
Help: "Maximal mumber of currently used inflight request limit of this apiserver per request kind in last second.",
},
[]string{"requestKind"},
)
kubectlExeRegexp = regexp.MustCompile(`^.*((?i:kubectl\.exe))`)
)

const (
// ReadOnlyKind is a string identifying read only request kind
ReadOnlyKind = "readOnly"
// MutatingKind is a string identifying mutating request kind
MutatingKind = "mutating"
)

// Register all metrics.
func Register() {
prometheus.MustRegister(requestCounter)
prometheus.MustRegister(requestLatencies)
prometheus.MustRegister(requestLatenciesSummary)
prometheus.MustRegister(responseSizes)
prometheus.MustRegister(DroppedRequests)
prometheus.MustRegister(currentInflightRequests)
}

func UpdateInflightRequestMetrics(nonmutating, mutating int) {
currentInflightRequests.WithLabelValues(ReadOnlyKind).Set(float64(nonmutating))
currentInflightRequests.WithLabelValues(MutatingKind).Set(float64(mutating))
}

// Monitor records a request to the apiserver endpoints that follow the Kubernetes API conventions. verb must be
Expand Down
1 change: 1 addition & 0 deletions staging/src/k8s.io/apiserver/pkg/server/filters/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
Expand Down
85 changes: 80 additions & 5 deletions staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"fmt"
"net/http"
"strings"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
Expand All @@ -31,9 +33,16 @@ import (
"github.com/golang/glog"
)

// Constant for the retry-after interval on rate limiting.
// TODO: maybe make this dynamic? or user-adjustable?
const retryAfter = "1"
const (
// Constant for the retry-after interval on rate limiting.
// TODO: maybe make this dynamic? or user-adjustable?
retryAfter = "1"

// How often inflight usage metric should be updated. Because
// the metrics tracks maximal value over period making this
// longer will increase the metric value.
inflightUsageMetricUpdatePeriod = time.Second
)

var nonMutatingRequestVerbs = sets.NewString("get", "list", "watch")

Expand All @@ -43,6 +52,49 @@ func handleError(w http.ResponseWriter, r *http.Request, err error) {
glog.Errorf(err.Error())
}

// requestWatermark is used to trak maximal usage of inflight requests.
type requestWatermark struct {
lock sync.Mutex
readOnlyWatermark, mutatingWatermark int
}

func (w *requestWatermark) recordMutating(mutatingVal int) {
w.lock.Lock()
defer w.lock.Unlock()

if w.mutatingWatermark < mutatingVal {
w.mutatingWatermark = mutatingVal
}
}

func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
w.lock.Lock()
defer w.lock.Unlock()

if w.readOnlyWatermark < readOnlyVal {
w.readOnlyWatermark = readOnlyVal
}
}

var watermark = &requestWatermark{}

func startRecordingUsage() {
go func() {
wait.Forever(func() {
watermark.lock.Lock()
readOnlyWatermark := watermark.readOnlyWatermark
mutatingWatermark := watermark.mutatingWatermark
watermark.readOnlyWatermark = 0
watermark.mutatingWatermark = 0
watermark.lock.Unlock()

metrics.UpdateInflightRequestMetrics(readOnlyWatermark, mutatingWatermark)
}, inflightUsageMetricUpdatePeriod)
}()
}

var startOnce sync.Once

// WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel.
func WithMaxInFlightLimit(
handler http.Handler,
Expand All @@ -51,6 +103,7 @@ func WithMaxInFlightLimit(
requestContextMapper genericapirequest.RequestContextMapper,
longRunningRequestCheck apirequest.LongRunningRequestCheck,
) http.Handler {
startOnce.Do(startRecordingUsage)
if nonMutatingLimit == 0 && mutatingLimit == 0 {
return handler
}
Expand Down Expand Up @@ -82,7 +135,8 @@ func WithMaxInFlightLimit(
}

var c chan bool
if !nonMutatingRequestVerbs.Has(requestInfo.Verb) {
isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb)
if isMutatingRequest {
c = mutatingChan
} else {
c = nonMutatingChan
Expand All @@ -94,10 +148,31 @@ func WithMaxInFlightLimit(

select {
case c <- true:
defer func() { <-c }()
var mutatingLen, readOnlyLen int
if isMutatingRequest {
mutatingLen = len(mutatingChan)
} else {
readOnlyLen = len(nonMutatingChan)
}

defer func() {
<-c
if isMutatingRequest {
watermark.recordMutating(mutatingLen)
} else {
watermark.recordReadOnly(readOnlyLen)
}

}()
handler.ServeHTTP(w, r)

default:
// We need to split this data between buckets used for throttling.
if isMutatingRequest {
metrics.DroppedRequests.WithLabelValues(metrics.MutatingKind).Inc()
} else {
metrics.DroppedRequests.WithLabelValues(metrics.ReadOnlyKind).Inc()
}
// at this point we're about to return a 429, BUT not all actors should be rate limited. A system:master is so powerful
// that he should always get an answer. It's a super-admin or a loopback connection.
if currUser, ok := apirequest.UserFrom(ctx); ok {
Expand Down

0 comments on commit 8eff38d

Please sign in to comment.