Skip to content

Commit

Permalink
refactor SubgraphRunner.run_inner to extract build_filter
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 committed Sep 4, 2023
1 parent d960f49 commit 87ba763
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 80 deletions.
8 changes: 3 additions & 5 deletions chain/arweave/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,9 @@ impl blockchain::DataSource<Chain> for DataSource {
self.source.start_block
}

fn has_expired(&self, block: BlockNumber) -> bool {
match self.source.end_block {
Some(end_block) => block > end_block,
None => false,
}

fn end_block(&self) -> Option<BlockNumber> {
self.source.end_block
}

fn match_and_decode(
Expand Down
7 changes: 2 additions & 5 deletions chain/cosmos/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,8 @@ impl blockchain::DataSource<Chain> for DataSource {
self.source.start_block
}

fn has_expired(&self, block: BlockNumber) -> bool {
match self.source.end_block {
Some(end_block) => block > end_block,
None => false,
}
fn end_block(&self) -> Option<BlockNumber> {
self.source.end_block
}

fn match_and_decode(
Expand Down
7 changes: 2 additions & 5 deletions chain/ethereum/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,8 @@ impl blockchain::DataSource<Chain> for DataSource {
self.start_block
}

fn has_expired(&self, block: BlockNumber) -> bool {
match self.end_block {
Some(end_block) => block > end_block,
None => false,
}
fn end_block(&self) -> Option<BlockNumber> {
self.end_block
}

fn match_and_decode(
Expand Down
7 changes: 2 additions & 5 deletions chain/near/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,8 @@ impl blockchain::DataSource<Chain> for DataSource {
self.source.start_block
}

fn has_expired(&self, block: BlockNumber) -> bool {
match self.source.end_block {
Some(end_block) => block > end_block,
None => false,
}
fn end_block(&self) -> Option<BlockNumber> {
self.source.end_block
}

fn match_and_decode(
Expand Down
4 changes: 2 additions & 2 deletions chain/substreams/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ impl blockchain::DataSource<Chain> for DataSource {
self.initial_block.unwrap_or(0)
}

fn has_expired(&self, _: BlockNumber) -> bool {
false
fn end_block(&self) -> Option<BlockNumber> {
None
}

fn name(&self) -> &str {
Expand Down
15 changes: 4 additions & 11 deletions core/src/subgraph/context/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ pub struct SubgraphInstance<C: Blockchain, T: RuntimeHostBuilder<C>> {
subgraph_id: DeploymentHash,
network: String,
host_builder: T,
templates: Arc<Vec<DataSourceTemplate<C>>>,
static_data_sources: Arc<Vec<DataSource<C>>>,
pub templates: Arc<Vec<DataSourceTemplate<C>>>,
/// The data sources declared in the subgraph manifest. This Does not include dynamic data sources.
pub data_sources: Arc<Vec<DataSource<C>>>,
host_metrics: Arc<HostMetrics>,

/// The hosts represent the data sources in the subgraph. There is one host per data source.
Expand Down Expand Up @@ -51,7 +52,7 @@ where
host_builder,
subgraph_id,
network,
static_data_sources: Arc::new(manifest.data_sources),
data_sources: Arc::new(manifest.data_sources),
hosts: Hosts::new(),
module_cache: HashMap::new(),
templates,
Expand Down Expand Up @@ -221,14 +222,6 @@ where
pub fn hosts(&self) -> &[Arc<T::Host>] {
&self.hosts.hosts()
}

pub fn static_data_sources(&self) -> Arc<Vec<DataSource<C>>> {
self.static_data_sources.clone()
}

pub fn templates(&self) -> Arc<Vec<DataSourceTemplate<C>>> {
self.templates.clone()
}
}

/// Runtime hosts, one for each data source mapping.
Expand Down
95 changes: 50 additions & 45 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,55 @@ where
self.run_inner(break_on_restart).await
}

fn build_filter(&self, current_ptr: Option<BlockPtr>) -> C::TriggerFilter {
let static_filters = self.inputs.static_filters
|| self.ctx.instance().hosts().len() > ENV_VARS.static_filters_threshold;

// if static_filters is enabled, build a minimal filter with the static data sources and
// add the necessary filters based on templates.
// if not enabled we just stick to the filter based on all the data sources.
// This specifically removes dynamic data sources based filters because these can be derived
// from templates AND this reduces the cost of egress traffic by making the payloads smaller.
let filter = if static_filters {
if !self.inputs.static_filters {
info!(self.logger, "forcing subgraph to use static filters.")
}

let data_sources = self.ctx.instance().data_sources.clone();

let mut filter = C::TriggerFilter::from_data_sources(
data_sources
.iter()
.filter_map(|ds| ds.as_onchain())
// Filter out data sources that have reached their end block if the block is final.
.filter(|ds| match current_ptr.as_ref() {
Some(block) => !ds.has_expired(block.number),
None => false,
}),
);

let templates = self.ctx.instance().templates.clone();

filter.extend_with_template(templates.iter().filter_map(|ds| ds.as_onchain()).cloned());

filter
} else {
C::TriggerFilter::from_data_sources(
self.ctx
.instance()
.hosts()
.iter()
.filter_map(|h| h.data_source().as_onchain())
.filter(|ds| match current_ptr.as_ref() {
Some(block) => !ds.has_expired(block.number),
None => false,
}),
)
};

filter
}

pub async fn run(self) -> Result<(), Error> {
self.run_inner(false).await.map(|_| ())
}
Expand Down Expand Up @@ -146,51 +195,7 @@ where
let block_stream_cancel_handle = block_stream_canceler.handle();
let current_ptr = self.inputs.store.block_ptr();

let static_filters = self.inputs.static_filters
|| self.ctx.instance().hosts().len() > ENV_VARS.static_filters_threshold;

// if static_filters is enabled, build a minimal filter with the static data sources and
// add the necessary filters based on templates.
// if not enabled we just stick to the filter based on all the data sources.
// This specifically removes dynamic data sources based filters because these can be derived
// from templates AND this reduces the cost of egress traffic by making the payloads smaller.
let filter = if static_filters {
if !self.inputs.static_filters {
info!(self.logger, "forcing subgraph to use static filters.")
}

let static_data_sources = self.ctx.instance().static_data_sources();

let mut filter = C::TriggerFilter::from_data_sources(
static_data_sources
.iter()
.filter_map(|ds| ds.as_onchain())
// Filter out data sources that have reached their end block.
.filter(|ds| match current_ptr.as_ref() {
Some(block) => !ds.has_expired(block.number),
None => true,
}),
);

filter.extend_with_template(
self.ctx
.instance()
.templates()
.iter()
.filter_map(|ds| ds.as_onchain())
.cloned(),
);

filter
} else {
C::TriggerFilter::from_data_sources(
self.ctx
.instance()
.hosts()
.iter()
.filter_map(|h| h.data_source().as_onchain()),
)
};
let filter = self.build_filter(current_ptr.clone());

let mut block_stream = new_block_stream(&self.inputs, &filter, &self.metrics.subgraph)
.await?
Expand Down
2 changes: 1 addition & 1 deletion graph/src/blockchain/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl<C: Blockchain> DataSource<C> for MockDataSource {
todo!()
}

fn has_expired(&self, _: BlockNumber) -> bool {
fn end_block(&self) -> Option<BlockNumber> {
todo!()
}

Expand Down
7 changes: 6 additions & 1 deletion graph/src/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ pub trait DataSource<C: Blockchain>: 'static + Sized + Send + Sync + Clone {
/// If the data source has an `endBlock`, check whether the trigger block is
/// within the range of blocks that the data source is supposed to handle.
/// Otherwise, ignore the trigger.
fn has_expired(&self, block: BlockNumber) -> bool;
fn end_block(&self) -> Option<BlockNumber>;
fn name(&self) -> &str;
fn kind(&self) -> &str;
fn network(&self) -> Option<&str>;
Expand Down Expand Up @@ -301,6 +301,11 @@ pub trait DataSource<C: Blockchain>: 'static + Sized + Send + Sync + Clone {

/// Used as part of manifest validation. If there are no errors, return an empty vector.
fn validate(&self) -> Vec<Error>;

fn has_expired(&self, block: BlockNumber) -> bool {
self.end_block()
.map_or(false, |end_block| block > end_block)
}
}

#[async_trait]
Expand Down

0 comments on commit 87ba763

Please sign in to comment.