Skip to content

Commit

Permalink
chain/ethereum, graph, store: use block cache to load blocks by numbers
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 committed Aug 22, 2024
1 parent b4ad24f commit 74291bb
Show file tree
Hide file tree
Showing 3 changed files with 282 additions and 15 deletions.
96 changes: 85 additions & 11 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,45 @@ impl EthereumAdapter {
.buffered(ENV_VARS.block_batch_size)
}

/// Request blocks by number through JSON-RPC.
fn load_blocks_by_numbers_rpc(
&self,
logger: Logger,
numbers: Vec<BlockNumber>,
) -> impl Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send {
let web3 = self.web3.clone();

stream::iter_ok::<_, Error>(numbers.into_iter().map(move |number| {
let web3 = web3.clone();
retry(format!("load block {}", number), &logger)
.limit(ENV_VARS.request_retries)
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
Box::pin(
web3.eth()
.block_with_txs(BlockId::Number(Web3BlockNumber::Number(
number.into(),
))),
)
.compat()
.from_err::<Error>()
.and_then(move |block| {
block.map(Arc::new).ok_or_else(|| {
anyhow::anyhow!(
"Ethereum node did not find block with number {:?}",
number
)
})
})
.compat()
})
.boxed()
.compat()
.from_err()
}))
.buffered(ENV_VARS.block_batch_size)
}

/// Request blocks ptrs for numbers through JSON-RPC.
///
/// Reorg safety: If ids are numbers, they must be a final blocks.
Expand Down Expand Up @@ -1650,26 +1689,61 @@ impl EthereumAdapterTrait for EthereumAdapter {
Ok(decoded)
}

// This is a ugly temporary implementation to get the block ptrs for a range of blocks
/// Load Ethereum blocks in bulk by number, returning results as they come back as a Stream.
async fn load_blocks_by_numbers(
&self,
logger: Logger,
chain_store: Arc<dyn ChainStore>,
block_numbers: HashSet<BlockNumber>,
) -> Box<dyn Stream<Item = Arc<LightEthereumBlock>, Error = Error> + Send> {
let block_hashes = block_numbers
let blocks_map: BTreeMap<i32, Vec<json::Value>> = chain_store
.cheap_clone()
.blocks_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::<Vec<_>>())
.await
.map_err(|e| {
error!(&logger, "Error accessing block cache {}", e);
e
})
.unwrap_or_default();

let mut blocks: Vec<Arc<LightEthereumBlock>> = blocks_map
.into_iter()
.map(|number| {
chain_store
.block_hashes_by_block_number(number)
.unwrap()
.first()
.unwrap()
.as_h256()
.filter_map(|(_number, values)| {
if values.len() == 1 {
json::from_value(values[0].clone()).ok()
} else {
None
}
})
.collect::<HashSet<_>>();
.collect::<Vec<_>>();

self.load_blocks(logger, chain_store, block_hashes).await
let missing_blocks: Vec<i32> = block_numbers
.into_iter()
.filter(|&number| !blocks.iter().any(|block| block.number() == number))
.collect();

debug!(logger, "Loading {} block(s)", missing_blocks.len());
Box::new(
self.load_blocks_by_numbers_rpc(logger.clone(), missing_blocks)
.collect()
.map(move |new_blocks| {
let upsert_blocks: Vec<_> = new_blocks
.iter()
.map(|block| BlockFinality::Final(block.clone()))
.collect();
let block_refs: Vec<_> = upsert_blocks
.iter()
.map(|block| block as &dyn graph::blockchain::Block)
.collect();
if let Err(e) = chain_store.upsert_light_blocks(block_refs.as_slice()) {
error!(logger, "Error writing to block cache {}", e);
}
blocks.extend(new_blocks);
blocks.sort_by_key(|block| block.number);
stream::iter_ok(blocks)
})
.flatten_stream(),
)
}

/// Load Ethereum blocks in bulk, returning results as they come back as a Stream.
Expand Down
6 changes: 6 additions & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,12 @@ pub trait ChainStore: Send + Sync + 'static {
hashes: Vec<BlockHash>,
) -> Result<Vec<serde_json::Value>, Error>;

/// Returns the blocks present in the store for the given block numbers.
async fn blocks_by_numbers(
self: Arc<Self>,
numbers: Vec<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<serde_json::Value>>, Error>;

/// Get the `offset`th ancestor of `block_hash`, where offset=0 means the block matching
/// `block_hash` and offset=1 means its parent. If `root` is passed, short-circuit upon finding
/// a child of `root`. Returns None if unable to complete due to missing blocks in the chain
Expand Down
195 changes: 191 additions & 4 deletions store/postgres/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use graph::slog::Logger;
use graph::stable_hash::crypto_stable_hash;
use graph::util::herd_cache::HerdCache;

use std::collections::BTreeMap;
use std::{
collections::HashMap,
convert::{TryFrom, TryInto},
Expand Down Expand Up @@ -579,6 +580,50 @@ mod data {
Ok(())
}

pub(super) fn blocks_by_numbers(
&self,
conn: &mut PgConnection,
chain: &str,
numbers: &[BlockNumber],
) -> Result<Vec<JsonBlock>, StoreError> {
let x = match self {
Storage::Shared => {
use public::ethereum_blocks as b;

b::table
.select((
b::hash,
b::number,
b::parent_hash,
sql::<Jsonb>("coalesce(data -> 'block', data)"),
))
.filter(b::network_name.eq(chain))
.filter(b::number.eq_any(Vec::from_iter(numbers.iter().map(|&n| n as i64))))
.load::<(BlockHash, i64, BlockHash, json::Value)>(conn)
}
Storage::Private(Schema { blocks, .. }) => blocks
.table()
.select((
blocks.hash(),
blocks.number(),
blocks.parent_hash(),
sql::<Jsonb>("coalesce(data -> 'block', data)"),
))
.filter(
blocks
.number()
.eq_any(Vec::from_iter(numbers.iter().map(|&n| n as i64))),
)
.load::<(BlockHash, i64, BlockHash, json::Value)>(conn),
}?;

Ok(x.into_iter()
.map(|(hash, nr, parent, data)| {
JsonBlock::new(BlockPtr::new(hash, nr as i32), parent, Some(data))
})
.collect())
}

pub(super) fn blocks(
&self,
conn: &mut PgConnection,
Expand Down Expand Up @@ -1651,7 +1696,10 @@ impl ChainStoreMetrics {
}

#[derive(Clone, CheapClone)]
struct BlocksLookupResult(Arc<Result<Vec<JsonBlock>, StoreError>>);
enum BlocksLookupResult {
ByHash(Arc<Result<Vec<JsonBlock>, StoreError>>),
ByNumber(Arc<Result<BTreeMap<BlockNumber, Vec<JsonBlock>>, StoreError>>),
}

pub struct ChainStore {
logger: Logger,
Expand Down Expand Up @@ -1870,6 +1918,35 @@ impl ChainStore {
.await?;
Ok(values)
}

async fn blocks_from_store_by_numbers(
self: &Arc<Self>,
numbers: Vec<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<JsonBlock>>, StoreError> {
let store = self.cheap_clone();
let pool = self.pool.clone();

let values = pool
.with_conn(move |conn, _| {
store
.storage
.blocks_by_numbers(conn, &store.chain, &numbers)
.map_err(CancelableError::from)
})
.await?;

let mut block_map = BTreeMap::new();

for block in values {
let block_number = block.ptr.block_number();
block_map
.entry(block_number)
.or_insert_with(Vec::new)
.push(block);
}

Ok(block_map)
}
}

#[async_trait]
Expand Down Expand Up @@ -2065,6 +2142,85 @@ impl ChainStoreTrait for ChainStore {
Ok(())
}

async fn blocks_by_numbers(
self: Arc<Self>,
numbers: Vec<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<json::Value>>, Error> {
if ENV_VARS.store.disable_block_cache_for_lookup {
let values = self
.blocks_from_store_by_numbers(numbers)
.await?
.into_iter()
.map(|(num, blocks)| {
(
num,
blocks
.into_iter()
.filter_map(|block| block.data)
.collect::<Vec<_>>(),
)
})
.collect();
Ok(values)
} else {
let cached = self.recent_blocks_cache.get_blocks_by_numbers(&numbers);

let stored = if cached.len() < numbers.len() {
let missing_numbers = numbers
.iter()
.filter(|num| !cached.iter().any(|(ptr, _)| ptr.block_number() == **num))
.cloned()
.collect::<Vec<_>>();

let hash = crypto_stable_hash(&missing_numbers);
let this = self.clone();
let lookup_fut = async move {
let res = this.blocks_from_store_by_numbers(missing_numbers).await;
BlocksLookupResult::ByNumber(Arc::new(res))
};
let lookup_herd = self.lookup_herd.cheap_clone();
let logger = self.logger.cheap_clone();
let res = match lookup_herd.cached_query(hash, lookup_fut, &logger).await {
(BlocksLookupResult::ByNumber(res), _) => res,
_ => unreachable!(),
};
let res = Arc::try_unwrap(res).unwrap_or_else(|arc| (*arc).clone());

match res {
Ok(blocks) => {
for (_, blocks_for_num) in &blocks {
if blocks.len() == 1 {
self.recent_blocks_cache
.insert_block(blocks_for_num[0].clone());
}
}
blocks
}
Err(e) => {
return Err(e.into());
}
}
} else {
BTreeMap::new()
};

let cached_map = cached
.into_iter()
.map(|(ptr, data)| (ptr.block_number(), vec![data]))
.collect::<BTreeMap<_, _>>();

let mut result: BTreeMap<BlockNumber, Vec<json::Value>> = cached_map;
for (num, blocks) in stored {
result
.entry(num)
.or_default()
.extend(blocks.into_iter().filter_map(|block| block.data));
}

Ok(result)
}
}

async fn blocks(self: Arc<Self>, hashes: Vec<BlockHash>) -> Result<Vec<json::Value>, Error> {
if ENV_VARS.store.disable_block_cache_for_lookup {
let values = self
Expand Down Expand Up @@ -2094,12 +2250,15 @@ impl ChainStoreTrait for ChainStore {
let this = self.clone();
let lookup_fut = async move {
let res = this.blocks_from_store(hashes).await;
BlocksLookupResult(Arc::new(res))
BlocksLookupResult::ByHash(Arc::new(res))
};
let lookup_herd = self.lookup_herd.cheap_clone();
let logger = self.logger.cheap_clone();
let (BlocksLookupResult(res), _) =
lookup_herd.cached_query(hash, lookup_fut, &logger).await;
//TODO(krishna): Add comments explaining the return value of cached_query
let res = match lookup_herd.cached_query(hash, lookup_fut, &logger).await {
(BlocksLookupResult::ByHash(res), _) => res,
_ => unreachable!(),
};
// Try to avoid cloning a non-concurrent lookup; it's not
// entirely clear whether that will actually avoid a clone
// since it depends on a lot of the details of how the
Expand Down Expand Up @@ -2361,6 +2520,12 @@ mod recent_blocks_cache {
.and_then(|block| block.data.as_ref().map(|data| (&block.ptr, data)))
}

fn get_block_by_number(&self, number: BlockNumber) -> Option<(&BlockPtr, &json::Value)> {
self.blocks
.get(&number)
.and_then(|block| block.data.as_ref().map(|data| (&block.ptr, data)))
}

fn get_ancestor(
&self,
child_ptr: &BlockPtr,
Expand Down Expand Up @@ -2483,6 +2648,28 @@ mod recent_blocks_cache {
blocks
}

pub fn get_blocks_by_numbers(
&self,
numbers: &[BlockNumber],
) -> Vec<(BlockPtr, json::Value)> {
let inner = self.inner.read();
let mut blocks: Vec<(BlockPtr, json::Value)> = Vec::new();

for &number in numbers {
if let Some((ptr, block)) = inner.get_block_by_number(number) {
blocks.push((ptr.clone(), block.clone()));
}
}

inner.metrics.record_hit_and_miss(
&inner.network,
blocks.len(),
numbers.len() - blocks.len(),
);

blocks
}

/// Tentatively caches the `ancestor` of a [`BlockPtr`] (`child`), together with
/// its associated `data`. Note that for this to work, `child` must be
/// in the cache already. The first block in the cache should be
Expand Down

0 comments on commit 74291bb

Please sign in to comment.