Skip to content

Commit

Permalink
experiment with a Store<K> parameter for Reflectors - #102
Browse files Browse the repository at this point in the history
in an effort to solve #102 we try to factor out the implicit data store
inside Reflector<K> as a trait object that can be customized.

The abstraction, feels very rigid, and not sure how useable it is.
Ultimately, I wanted a nicer interface for users of library that wasnt
"give me a CLONE of ALL state" or "give me one thing with a matching
ID".

It's potentially possible to take type that implemented iterator with
this approach, as well as MAYBE handing out an arc'd clone to users
(provided they are told not to keep it locked for ages).

Another problem I wanted to solve was having the stored state be a
simplified version of the underlying data (i.e. add/modify would perform
some kind of transformation before storing it). This was super awkward
with the old Reflector setup because you'd effectively force two whole
clones of the two types, and you'd have to recreate your shadow tree
every time you asked for the state (which was ALL of the input).
  • Loading branch information
clux committed Apr 8, 2020
1 parent c88b261 commit 2637056
Showing 1 changed file with 82 additions and 71 deletions.
153 changes: 82 additions & 71 deletions kube/src/runtime/reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ use tokio::{

use std::{collections::BTreeMap, sync::Arc, time::Duration};

/// A Reflector with a default MapCache
pub type Reflector<K> = GenericReflector<K, MapCache<K>>;

/// A reflection of state for a Kubernetes ['Api'] resource
///
/// This builds on top of the ['Informer'] by tracking the events received,
Expand All @@ -20,29 +23,33 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration};
/// It is prone to the same desync problems as an informer, but it will self-heal,
/// as best as possible - though this means that you might occasionally see a full
/// reset (boot equivalent) when network issues are encountered.
/// During a reset, the state is cleared before it is rebuilt.
/// During a reset, the state is cleared and rebuilt in an atomic operation.
///
/// The internal state is exposed readably through a getter.
#[derive(Clone)]
pub struct Reflector<K>
pub struct GenericReflector<K, S>
where
K: Clone + DeserializeOwned + Meta,
S: Store<K> + Default,
{
state: Arc<Mutex<State<K>>>,
store: Arc<Mutex<S>>,
version: Arc<Mutex<String>>,
params: ListParams,
api: Api<K>,
}

impl<K> Reflector<K>
impl<K, S> GenericReflector<K, S>
where
K: Clone + DeserializeOwned + Meta,
S: Store<K> + Default,
{
/// Create a reflector on an api resource
pub fn new(api: Api<K>) -> Self {
Reflector {
api,
params: ListParams::default(),
state: Default::default(),
version: Arc::new(Mutex::new(0.to_string())),
store: Default::default(),
}

Check failure on line 53 in kube/src/runtime/reflector.rs

View workflow job for this annotation

GitHub Actions / clippy

mismatched types

error[E0308]: mismatched types --> kube/src/runtime/reflector.rs:48:9 | 41 | impl<K, S> GenericReflector<K, S> | - expected this type parameter ... 47 | pub fn new(api: Api<K>) -> Self { | ---- expected `runtime::reflector::GenericReflector<K, S>` because of return type 48 | / Reflector { 49 | | api, 50 | | params: ListParams::default(), 51 | | version: Arc::new(Mutex::new(0.to_string())), 52 | | store: Default::default(), 53 | | } | |_________^ expected `GenericReflector<K, S>`, found `GenericReflector<K, ...>` | = note: expected struct `runtime::reflector::GenericReflector<_, S>` found struct `runtime::reflector::GenericReflector<_, std::collections::BTreeMap<runtime::reflector::ObjectId, K>>`
}

Expand Down Expand Up @@ -94,14 +101,13 @@ where
/// A single poll call to modify the internal state
async fn poll(&self) -> Result<()> {
let kind = &self.api.resource.kind;
let resource_version = self.state.lock().await.version.clone();
let resource_version = self.version.lock().await.clone();
trace!("Polling {} from resourceVersion={}", kind, resource_version);
let stream = self.api.watch(&self.params, &resource_version).await?;
pin_mut!(stream);

// For every event, modify our state
while let Some(ev) = stream.try_next().await? {
let mut state = self.state.lock().await;
// Informer-like version tracking:
match &ev {
WatchEvent::Added(o)
Expand All @@ -111,85 +117,64 @@ where
// always store the last seen resourceVersion
if let Some(nv) = Meta::resource_ver(o) {
trace!("Updating reflector version for {} to {}", kind, nv);
state.version = nv.clone();
*self.version.lock().await = nv.clone();
}
}
_ => {}
WatchEvent::Error(e) => {
warn!("Failed to watch {}: {:?}", kind, e);
return Err(Error::Api(e.to_owned()));
}
}

let data = &mut state.data;
// Core Reflector logic
let mut store = self.store.lock().await;
match ev {
WatchEvent::Added(o) => {
debug!("Adding {} to {}", Meta::name(&o), kind);
data.entry(ObjectId::key_for(&o)).or_insert_with(|| o.clone());
}
WatchEvent::Modified(o) => {
debug!("Modifying {} in {}", Meta::name(&o), kind);
data.entry(ObjectId::key_for(&o)).and_modify(|e| *e = o.clone());
}
WatchEvent::Deleted(o) => {
debug!("Removing {} from {}", Meta::name(&o), kind);
data.remove(&ObjectId::key_for(&o));
}
WatchEvent::Bookmark(o) => {
debug!("Bookmarking {} from {}", Meta::name(&o), kind);
}
WatchEvent::Error(e) => {
warn!("Failed to watch {}: {:?}", kind, e);
return Err(Error::Api(e));
}
WatchEvent::Added(o) => store.add(o),
WatchEvent::Modified(o) => store.modify(o),
WatchEvent::Deleted(o) => store.delete(o),
_ => {}
}
}
Ok(())
}

/// Reset the state of the underlying informer and clear the cache
pub async fn reset(&self) -> Result<()> {
/// Reset the resource version and clear cache
async fn reset(&self) -> Result<()> {
trace!("Resetting {}", self.api.resource.kind);
// Simplified for k8s >= 1.16
//*self.state.lock().await = Default::default();
//self.informer.reset().await

// For now:
let (data, version) = self.get_full_resource_entries().await?;
*self.state.lock().await = State { data, version };
*self.version.lock().await = version;
let mut store = self.store.lock().await;
store.clear();
for d in data {
store.add(d);
}
Ok(())
}

/// Legacy helper for kubernetes < 1.16
///
/// Needed to do an initial list operation because of https://github.com/clux/kube-rs/issues/219
/// Soon, this goes away as we drop support for k8s < 1.16
async fn get_full_resource_entries(&self) -> Result<(Cache<K>, String)> {
async fn get_full_resource_entries(&self) -> Result<(Vec<K>, String)> {
let res = self.api.list(&self.params).await?;
debug!("Initializing {}", K::KIND);
let version = res.metadata.resource_version.unwrap_or_default();
trace!(
"Got {} {} at resourceVersion={:?}",
debug!(
"Initialized {} with {} objects at {}",
K::KIND,
res.items.len(),
self.api.resource.kind,
version
);
let mut data = BTreeMap::new();
for i in res.items {
// The non-generic parts we care about are spec + status
data.insert(ObjectId::key_for(&i), i);
}
let keys = data
.keys()
.map(ObjectId::to_string)
.collect::<Vec<_>>()
.join(", ");
debug!("Initialized with: [{}]", keys);
Ok((data, version))
Ok((res.items, version))
}

/// Read data for users of the reflector
///
/// This is instant if you are reading and writing from the same context.
pub async fn state(&self) -> Result<Vec<K>> {
let state = self.state.lock().await;
Ok(state.data.values().cloned().collect::<Vec<K>>())
let store = self.store.lock().await;
Ok(store.values())
}

/// Read a single entry by name
Expand All @@ -203,7 +188,7 @@ where
namespace: self.api.resource.namespace.clone(),
};

Ok(self.state.lock().await.data.get(&id).map(Clone::clone))
Ok(self.store.lock().await.get(&id).map(Clone::clone))
}

/// Read a single entry by name within a specific namespace
Expand All @@ -215,15 +200,15 @@ where
name: name.into(),
namespace: Some(ns.into()),
};
Ok(self.state.lock().await.data.get(&id).map(Clone::clone))
Ok(self.store.lock().await.get(&id).map(Clone::clone))
}
}

/// ObjectId represents an object by name and namespace (if any)
///
/// This is an internal subset of ['k8s_openapi::api::core::v1::ObjectReference']
#[derive(Ord, PartialOrd, Hash, Eq, PartialEq, Clone)]
struct ObjectId {
pub struct ObjectId {
name: String,
namespace: Option<String>,
}
Expand All @@ -246,21 +231,47 @@ impl ObjectId {
}
}

/// Internal shared state of Reflector
///
/// Can remove this in k8s >= 1.16 once this uses Informer
struct State<K> {
data: Cache<K>,
version: String,
/// A store that can be plugged into a Reflector
pub trait Store<K> {
fn clear(&mut self);
fn get(&self, id: &ObjectId) -> Option<&K>;
fn add(&mut self, k: K);
fn values(&self) -> Vec<K>;
fn modify(&mut self, k: K);
fn delete(&mut self, k: K);
}

impl<K> Default for State<K> {
fn default() -> Self {
State {
data: Default::default(),
version: 0.to_string(),
}
/// Default Store for a Reflector
pub type MapCache<K> = BTreeMap<ObjectId, K>;

impl<K> Store<K> for MapCache<K>
where
K: Clone + DeserializeOwned + Meta,
{
fn clear(&mut self) {
self.clear()
}

fn get(&self, id: &ObjectId) -> Option<&K> {
self.get(id)
}

fn values(&self) -> Vec<K> {
self.values().cloned().collect()
}

fn add(&mut self, k: K) {
debug!("Adding {} to {}", Meta::name(&k), K::KIND);
self.entry(ObjectId::key_for(&k)).or_insert_with(|| k);
}

fn modify(&mut self, k: K) {
debug!("Modifying {} in {}", Meta::name(&k), K::KIND);
self.entry(ObjectId::key_for(&k)).and_modify(|e| *e = k);
}

fn delete(&mut self, k: K) {
debug!("Removing {} from {}", Meta::name(&k), K::KIND);
self.remove(&ObjectId::key_for(&k));
}
}
/// Internal representation for Reflector
type Cache<K> = BTreeMap<ObjectId, K>;

0 comments on commit 2637056

Please sign in to comment.