Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track push inflight requests via grpc server handlers. #5976

Merged
merged 4 commits into from
Sep 11, 2023

Conversation

pstibrany
Copy link
Member

@pstibrany pstibrany commented Sep 8, 2023

What this PR does

This PR moves tracking of inflight push requests to gRPC handlers. We install 1) tap handle and 2) stats handler, and they work together to check if another push request is allowed, and track number of ongoing push requests.

Tap handle is called as soon as request headers are received, but before request body is read into memory. At that point we can check and reject request early, or accept it and increase ongoing requests. At this point no interceptor or stats handler has run yet, so if we reject request, the only record of request will be in cortex_ingester_instance_rejected_requests_total metric.

If tap handle allows the request to proceed, only then stats handlers and interceptors are called. Stats handler is also called after request has finished -- that is where we decrease number of inflight requests again.

Big benefit of this change is that we can reject requests early, before they are read into memory. This protects the ingester from crashing when it receives many push requests.

Since we increase number of ongoing requests in Tap handle, are we guaranteed that StatsHandler is called when each time when request finishes?

Tap handle is called in operateHeaders:

if t.inTapHandle != nil {
var err error
if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method}); err != nil {
t.mu.Unlock()
if t.logger.V(logLevel) {
t.logger.Infof("Aborting the stream early due to InTapHandle failure: %v", err)
}
stat, ok := status.FromError(err)
if !ok {
stat = status.New(codes.PermissionDenied, err.Error())
}
t.controlBuf.put(&earlyAbortStream{
httpStatus: 200,
streamID: s.id,
contentSubtype: s.contentSubtype,
status: stat,
rst: !frame.StreamEnded(),
})
return nil
}
}
t.activeStreams[streamID] = s
if len(t.activeStreams) == 1 {
t.idle = time.Time{}
}
t.mu.Unlock()
if channelz.IsOn() {
atomic.AddInt64(&t.czData.streamsStarted, 1)
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
}
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
}
s.ctx = traceCtx(s.ctx, s.method)
for _, sh := range t.stats {
s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
inHeader := &stats.InHeader{
FullMethod: s.method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: s.recvCompress,
WireLength: int(frame.Header().Length),
Header: mdata.Copy(),
}
sh.HandleRPC(s.ctx, inHeader)
}
s.ctxDone = s.ctx.Done()
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
s.trReader = &transportReader{
reader: &recvBufferReader{
ctx: s.ctx,
ctxDone: s.ctxDone,
recv: s.buf,
freeBuffer: t.bufferPool.put,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
},
}
// Register the stream with loopy.
t.controlBuf.put(&registerStream{
streamID: s.id,
wq: s.wq,
})
handle(s)
return nil

If request is not rejected, eventually handle function is called at the end of operateHeaders. handle is really a function passed to HandleStreams:

st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
if s.opts.numServerWorkers > 0 {
data := &serverWorkerData{st: st, wg: &wg, stream: stream}
select {
case s.serverWorkerChannel <- data:
return
default:
// If all stream workers are busy, fallback to the default code path.
}
}
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}, func(ctx context.Context, method string) context.Context {

This function will either pass request to worker, or start new goroutine. Either way, processing of the request moves to handleStream method:

func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
sm := stream.Method()
if sm != "" && sm[0] == '/' {
sm = sm[1:]
}
pos := strings.LastIndex(sm, "/")
if pos == -1 {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
trInfo.tr.SetError()
}
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
}
return
}
service := sm[:pos]
method := sm[pos+1:]
srv, knownService := s.services[service]
if knownService {
if md, ok := srv.methods[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
if sd, ok := srv.streams[method]; ok {
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
}
// Unknown service, or known server unknown method.
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
return
}
var errDesc string
if !knownService {
errDesc = fmt.Sprintf("unknown service %v", service)
} else {
errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
}
if trInfo != nil {
trInfo.tr.LazyPrintf("%s", errDesc)
trInfo.tr.SetError()
}
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
}
}

This method can return early with error, without calling stats handler in several cases:

  1. method name is malformed. This should not be a problem in our usage, as tap handle only increases inflight push requests for specific and correct method name.
  2. when service is unknown. This should also not happen in our case, since tap handle requires Mimir ingester component to be initialized, and it registers itself into gRPC server.
  3. if the method is unknown, request would be processed as streaming RCP, not unary. Our unit test covers this. (However in this case, stats handler is actually called).

Once processing moves to processUnaryRPC or processStreamingRPC, we are guaranteed that stats handlers are called when request is finished via defer statement. (Push requests are unary)

TODO:

  • make this feature optional, and revert back to previous way of tracking when new tracking is not enabled
  • update ingester unit tests (not needed for now, since this new limiting is disabled by default)
  • add integration test checking for both ways of limiting push requests

Which issue(s) this PR fixes or relates to

Related to

Checklist

  • Tests updated
  • Documentation added
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX]

Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
@pstibrany pstibrany requested a review from a team as a code owner September 8, 2023 12:55
Copy link
Contributor

@dimitarvdimitrov dimitarvdimitrov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

pkg/ingester/ingester.go Outdated Show resolved Hide resolved
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Copy link
Member

@jhalterman jhalterman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@pstibrany
Copy link
Member Author

Merging, will add integration test in separate PR.

@pstibrany pstibrany merged commit d10688b into main Sep 11, 2023
28 checks passed
@pstibrany pstibrany deleted the ing-push-requests branch September 11, 2023 10:56
@pstibrany pstibrany changed the title WIP: Track push inflight requests via grpc server handlers. Track push inflight requests via grpc server handlers. Sep 11, 2023
@pstibrany
Copy link
Member Author

pstibrany commented Sep 14, 2023

grafana/dskit#377 moves this code into dskit, and introduces global gRPC server inflight limit + a way of limiting individual methods (like we do in this PR, where only Ingester's Push method is limited).


# (experimental) Use experimental method of limiting push requests
# CLI flag: -ingester.limit-inflight-requests-using-grpc-handlers
[limit_inflight_requests_using_grpc_tap_handle: <boolean> | default = false]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YAML parameter and CLI flag don't match. Can you fix it, please?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be fixed in #6073 (draft for now)

if err := i.checkRunning(); err != nil {
return nil, util_log.DoNotLogError{Err: err}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was DoNotLogError{} removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was moved from StartPushRequest to PushWithCleanup.

When StartPushRequest is called from gRPC tap handle, no log messages will be logged anyway, because we do logging from interceptor, but interceptor will not run at all.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When StartPushRequest is called from gRPC tap handle, no log messages will be logged anyway, because we do logging from interceptor, but interceptor will not run at all.

I see. I think it's a bit obscure. Maybe we can improve the code comments to explain it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be fixed in #6073 (draft for now)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants