Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runtime: Add WatchStreamExt::subscribe #1131

Merged
merged 15 commits into from
Feb 22, 2023
Merged

Conversation

danrspencer
Copy link
Contributor

Motivation

It may be desirable to have subscribe to the events from a watcher without taking ownership of the stream itself. For example this allows a controller to be created via the applier without consuming the original watcher stream.

Solution

Added an extension method in watch_ext which produces a stream_subscriber. Success events (T must implement clone, so we can't send errors to subscribers) are propagated to all subscribers as the original stream is consumed.

@danrspencer danrspencer marked this pull request as ready for review February 2, 2023 11:13
@nightkr
Copy link
Member

nightkr commented Feb 2, 2023

Sadly, I'm not sure this is a good fit for us, for a couple of reasons:

  • This doesn't inherently have anything to do with Kubernetes, it could be a generic stream adapter
  • This implementation involves a few footguns:

As for the clone issue, I think I'd generally rather Arc-wrap the messages than drop errors (or demand that users do so if required). That should also help efficiency in the fast case since K8s objects tend to be relatively costly to clone (lots of small strings and vecs that need to be allocated separately).

@danrspencer
Copy link
Contributor Author

@nightkr see #1128 (comment) for some context on this PR

@danrspencer
Copy link
Contributor Author

danrspencer commented Feb 2, 2023

To address the other comments more specifically:

  • Wrapping the stream items in an Arc would change the signature of the original stream. I'm not sure if that's problematic but it would stop anything else that expects to be able to take an Stream<Item = Result<watcher::Event<K>, watcher::Error>> from using the new stream. Wrapping the item before pushing it into the channel to prevent >1 clone of the Kubernetes item seems reasonable though.
  • A Store will remain always empty if its stream is never polled, I'm unsure how the subscription not producing items is more of a foot gun than that?
  • The lagging concern seems reasonable; I'm not sure of the best way to solve it. Calling stream.subscribable() could return a wrapper over the original stream and a single subscription stream in a tuple since the primary uses cases I see would be for a very small number of subscribers. The downside of that approach is each additional stream is another layer on top of the original stream.

@nightkr
Copy link
Member

nightkr commented Feb 2, 2023

I'm sorry for the tone in my original reply. I don't want to shut this down, it's just a complicated topic that I'm scared of getting wrong.

I'll go over the other PR to try to understand the use case better.

Wrapping the stream items in an Arc would change the signature of the original stream. I'm not sure if that's problematic but it would stop anything else that expects to be able to take an Stream<Item = Result<watcher::Event, watcher::Error>> from using the new stream. Wrapping the item before pushing it into the channel to prevent >1 clone of the Kubernetes item seems reasonable though.

Yeah, that's a concern. In theory most consumers "should" be happy to take anything that can be derefed into the correct types, but that's kind of a pain to model properly in the type system.

A Store will remain always empty if its stream is never polled, I'm unsure how the subscription not producing items is more of a foot gun than that?

I'd generally expect (when dealing with async stuff) that a sync function operates on the current state, while an async function (or stream) would try to make progress. Implicitly waiting for another external thing to make said progress exposes the user to deadlocks which are a pain to debug.

We might be able to get around that by having the poller try to "steal" the backing stream, but I suspect that would cause a lot of thrashing.

The lagging concern seems reasonable; I'm not sure of the best way to solve it. Calling stream.subscribable() could return a wrapper over the original stream and a single subscription stream in a tuple since the primary uses cases I see would be for a very small number of subscribers. The downside of that approach is each additional stream is another layer on top of the original stream.

I think that would still cause the same issue, you still have a "leader" stream and a "follower" stream, and you need to decide what happens when one consumer is making progress faster than the other.

The other way to handle this would be to pause the faster consumers to let the slower ones catch up. But this also means that we're back to worrying about even more deadlocks.


Maybe I'm overblowing the deadlock issue itself, and it'd be more pressing to make sure we have proper diagnostics for it when we suspect it. I don't know.

@nightkr
Copy link
Member

nightkr commented Feb 2, 2023

Hm, as far as I can tell, subscribe_ok() is "just" propagated as subscribe_all()/subscribe_namespaced() in that PR, I'd be curious to understand how these are intended to be used in turn.

@danrspencer
Copy link
Contributor Author

danrspencer commented Feb 2, 2023

Hm, as far as I can tell, subscribe_ok() is "just" propagated as subscribe_all()/subscribe_namespaced() in that PR, I'd be curious to understand how these are intended to be used in turn.

If you go back to the first commits on the PR I had an implementation which allowed a controller to run from the same stream of events as a store. I encountered the problem that the applier essentially ate the watcher stream, so you lost the original stream if you wanted to add logging or anything to it as it was consumed. I dropped that part of the PR for now because there was a lot going on in one PR and it seemed like it could be tackled in a few separate parts.

The current subscribe_all and subscribe_namespaced are more brain storming that concrete suggestions on how a controller could be setup from a SharedStore to solve the split brain problem that can (hypothetically) currently occur.

It's entirely possible I'm going down the wrong path here and the correct solution is to stop the applier eating the watcher stream, or let it eat the stream and accept you can only have a single controller per stream and that you lose access to the watch stream when it's setup.

@danrspencer
Copy link
Contributor Author

I'd generally expect (when dealing with async stuff) that a sync function operates on the current state, while an async function (or stream) would try to make progress. Implicitly waiting for another external thing to make said progress exposes the user to deadlocks which are a pain to debug.

I think if it were an async function that would just deadlock I'd agree, but for a stream it feels entirely reasonable for it to just poll pending if there are no events on it yet. If Kubernetes has supplied no events then the watcher itself will not progress.

@nightkr
Copy link
Member

nightkr commented Feb 2, 2023

I think if it were an async function that would just deadlock I'd agree, but for a stream it feels entirely reasonable for it to just poll pending if there are no events on it yet. If Kubernetes has supplied no events then the watcher itself will not progress.

Rust streams are pulled, not pushed, just like futures.

If you go back to the first commits on the PR I had an implementation which allowed a controller to run from the same stream of events as a store.

Ideally I think we'd share the same store too. I believe Go's controller library basically creates a new local stream by watching the store, something like that might be viable for us too.

@danrspencer
Copy link
Contributor Author

Ideally I think we'd share the same store too. I believe Go's controller library basically creates a new local stream by watching the store, something like that might be viable for us too.

The initial implementation I was playing with did have the Controller sharing the same store too.

Wouldn't a new stream created by watching the store have exactly the same problem you're worried about with this PR? If you didn't consume the original watch stream then the stream watching the store would also never progress. Apologies if I'm completely missing some nuance in the details here 😅.

@nightkr
Copy link
Member

nightkr commented Feb 3, 2023

You're right. I don't know, for these cases that will probably just end up being a fact of life that we'll have to live with (and warn about in the documentation).

Reasonable memory usage, guaranteed progress, reliable delivery... pick any two. :/

@codecov-commenter
Copy link

codecov-commenter commented Feb 13, 2023

Codecov Report

Merging #1131 (7f91a6f) into main (53ad9ee) will decrease coverage by 0.31%.
The diff coverage is 0.00%.

📣 This organization is not using Codecov’s GitHub App Integration. We recommend you install it so Codecov can continue to function properly for your repositories. Learn more

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1131      +/-   ##
==========================================
- Coverage   72.79%   72.49%   -0.31%     
==========================================
  Files          66       67       +1     
  Lines        5066     5087      +21     
==========================================
  Hits         3688     3688              
- Misses       1378     1399      +21     
Impacted Files Coverage Δ
kube-runtime/src/utils/mod.rs 64.15% <ø> (ø)
kube-runtime/src/utils/stream_subscribe.rs 0.00% <0.00%> (ø)
kube-runtime/src/utils/watch_ext.rs 0.00% <0.00%> (ø)

@danrspencer
Copy link
Contributor Author

danrspencer commented Feb 13, 2023

I've added in an update where the StreamSubscribe wraps the events in Arcs which alleviates some of the concerns raised. Some additional thoughts on it that I wanted to discuss:

  1. This makes the subscribe streams no longer compatible with applier without additional work, which was the original motivation of this functionality. My suggestion would be to push the Arc further up so that the initial watcher function wraps all items in an Arc and then cascade that change down into all of the consumers.

  2. I considered an implementation where the output stream of StreamSubscribe would be another receiver. Every subscription and the StreamSubscribe stream would then check themselves when polled, if pending they'd poll the original stream to see if there's a new item that can be pulled into the channel. I decided against it because while it would fix the blocking issue, if there is a slow consumer then the StreamSubscribe itself may miss events. I think it makes more sense that the output stream of StreamSubscribe is considered the original and should always get everything, at the expense of forcing it to be consumed for subscribers to get their events.

Copy link
Member

@clux clux left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks great with the Arc, but want to document and test the edge cases here.

kube-runtime/src/utils/stream_subscribe.rs Show resolved Hide resolved
kube-runtime/src/utils/watch_ext.rs Outdated Show resolved Hide resolved
@clux
Copy link
Member

clux commented Feb 15, 2023

  1. This makes the subscribe streams no longer compatible with applier without additional work, which was the original motivation of this functionality. My suggestion would be to push the Arc further up so that the initial watcher function wraps all items in an Arc and then cascade that change down into all of the consumers.

That is possible.. but probably a pretty extensive change. I would want to see how much this affects the examples and general codebase before commit to an answer here. It is also possible that we make a new watcher like thing does this to let users move over in a non-breaking way somehow (we also have another generic extension in the works for watcher to allow doing watches on PartialObjectMeta types so there's more than one need) - but that does also limit the immediate controller usage (but we kind of also need some redesigning here to handle input streams so i think that is potentially OK).

@danrspencer
Copy link
Contributor Author

  1. This makes the subscribe streams no longer compatible with applier without additional work, which was the original motivation of this functionality. My suggestion would be to push the Arc further up so that the initial watcher function wraps all items in an Arc and then cascade that change down into all of the consumers.

That is possible.. but probably a pretty extensive change. I would want to see how much this affects the examples and general codebase before commit to an answer here. It is also possible that we make a new watcher like thing does this to let users move over in a non-breaking way somehow (we also have another generic extension in the works for watcher to allow doing watches on PartialObjectMeta types so there's more than one need) - but that does also limit the immediate controller usage (but we kind of also need some redesigning here to handle input streams so i think that is potentially OK).

I guess an alternative would be to push the change down to the applier first. I imagine 99% of people are just using Controller::new so the wrapping of the events in an Arc could be wrapped inside that?

@danrspencer danrspencer force-pushed the stream_subscribe branch 3 times, most recently from eb24e11 to d4f30b1 Compare February 16, 2023 10:48
Copy link
Member

@clux clux left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests are good now 👍
minor nits on comments, will leave another comment on integration after.

kube-runtime/src/utils/stream_subscribe.rs Show resolved Hide resolved
kube-runtime/src/utils/stream_subscribe.rs Show resolved Hide resolved
kube-runtime/src/utils/stream_subscribe.rs Show resolved Hide resolved
kube-runtime/src/utils/watch_ext.rs Outdated Show resolved Hide resolved
@clux
Copy link
Member

clux commented Feb 17, 2023

I guess an alternative would be to push the change down to the applier first. I imagine 99% of people are just using Controller::new so the wrapping of the events in an Arc could be wrapped inside that?

As it stands we are creating the distinction between two types of streams:

  • a root stream of watcher events (not arcd, currently compatible with everything)
  • an arcd subscriber stream of watcher events (arcd, not compatible with anything atm)

Currently, this distinction makes it hard for us to use (even within controller i think):

  • WatchStreamExt will not be compatible; subscribers can't use applied_objects() style filters on a subscribed stream
  • Subscribers have fundamentally different errors also - a LagError has to be handled whereas normally there are no errors bubbled up from a watcher stream

as a longer term solution; I'm wondering if this should be Arc'd at the start and we should start sketching out a more comprehensive/configurable Watcher struct that can help us solve other issues watcher at the same time;

  • Arc's the items
  • unifies the error enum (possibly unioning in Subscriber::Error)
  • allows configuring the watcher errors - currently i don't believe we bubble up a single error :|
  • pagination on initial list call to handle busier resources better
  • abstracting over watch and watch_metadata to allow metadata watchers
  • maybe use client-v2 isms to avoid hardcoding up against Api

I'll write up some thoughts about that in a separate issue.

In either case, I think as it stands this PR is OK without further integration for now provided it's feature flagged while we sketch these things out; maybe under feat=unstable_runtime_subscribe inside a unstable_runtime master feat.

@danrspencer danrspencer force-pushed the stream_subscribe branch 2 times, most recently from 70c6ccb to 3610807 Compare February 17, 2023 12:57
danrspencer and others added 4 commits February 17, 2023 13:25
…event subscribers

Signed-off-by: Dan Spencer <danrspen@gmail.com>
1. Remove expensive cloning of Kubernetes objects
2. Allow propogation of err events

Signed-off-by: Dan Spencer <danrspen@gmail.com>
Co-authored-by: Eirik A <sszynrae@gmail.com>
Signed-off-by: Dan Spencer <danrspen@gmail.com>
Signed-off-by: Dan Spencer <danrspen@gmail.com>
Signed-off-by: Dan Spencer <danrspen@gmail.com>
Signed-off-by: Dan Spencer <danrspen@gmail.com>
Signed-off-by: Dan Spencer <danrspen@gmail.com>
Also renamed WatchStreamExt::subscribe to WatchStreamExt::stream_subscribe. The compiler was unable to tell if we were trying to call WatchStreamExt::subscribe or StreamSubscribe::subscribe when they were named the same.

e.g. this code would not compile:

let stream_subscribe = stream.subscribe();
let subscription = stream_subscribe.subscribe();

Signed-off-by: Dan Spencer <danrspen@gmail.com>
@clux clux added the changelog-add changelog added category for prs label Feb 17, 2023
@clux clux added this to the 0.79.0 milestone Feb 17, 2023
@clux clux changed the title Added stream_subscribe which allows watch streams to have additional event subscribers Runtime: Add WatchStreamExt::subscribe Feb 17, 2023
Signed-off-by: Dan Spencer <danrspen@gmail.com>
kube-runtime/src/utils/mod.rs Outdated Show resolved Hide resolved
Co-authored-by: Eirik A <sszynrae@gmail.com>
Signed-off-by: Dan Spencer <danrspen@gmail.com>
Copy link
Member

@clux clux left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome work, thank you so much. This is all good to me, apart from the DCO check that needs a --signoff, but beyond that I am all happy to merge.

This PR has also helped me clarify some thoughts on how we can do unstable features (yes, people can accidentally enable these, but our most advanced use generally makes the most sense as a direct dependency so people generally have to opt-in to this).

Signed-off-by: Dan Spencer <danrspen@gmail.com>
… feature

Signed-off-by: Dan Spencer <danrspen@gmail.com>
@clux
Copy link
Member

clux commented Feb 22, 2023

Was looking to merge this yesterday @danrspencer , but when updating with main and got the doctests to run properly, it has surfaced a minor problem with them. Are you able to have a look?

Signed-off-by: Dan Spencer <danrspen@gmail.com>
@clux clux merged commit 5afa31e into kube-rs:main Feb 22, 2023
jmintb pushed a commit to jmintb/kube-rs that referenced this pull request Mar 9, 2023
* Added stream_subscribe which allows watch streams to have additional event subscribers

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* StreamSubscribe now wraps items in arcs so that:

1. Remove expensive cloning of Kubernetes objects
2. Allow propogation of err events

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Renamed watch_ext subscribable to subscribe

Co-authored-by: Eirik A <sszynrae@gmail.com>
Signed-off-by: Dan Spencer <danrspen@gmail.com>

* StreamSubscribe now allows subscribers how to handle lagging

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Fixed clippy errors in StreamSubscribe

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Fixed grammar in StreamSubscribe docs

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Fixed rustfmt errors in StreamSubscribe

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Improved the documentation for WatchStreamExt::stream_subscribe method.

Also renamed WatchStreamExt::subscribe to WatchStreamExt::stream_subscribe. The compiler was unable to tell if we were trying to call WatchStreamExt::subscribe or StreamSubscribe::subscribe when they were named the same.

e.g. this code would not compile:

let stream_subscribe = stream.subscribe();
let subscription = stream_subscribe.subscribe();

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Put StreamSubscribe behind a feature flag unstable_runtime_subscribe

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Update kube-runtime/src/utils/mod.rs

Co-authored-by: Eirik A <sszynrae@gmail.com>
Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Fixed rustfmt error in kube-runtime utils mod.rs

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Fixed incorrect feature flag usage for the unstable-runtime-subscribe feature

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Made substream_subscribe pub so that its error can be accessed / matched on by consumers

Signed-off-by: Dan Spencer <danrspen@gmail.com>

* Fixed issue with doctest for stream_subscribe

Signed-off-by: Dan Spencer <danrspen@gmail.com>

---------

Signed-off-by: Dan Spencer <danrspen@gmail.com>
Co-authored-by: Eirik A <sszynrae@gmail.com>
@clux clux mentioned this pull request Aug 7, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
changelog-add changelog added category for prs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants