Skip to content

Commit

Permalink
graph, tests : Fix loadDerived not taking entity cache into conside…
Browse files Browse the repository at this point in the history
…ration when loading derived entities (#4799)

- graph: Consider `udpates` and `handler_updates` in `load_related`
- Add more tests for derived loaders
  • Loading branch information
incrypto32 committed Aug 14, 2023
1 parent 27cbcdd commit c350e4f
Show file tree
Hide file tree
Showing 20 changed files with 772 additions and 247 deletions.
111 changes: 103 additions & 8 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use anyhow::anyhow;
use inflector::Inflector;
use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::{self, Debug};
Expand Down Expand Up @@ -203,18 +202,114 @@ impl EntityCache {

let query = DerivedEntityQuery {
entity_type: EntityType::new(base_type.to_string()),
entity_field: field.name.clone().to_snake_case().into(),
entity_field: field.name.clone().into(),
value: eref.entity_id.clone(),
causality_region: eref.causality_region,
id_is_bytes: id_is_bytes,
};

let entities = self.store.get_derived(&query)?;
entities.iter().for_each(|(key, e)| {
self.current.insert(key.clone(), Some(e.clone()));
});
let entities: Vec<Entity> = entities.values().cloned().collect();
Ok(entities)
let mut entity_map = self.store.get_derived(&query)?;

for (key, entity) in entity_map.iter() {
// Only insert to the cache if it's not already there
if !self.current.contains_key(&key) {
self.current.insert(key.clone(), Some(entity.clone()));
}
}

let mut keys_to_remove = Vec::new();

// Apply updates from `updates` and `handler_updates` directly to entities in `entity_map` that match the query
for (key, entity) in entity_map.iter_mut() {
let mut entity_cow = Some(Cow::Borrowed(entity));

if let Some(op) = self.updates.get(key).cloned() {
op.apply_to(&mut entity_cow)
.map_err(|e| key.unknown_attribute(e))?;
}

if let Some(op) = self.handler_updates.get(key).cloned() {
op.apply_to(&mut entity_cow)
.map_err(|e| key.unknown_attribute(e))?;
}

if let Some(updated_entity) = entity_cow {
*entity = updated_entity.into_owned();
} else {
// if entity_cow is None, it means that the entity was removed by an update
// mark the key for removal from the map
keys_to_remove.push(key.clone());
}
}

// A helper function that checks if an update matches the query and returns the updated entity if it does
fn matches_query(
op: &EntityOp,
query: &DerivedEntityQuery,
key: &EntityKey,
) -> Result<Option<Entity>, anyhow::Error> {
match op {
EntityOp::Update(entity) | EntityOp::Overwrite(entity)
if query.matches(key, entity) =>
{
Ok(Some(entity.clone()))
}
EntityOp::Remove => Ok(None),
_ => Ok(None),
}
}

// Iterate over self.updates to find entities that:
// - Aren't already present in the entity_map
// - Match the query
// If these conditions are met:
// - Check if there's an update for the same entity in handler_updates and apply it.
// - Add the entity to entity_map.
for (key, op) in self.updates.iter() {
if !entity_map.contains_key(key) {
if let Some(entity) = matches_query(op, &query, key)? {
if let Some(handler_op) = self.handler_updates.get(key).cloned() {
// If there's a corresponding update in handler_updates, apply it to the entity
// and insert the updated entity into entity_map
let mut entity_cow = Some(Cow::Borrowed(&entity));
handler_op
.apply_to(&mut entity_cow)
.map_err(|e| key.unknown_attribute(e))?;

if let Some(updated_entity) = entity_cow {
entity_map.insert(key.clone(), updated_entity.into_owned());
}
} else {
// If there isn't a corresponding update in handler_updates or the update doesn't match the query, just insert the entity from self.updates
entity_map.insert(key.clone(), entity);
}
}
}
}

// Iterate over handler_updates to find entities that:
// - Aren't already present in the entity_map.
// - Aren't present in self.updates.
// - Match the query.
// If these conditions are met, add the entity to entity_map.
for (key, handler_op) in self.handler_updates.iter() {
if !entity_map.contains_key(key) && !self.updates.contains_key(key) {
if let Some(entity) = matches_query(handler_op, &query, key)? {
entity_map.insert(key.clone(), entity);
}
}
}

// Remove entities that are in the store but have been removed by an update.
// We do this last since the loops over updates and handler_updates are only
// concerned with entities that are not in the store yet and by leaving removed
// keys in entity_map we avoid processing these updates a second time when we
// already looked at them when we went through entity_map
for key in keys_to_remove {
entity_map.remove(&key);
}

Ok(entity_map.into_values().collect())
}

pub fn remove(&mut self, key: EntityKey) {
Expand Down
17 changes: 17 additions & 0 deletions graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::borrow::Borrow;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::fmt::Display;
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Duration;
Expand Down Expand Up @@ -199,6 +200,22 @@ pub struct DerivedEntityQuery {
pub causality_region: CausalityRegion,
}

impl DerivedEntityQuery {
/// Checks if a given key and entity match this query.
pub fn matches(&self, key: &EntityKey, entity: &Entity) -> bool {
key.entity_type == self.entity_type
&& entity
.get(&self.entity_field)
.map(|v| match v {
Value::String(s) => s.as_str() == self.value.as_str(),
Value::Bytes(b) => Bytes::from_str(self.value.as_str())
.map_or(false, |bytes_value| &bytes_value == b),
_ => false,
})
.unwrap_or(false)
}
}

impl EntityKey {
// For use in tests only
#[cfg(debug_assertions)]
Expand Down
6 changes: 6 additions & 0 deletions graph/src/util/lfu_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ impl<K: Clone + Ord + Eq + Hash + Debug + CacheWeight, V: CacheWeight + Default>
})
}

pub fn iter<'a>(&'a self) -> impl Iterator<Item = (&K, &V)> {
self.queue
.iter()
.map(|entry| (&entry.0.key, &entry.0.value))
}

pub fn get(&mut self, key: &K) -> Option<&V> {
self.get_mut(key.clone()).map(|x| &x.value)
}
Expand Down
14 changes: 12 additions & 2 deletions store/postgres/src/relational_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use graph::{
components::store::{AttributeNames, EntityType},
data::store::scalar,
};
use inflector::Inflector;
use itertools::Itertools;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::convert::TryFrom;
Expand Down Expand Up @@ -1760,11 +1761,20 @@ impl<'a> QueryFragment<Pg> for FindDerivedQuery<'a> {
if i > 0 {
out.push_sql(", ");
}
out.push_bind_param::<Text, _>(&value.entity_id.as_str())?;

if *id_is_bytes {
out.push_sql("decode(");
out.push_bind_param::<Text, _>(
&value.entity_id.as_str().strip_prefix("0x").unwrap(),
)?;
out.push_sql(", 'hex')");
} else {
out.push_bind_param::<Text, _>(&value.entity_id.as_str())?;
}
}
out.push_sql(") and ");
}
out.push_identifier(entity_field.as_str())?;
out.push_identifier(entity_field.to_snake_case().as_str())?;
out.push_sql(" = ");
if *id_is_bytes {
out.push_sql("decode(");
Expand Down
10 changes: 9 additions & 1 deletion store/postgres/src/writable.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::BTreeSet;
use std::ops::Deref;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Mutex, RwLock, TryLockError as RwLockError};
use std::time::{Duration, Instant};
Expand All @@ -10,6 +11,8 @@ use graph::components::store::{
Batch, DeploymentCursorTracker, DerivedEntityQuery, EntityKey, ReadStore,
};
use graph::constraint_violation;
use graph::data::store::scalar::Bytes;
use graph::data::store::Value;
use graph::data::subgraph::schema;
use graph::data_source::CausalityRegion;
use graph::prelude::{
Expand Down Expand Up @@ -1100,7 +1103,12 @@ impl Queue {
fn is_related(derived_query: &DerivedEntityQuery, entity: &Entity) -> bool {
entity
.get(&derived_query.entity_field)
.map(|related_id| related_id.as_str() == Some(&derived_query.value))
.map(|v| match v {
Value::String(s) => s.as_str() == derived_query.value.as_str(),
Value::Bytes(b) => Bytes::from_str(derived_query.value.as_str())
.map_or(false, |bytes_value| &bytes_value == b),
_ => false,
})
.unwrap_or(false)
}

Expand Down
33 changes: 0 additions & 33 deletions tests/integration-tests/derived-loaders/abis/Contract.abi

This file was deleted.

21 changes: 0 additions & 21 deletions tests/integration-tests/derived-loaders/schema.graphql

This file was deleted.

34 changes: 0 additions & 34 deletions tests/integration-tests/derived-loaders/src/mapping.ts

This file was deleted.

Loading

0 comments on commit c350e4f

Please sign in to comment.