Skip to content

Commit

Permalink
Add query sharding documentation (#960)
Browse files Browse the repository at this point in the history
* Add query sharding documentation

* Address review comments

Co-authored-by: Karen Miller <karen.miller@grafana.com>
Signed-off-by: Christian Simon <simon@swine.de>

* Provide concrete example for shardable queries

* Remove debugging header `Sharding-Control`

* Remove outdated feature flag

* Rewrite the steps necessary to configure query-sharding

* Mention the need to raise -querier.max-query-paralleism

* Prettify markdown

* Apply suggestions from code review

Co-authored-by: Mauro Stettler <mauro.stettler@gmail.com>

* Remove comments

* Apply suggestions from code review

Co-authored-by: Marco Pracucci <marco@pracucci.com>

* Add example 3 to show flow of a query with 2 shardable portions.

* Prettifier

* Apply suggestions from code review

Co-authored-by: Marco Pracucci <marco@pracucci.com>

Co-authored-by: Karen Miller <karen.miller@grafana.com>
Co-authored-by: Mauro Stettler <mauro.stettler@gmail.com>
Co-authored-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
4 people committed Feb 28, 2022
1 parent 11e4f57 commit c6a11d3
Show file tree
Hide file tree
Showing 3 changed files with 277 additions and 0 deletions.
186 changes: 186 additions & 0 deletions docs/sources/guides/query-sharding.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
---
title: Query sharding
weight: 1100
---

# Query sharding

Mimir includes the ability to run a single query across multiple machines. This is
achieved by breaking the dataset into smaller pieces. These smaller pieces are
called shards. Each shard then gets queried in a partial query, and those
partial queries are distributed by the query-frontend to run on different
queriers in parallel. The results of those partial queries are aggregated by the
query-frontend to return the full query result.

## Query sharding at glance

Not all queries are shardable. While the full query is not shardable, the inner
parts of a query could still be shardable.

In particular associative aggregations (like `sum`, `min`, `max`, `count`,
`avg`) are shardable, while some query functions (like `absent`, `absent_over_time`,
`histogram_quantile`, `sort_desc`, `sort`) are not.

In the following examples we look at a concrete example with a shard count of
`3`. All the partial queries that include a label selector `__query_shard__`
are executed in parallel. The `concat()` annotation is used to show when partial
query results are concatenated/merged by the query-frontend.

### Example 1: Full query is shardable

```promql
sum(rate(metric[1m]))
```

Is executed as (assuming a shard count of 3):

```promql
sum(
concat(
sum(rate(metric{__query_shard__="1_of_3"}[1m]))
sum(rate(metric{__query_shard__="2_of_3"}[1m]))
sum(rate(metric{__query_shard__="3_of_3"}[1m]))
)
)
```

### Example 2: Inner part is shardable

```promql
histogram_quantile(0.99, sum by(le) (rate(metric[1m])))
```

Is executed as (assuming a shard count of 3):

```promql
histogram_quantile(0.99, sum by(le) (
concat(
sum by(le) (rate(metric{__query_shard__="1_of_3"}[1m]))
sum by(le) (rate(metric{__query_shard__="2_of_3"}[1m]))
sum by(le) (rate(metric{__query_shard__="3_of_3"}[1m]))
)
))
```

### Example 3: Query with two shardable portions

```promql
sum(rate(failed[1m])) / sum(rate(total[1m]))
```

Is executed as (assuming a shard count of 3):

```promql
sum(
concat(
sum (rate(failed{__query_shard__="1_of_3"}[1m]))
sum (rate(failed{__query_shard__="2_of_3"}[1m]))
sum (rate(failed{__query_shard__="3_of_3"}[1m]))
)
)
/
sum(
concat(
sum (rate(total{__query_shard__="1_of_3"}[1m]))
sum (rate(total{__query_shard__="2_of_3"}[1m]))
sum (rate(total{__query_shard__="3_of_3"}[1m]))
)
)
```

![Flow of a query with two shardable portions](../../images/query-sharding.png)

## How to enable query sharding

In order to enable query sharding you need to opt-in by setting
`-query-frontend.parallelize-shardable-queries` to `true`.

Each shardable portion of a query is split into
`-query-frontend.query-sharding-total-shards` partial queries. If a query has multiple
inner portions that can be sharded, each portion is sharded
`-query-frontend.query-sharding-total-shards` times. In some cases, this could lead to
an explosion of queries. For this reason, there is a parameter that allows to
modify the default hard limit of 128 queries on the total number of partial
queries a single input query can generate:
`-query-frontend.query-sharding-max-sharded-queries`.

When running a query over a large time range and
`-query-frontend.split-queries-by-interval` is enabled, the
`-query-frontend.query-sharding-max-sharded-queries` limit applies on the total
number of queries which have been split by time (first) and by shards (second).

As an example, if `-query-frontend.query-sharding-max-sharded-queries=128` and
`-query-frontend.split-queries-by-interval=24h`, and you run a query over 8 days, each
daily query will have a max of 128 / 8 days = 16 partial queries per day.

After enabling query sharding in a microservices deployment, the query
frontends will start processing the aggregation of the partial queries. Hence
it is important to configure some PromQL engine specific parameters on the
query-frontend too:

- `-querier.max-concurrent`
- `-querier.timeout`
- `-querier.max-samples`
- `-querier.default-evaluation-interval`
- `-querier.lookback-delta`

## Operational considerations

Splitting a single query into sharded queries increases the quantity of queries
that must be processed. Parallelization decreases the query processing time,
but increases the load on querier components and their underlying data stores
(ingesters for recent data and store-gateway for historic data). The
caching layer for chunks and indexes will also experience an increased load.

We also recommend to increase the maximum number of queries scheduled in
parallel by the query-frontend, multiplying the previously set value of
`-querier.max-query-parallelism` by
`-query-frontend.query-sharding-total-shards`.

## Verification

### Query statistics

The query statistics logged by the query-frontend allow to check if query sharding was
used for an individual query. The field `sharded_queries` contains the amount
of parallelly executed partial queries.

When `sharded_queries` is `0`, either the query is not shardable or query
sharding is disabled for cluster or tenant. This is a log line of an
unshardable query:

```
sharded_queries=0 param_query="absent(up{job=\"my-service\"})"
```

When `sharded_queries` matches the configured shard count, query sharding is
operational and the query has only a single leg (assuming time splitting is
disabled or the query doesn't span across multiple days). The following log
line represents that case with a shard count of `16`:

```
sharded_queries=16 query="sum(rate(prometheus_engine_queries[5m]))"
```

When `sharded_queries` is a multiple of the configured shard count, query
sharding is operational and the query has multiple legs (assuming time
splitting is disabled or the query doesn't span across multiple days). The
following log line shows a query with two legs and with a configured shard
count of `16`:

```
sharded_queries=32 query="sum(rate(prometheus_engine_queries{engine=\"ruler\"}[5m]))/sum(rate(prometheus_engine_queries[5m]))"
```

The query-frontend also exposes metrics, which can be useful to understand the
query workload's parallelism as a whole.

You can run the following query to get the ratio of queries which have been successfully sharded:

```promql
sum(rate(cortex_frontend_query_sharding_rewrites_succeeded_total[$__rate_interval])) /
sum(rate(cortex_frontend_query_sharding_rewrites_attempted_total[$__rate_interval]))
```

The histogram `cortex_frontend_sharded_queries_per_query` allows to understand
how many sharded sub queries are generated per query.
Binary file added docs/sources/images/query-sharding.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
91 changes: 91 additions & 0 deletions docs/sources/images/query-sharding.tex
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
% This can be compiled using:
% $ pdflatex -interaction=nonstopmode query-sharding.tex
% $ convert -density 300 query-sharding.pdf -quality 90 query-sharding.png
%
% Or by using an online service like https://www.overleaf.com/

\documentclass[border=10pt]{standalone}
\usepackage{tikz}
\usepackage{xfrac}
\usetikzlibrary{arrows.meta}
\tikzset{%
>={Latex[width=2mm,length=2mm]},
% Specifications for style of nodes:
base/.style = {rectangle, rounded corners, draw=black, minimum width=3cm, minimum height=1cm, text centered, font=\footnotesize\ttfamily},
userQuery/.style = {base, fill=blue!15, align=left, minimum width=9.4cm},
transparent/.style = {fill opacity=0, text opacity= 1},
story/.style = {transparent, text width=4cm},
storyShift/.style = {story, xshift=-6cm},
startend/.style = {base, fill=orange!15, node distance=2.9cm, minimum width=3.5cm, align=left},
query/.style = {startend, font=\footnotesize\ttfamily, node distance=4cm},
shard/.style = {base, fill=green!15, node distance = 1.2cm and 10cm, minimum width=0.4cm, minimum height = 0.4cm},
elemx/.style = {base, fill=blue!15, align=left, minimum width=9.4cm, font=\small\rmfamily},
}

\pgfdeclarelayer{background}
\pgfdeclarelayer{foreground}
\pgfsetlayers{background,main,foreground}

\begin{document}
\begin{tikzpicture}[node distance=1.5cm]
% Specification of nodes (position, etc.)
\node (user) [userQuery] {query = sum(rate(failed[1m])) / sum(rate(total[1m])) \\ start = today - 7d \\ end = today};
\node (userStory) [storyShift, left of=user] {User queries over 7 days};
\node (timesplitdots) [below of=user, yshift = -1cm, transparent] {\dots};
\node (timesplit1) [startend, left of=timesplitdots] {start = today - 7d \\ end = today - 6d};
\node (timesplitn) [startend, right of=timesplitdots] {start = today - 1d \\ end = today};
\node (timesplitStory) [storyShift, left of=timesplitdots] {Queries are split into 24 hour pieces \texttt{split\_by\_interval=24h}};
\node (shardingLeg1) [query, below of=timesplit1, yshift=2.2cm] {sum(rate(failed[1m]))};
\node (shardingLeg1M) [shard, below of=shardingLeg1] {\sfrac{2}{3}};
\node (shardingLeg1L) [shard, left of=shardingLeg1M] {\sfrac{1}{3}};
\node (shardingLeg1R) [shard, right of=shardingLeg1M] {\sfrac{3}{3}};
\node (shardingLeg2) [query, right of=shardingLeg1] {sum(rate(total[1m]))};
\node (shardingLeg2M) [shard, below of=shardingLeg2] {\sfrac{2}{3}};
\node (shardingLeg2L) [shard, left of=shardingLeg2M] {\sfrac{1}{3}};
\node (shardingLeg2R) [shard, right of=shardingLeg2M] {\sfrac{3}{3}};
\node (legStory) [story, below of=timesplitStory, yshift=-0.3cm]{Query is split into different legs};
\node (shardingStory) [story, below of=legStory] {Each leg generates a query for each shard \texttt{total\_shards=3}};
\node (process) [elemx,below of=timesplitdots,yshift=-3cm] {Process queries};
\node (aggregate) [elemx,below of=process] {Aggregate results};

% Gray box around query process
\begin{pgfonlayer}{background}
\path (aggregate.west |- timesplit1.north) node (qfa) {};

\path (aggregate.west |- timesplit1.north)+(0,0.2cm) node (qfa) {};
\path (aggregate.east) node (qfc) {};

\path[fill=black!10,rounded corners]
(qfa) rectangle (qfc);

\end{pgfonlayer}

% Specification of lines between nodes specified above
\draw[->] (user) -- (user.south |- qfa);
\draw[->] (timesplit1.south) -- +(0,-0.3cm) -| (shardingLeg2.north);
\draw[->] (timesplit1.south) -- (timesplit1.south) -- (shardingLeg1.north);
\draw[->] (shardingLeg1M.south|-shardingLeg1.south) -- (shardingLeg1M);
\draw[->] (shardingLeg1L.south|-shardingLeg1.south) -- (shardingLeg1L);
\draw[->] (shardingLeg1R.south|-shardingLeg1.south) -- (shardingLeg1R);
\draw[->] (shardingLeg2M.south|-shardingLeg2.south) -- (shardingLeg2M);
\draw[->] (shardingLeg2L.south|-shardingLeg2.south) -- (shardingLeg2L);
\draw[->] (shardingLeg2R.south|-shardingLeg2.south) -- (shardingLeg2R);

\draw[->] (shardingLeg1M.south) -- (shardingLeg1M|-process.north);
\draw[->] (shardingLeg1L.south) -- (shardingLeg1L|-process.north);
\draw[->] (shardingLeg1R.south) -- (shardingLeg1R|-process.north);
\draw[->] (shardingLeg2M.south) -- (shardingLeg2M|-process.north);
\draw[->] (shardingLeg2L.south) -- (shardingLeg2L|-process.north);
\draw[->] (shardingLeg2R.south) -- (shardingLeg2R|-process.north);

\draw[->] (shardingLeg1M|-process.south) -- (shardingLeg1M|-aggregate.north);
\draw[->] (shardingLeg1L|-process.south) -- (shardingLeg1L|-aggregate.north);
\draw[->] (shardingLeg1R|-process.south) -- (shardingLeg1R|-aggregate.north);
\draw[->] (shardingLeg2M|-process.south) -- (shardingLeg2M|-aggregate.north);
\draw[->] (shardingLeg2L|-process.south) -- (shardingLeg2L|-aggregate.north);
\draw[->] (shardingLeg2R|-process.south) -- (shardingLeg2R|-aggregate.north);

\draw[->] (timesplitn.south) -- ++(0,-0.3cm) -| ([xshift=1.2cm]shardingLeg2R |- process.north) node(processTimesplitn);
\draw[->] (processTimesplitn|-process.south) -- (processTimesplitn|-aggregate.north);
\end{tikzpicture}
\end{document}

0 comments on commit c6a11d3

Please sign in to comment.