Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add users stats http api to ingester #6178

Merged
merged 5 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [ENHANCEMENT] Ruler: Add new ruler metric `cortex_ruler_rule_groups_in_store` that is the total rule groups per tenant in store, which can be used to compare with `cortex_prometheus_rule_group_rules` to count the number of rule groups that are not loaded by a ruler. #5869
* [ENHANCEMENT] Ingester/Ring: New `READONLY` status on ring to be used by Ingester. New ingester API to change mode of ingester #6163
* [ENHANCEMENT] Ruler: Add query statistics metrics when --ruler.query-stats-enabled=true. #6173
* [ENHANCEMENT] Ingester: Add new API `/ingester/all_user_stats` which shows loaded blocks, active timeseries and ingestion rate for a specific ingester. #6178
* [ENHANCEMENT] Distributor: Add new `cortex_reduced_resolution_histogram_samples_total` metric to to track the number of histogram samples which resolution was reduced. #6182

## 1.18.0 in progress
Expand Down
12 changes: 11 additions & 1 deletion docs/api/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ For the sake of clarity, in this document we have grouped API endpoints by servi
| [Flush blocks](#flush-blocks) | Ingester || `GET,POST /ingester/flush` |
| [Shutdown](#shutdown) | Ingester || `GET,POST /ingester/shutdown` |
| [Ingesters ring status](#ingesters-ring-status) | Ingester || `GET /ingester/ring` |
| [Ingester tenants stats](#ingester-tenants-stats) | Ingester || `GET /ingester/all_user_stats` |
| [Ingester mode](#ingester-mode) | Ingester || `GET,POST /ingester/mode` |
| [Instant query](#instant-query) | Querier, Query-frontend || `GET,POST <prometheus-http-prefix>/api/v1/query` |
| [Range query](#range-query) | Querier, Query-frontend || `GET,POST <prometheus-http-prefix>/api/v1/query_range` |
Expand Down Expand Up @@ -242,7 +243,7 @@ GET /distributor/all_user_stats
GET /all_user_stats
```

Displays a web page with per-tenant statistics updated in realtime, including the total number of active series across all ingesters and the current ingestion rate (samples / sec).
Displays a web page with per-tenant statistics updated in realtime, including the total number of loaded blocks and active series across all ingesters as well as the current ingestion rate (samples / sec).

### HA tracker status

Expand Down Expand Up @@ -297,6 +298,15 @@ GET /ring

Displays a web page with the ingesters hash ring status, including the state, healthy and last heartbeat time of each ingester.

### Ingester tenants stats

```
GET /ingester/all_user_stats

```

Displays a web page with per-tenant statistics updated in realtime, including the total number of loaded blocks and active series from a specific ingester as well as the current ingestion rate (samples / sec).

### Ingester mode

```
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ type Ingester interface {
FlushHandler(http.ResponseWriter, *http.Request)
ShutdownHandler(http.ResponseWriter, *http.Request)
RenewTokenHandler(http.ResponseWriter, *http.Request)
AllUserStatsHandler(http.ResponseWriter, *http.Request)
ModeHandler(http.ResponseWriter, *http.Request)
Push(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
}
Expand All @@ -292,6 +293,8 @@ type Ingester interface {
func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
client.RegisterIngesterServer(a.server.GRPC, i)

a.indexPage.AddLink(SectionAdminEndpoints, "/ingester/all_user_stats", "Usage Statistics")

a.indexPage.AddLink(SectionDangerous, "/ingester/flush", "Trigger a Flush of data from Ingester to storage")
a.indexPage.AddLink(SectionDangerous, "/ingester/shutdown", "Trigger Ingester Shutdown (Dangerous)")
a.indexPage.AddLink(SectionDangerous, "/ingester/renewTokens", "Renew Ingester Tokens (10%)")
Expand All @@ -300,6 +303,7 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
a.RegisterRoute("/ingester/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET")
danielblando marked this conversation as resolved.
Show resolved Hide resolved
a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.

Expand Down
34 changes: 21 additions & 13 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/ha"
"github.com/cortexproject/cortex/pkg/ingester"
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
ring_client "github.com/cortexproject/cortex/pkg/ring/client"
Expand Down Expand Up @@ -1319,7 +1320,7 @@ func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetad
}

// UserStats returns statistics about the current user.
func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) {
func (d *Distributor) UserStats(ctx context.Context) (*ingester.UserStats, error) {
replicationSet, err := d.GetIngestersForMetadata(ctx)
if err != nil {
return nil, err
Expand All @@ -1336,7 +1337,7 @@ func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) {
return nil, err
}

totalStats := &UserStats{}
totalStats := &ingester.UserStats{}
for _, resp := range resps {
r := resp.(*ingester_client.UserStatsResponse)
totalStats.IngestionRate += r.IngestionRate
Expand All @@ -1354,17 +1355,11 @@ func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) {
return totalStats, nil
}

// UserIDStats models ingestion statistics for one user, including the user ID
type UserIDStats struct {
UserID string `json:"userID"`
UserStats
}

// AllUserStats returns statistics about all users.
// Note it does not divide by the ReplicationFactor like UserStats()
func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) {
func (d *Distributor) AllUserStats(ctx context.Context) ([]ingester.UserIDStats, error) {
// Add up by user, across all responses from ingesters
perUserTotals := make(map[string]UserStats)
perUserTotals := make(map[string]ingester.UserStats)

req := &ingester_client.UserStatsRequest{}
ctx = user.InjectOrgID(ctx, "1") // fake: ingester insists on having an org ID
Expand All @@ -1389,28 +1384,41 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) {
s.RuleIngestionRate += u.Data.RuleIngestionRate
s.NumSeries += u.Data.NumSeries
s.ActiveSeries += u.Data.ActiveSeries
s.LoadedBlocks += u.Data.LoadedBlocks
perUserTotals[u.UserId] = s
}
}

// Turn aggregated map into a slice for return
response := make([]UserIDStats, 0, len(perUserTotals))
response := make([]ingester.UserIDStats, 0, len(perUserTotals))
for id, stats := range perUserTotals {
response = append(response, UserIDStats{
response = append(response, ingester.UserIDStats{
UserID: id,
UserStats: UserStats{
UserStats: ingester.UserStats{
IngestionRate: stats.IngestionRate,
APIIngestionRate: stats.APIIngestionRate,
RuleIngestionRate: stats.RuleIngestionRate,
NumSeries: stats.NumSeries,
ActiveSeries: stats.ActiveSeries,
LoadedBlocks: stats.LoadedBlocks,
},
})
}

return response, nil
}

// AllUserStatsHandler shows stats for all users.
func (d *Distributor) AllUserStatsHandler(w http.ResponseWriter, r *http.Request) {
stats, err := d.AllUserStats(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

ingester.AllUserStatsRender(w, r, stats, d.ingestersRing.ReplicationFactor())
}

func (d *Distributor) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if d.distributorsRing != nil {
d.distributorsRing.ServeHTTP(w, req)
Expand Down
9 changes: 0 additions & 9 deletions pkg/distributor/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,6 @@ import (
"github.com/cortexproject/cortex/pkg/util"
)

// UserStats models ingestion statistics for one user.
type UserStats struct {
IngestionRate float64 `json:"ingestionRate"`
NumSeries uint64 `json:"numSeries"`
APIIngestionRate float64 `json:"APIIngestionRate"`
RuleIngestionRate float64 `json:"RuleIngestionRate"`
ActiveSeries uint64 `json:"activeSeries"`
}

// UserStatsHandler handles user stats to the Distributor.
func (d *Distributor) UserStatsHandler(w http.ResponseWriter, r *http.Request) {
stats, err := d.UserStats(r.Context())
Expand Down
Loading
Loading