Skip to content

Commit

Permalink
Add /ingester/unregister-on-shutdown HTTP endpoint (#7739)
Browse files Browse the repository at this point in the history
* add initial implementation of HTTP /ingester/unregister-on-shutdown endpoint

* add method comment

* add docs

* clarify documentation

* register endpoint

* add activity tracker

* add CHANGELOG entry

* remove unnecessary error check

* make doc

* Update docs/sources/mimir/references/http-api/index.md

Co-authored-by: Jack Baldry <jack.baldry@grafana.com>

* remove duplicate /ingester in path

* replace POST method with PUT

* move prepareUnregisterBody struct declaration to PrepareUnregisterHandler method

* remove linebreaks in PrepareUnregisterHandler method

* use util.WriteJSONResponse()

* only read request body for PUT requests

* remove redundant tests

* expand comment on loop gotcha

* fix CHANGELOG number

---------

Co-authored-by: Jack Baldry <jack.baldry@grafana.com>
  • Loading branch information
LasseHels and jdbaldry committed Apr 24, 2024
1 parent 51498b2 commit 5518c5b
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [FEATURE] Continuous-test: now runable as a module with `mimir -target=continuous-test`. #7747
* [FEATURE] Store-gateway: Allow specific tenants to be enabled or disabled via `-store-gateway.enabled-tenants` or `-store-gateway.disabled-tenants` CLI flags or their corresponding YAML settings. #7653
* [FEATURE] New `-<prefix>.s3.bucket-lookup-type` flag configures lookup style type, used to access bucket in s3 compatible providers. #7684
* [FEATURE] New `/ingester/unregister-on-shutdown` HTTP endpoint allows dynamic access to ingesters' `-ingester.ring.unregister-on-shutdown` configuration. #7739
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.promql-engine=streaming`. #7693
* [FEATURE] Server: added experimental [PROXY protocol support](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt). The PROXY protocol support can be enabled via `-server.proxy-protocol-enabled=true`. When enabled, the support is added both to HTTP and gRPC listening ports. #7698
* [ENHANCEMENT] Store-gateway: merge series from different blocks concurrently. #7456
Expand Down
26 changes: 26 additions & 0 deletions docs/sources/mimir/references/http-api/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,32 @@ This endpoint unregisters the ingester from the ring even if you disable `-inges

This API endpoint is usually used by scale down automations.

### Prepare for unregister

```
GET,PUT,DELETE /ingester/unregister-on-shutdown
```

This endpoint controls whether an ingester should unregister from the ring on its next termination, that is, the next time it receives a `SIGINT` or `SIGTERM` signal.
Via this endpoint, Mimir operators can dynamically control an ingester's `-ingester.ring.unregister-on-shutdown` state without having to restart the ingester.

A `PUT` sets the ingester's unregister state. When invoked with the `PUT` method, the endpoint takes a request body:

```
{"unregister": true}
```

A `GET` returns the ingester's current unregister state.

A `DELETE` resets the ingester's unregister state to the value that was passed via the `-ingester.ring.unregister-on-shutdown`
configuration option.

Regardless of the HTTP method used, the endpoint always returns a response body with the ingester's current unregister state:

```
{"unregister": true}
```

### TSDB Metrics

```
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ type Ingester interface {
ShutdownHandler(http.ResponseWriter, *http.Request)
PrepareShutdownHandler(http.ResponseWriter, *http.Request)
PreparePartitionDownscaleHandler(http.ResponseWriter, *http.Request)
PrepareUnregisterHandler(w http.ResponseWriter, r *http.Request)
UserRegistryHandler(http.ResponseWriter, *http.Request)
TenantsHandler(http.ResponseWriter, *http.Request)
TenantTSDBHandler(http.ResponseWriter, *http.Request)
Expand All @@ -300,6 +301,7 @@ func (a *API) RegisterIngester(i Ingester) {
a.RegisterRoute("/ingester/flush", http.HandlerFunc(i.FlushHandler), false, true, "GET", "POST")
a.RegisterRoute("/ingester/prepare-shutdown", http.HandlerFunc(i.PrepareShutdownHandler), false, true, "GET", "POST", "DELETE")
a.RegisterRoute("/ingester/prepare-partition-downscale", http.HandlerFunc(i.PreparePartitionDownscaleHandler), false, true, "GET", "POST", "DELETE")
a.RegisterRoute("/ingester/unregister-on-shutdown", http.HandlerFunc(i.PrepareUnregisterHandler), false, false, "GET", "PUT", "DELETE")
a.RegisterRoute("/ingester/shutdown", http.HandlerFunc(i.ShutdownHandler), false, true, "POST")
if a.cfg.GETRequestForIngesterShutdownEnabled {
a.RegisterDeprecatedRoute("/ingester/shutdown", http.HandlerFunc(i.ShutdownHandler), false, true, "GET")
Expand Down
43 changes: 43 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package ingester

import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
Expand Down Expand Up @@ -3549,6 +3550,48 @@ func (i *Ingester) unsetPrepareShutdown() {
i.metrics.shutdownMarker.Set(0)
}

// PrepareUnregisterHandler manipulates whether an ingester will unregister from the ring on its next termination.
//
// The following methods are supported:
// - GET Returns the ingester's current unregister state.
// - PUT Sets the ingester's unregister state.
// - DELETE Resets the ingester's unregister state to the value passed via the RingConfig.UnregisterOnShutdown ring
// configuration option.
//
// All methods are idempotent.
func (i *Ingester) PrepareUnregisterHandler(w http.ResponseWriter, r *http.Request) {
if i.State() != services.Running {
w.WriteHeader(http.StatusServiceUnavailable)
return
}

type prepareUnregisterBody struct {
Unregister *bool `json:"unregister"`
}

switch r.Method {
case http.MethodPut:
dec := json.NewDecoder(r.Body)
input := prepareUnregisterBody{}
if err := dec.Decode(&input); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}

if input.Unregister == nil {
w.WriteHeader(http.StatusBadRequest)
return
}

i.lifecycler.SetUnregisterOnShutdown(*input.Unregister)
case http.MethodDelete:
i.lifecycler.SetUnregisterOnShutdown(i.cfg.IngesterRing.UnregisterOnShutdown)
}

shouldUnregister := i.lifecycler.ShouldUnregisterOnShutdown()
util.WriteJSONResponse(w, &prepareUnregisterBody{Unregister: &shouldUnregister})
}

// PreparePartitionDownscaleHandler prepares the ingester's partition downscaling. The partition owned by the
// ingester will switch to INACTIVE state (read-only).
//
Expand Down
9 changes: 9 additions & 0 deletions pkg/ingester/ingester_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,15 @@ func (i *ActivityTrackerWrapper) PreparePartitionDownscaleHandler(w http.Respons
i.ing.PreparePartitionDownscaleHandler(w, r)
}

func (i *ActivityTrackerWrapper) PrepareUnregisterHandler(w http.ResponseWriter, r *http.Request) {
ix := i.tracker.Insert(func() string {
return requestActivity(r.Context(), "Ingester/PrepareUnregisterHandler", nil)
})
defer i.tracker.Delete(ix)

i.ing.PrepareUnregisterHandler(w, r)
}

func (i *ActivityTrackerWrapper) ShutdownHandler(w http.ResponseWriter, r *http.Request) {
ix := i.tracker.Insert(func() string {
return requestActivity(r.Context(), "Ingester/ShutdownHandler", nil)
Expand Down
122 changes: 122 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10724,6 +10724,128 @@ func TestIngester_Starting(t *testing.T) {
}
}

func TestIngester_PrepareUnregisterHandler(t *testing.T) {
ctx := context.Background()

overrides, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

type testCase struct {
name string
startIngester bool
httpMethod string
requestBody io.Reader
prepare func(i *Ingester)
expectedStatusCode int
expectedResponseBody string
expectedUnregisterStatus bool
}

tests := []testCase{
{
name: "returns HTTP 503 if ingester is not running",
startIngester: false,
httpMethod: http.MethodGet,
requestBody: nil,
prepare: nil,
expectedStatusCode: http.StatusServiceUnavailable,
expectedResponseBody: "",
expectedUnregisterStatus: true,
},
{
name: "returns HTTP 400 on PUT with request body that is not valid JSON",
startIngester: true,
httpMethod: http.MethodPut,
requestBody: strings.NewReader("invalid json"),
prepare: nil,
expectedStatusCode: http.StatusBadRequest,
expectedResponseBody: "",
expectedUnregisterStatus: true,
},
{
name: "returns HTTP 400 on PUT with request body that is valid JSON but has incorrect structure",
startIngester: true,
httpMethod: http.MethodPut,
requestBody: strings.NewReader(`{"ping": "pong"}`),
prepare: nil,
expectedStatusCode: http.StatusBadRequest,
expectedResponseBody: "",
expectedUnregisterStatus: true,
},
{
name: "returns HTTP 200 and unregister status on PUT with valid request body",
startIngester: true,
httpMethod: http.MethodPut,
requestBody: strings.NewReader(`{"unregister": false}`),
prepare: nil,
expectedStatusCode: http.StatusOK,
expectedResponseBody: `{"unregister":false}`,
expectedUnregisterStatus: false,
},
{
name: "returns HTTP 200 with unregister status on GET request",
startIngester: true,
httpMethod: http.MethodGet,
requestBody: nil,
prepare: nil,
expectedStatusCode: http.StatusOK,
expectedResponseBody: `{"unregister":true}`,
expectedUnregisterStatus: true,
},
{
name: "returns HTTP 200 with unregister status on DELETE request",
startIngester: true,
httpMethod: http.MethodDelete,
requestBody: nil,
prepare: func(i *Ingester) {
i.lifecycler.SetUnregisterOnShutdown(false)
},
expectedStatusCode: http.StatusOK,
expectedResponseBody: `{"unregister":true}`,
expectedUnregisterStatus: true,
},
}

setup := func(t *testing.T, start bool, cfg Config) *Ingester {
ingester, _, _ := createTestIngesterWithIngestStorage(t, &cfg, overrides, prometheus.NewPedanticRegistry())

if start {
require.NoError(t, services.StartAndAwaitRunning(ctx, ingester))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, ingester))
})

test.Poll(t, 1*time.Second, 1, func() interface{} {
return ingester.lifecycler.HealthyInstancesCount()
})
}

return ingester
}

for _, tc := range tests {
// Avoid a common gotcha with table driven tests and t.Parallel().
// See https://gist.github.com/posener/92a55c4cd441fc5e5e85f27bca008721.
// As of writing these tests, Mimir runs on go 1.21. Once go.mod is updated to specify go 1.22, this line can
// be dropped. See https://tip.golang.org/wiki/LoopvarExperiment.
tc := tc

t.Run(tc.name, func(t *testing.T) {
t.Parallel()

ingester := setup(t, tc.startIngester, defaultIngesterTestConfig(t))
if tc.prepare != nil {
tc.prepare(ingester)
}
res := httptest.NewRecorder()
ingester.PrepareUnregisterHandler(res, httptest.NewRequest(tc.httpMethod, "/ingester/unregister-on-shutdown", tc.requestBody))
require.Equal(t, tc.expectedStatusCode, res.Code)
require.Equal(t, tc.expectedResponseBody, res.Body.String())
require.Equal(t, tc.expectedUnregisterStatus, ingester.lifecycler.ShouldUnregisterOnShutdown())
})
}
}

func setupFailingIngester(t *testing.T, cfg Config, failingCause error) *failingIngester {
// Start the first ingester. This ensures the ring will be created.
fI := newFailingIngester(t, cfg, nil, nil)
Expand Down

0 comments on commit 5518c5b

Please sign in to comment.