Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental fetch support #168

Merged

Conversation

mmaslankaprv
Copy link
Member

@mmaslankaprv mmaslankaprv commented Nov 24, 2020

Incremental fetch handing in redpanda.

Fixes #25

Checklist

When referencing a related issue, remember to migrate duplicate stories from the
external tracker. This is not relevant for most users.

@emaxerrno
Copy link
Contributor

can you rebase w/ dev

@mmaslankaprv mmaslankaprv marked this pull request as ready for review November 24, 2020 21:52
@dotnwat
Copy link
Member

dotnwat commented Nov 24, 2020

@mmaslankaprv merge conflict

@mmaslankaprv
Copy link
Member Author

@senior7515 @dotnwat rebased.

topic_partition() = default;
topic_partition(model::topic t, model::partition_id i)
: topic(std::move(t))
, partition(i) {}

topic_partition(model::topic_partition_view view) // NOLINT
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why nolint? does clang tidy want explcit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly, it is complaining about explicit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from what type are we taking implicit conversions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I think of it this conversion i.e. topic_partition_view -> topic_partition as it involves copying.

src/v/kafka/fetch_session.h Show resolved Hide resolved
src/v/kafka/fetch_session.h Outdated Show resolved Hide resolved
src/v/kafka/fetch_session.h Outdated Show resolved Hide resolved
src/v/kafka/fetch_session.h Outdated Show resolved Hide resolved
src/v/kafka/fetch_session_cache.cc Show resolved Hide resolved
src/v/kafka/fetch_session_cache.cc Outdated Show resolved Hide resolved
src/v/kafka/fetch_session_cache.cc Show resolved Hide resolved
src/v/kafka/requests/fetch_request.cc Outdated Show resolved Hide resolved
src/v/kafka/requests/fetch_request.cc Show resolved Hide resolved
+ partitions.size() * sizeof(fetch_partition);
}

underlying_t::iterator begin() { return partitions.begin(); }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use type alias at the top.

also define begin() const and end() const

see iobuf for alias examples.

@dotnwat
Copy link
Member

dotnwat commented Nov 25, 2020

clang-release ci on google cloud build

Step #6 - "test redpanda": The following tests FAILED:
Step #6 - "test redpanda": 	 73 - test_kafka_rpunit (Failed)

@mmaslankaprv mmaslankaprv force-pushed the incremental-fetch-support branch 2 times, most recently from b953d42 to ab1ffc4 Compare November 25, 2020 08:38
@mmaslankaprv mmaslankaprv force-pushed the incremental-fetch-support branch 2 times, most recently from ff998bc to 4d8b90b Compare November 25, 2020 18:56
Introduced `model::topic_partition_view` to avoid excessive
topic_partition copying.

Signed-off-by: Michal Maslanka <michal@vectorized.io>
Introduced `kafka::fetch_session` type. The `fetch_session` represents a
single session maintained in memory on server for a consumer.
Maintaining a list of partition that consumer is interested in on the
server sides allow to reduce overhead of transmitting the whole
partitions list between server and consumer every time the consumer
fetches data.

Signed-off-by: Michal Maslanka <michal@vectorized.io>
Added configuration option for controlling fetch session eviction
timeout. i.e. the time after inactive session is being evicted.

Signed-off-by: Michal Maslanka <michal@vectorized.io>
Implemented fetch session cache.

Fetch session cache is a core local cache holding incremental fetch
sessions. Each core local cache instance assigns session ids that are
unique for the node (non overlapping ranges of ids are assigned to each core).
The cache evicts not used sessions after configurable period of inactivity.
Fetch session cache will stop adding new sessions after its max
memory usage is reached.

Signed-off-by: Michal Maslanka <michal@vectorized.io>
When processing the fetch request we have to be able to access core
local fetch sessions cache. Added the sessions cache to
`kafka::request_context`.

Signed-off-by: Michal Maslanka <michal@vectorized.io>
Using the fetch sessions cache to implement incremental fetch requests
handling. Caching fetch sessions allow to reduce overhead of sending
whole list of partitions every time consumer issues the fetch request.

Signed-off-by: Michal Maslanka <michal@vectorized.io>
Signed-off-by: Michal Maslanka <michal@vectorized.io>
@mmaslankaprv mmaslankaprv merged commit ad96980 into redpanda-data:dev Nov 25, 2020
@mmaslankaprv mmaslankaprv deleted the incremental-fetch-support branch November 21, 2022 08:36
This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Incremental fetch support in Redpanda
3 participants