Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add query sharding documentation #960

Merged
merged 14 commits into from
Feb 24, 2022
Merged
142 changes: 142 additions & 0 deletions docs/sources/guides/query-sharding.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
---
title: Query sharding
weight: 1100
---

# Query sharding

<span style="background-color:#f3f973;">Query sharding is an experimental feature.</span>
simonswine marked this conversation as resolved.
Show resolved Hide resolved

Mimir includes the ability to process range queries in parallel. This is
simonswine marked this conversation as resolved.
Show resolved Hide resolved
achieved by breaking the dataset into smaller pieces. These smaller pieces are
called shards. Each shard becomes its own query, and the shards run in
parallel. The results from the shard queries are aggregated to become the full
query result.

## Shardable query requirements
simonswine marked this conversation as resolved.
Show resolved Hide resolved

[//]: <> (The conditions were derived from https://github.com/grafana/mimir/blob/cad5243915a739e026ba3352ce9b7bdff3de97d6/pkg/frontend/querymiddleware/astmapper/parallel.go)

These conditions must be met for the query to be shardable:

- The query is a range query.
- All binary expressions contain a scalar value on one of the sides.
simonswine marked this conversation as resolved.
Show resolved Hide resolved

[//]: <> (List of functions should be kept in sync with https://github.com/grafana/mimir/blob/cad5243915a739e026ba3352ce9b7bdff3de97d6/pkg/frontend/querymiddleware/astmapper/parallel.go#L24-L32)

- The query does not invoke any of the query functions:
- `absent`
- `absent_over_time`
- `histogram_quantile`
- `sort_desc`
- `sort`

[//]: <> (List of functions should be kept in sync with https://github.com/grafana/mimir/blob/cad5243915a739e026ba3352ce9b7bdff3de97d6/pkg/frontend/querymiddleware/astmapper/parallel.go#L16-L22)

- The only aggregations used are:

- `sum`
- `min`
- `max`
- `count`
- `avg`

- The query does not contain nested aggregations.

## Query-related configuration

Configure these options for query sharding:
simonswine marked this conversation as resolved.
Show resolved Hide resolved

- Set the command-line flag
`-query-frontend.parallelize-shardable-queries=true` or set the query
frontend configuration parameter `parallelize-shardable-queries` to true.

- Set the shard count for each query to be an integer greater than two. The
shard count for a query is set by the first item set in this ordered list:

- The HTTP header `Sharding-Control` specified as part of the query request
simonswine marked this conversation as resolved.
Show resolved Hide resolved

- The tenant override value for the limit
`query_sharding_total_shards`

- The value of the command-line configuration flag
`-frontend.query-sharding-total-shards`

- For a microservices deployment, set the query frontend configuration to the
same values as are in their equivalent querier configuration command-line
flags:

- -querier.max-concurrent
- -querier.timeout
- -querier.max-samples
- -querier.at-modifier-enabled
simonswine marked this conversation as resolved.
Show resolved Hide resolved
- -querier.default-evaluation-interval
- -querier.active-query-tracker-dir
- -querier.lookback-delta

## Flow of a sharded query

![Flow of a query with query sharding](../../images/query-sharding.png)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of including it here all alone (poor diagram 😉 ) we could add a 3rd example above with the same query in the diagram. I think it's a good query because it shows the case of 2 shardable portions.


[//]: <> (TODO: Line out how `-frontend.query-sharding-max-sharded-queries` related to all of this)

[//]: <> (TODO: Finish an overview, which shows how the query-frontend splits queries to explain)
[Mermaid Graph]:https://mermaid.live/edit/#eyJjb2RlIjoiZmxvd2NoYXJ0IExSXG4gIHN1YmdyYXBoIFFGW1F1ZXJ5LUZyb250ZW5kXVxuICAgIGRpcmVjdGlvbiBMUlxuICAgIHN1YmdyYXBoIFRTXG4gICAgICAgIGRpcmVjdGlvbiBSTFxuICAgICAgICBUUzFbc3RhcnQ9dHMgZW5kIHQ9bl1cbiAgICAgICAgVFMyW3N0YXJ0PXRuIGVuZCB0PW4rMV1cbiAgICAgICAgVFNuWy4uLi5dXG4gICAgZW5kXG4gICAgc3ViZ3JhcGggUVNbU3BsaXR0aW5nIHF1ZXJpZXMgXFxuIGludG8gMyBxdWVyeSBzaGFyZHNdXG4gICAgICAgIGRpcmVjdGlvbiBSTFxuICAgICAgICBRUzFhWzEvM11cbiAgICAgICAgUVMxYlsyLzNdXG4gICAgICAgIFFTMWNbMy8zXVxuICAgICAgICBRUzJhWzEvM11cbiAgICAgICAgUVMyYlsyLzNdXG4gICAgICAgIFFTMmNbMy8zXVxuICAgICAgICBRU25hWzEvM11cbiAgICAgICAgUVNuYlsyLzNdXG4gICAgICAgIFFTbmNbMy8zXVxuICAgIGVuZFxuICBlbmRcbiAgVFMxIC0tPiBRUzFhXG4gIFRTMSAtLT4gUVMxYlxuICBUUzEgLS0-IFFTMWNcbiAgVFMyIC0tPiBRUzJhXG4gIFRTMiAtLT4gUVMyYlxuICBUUzIgLS0-IFFTMmNcbiAgVFNuIC0tPiBRU25hXG4gIFRTbiAtLT4gUVNuYlxuICBUU24gLS0-IFFTbmNcbiAgVVtVc2VyXSAtLT4gUUZcbiIsIm1lcm1haWQiOiJ7XG4gIFwidGhlbWVcIjogXCJkZWZhdWx0XCJcbn0iLCJ1cGRhdGVFZGl0b3IiOnRydWUsImF1dG9TeW5jIjp0cnVlLCJ1cGRhdGVEaWFncmFtIjp0cnVlfQ

[//]: <> (TODO query_range.split_queries_by_interval: "24h")

## Operational considerations
simonswine marked this conversation as resolved.
Show resolved Hide resolved

Splitting a single query into sharded queries increases the quantity of queries
that must be processed. Parallelization decreases the query processing time
latency, but increases the load on querier components and their underlying data
simonswine marked this conversation as resolved.
Show resolved Hide resolved
stores (ingesters for recent data and store-gateway for historic data). The
caching layer for chunks and indexes will also experience an increased load.

## Verification

### Query statistics

The query statistics of the query-frontend allow to check if query sharding was
simonswine marked this conversation as resolved.
Show resolved Hide resolved
used for an individual query. The field `sharded_queries` contains the amount
of parallelly executed sub-queries.
simonswine marked this conversation as resolved.
Show resolved Hide resolved

When `sharded_queries` is `0`, either the query is not shardable or query
sharding is disabled for cluster or tenant. This is a log line of an
unshardable query:

```
sharded_queries=0 param_query="absent(up{job=\"my-service\"})"
```

When `sharded_queries` matches the configured shard count, query sharding is
operational and the query has only a single leg (assuming time splitting is
disabled). The following log line represents that case with a shard count of
simonswine marked this conversation as resolved.
Show resolved Hide resolved
`16`:

```
sharded_queries=16 query="sum(rate(prometheus_engine_queries[5m]))"
```

When `sharded_queries` is a multiple of the configured shard count, query
sharding is operational and the query has only a multiple legs (assuming time
simonswine marked this conversation as resolved.
Show resolved Hide resolved
splitting is disabled). The following log line shows a query with two legs and
with a configured shard count of `16`:

```
sharded_queries=32 query="sum(rate(prometheus_engine_queries{engine=\"ruler\"}[5m]))/sum(rate(prometheus_engine_queries[5m]))"
```

The query-frontend also exposes metrics, which can be useful to understand the
query workload's parallelism as a whole.

To get the ratio of queries which are shardable:
queries the following PromQL query can be used. A value of 1.0 would mean all queries are shardable wh:
simonswine marked this conversation as resolved.
Show resolved Hide resolved

```promql
sum(rate(cortex_frontend_query_sharding_rewrites_succeeded_total[$__rate_interval])) /
sum(rate(cortex_frontend_query_sharding_rewrites_attempted_total[$__rate_interval]))
```

The histogram `cortex_frontend_sharded_queries_per_query` allows to understand
how many sharded sub queries are generated per query.
Binary file added docs/sources/images/query-sharding.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
91 changes: 91 additions & 0 deletions docs/sources/images/query-sharding.tex
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
% This can be compiled using:
% $ pdflatex -interaction=nonstopmode query-sharding.tex
% $ convert -density 300 query-sharding.pdf -quality 90 query-sharding.png
%
% Or by using an online service like https://www.overleaf.com/

\documentclass[border=10pt]{standalone}
\usepackage{tikz}
\usepackage{xfrac}
\usetikzlibrary{arrows.meta}
\tikzset{%
>={Latex[width=2mm,length=2mm]},
% Specifications for style of nodes:
base/.style = {rectangle, rounded corners, draw=black, minimum width=3cm, minimum height=1cm, text centered, font=\footnotesize\ttfamily},
userQuery/.style = {base, fill=blue!15, align=left, minimum width=9.4cm},
transparent/.style = {fill opacity=0, text opacity= 1},
story/.style = {transparent, text width=4cm},
storyShift/.style = {story, xshift=-6cm},
startend/.style = {base, fill=orange!15, node distance=2.9cm, minimum width=3.5cm, align=left},
query/.style = {startend, font=\footnotesize\ttfamily, node distance=4cm},
shard/.style = {base, fill=green!15, node distance = 1.2cm and 10cm, minimum width=0.4cm, minimum height = 0.4cm},
elemx/.style = {base, fill=blue!15, align=left, minimum width=9.4cm, font=\small\rmfamily},
}

\pgfdeclarelayer{background}
\pgfdeclarelayer{foreground}
\pgfsetlayers{background,main,foreground}

\begin{document}
\begin{tikzpicture}[node distance=1.5cm]
% Specification of nodes (position, etc.)
\node (user) [userQuery] {query = sum(rate(failed[5m])) / sum(rate(total[5m])) \\ start = today - 7d \\ end = today};
\node (userStory) [storyShift, left of=user] {User queries over 7 days};
\node (timesplitdots) [below of=user, yshift = -1cm, transparent] {\dots};
\node (timesplit1) [startend, left of=timesplitdots] {start = today - 7d \\ end = today - 6d};
\node (timesplitn) [startend, right of=timesplitdots] {start = today - 1d \\ end = today};
\node (timesplitStory) [storyShift, left of=timesplitdots] {Queries are split into 24 hour pieces \texttt{split\_by\_interval=24h}};
\node (shardingLeg1) [query, below of=timesplit1, yshift=2.2cm] {sum(rate(failed[5m]))};
\node (shardingLeg1M) [shard, below of=shardingLeg1] {\sfrac{2}{3}};
\node (shardingLeg1L) [shard, left of=shardingLeg1M] {\sfrac{1}{3}};
\node (shardingLeg1R) [shard, right of=shardingLeg1M] {\sfrac{3}{3}};
\node (shardingLeg2) [query, right of=shardingLeg1] {sum(rate(total[5m]))};
\node (shardingLeg2M) [shard, below of=shardingLeg2] {\sfrac{2}{3}};
\node (shardingLeg2L) [shard, left of=shardingLeg2M] {\sfrac{1}{3}};
\node (shardingLeg2R) [shard, right of=shardingLeg2M] {\sfrac{3}{3}};
\node (legStory) [story, below of=timesplitStory, yshift=-0.3cm]{Query is splitted into different legs};
simonswine marked this conversation as resolved.
Show resolved Hide resolved
\node (shardingStory) [story, below of=legStory] {Each leg generates a query for each shard \texttt{total\_shards=3}};
\node (process) [elemx,below of=timesplitdots,yshift=-3cm] {Process queries};
\node (aggregate) [elemx,below of=process] {Aggregate result};
simonswine marked this conversation as resolved.
Show resolved Hide resolved

% Gray box around query process
\begin{pgfonlayer}{background}
\path (aggregate.west |- timesplit1.north) node (qfa) {};

\path (aggregate.west |- timesplit1.north)+(0,0.2cm) node (qfa) {};
\path (aggregate.east) node (qfc) {};

\path[fill=black!10,rounded corners]
(qfa) rectangle (qfc);

\end{pgfonlayer}

% Specification of lines between nodes specified above
\draw[->] (user) -- (user.south |- qfa);
\draw[->] (timesplit1.south) -- +(0,-0.3cm) -| (shardingLeg2.north);
\draw[->] (timesplit1.south) -- (timesplit1.south) -- (shardingLeg1.north);
\draw[->] (shardingLeg1M.south|-shardingLeg1.south) -- (shardingLeg1M);
\draw[->] (shardingLeg1L.south|-shardingLeg1.south) -- (shardingLeg1L);
\draw[->] (shardingLeg1R.south|-shardingLeg1.south) -- (shardingLeg1R);
\draw[->] (shardingLeg2M.south|-shardingLeg2.south) -- (shardingLeg2M);
\draw[->] (shardingLeg2L.south|-shardingLeg2.south) -- (shardingLeg2L);
\draw[->] (shardingLeg2R.south|-shardingLeg2.south) -- (shardingLeg2R);

\draw[->] (shardingLeg1M.south) -- (shardingLeg1M|-process.north);
\draw[->] (shardingLeg1L.south) -- (shardingLeg1L|-process.north);
\draw[->] (shardingLeg1R.south) -- (shardingLeg1R|-process.north);
\draw[->] (shardingLeg2M.south) -- (shardingLeg2M|-process.north);
\draw[->] (shardingLeg2L.south) -- (shardingLeg2L|-process.north);
\draw[->] (shardingLeg2R.south) -- (shardingLeg2R|-process.north);

\draw[->] (shardingLeg1M|-process.south) -- (shardingLeg1M|-aggregate.north);
\draw[->] (shardingLeg1L|-process.south) -- (shardingLeg1L|-aggregate.north);
\draw[->] (shardingLeg1R|-process.south) -- (shardingLeg1R|-aggregate.north);
\draw[->] (shardingLeg2M|-process.south) -- (shardingLeg2M|-aggregate.north);
\draw[->] (shardingLeg2L|-process.south) -- (shardingLeg2L|-aggregate.north);
\draw[->] (shardingLeg2R|-process.south) -- (shardingLeg2R|-aggregate.north);

\draw[->] (timesplitn.south) -- ++(0,-0.3cm) -| ([xshift=1.2cm]shardingLeg2R |- process.north) node(processTimesplitn);
\draw[->] (processTimesplitn|-process.south) -- (processTimesplitn|-aggregate.north);
\end{tikzpicture}
\end{document}