diff --git a/chain/arweave/src/data_source.rs b/chain/arweave/src/data_source.rs index a4189c8df5e..bb68d59be7a 100644 --- a/chain/arweave/src/data_source.rs +++ b/chain/arweave/src/data_source.rs @@ -47,11 +47,9 @@ impl blockchain::DataSource for DataSource { self.source.start_block } - fn has_expired(&self, block: BlockNumber) -> bool { - match self.source.end_block { - Some(end_block) => block > end_block, - None => false, - } + + fn end_block(&self) -> Option { + self.source.end_block } fn match_and_decode( diff --git a/chain/cosmos/src/data_source.rs b/chain/cosmos/src/data_source.rs index 3ef74735eaf..d5d8963fd3a 100644 --- a/chain/cosmos/src/data_source.rs +++ b/chain/cosmos/src/data_source.rs @@ -49,11 +49,8 @@ impl blockchain::DataSource for DataSource { self.source.start_block } - fn has_expired(&self, block: BlockNumber) -> bool { - match self.source.end_block { - Some(end_block) => block > end_block, - None => false, - } + fn end_block(&self) -> Option { + self.source.end_block } fn match_and_decode( diff --git a/chain/ethereum/src/data_source.rs b/chain/ethereum/src/data_source.rs index 7e934d5fc08..3864eb57991 100644 --- a/chain/ethereum/src/data_source.rs +++ b/chain/ethereum/src/data_source.rs @@ -109,11 +109,8 @@ impl blockchain::DataSource for DataSource { self.start_block } - fn has_expired(&self, block: BlockNumber) -> bool { - match self.end_block { - Some(end_block) => block > end_block, - None => false, - } + fn end_block(&self) -> Option { + self.end_block } fn match_and_decode( diff --git a/chain/near/src/data_source.rs b/chain/near/src/data_source.rs index d5be019d40d..60acf011118 100644 --- a/chain/near/src/data_source.rs +++ b/chain/near/src/data_source.rs @@ -75,11 +75,8 @@ impl blockchain::DataSource for DataSource { self.source.start_block } - fn has_expired(&self, block: BlockNumber) -> bool { - match self.source.end_block { - Some(end_block) => block > end_block, - None => false, - } + fn end_block(&self) -> Option { + self.source.end_block } fn match_and_decode( diff --git a/chain/substreams/src/data_source.rs b/chain/substreams/src/data_source.rs index 1b0464214c8..f8d731f712d 100644 --- a/chain/substreams/src/data_source.rs +++ b/chain/substreams/src/data_source.rs @@ -51,8 +51,8 @@ impl blockchain::DataSource for DataSource { self.initial_block.unwrap_or(0) } - fn has_expired(&self, _: BlockNumber) -> bool { - false + fn end_block(&self) -> Option { + None } fn name(&self) -> &str { diff --git a/core/src/subgraph/context/instance.rs b/core/src/subgraph/context/instance.rs index 5a1cfe30826..2fcb68c9304 100644 --- a/core/src/subgraph/context/instance.rs +++ b/core/src/subgraph/context/instance.rs @@ -15,8 +15,9 @@ pub struct SubgraphInstance> { subgraph_id: DeploymentHash, network: String, host_builder: T, - templates: Arc>>, - static_data_sources: Arc>>, + pub templates: Arc>>, + /// The data sources declared in the subgraph manifest. This Does not include dynamic data sources. + pub data_sources: Arc>>, host_metrics: Arc, /// The hosts represent the data sources in the subgraph. There is one host per data source. @@ -51,7 +52,7 @@ where host_builder, subgraph_id, network, - static_data_sources: Arc::new(manifest.data_sources), + data_sources: Arc::new(manifest.data_sources), hosts: Hosts::new(), module_cache: HashMap::new(), templates, @@ -221,14 +222,6 @@ where pub fn hosts(&self) -> &[Arc] { &self.hosts.hosts() } - - pub fn static_data_sources(&self) -> Arc>> { - self.static_data_sources.clone() - } - - pub fn templates(&self) -> Arc>> { - self.templates.clone() - } } /// Runtime hosts, one for each data source mapping. diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index d578f69cc3b..9d16d084eb4 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -106,6 +106,55 @@ where self.run_inner(break_on_restart).await } + fn build_filter(&self, current_ptr: Option) -> C::TriggerFilter { + let static_filters = self.inputs.static_filters + || self.ctx.instance().hosts().len() > ENV_VARS.static_filters_threshold; + + // if static_filters is enabled, build a minimal filter with the static data sources and + // add the necessary filters based on templates. + // if not enabled we just stick to the filter based on all the data sources. + // This specifically removes dynamic data sources based filters because these can be derived + // from templates AND this reduces the cost of egress traffic by making the payloads smaller. + let filter = if static_filters { + if !self.inputs.static_filters { + info!(self.logger, "forcing subgraph to use static filters.") + } + + let data_sources = self.ctx.instance().data_sources.clone(); + + let mut filter = C::TriggerFilter::from_data_sources( + data_sources + .iter() + .filter_map(|ds| ds.as_onchain()) + // Filter out data sources that have reached their end block if the block is final. + .filter(|ds| match current_ptr.as_ref() { + Some(block) => !ds.has_expired(block.number), + None => false, + }), + ); + + let templates = self.ctx.instance().templates.clone(); + + filter.extend_with_template(templates.iter().filter_map(|ds| ds.as_onchain()).cloned()); + + filter + } else { + C::TriggerFilter::from_data_sources( + self.ctx + .instance() + .hosts() + .iter() + .filter_map(|h| h.data_source().as_onchain()) + .filter(|ds| match current_ptr.as_ref() { + Some(block) => !ds.has_expired(block.number), + None => false, + }), + ) + }; + + filter + } + pub async fn run(self) -> Result<(), Error> { self.run_inner(false).await.map(|_| ()) } @@ -146,51 +195,7 @@ where let block_stream_cancel_handle = block_stream_canceler.handle(); let current_ptr = self.inputs.store.block_ptr(); - let static_filters = self.inputs.static_filters - || self.ctx.instance().hosts().len() > ENV_VARS.static_filters_threshold; - - // if static_filters is enabled, build a minimal filter with the static data sources and - // add the necessary filters based on templates. - // if not enabled we just stick to the filter based on all the data sources. - // This specifically removes dynamic data sources based filters because these can be derived - // from templates AND this reduces the cost of egress traffic by making the payloads smaller. - let filter = if static_filters { - if !self.inputs.static_filters { - info!(self.logger, "forcing subgraph to use static filters.") - } - - let static_data_sources = self.ctx.instance().static_data_sources(); - - let mut filter = C::TriggerFilter::from_data_sources( - static_data_sources - .iter() - .filter_map(|ds| ds.as_onchain()) - // Filter out data sources that have reached their end block. - .filter(|ds| match current_ptr.as_ref() { - Some(block) => !ds.has_expired(block.number), - None => true, - }), - ); - - filter.extend_with_template( - self.ctx - .instance() - .templates() - .iter() - .filter_map(|ds| ds.as_onchain()) - .cloned(), - ); - - filter - } else { - C::TriggerFilter::from_data_sources( - self.ctx - .instance() - .hosts() - .iter() - .filter_map(|h| h.data_source().as_onchain()), - ) - }; + let filter = self.build_filter(current_ptr.clone()); let mut block_stream = new_block_stream(&self.inputs, &filter, &self.metrics.subgraph) .await? diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index a2287f39e29..527067381d9 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -70,7 +70,7 @@ impl DataSource for MockDataSource { todo!() } - fn has_expired(&self, _: BlockNumber) -> bool { + fn end_block(&self) -> Option { todo!() } diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index 8d307b8ba98..dfcc93977b3 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -268,7 +268,7 @@ pub trait DataSource: 'static + Sized + Send + Sync + Clone { /// If the data source has an `endBlock`, check whether the trigger block is /// within the range of blocks that the data source is supposed to handle. /// Otherwise, ignore the trigger. - fn has_expired(&self, block: BlockNumber) -> bool; + fn end_block(&self) -> Option; fn name(&self) -> &str; fn kind(&self) -> &str; fn network(&self) -> Option<&str>; @@ -301,6 +301,11 @@ pub trait DataSource: 'static + Sized + Send + Sync + Clone { /// Used as part of manifest validation. If there are no errors, return an empty vector. fn validate(&self) -> Vec; + + fn has_expired(&self, block: BlockNumber) -> bool { + self.end_block() + .map_or(false, |end_block| block > end_block) + } } #[async_trait]