From 139f3144d8d878b528416fcd4f9a34353dc3dfb1 Mon Sep 17 00:00:00 2001 From: Krishnanand V P <44740264+incrypto32@users.noreply.github.com> Date: Mon, 6 Nov 2023 21:10:38 +0530 Subject: [PATCH] [Feature] Add support for `endBlock` in data sources (#4787) * graph,chain,store/test-store : Allow new param `endBlock` in manifest * core,graph,store: ignore end_block reached datasources in match_and_decode, include them in processed datasources * tests : add runner tests for end-block * core: move TriggerFilter construction into SubgraphRunner.run_inner * core: filter out endBlock reached subgraphs when constructing TriggerFilter * chain,core: refactor endBlock implementation * refactor `SubgraphRunner.run_inner` to extract `build_filter` * core : handle reverts for endBlock * chain,graph: set min_spec_version requirements for endBlock * core: refaction `build_filter` * tests: runner test for endblock on reorg * core: restart block stream in the next block for endblock reached ds * graph: bump specVersion requirement for endBlock * core: refactor build_filter logic * core, tests, graph : make TriggerFilters testable * chain/startknet: endBlock support for starknet * chain,core,graph: refactor end_block implementation * core: refactor build_filter * Add comments for end-block runner tests --- .gitignore | 1 + chain/arweave/src/adapter.rs | 6 +- chain/arweave/src/data_source.rs | 8 +- chain/cosmos/src/data_source.rs | 18 ++- chain/ethereum/src/adapter.rs | 31 ++++- chain/ethereum/src/data_source.rs | 10 +- chain/near/src/chain.rs | 1 + chain/near/src/data_source.rs | 8 +- chain/starknet/src/data_source.rs | 5 + chain/substreams/src/data_source.rs | 4 + core/src/subgraph/context.rs | 6 +- core/src/subgraph/context/instance.rs | 11 +- core/src/subgraph/inputs.rs | 3 + core/src/subgraph/instance_manager.rs | 89 ++++-------- core/src/subgraph/runner.rs | 116 +++++++++++++--- graph/src/blockchain/mock.rs | 4 + graph/src/blockchain/mod.rs | 8 ++ graph/src/data/subgraph/api_version.rs | 3 + graph/src/data/subgraph/mod.rs | 13 +- graph/src/data_source/mod.rs | 10 +- graph/src/env/mod.rs | 2 +- runtime/test/src/common.rs | 1 + .../tests/chain/ethereum/manifest.rs | 41 +++++- store/test-store/tests/postgres/store.rs | 1 + .../runner-tests/end-block/abis/Contract.abi | 15 +++ tests/runner-tests/end-block/package.json | 13 ++ tests/runner-tests/end-block/schema.graphql | 12 ++ tests/runner-tests/end-block/src/mapping.ts | 23 ++++ .../{api-version => end-block}/subgraph.yaml | 9 +- tests/runner-tests/yarn.lock | 32 +++++ tests/tests/runner_tests.rs | 127 ++++++++++++++++++ 31 files changed, 529 insertions(+), 102 deletions(-) create mode 100644 tests/runner-tests/end-block/abis/Contract.abi create mode 100644 tests/runner-tests/end-block/package.json create mode 100644 tests/runner-tests/end-block/schema.graphql create mode 100644 tests/runner-tests/end-block/src/mapping.ts rename tests/runner-tests/{api-version => end-block}/subgraph.yaml (82%) diff --git a/.gitignore b/.gitignore index a0d8e5475b4..15ad2465251 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,7 @@ lcov.info /tests/**/generated /tests/**/node_modules /tests/**/yarn-error.log +/tests/**/pnpm-lock.yaml # Built solidity contracts. /tests/**/bin diff --git a/chain/arweave/src/adapter.rs b/chain/arweave/src/adapter.rs index fd2d962e31e..9b25016b4c6 100644 --- a/chain/arweave/src/adapter.rs +++ b/chain/arweave/src/adapter.rs @@ -239,7 +239,11 @@ mod test { kind: "".into(), network: None, name: "".into(), - source: Source { owner, start_block }, + source: Source { + owner, + start_block, + end_block: None, + }, mapping: Mapping { api_version: Version::new(1, 2, 3), language: "".into(), diff --git a/chain/arweave/src/data_source.rs b/chain/arweave/src/data_source.rs index 86a5878df4e..b4f21301c9f 100644 --- a/chain/arweave/src/data_source.rs +++ b/chain/arweave/src/data_source.rs @@ -63,6 +63,10 @@ impl blockchain::DataSource for DataSource { kinds } + fn end_block(&self) -> Option { + self.source.end_block + } + fn match_and_decode( &self, trigger: &::TriggerData, @@ -392,9 +396,11 @@ pub struct TransactionHandler { } #[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] pub(crate) struct Source { // A data source that does not have an owner can only have block handlers. pub(crate) owner: Option, - #[serde(rename = "startBlock", default)] + #[serde(default)] pub(crate) start_block: BlockNumber, + pub(crate) end_block: Option, } diff --git a/chain/cosmos/src/data_source.rs b/chain/cosmos/src/data_source.rs index bedff2f9251..08e146456da 100644 --- a/chain/cosmos/src/data_source.rs +++ b/chain/cosmos/src/data_source.rs @@ -83,6 +83,10 @@ impl blockchain::DataSource for DataSource { kinds } + fn end_block(&self) -> Option { + self.source.end_block + } + fn match_and_decode( &self, trigger: &::TriggerData, @@ -502,9 +506,11 @@ pub struct MappingMessageHandler { } #[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] pub struct Source { - #[serde(rename = "startBlock", default)] + #[serde(default)] pub start_block: BlockNumber, + pub(crate) end_block: Option, } #[derive(Clone, Copy, Debug, Hash, Eq, PartialEq, Deserialize)] @@ -657,7 +663,10 @@ mod tests { kind: "cosmos".to_string(), network: None, name: "Test".to_string(), - source: Source { start_block: 1 }, + source: Source { + start_block: 1, + end_block: None, + }, mapping: Mapping { api_version: semver::Version::new(0, 0, 0), language: "".to_string(), @@ -679,7 +688,10 @@ mod tests { kind: "cosmos".to_string(), network: None, name: "Test".to_string(), - source: Source { start_block: 1 }, + source: Source { + start_block: 1, + end_block: None, + }, mapping: Mapping { api_version: semver::Version::new(0, 0, 0), language: "".to_string(), diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index 4fc4d307d0c..064d2c10ab4 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -121,6 +121,21 @@ impl TriggerFilter { pub(crate) fn requires_traces(&self) -> bool { !self.call.is_empty() || self.block.requires_traces() } + + #[cfg(debug_assertions)] + pub fn log(&self) -> &EthereumLogFilter { + &self.log + } + + #[cfg(debug_assertions)] + pub fn call(&self) -> &EthereumCallFilter { + &self.call + } + + #[cfg(debug_assertions)] + pub fn block(&self) -> &EthereumBlockFilter { + &self.block + } } impl bc::TriggerFilter for TriggerFilter { @@ -185,7 +200,7 @@ impl bc::TriggerFilter for TriggerFilter { } #[derive(Clone, Debug, Default)] -pub(crate) struct EthereumLogFilter { +pub struct EthereumLogFilter { /// Log filters can be represented as a bipartite graph between contracts and events. An edge /// exists between a contract and an event if a data source for the contract has a trigger for /// the event. @@ -382,10 +397,20 @@ impl EthereumLogFilter { } filters.into_iter() } + + #[cfg(debug_assertions)] + pub fn contract_addresses(&self) -> impl Iterator + '_ { + self.contracts_and_events_graph + .nodes() + .filter_map(|node| match node { + LogFilterNode::Contract(address) => Some(address), + LogFilterNode::Event(_) => None, + }) + } } #[derive(Clone, Debug, Default)] -pub(crate) struct EthereumCallFilter { +pub struct EthereumCallFilter { // Each call filter has a map of filters keyed by address, each containing a tuple with // start_block and the set of function signatures pub contract_addresses_function_signatures: @@ -583,7 +608,7 @@ impl From<&EthereumBlockFilter> for EthereumCallFilter { } #[derive(Clone, Debug, Default)] -pub(crate) struct EthereumBlockFilter { +pub struct EthereumBlockFilter { /// Used for polling block handlers, a hashset of (start_block, polling_interval) pub polling_intervals: HashSet<(BlockNumber, i32)>, pub contract_addresses: HashSet<(BlockNumber, Address)>, diff --git a/chain/ethereum/src/data_source.rs b/chain/ethereum/src/data_source.rs index bd1855965a3..b7782f87044 100644 --- a/chain/ethereum/src/data_source.rs +++ b/chain/ethereum/src/data_source.rs @@ -51,6 +51,7 @@ pub struct DataSource { pub manifest_idx: u32, pub address: Option
, pub start_block: BlockNumber, + pub end_block: Option, pub mapping: Mapping, pub context: Arc>, pub creation_block: Option, @@ -99,6 +100,7 @@ impl blockchain::DataSource for DataSource { manifest_idx: template.manifest_idx, address: Some(address), start_block: creation_block, + end_block: None, mapping: template.mapping, context: Arc::new(context), creation_block: Some(creation_block), @@ -137,6 +139,10 @@ impl blockchain::DataSource for DataSource { self.start_block } + fn end_block(&self) -> Option { + self.end_block + } + fn match_and_decode( &self, trigger: &::TriggerData, @@ -176,12 +182,12 @@ impl blockchain::DataSource for DataSource { address, mapping, context, - // The creation block is ignored for detection duplicate data sources. // Contract ABI equality is implicit in `mapping.abis` equality. creation_block: _, contract_abi: _, start_block: _, + end_block: _, } = self; // mapping_request_sender, host_metrics, and (most of) host_exports are operational structs @@ -247,6 +253,7 @@ impl blockchain::DataSource for DataSource { manifest_idx, address, start_block: creation_block.unwrap_or(0), + end_block: None, mapping: template.mapping.clone(), context: Arc::new(context), creation_block, @@ -382,6 +389,7 @@ impl DataSource { manifest_idx, address: source.address, start_block: source.start_block, + end_block: source.end_block, mapping, context: Arc::new(context), creation_block, diff --git a/chain/near/src/chain.rs b/chain/near/src/chain.rs index 3fdf5b1bee7..321a2650094 100644 --- a/chain/near/src/chain.rs +++ b/chain/near/src/chain.rs @@ -1010,6 +1010,7 @@ mod test { source: crate::data_source::Source { account, start_block: 10, + end_block: None, accounts: partial_accounts, }, mapping: Mapping { diff --git a/chain/near/src/data_source.rs b/chain/near/src/data_source.rs index 7fcac3ca9a0..a2651edb51d 100644 --- a/chain/near/src/data_source.rs +++ b/chain/near/src/data_source.rs @@ -92,6 +92,10 @@ impl blockchain::DataSource for DataSource { kinds } + fn end_block(&self) -> Option { + self.source.end_block + } + fn match_and_decode( &self, trigger: &::TriggerData, @@ -493,10 +497,12 @@ impl PartialAccounts { } #[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] pub(crate) struct Source { // A data source that does not have an account or accounts can only have block handlers. pub(crate) account: Option, - #[serde(rename = "startBlock", default)] + #[serde(default)] pub(crate) start_block: BlockNumber, + pub(crate) end_block: Option, pub(crate) accounts: Option, } diff --git a/chain/starknet/src/data_source.rs b/chain/starknet/src/data_source.rs index 144dcf8560c..00ed1c1c290 100644 --- a/chain/starknet/src/data_source.rs +++ b/chain/starknet/src/data_source.rs @@ -48,6 +48,7 @@ pub struct UnresolvedDataSource { #[serde(rename_all = "camelCase")] pub struct Source { pub start_block: BlockNumber, + pub end_block: Option, #[serde(default, deserialize_with = "deserialize_address")] pub address: Option, } @@ -98,6 +99,10 @@ impl blockchain::DataSource for DataSource { self.source.start_block } + fn end_block(&self) -> Option { + self.source.end_block + } + fn handler_kinds(&self) -> HashSet<&str> { let mut kinds = HashSet::new(); diff --git a/chain/substreams/src/data_source.rs b/chain/substreams/src/data_source.rs index b78b0c90463..fec9b015dd1 100644 --- a/chain/substreams/src/data_source.rs +++ b/chain/substreams/src/data_source.rs @@ -47,6 +47,10 @@ impl blockchain::DataSource for DataSource { self.initial_block.unwrap_or(0) } + fn end_block(&self) -> Option { + None + } + fn name(&self) -> &str { &self.name } diff --git a/core/src/subgraph/context.rs b/core/src/subgraph/context.rs index d455fe64601..460dd3e9d00 100644 --- a/core/src/subgraph/context.rs +++ b/core/src/subgraph/context.rs @@ -66,8 +66,8 @@ where { instance: SubgraphInstance, pub instances: SubgraphKeepAlive, - pub filter: C::TriggerFilter, pub offchain_monitor: OffchainMonitor, + pub filter: Option, trigger_processor: Box>, } @@ -75,15 +75,14 @@ impl> IndexingContext { pub fn new( instance: SubgraphInstance, instances: SubgraphKeepAlive, - filter: C::TriggerFilter, offchain_monitor: OffchainMonitor, trigger_processor: Box>, ) -> Self { Self { instance, instances, - filter, offchain_monitor, + filter: None, trigger_processor, } } @@ -182,7 +181,6 @@ impl> IndexingContext { self.instance.causality_region_next_value() } - #[cfg(debug_assertions)] pub fn instance(&self) -> &SubgraphInstance { &self.instance } diff --git a/core/src/subgraph/context/instance.rs b/core/src/subgraph/context/instance.rs index d566bd8081d..e8c849f1a5f 100644 --- a/core/src/subgraph/context/instance.rs +++ b/core/src/subgraph/context/instance.rs @@ -15,7 +15,9 @@ pub struct SubgraphInstance> { subgraph_id: DeploymentHash, network: String, host_builder: T, - templates: Arc>>, + pub templates: Arc>>, + /// The data sources declared in the subgraph manifest. This does not include dynamic data sources. + pub data_sources: Arc>>, host_metrics: Arc, /// The hosts represent the data sources in the subgraph. There is one host per data source. @@ -33,9 +35,12 @@ where C: Blockchain, T: RuntimeHostBuilder, { + /// Create a new subgraph instance from the given manifest and data sources. + /// `data_sources` must contain all data sources declared in the manifest + all dynamic data sources. pub fn from_manifest( logger: &Logger, manifest: SubgraphManifest, + data_sources: Vec>, host_builder: T, host_metrics: Arc, offchain_monitor: &mut OffchainMonitor, @@ -49,6 +54,7 @@ where host_builder, subgraph_id, network, + data_sources: Arc::new(manifest.data_sources), hosts: Hosts::new(), module_cache: HashMap::new(), templates, @@ -59,7 +65,7 @@ where // Create a new runtime host for each data source in the subgraph manifest; // we use the same order here as in the subgraph manifest to make the // event processing behavior predictable - for ds in manifest.data_sources { + for ds in data_sources { // TODO: This is duplicating code from `IndexingContext::add_dynamic_data_source` and // `SubgraphInstance::add_dynamic_data_source`. Ideally this should be refactored into // `IndexingContext`. @@ -215,7 +221,6 @@ where self.causality_region_seq.next_val() } - #[cfg(debug_assertions)] pub fn hosts(&self) -> &[Arc] { &self.hosts.hosts() } diff --git a/core/src/subgraph/inputs.rs b/core/src/subgraph/inputs.rs index e0c8a655b73..b2e95c753f5 100644 --- a/core/src/subgraph/inputs.rs +++ b/core/src/subgraph/inputs.rs @@ -15,6 +15,7 @@ pub struct IndexingInputs { pub deployment: DeploymentLocator, pub features: BTreeSet, pub start_blocks: Vec, + pub end_blocks: BTreeSet, pub stop_block: Option, pub store: Arc, pub debug_fork: Option>, @@ -37,6 +38,7 @@ impl IndexingInputs { deployment, features, start_blocks, + end_blocks, stop_block, store: _, debug_fork, @@ -53,6 +55,7 @@ impl IndexingInputs { deployment: deployment.clone(), features: features.clone(), start_blocks: start_blocks.clone(), + end_blocks: end_blocks.clone(), stop_block: stop_block.clone(), store, debug_fork: debug_fork.clone(), diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 80751e7105c..6a3d97d589c 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -2,12 +2,11 @@ use crate::polling_monitor::{ArweaveService, IpfsService}; use crate::subgraph::context::{IndexingContext, SubgraphKeepAlive}; use crate::subgraph::inputs::IndexingInputs; use crate::subgraph::loader::load_dynamic_data_sources; +use std::collections::BTreeSet; use crate::subgraph::runner::SubgraphRunner; use graph::blockchain::block_stream::BlockStreamMetrics; -use graph::blockchain::Blockchain; -use graph::blockchain::NodeCapabilities; -use graph::blockchain::{BlockchainKind, TriggerFilter}; +use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities}; use graph::components::subgraph::ProofOfIndexingVersion; use graph::data::subgraph::{UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6}; use graph::data_source::causality_region::CausalityRegionSeq; @@ -245,7 +244,7 @@ impl SubgraphInstanceManager { "n_templates" => manifest.templates.len(), ); - let mut manifest = manifest + let manifest = manifest .resolve(&link_resolver, &logger, ENV_VARS.max_spec_version.clone()) .await?; @@ -289,36 +288,22 @@ impl SubgraphInstanceManager { // that is done store.start_subgraph_deployment(&logger).await?; - // Dynamic data sources are loaded by appending them to the manifest. - // - // Refactor: Preferrably we'd avoid any mutation of the manifest. - let (manifest, static_data_sources) = { - let data_sources = load_dynamic_data_sources(store.clone(), logger.clone(), &manifest) + let dynamic_data_sources = + load_dynamic_data_sources(store.clone(), logger.clone(), &manifest) .await .context("Failed to load dynamic data sources")?; - let static_data_sources = manifest.data_sources.clone(); - - // Add dynamic data sources to the subgraph - manifest.data_sources.extend(data_sources); - - info!( - logger, - "Data source count at start: {}", - manifest.data_sources.len() - ); - - (manifest, static_data_sources) - }; + // Combine the data sources from the manifest with the dynamic data sources + let mut data_sources = manifest.data_sources.clone(); + data_sources.extend(dynamic_data_sources); - let static_filters = - self.static_filters || manifest.data_sources.len() >= ENV_VARS.static_filters_threshold; + info!(logger, "Data source count at start: {}", data_sources.len()); - let onchain_data_sources = manifest - .data_sources + let onchain_data_sources = data_sources .iter() .filter_map(|d| d.as_onchain().cloned()) .collect::>(); + let required_capabilities = C::NodeCapabilities::from_data_sources(&onchain_data_sources); let network = manifest.network_name(); @@ -328,33 +313,20 @@ impl SubgraphInstanceManager { .with_context(|| format!("no chain configured for network {}", network))? .clone(); - // if static_filters is enabled, build a minimal filter with the static data sources and - // add the necessary filters based on templates. - // if not enabled we just stick to the filter based on all the data sources. - // This specifically removes dynamic data sources based filters because these can be derived - // from templates AND this reduces the cost of egress traffic by making the payloads smaller. - let filter = if static_filters { - if !self.static_filters { - info!(logger, "forcing subgraph to use static filters.") - } - - let onchain_data_sources = static_data_sources.iter().filter_map(|d| d.as_onchain()); - - let mut filter = C::TriggerFilter::from_data_sources(onchain_data_sources); - - filter.extend_with_template( - manifest - .templates - .iter() - .filter_map(|ds| ds.as_onchain()) - .cloned(), - ); - filter - } else { - C::TriggerFilter::from_data_sources(onchain_data_sources.iter()) - }; + let start_blocks: Vec = data_sources + .iter() + .filter_map(|d| d.as_onchain().map(|d: &C::DataSource| d.start_block())) + .collect(); - let start_blocks = manifest.start_blocks(); + let end_blocks: BTreeSet = manifest + .data_sources + .iter() + .filter_map(|d| { + d.as_onchain() + .map(|d: &C::DataSource| d.end_block()) + .flatten() + }) + .collect(); let templates = Arc::new(manifest.templates.clone()); @@ -433,6 +405,7 @@ impl SubgraphInstanceManager { let instance = super::context::instance::SubgraphInstance::from_manifest( &logger, manifest, + data_sources, host_builder, host_metrics.clone(), &mut offchain_monitor, @@ -443,6 +416,7 @@ impl SubgraphInstanceManager { deployment: deployment.clone(), features, start_blocks, + end_blocks, stop_block, store, debug_fork, @@ -450,20 +424,15 @@ impl SubgraphInstanceManager { chain, templates, unified_api_version, - static_filters, + static_filters: self.static_filters, poi_version, network, instrument, }; // The subgraph state tracks the state of the subgraph instance over time - let ctx = IndexingContext::new( - instance, - self.instances.cheap_clone(), - filter, - offchain_monitor, - tp, - ); + let ctx = + IndexingContext::new(instance, self.instances.cheap_clone(), offchain_monitor, tp); let metrics = RunnerMetrics { subgraph: subgraph_metrics, diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 5db94b7b286..14277de4e0e 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -106,6 +106,58 @@ where self.run_inner(break_on_restart).await } + fn build_filter(&self) -> C::TriggerFilter { + let current_ptr = self.inputs.store.block_ptr(); + let static_filters = self.inputs.static_filters + || self.ctx.instance().hosts().len() > ENV_VARS.static_filters_threshold; + + // Filter out data sources that have reached their end block + let end_block_filter = |ds: &&C::DataSource| match current_ptr.as_ref() { + // We filter out datasources for which the current block is at or past their end block. + Some(block) => ds.end_block().map_or(true, |end| block.number < end), + // If there is no current block, we keep all datasources. + None => true, + }; + + // if static_filters is not enabled we just stick to the filter based on all the data sources. + if !static_filters { + return C::TriggerFilter::from_data_sources( + self.ctx + .instance() + .hosts() + .iter() + .filter_map(|h| h.data_source().as_onchain()) + // Filter out data sources that have reached their end block if the block is final. + .filter(end_block_filter), + ); + } + + // if static_filters is enabled, build a minimal filter with the static data sources and + // add the necessary filters based on templates. + // This specifically removes dynamic data sources based filters because these can be derived + // from templates AND this reduces the cost of egress traffic by making the payloads smaller. + + if !self.inputs.static_filters { + info!(self.logger, "forcing subgraph to use static filters.") + } + + let data_sources = self.ctx.instance().data_sources.clone(); + + let mut filter = C::TriggerFilter::from_data_sources( + data_sources + .iter() + .filter_map(|ds| ds.as_onchain()) + // Filter out data sources that have reached their end block if the block is final. + .filter(end_block_filter), + ); + + let templates = self.ctx.instance().templates.clone(); + + filter.extend_with_template(templates.iter().filter_map(|ds| ds.as_onchain()).cloned()); + + filter + } + pub async fn run(self) -> Result<(), Error> { self.run_inner(false).await.map(|_| ()) } @@ -144,12 +196,17 @@ where let block_stream_canceler = CancelGuard::new(); let block_stream_cancel_handle = block_stream_canceler.handle(); + // TriggerFilter needs to be rebuilt eveytime the blockstream is restarted + self.ctx.filter = Some(self.build_filter()); - let mut block_stream = - new_block_stream(&self.inputs, &self.ctx.filter, &self.metrics.subgraph) - .await? - .map_err(CancelableError::Error) - .cancelable(&block_stream_canceler, || Err(CancelableError::Cancel)); + let mut block_stream = new_block_stream( + &self.inputs, + self.ctx.filter.as_ref().unwrap(), // Safe to unwrap as we just called `build_filter` in the previous line + &self.metrics.subgraph, + ) + .await? + .map_err(CancelableError::Error) + .cancelable(&block_stream_canceler, || Err(CancelableError::Cancel)); // Keep the stream's cancel guard around to be able to shut it down when the subgraph // deployment is unassigned @@ -275,10 +332,18 @@ where } }; + // Check if there are any datasources that have expired in this block. ie: the end_block + // of that data source is equal to the block number of the current block. + let has_expired_data_sources = self.inputs.end_blocks.contains(&block_ptr.number); + // If new onchain data sources have been created, and static filters are not in use, it is necessary // to restart the block stream with the new filters. - let needs_restart = - block_state.has_created_on_chain_data_sources() && !self.inputs.static_filters; + let created_data_sources_needs_restart = + !self.inputs.static_filters && block_state.has_created_on_chain_data_sources(); + + // Determine if the block stream needs to be restarted due to newly created on-chain data sources + // or data sources that have reached their end block. + let needs_restart = created_data_sources_needs_restart || has_expired_data_sources; { let _section = self @@ -437,9 +502,9 @@ where // Check for offchain events and process them, including their entity modifications in the // set to be transacted. let offchain_events = self.ctx.offchain_monitor.ready_offchain_events()?; - let (offchain_mods, processed_data_sources, persisted_off_chain_data_sources) = self - .handle_offchain_triggers(offchain_events, &block) - .await?; + let (offchain_mods, processed_offchain_data_sources, persisted_off_chain_data_sources) = + self.handle_offchain_triggers(offchain_events, &block) + .await?; mods.extend(offchain_mods); // Put the cache back in the state, asserting that the placeholder cache was not used. @@ -495,7 +560,7 @@ where &self.metrics.host.stopwatch, persisted_data_sources, deterministic_errors, - processed_data_sources, + processed_offchain_data_sources, is_non_fatal_errors_active, ) .await @@ -657,11 +722,6 @@ where ); block_state.persist_data_source(data_source.as_stored_dynamic_data_source()); } - - // Merge filters from data sources into the block stream builder - self.ctx - .filter - .extend(data_sources.iter().filter_map(|ds| ds.as_onchain())); } } @@ -811,6 +871,7 @@ trait StreamEventHandler { err: CancelableError, cancel_handle: &CancelHandle, ) -> Result; + fn needs_restart(&self, revert_to_ptr: BlockPtr, subgraph_ptr: BlockPtr) -> bool; } #[async_trait] @@ -1001,6 +1062,17 @@ where } } + /// Determines if the subgraph needs to be restarted. + /// Currently returns true when there are data sources that have reached their end block + /// in the range between `revert_to_ptr` and `subgraph_ptr`. + fn needs_restart(&self, revert_to_ptr: BlockPtr, subgraph_ptr: BlockPtr) -> bool { + self.inputs + .end_blocks + .range(revert_to_ptr.number..=subgraph_ptr.number) + .next() + .is_some() + } + async fn handle_revert( &mut self, revert_to_ptr: BlockPtr, @@ -1021,7 +1093,7 @@ where if let Err(e) = self .inputs .store - .revert_block_operations(revert_to_ptr, cursor) + .revert_block_operations(revert_to_ptr.clone(), cursor) .await { error!(&self.logger, "Could not revert block. Retrying"; "error" => %e); @@ -1041,7 +1113,15 @@ where self.revert_state(subgraph_ptr.number)?; - Ok(Action::Continue) + let needs_restart: bool = self.needs_restart(revert_to_ptr, subgraph_ptr); + + let action = if needs_restart { + Action::Restart + } else { + Action::Continue + }; + + Ok(action) } async fn handle_err( diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index 03eb80453d5..a8829f22dab 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -76,6 +76,10 @@ impl DataSource for MockDataSource { .collect() } + fn end_block(&self) -> Option { + todo!() + } + fn name(&self) -> &str { todo!() } diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index 837037db019..26df7f8b37c 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -255,15 +255,18 @@ pub trait DataSource: 'static + Sized + Send + Sync + Clone { fn address(&self) -> Option<&[u8]>; fn start_block(&self) -> BlockNumber; + fn end_block(&self) -> Option; fn name(&self) -> &str; fn kind(&self) -> &str; fn network(&self) -> Option<&str>; fn context(&self) -> Arc>; fn creation_block(&self) -> Option; fn api_version(&self) -> semver::Version; + fn min_spec_version(&self) -> semver::Version { MIN_SPEC_VERSION } + fn runtime(&self) -> Option>>; fn handler_kinds(&self) -> HashSet<&str>; @@ -292,6 +295,11 @@ pub trait DataSource: 'static + Sized + Send + Sync + Clone { /// Used as part of manifest validation. If there are no errors, return an empty vector. fn validate(&self) -> Vec; + + fn has_expired(&self, block: BlockNumber) -> bool { + self.end_block() + .map_or(false, |end_block| block > end_block) + } } #[async_trait] diff --git a/graph/src/data/subgraph/api_version.rs b/graph/src/data/subgraph/api_version.rs index 5e642719d95..1900f12a188 100644 --- a/graph/src/data/subgraph/api_version.rs +++ b/graph/src/data/subgraph/api_version.rs @@ -38,6 +38,9 @@ pub const SPEC_VERSION_0_0_7: Version = Version::new(0, 0, 7); /// Enables polling block handlers and initialisation handlers. pub const SPEC_VERSION_0_0_8: Version = Version::new(0, 0, 8); +// Enables `endBlock` feature. +pub const SPEC_VERSION_0_0_9: Version = Version::new(0, 0, 9); + pub const MIN_SPEC_VERSION: Version = Version::new(0, 0, 2); #[derive(Clone, PartialEq, Debug)] diff --git a/graph/src/data/subgraph/mod.rs b/graph/src/data/subgraph/mod.rs index 5fbe307d72c..dbfdec4dd71 100644 --- a/graph/src/data/subgraph/mod.rs +++ b/graph/src/data/subgraph/mod.rs @@ -419,6 +419,7 @@ impl UnresolvedSchema { } #[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] pub struct Source { /// The contract address for the data source. We allow data sources /// without an address for 'wildcard' triggers that catch all possible @@ -426,8 +427,9 @@ pub struct Source { #[serde(default, deserialize_with = "deserialize_address")] pub address: Option
, pub abi: String, - #[serde(rename = "startBlock", default)] + #[serde(default)] pub start_block: BlockNumber, + pub end_block: Option, } pub fn calls_host_fn(runtime: &[u8], host_fn: &str) -> anyhow::Result { @@ -858,6 +860,15 @@ impl UnresolvedSubgraphManifest { ); } + if spec_version < SPEC_VERSION_0_0_9 + && data_sources.iter().any(|ds| ds.end_block().is_some()) + { + bail!( + "Defining `endBlock` in the manifest is not supported prior to {}", + SPEC_VERSION_0_0_9 + ); + } + // Check the min_spec_version of each data source against the spec version of the subgraph let min_spec_version_mismatch = data_sources .iter() diff --git a/graph/src/data_source/mod.rs b/graph/src/data_source/mod.rs index 90c0597805f..e4f216350bf 100644 --- a/graph/src/data_source/mod.rs +++ b/graph/src/data_source/mod.rs @@ -8,7 +8,7 @@ mod tests; use crate::{ blockchain::{ - BlockPtr, Blockchain, DataSource as _, DataSourceTemplate as _, MappingTriggerTrait, + Block, BlockPtr, Blockchain, DataSource as _, DataSourceTemplate as _, MappingTriggerTrait, TriggerData as _, UnresolvedDataSource as _, UnresolvedDataSourceTemplate as _, }, components::{ @@ -126,6 +126,13 @@ impl DataSource { } } + pub fn end_block(&self) -> Option { + match self { + Self::Onchain(ds) => ds.end_block(), + Self::Offchain(_) => None, + } + } + pub fn creation_block(&self) -> Option { match self { Self::Onchain(ds) => ds.creation_block(), @@ -177,6 +184,7 @@ impl DataSource { logger: &Logger, ) -> Result>>, Error> { match (self, trigger) { + (Self::Onchain(ds), _) if ds.has_expired(block.number()) => Ok(None), (Self::Onchain(ds), TriggerData::Onchain(trigger)) => ds .match_and_decode(trigger, block, logger) .map(|t| t.map(|t| t.map(MappingTrigger::Onchain))), diff --git a/graph/src/env/mod.rs b/graph/src/env/mod.rs index a3aba7bd53e..1522998d6e9 100644 --- a/graph/src/env/mod.rs +++ b/graph/src/env/mod.rs @@ -288,7 +288,7 @@ struct Inner { default = "false" )] allow_non_deterministic_fulltext_search: EnvVarBoolean, - #[envconfig(from = "GRAPH_MAX_SPEC_VERSION", default = "0.0.8")] + #[envconfig(from = "GRAPH_MAX_SPEC_VERSION", default = "0.0.9")] max_spec_version: Version, #[envconfig(from = "GRAPH_LOAD_WINDOW_SIZE", default = "300")] load_window_size_in_secs: u64, diff --git a/runtime/test/src/common.rs b/runtime/test/src/common.rs index dde5b24e1bd..b31cbbe7dab 100644 --- a/runtime/test/src/common.rs +++ b/runtime/test/src/common.rs @@ -135,6 +135,7 @@ pub fn mock_data_source(path: &str, api_version: Version) -> DataSource { network: Some(String::from("mainnet")), address: Some(Address::from_str("0123123123012312312301231231230123123123").unwrap()), start_block: 0, + end_block: None, mapping: Mapping { kind: String::from("ethereum/events"), api_version, diff --git a/store/test-store/tests/chain/ethereum/manifest.rs b/store/test-store/tests/chain/ethereum/manifest.rs index c59d6f4793d..495e517e6fc 100644 --- a/store/test-store/tests/chain/ethereum/manifest.rs +++ b/store/test-store/tests/chain/ethereum/manifest.rs @@ -8,7 +8,9 @@ use graph::blockchain::DataSource; use graph::data::store::scalar::Bytes; use graph::data::store::Value; use graph::data::subgraph::schema::SubgraphError; -use graph::data::subgraph::{SPEC_VERSION_0_0_4, SPEC_VERSION_0_0_7, SPEC_VERSION_0_0_8}; +use graph::data::subgraph::{ + SPEC_VERSION_0_0_4, SPEC_VERSION_0_0_7, SPEC_VERSION_0_0_8, SPEC_VERSION_0_0_9, +}; use graph::data_source::offchain::OffchainDataSourceKind; use graph::data_source::DataSourceTemplate; use graph::entity; @@ -547,6 +549,43 @@ specVersion: 0.0.8 assert_eq!("Qmmanifest", manifest.id.as_str()); } +#[tokio::test] +async fn parse_data_source_with_end_block() { + const YAML: &str = " +dataSources: + - kind: ethereum/contract + name: Factory + network: mainnet + source: + abi: Factory + startBlock: 9562480 + endBlock: 9562481 + mapping: + kind: ethereum/events + apiVersion: 0.0.4 + language: wasm/assemblyscript + entities: + - TestEntity + file: + /: /ipfs/Qmmapping + abis: + - name: Factory + file: + /: /ipfs/Qmabi +schema: + file: + /: /ipfs/Qmschema +specVersion: 0.0.9 +"; + + let manifest = resolve_manifest(YAML, SPEC_VERSION_0_0_9).await; + // Check if end block is parsed correctly + let data_source = manifest.data_sources.first().unwrap(); + let end_block = data_source.as_onchain().unwrap().end_block; + + assert_eq!(Some(9562481), end_block); +} + #[tokio::test] async fn parse_block_handlers_with_both_polling_and_once_filter() { const YAML: &str = " diff --git a/store/test-store/tests/postgres/store.rs b/store/test-store/tests/postgres/store.rs index 2e3bb5f9430..8215f9e62e9 100644 --- a/store/test-store/tests/postgres/store.rs +++ b/store/test-store/tests/postgres/store.rs @@ -1117,6 +1117,7 @@ fn mock_data_source() -> graph_chain_ethereum::DataSource { network: Some(String::from("mainnet")), address: Some(Address::from_str("0123123123012312312301231231230123123123").unwrap()), start_block: 0, + end_block: None, mapping: Mapping { kind: String::from("ethereum/events"), api_version: Version::parse("0.1.0").unwrap(), diff --git a/tests/runner-tests/end-block/abis/Contract.abi b/tests/runner-tests/end-block/abis/Contract.abi new file mode 100644 index 00000000000..9d9f56b9263 --- /dev/null +++ b/tests/runner-tests/end-block/abis/Contract.abi @@ -0,0 +1,15 @@ +[ + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "string", + "name": "testCommand", + "type": "string" + } + ], + "name": "TestEvent", + "type": "event" + } +] diff --git a/tests/runner-tests/end-block/package.json b/tests/runner-tests/end-block/package.json new file mode 100644 index 00000000000..b01a779c4bb --- /dev/null +++ b/tests/runner-tests/end-block/package.json @@ -0,0 +1,13 @@ +{ + "name": "end-block", + "version": "0.1.0", + "scripts": { + "codegen": "graph codegen --skip-migrations", + "create:test": "graph create test/end-block --node $GRAPH_NODE_ADMIN_URI", + "deploy:test": "graph deploy test/end-block --version-label v0.0.1 --ipfs $IPFS_URI --node $GRAPH_NODE_ADMIN_URI" + }, + "devDependencies": { + "@graphprotocol/graph-cli": "0.54.0-alpha-20230727052453-1e0e6e5", + "@graphprotocol/graph-ts": "0.30.0" + } +} diff --git a/tests/runner-tests/end-block/schema.graphql b/tests/runner-tests/end-block/schema.graphql new file mode 100644 index 00000000000..c1afe145b8c --- /dev/null +++ b/tests/runner-tests/end-block/schema.graphql @@ -0,0 +1,12 @@ +type Block @entity { + id: ID! + hash: String! + number: BigInt! +} + +type TestEventEntity @entity { + id: ID! + blockHash: String! + blockNumber: BigInt! + command: String! +} diff --git a/tests/runner-tests/end-block/src/mapping.ts b/tests/runner-tests/end-block/src/mapping.ts new file mode 100644 index 00000000000..0ce2ef94eb3 --- /dev/null +++ b/tests/runner-tests/end-block/src/mapping.ts @@ -0,0 +1,23 @@ +import { + ethereum, +} from '@graphprotocol/graph-ts'; +import { TestEvent } from '../generated/Contract/Contract'; +import { Block, TestEventEntity } from '../generated/schema'; + +export function handleBlock(block: ethereum.Block): void { + let entity = new Block(block.number.toHex()); + entity.number = block.number; + entity.hash = block.hash.toHexString(); + entity.save(); +} + +export function handleTestEvent(event: TestEvent): void { + let command = event.params.testCommand; + let entity = new TestEventEntity( + event.transaction.hash.toHex() + "-" + event.logIndex.toString(), + ); + entity.blockNumber = event.block.number; + entity.blockHash = event.block.hash.toHexString(); + entity.command = command; + entity.save(); +} diff --git a/tests/runner-tests/api-version/subgraph.yaml b/tests/runner-tests/end-block/subgraph.yaml similarity index 82% rename from tests/runner-tests/api-version/subgraph.yaml rename to tests/runner-tests/end-block/subgraph.yaml index 39d6a73190c..a20a593e8b8 100644 --- a/tests/runner-tests/api-version/subgraph.yaml +++ b/tests/runner-tests/end-block/subgraph.yaml @@ -1,4 +1,4 @@ -specVersion: 0.0.4 +specVersion: 0.0.9 schema: file: ./schema.graphql dataSources: @@ -8,15 +8,18 @@ dataSources: source: address: "0x0000000000000000000000000000000000000000" abi: Contract + endBlock: 8 mapping: kind: ethereum/events apiVersion: 0.0.7 language: wasm/assemblyscript + entities: + - Gravatar abis: - name: Contract file: ./abis/Contract.abi - entities: - - Call + blockHandlers: + - handler: handleBlock eventHandlers: - event: TestEvent(string) handler: handleTestEvent diff --git a/tests/runner-tests/yarn.lock b/tests/runner-tests/yarn.lock index 945a4ce31d1..e484370f844 100644 --- a/tests/runner-tests/yarn.lock +++ b/tests/runner-tests/yarn.lock @@ -249,6 +249,38 @@ which "2.0.2" yaml "1.10.2" +"@graphprotocol/graph-cli@0.54.0-alpha-20230727052453-1e0e6e5": + version "0.54.0-alpha-20230727052453-1e0e6e5" + resolved "https://registry.yarnpkg.com/@graphprotocol/graph-cli/-/graph-cli-0.54.0-alpha-20230727052453-1e0e6e5.tgz#7c36225484d503ab410ea03861d701dc30bc8b1e" + integrity sha512-pxZAJvUXHRMtPIoMTSvVyIjqrfMGCtaqWG9qdRDrLMxUKrIuGWniMKntxaFnHPlgz6OQznN9Zt8wV6uScD/4Sg== + dependencies: + "@float-capital/float-subgraph-uncrashable" "^0.0.0-alpha.4" + "@oclif/core" "2.8.6" + "@whatwg-node/fetch" "^0.8.4" + assemblyscript "0.19.23" + binary-install-raw "0.0.13" + chalk "3.0.0" + chokidar "3.5.3" + debug "4.3.4" + docker-compose "0.23.19" + dockerode "2.5.8" + fs-extra "9.1.0" + glob "9.3.5" + gluegun "5.1.2" + graphql "15.5.0" + immutable "4.2.1" + ipfs-http-client "55.0.0" + jayson "4.0.0" + js-yaml "3.14.1" + prettier "1.19.1" + request "2.88.2" + semver "7.4.0" + sync-request "6.1.0" + tmp-promise "3.0.3" + web3-eth-abi "1.7.0" + which "2.0.2" + yaml "1.10.2" + "@graphprotocol/graph-cli@0.60.0": version "0.60.0" resolved "https://registry.yarnpkg.com/@graphprotocol/graph-cli/-/graph-cli-0.60.0.tgz#afcae7a966ad348886f49372d36c4ca6c35b9434" diff --git a/tests/tests/runner_tests.rs b/tests/tests/runner_tests.rs index e8eafef5e8e..8b7471366e9 100644 --- a/tests/tests/runner_tests.rs +++ b/tests/tests/runner_tests.rs @@ -1,5 +1,6 @@ use std::marker::PhantomData; use std::process::Command; +use std::str::FromStr; use std::sync::atomic::{self, AtomicBool}; use std::sync::Arc; use std::time::Duration; @@ -15,6 +16,7 @@ use graph::env::EnvVars; use graph::ipfs_client::IpfsClient; use graph::object; use graph::prelude::ethabi::ethereum_types::H256; +use graph::prelude::web3::types::Address; use graph::prelude::{ hex, CheapClone, DeploymentHash, SubgraphAssignmentProvider, SubgraphName, SubgraphStore, }; @@ -24,6 +26,7 @@ use graph_tests::fixture::ethereum::{ }; use graph_tests::fixture::{ self, stores, test_ptr, test_ptr_reorged, MockAdapterSelector, NoopAdapterSelector, Stores, + TestContext, }; use graph_tests::helpers::run_cmd; use slog::{o, Discard, Logger}; @@ -428,6 +431,130 @@ async fn derived_loaders() { ); } +#[tokio::test] +async fn end_block() -> anyhow::Result<()> { + let RunnerTestRecipe { + stores, + subgraph_name, + hash, + } = RunnerTestRecipe::new("end-block").await; + // This test is to test the end_block feature which enables datasources to stop indexing + // At a user specified block, this test tests whether the subgraph stops indexing at that + // block, rebuild the filters accurately when a revert occurs etc + + // test if the TriggerFilter includes the given contract address + async fn test_filter( + ctx: &TestContext, + block_ptr: BlockPtr, + addr: &Address, + should_contain_addr: bool, + ) { + dbg!(block_ptr.number, should_contain_addr); + let runner = ctx.runner(block_ptr.clone()).await; + let runner = runner.run_for_test(false).await.unwrap(); + let filter = runner.context().filter.as_ref().unwrap(); + let addresses = filter.log().contract_addresses().collect::>(); + + if should_contain_addr { + assert!(addresses.contains(&addr)); + } else { + assert!(!addresses.contains(&addr)); + }; + } + + let blocks = { + let block_0 = genesis(); + let block_1 = empty_block(block_0.ptr(), test_ptr(1)); + let block_2 = empty_block(block_1.ptr(), test_ptr(2)); + let block_3 = empty_block(block_2.ptr(), test_ptr(3)); + let block_4 = empty_block(block_3.ptr(), test_ptr(4)); + let block_5 = empty_block(block_4.ptr(), test_ptr(5)); + let block_6 = empty_block(block_5.ptr(), test_ptr(6)); + let block_7 = empty_block(block_6.ptr(), test_ptr(7)); + let block_8 = empty_block(block_7.ptr(), test_ptr(8)); + let block_9 = empty_block(block_8.ptr(), test_ptr(9)); + let block_10 = empty_block(block_9.ptr(), test_ptr(10)); + vec![ + block_0, block_1, block_2, block_3, block_4, block_5, block_6, block_7, block_8, + block_9, block_10, + ] + }; + + let stop_block = blocks.last().unwrap().block.ptr(); + + let chain = chain(blocks.clone(), &stores, None).await; + let ctx = fixture::setup(subgraph_name.clone(), &hash, &stores, &chain, None, None).await; + + let addr = Address::from_str("0x0000000000000000000000000000000000000000").unwrap(); + + // Test if the filter includes the contract address before the stop block. + test_filter(&ctx, test_ptr(5), &addr, true).await; + + // Test if the filter excludes the contract address after the stop block. + test_filter(&ctx, stop_block, &addr, false).await; + + // Query the subgraph to ensure the last indexed block is number 8, indicating the end block feature works. + let query_res = ctx + .query(r#"{ blocks(first: 1, orderBy: number, orderDirection: desc) { number hash } }"#) + .await + .unwrap(); + + assert_eq!( + query_res, + Some( + object! { blocks: vec![object!{ number: "8", hash:"0x0000000000000000000000000000000000000000000000000000000000000008" }] } + ) + ); + + // Simulate a chain reorg and ensure the filter rebuilds accurately post-reorg. + { + ctx.rewind(test_ptr(6)); + + let mut blocks = blocks[0..8].to_vec().clone(); + + // Create new blocks to represent a fork from block 7 onwards, including a reorged block 8. + let block_8_1_ptr = test_ptr_reorged(8, 1); + let block_8_1 = empty_block(test_ptr(7), block_8_1_ptr.clone()); + blocks.push(block_8_1); + blocks.push(empty_block(block_8_1_ptr, test_ptr(9))); + + let stop_block = blocks.last().unwrap().block.ptr(); + + chain.set_block_stream(blocks.clone()); + + // Test the filter behavior in the presence of the reorganized chain. + test_filter(&ctx, test_ptr(7), &addr, true).await; + test_filter(&ctx, stop_block, &addr, false).await; + + // Verify that after the reorg, the last Block entity still reflects block number 8, but with a different hash. + let query_res = ctx + .query( + r#"{ + blocks(first: 1, orderBy: number, orderDirection: desc) { + number + hash + } + }"#, + ) + .await + .unwrap(); + + assert_eq!( + query_res, + Some(object! { + blocks: vec![ + object!{ + number: "8", + hash: "0x0000000100000000000000000000000000000000000000000000000000000008" + } + ], + }) + ); + } + + Ok(()) +} + #[tokio::test] async fn file_data_sources() { let RunnerTestRecipe {