Skip to content

Commit

Permalink
chain,core,graph: refactor end_block implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 committed Oct 24, 2023
1 parent 5d93fb6 commit d572536
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 34 deletions.
4 changes: 2 additions & 2 deletions chain/arweave/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,11 +396,11 @@ pub struct TransactionHandler {
}

#[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct Source {
// A data source that does not have an owner can only have block handlers.
pub(crate) owner: Option<String>,
#[serde(rename = "startBlock", default)]
#[serde(default)]
pub(crate) start_block: BlockNumber,
#[serde(rename = "endBlock")]
pub(crate) end_block: Option<BlockNumber>,
}
4 changes: 2 additions & 2 deletions chain/cosmos/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,10 +506,10 @@ pub struct MappingMessageHandler {
}

#[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Source {
#[serde(rename = "startBlock", default)]
#[serde(default)]
pub start_block: BlockNumber,
#[serde(rename = "endBlock")]
pub(crate) end_block: Option<BlockNumber>,
}

Expand Down
2 changes: 1 addition & 1 deletion chain/ethereum/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ impl blockchain::DataSource<Chain> for DataSource {
self.mapping.api_version.clone()
}

fn min_spec_version_inner(&self) -> semver::Version {
fn min_spec_version(&self) -> semver::Version {
self.mapping
.block_handlers
.iter()
Expand Down
4 changes: 2 additions & 2 deletions chain/near/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,12 +497,12 @@ impl PartialAccounts {
}

#[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct Source {
// A data source that does not have an account or accounts can only have block handlers.
pub(crate) account: Option<String>,
#[serde(rename = "startBlock", default)]
#[serde(default)]
pub(crate) start_block: BlockNumber,
#[serde(rename = "endBlock")]
pub(crate) end_block: Option<BlockNumber>,
pub(crate) accounts: Option<PartialAccounts>,
}
27 changes: 16 additions & 11 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ where
self.run_inner(break_on_restart).await
}

fn build_filter(&mut self) {
fn build_filter(&self) -> C::TriggerFilter {
let current_ptr = self.inputs.store.block_ptr();
let static_filters = self.inputs.static_filters
|| self.ctx.instance().hosts().len() > ENV_VARS.static_filters_threshold;
Expand Down Expand Up @@ -156,7 +156,7 @@ where
)
};

self.ctx.filter = Some(filter);
filter
}

pub async fn run(self) -> Result<(), Error> {
Expand Down Expand Up @@ -198,7 +198,7 @@ where
let block_stream_canceler = CancelGuard::new();
let block_stream_cancel_handle = block_stream_canceler.handle();
// TriggerFilter needs to be rebuilt eveytime the blockstream is restarted
self.build_filter();
self.ctx.filter = Some(self.build_filter());

let mut block_stream = new_block_stream(
&self.inputs,
Expand Down Expand Up @@ -872,6 +872,7 @@ trait StreamEventHandler<C: Blockchain> {
err: CancelableError<Error>,
cancel_handle: &CancelHandle,
) -> Result<Action, Error>;
fn needs_restart(&self, revert_to_ptr: BlockPtr, subgraph_ptr: BlockPtr) -> bool;
}

#[async_trait]
Expand Down Expand Up @@ -1062,6 +1063,17 @@ where
}
}

/// Determines if the subgraph needs to be restarted.
/// Currently returns true when there are data sources that have reached their end block
/// in the range between `revert_to_ptr` and `subgraph_ptr`.
fn needs_restart(&self, revert_to_ptr: BlockPtr, subgraph_ptr: BlockPtr) -> bool {
self.inputs
.end_blocks
.range(revert_to_ptr.number..=subgraph_ptr.number)
.next()
.is_some()
}

async fn handle_revert(
&mut self,
revert_to_ptr: BlockPtr,
Expand Down Expand Up @@ -1102,14 +1114,7 @@ where

self.revert_state(subgraph_ptr.number)?;

// If any datasource has reached its end block in the range [revert_to_ptr, subgraph_ptr]
// then we need to restart the blockstream to reset the TriggerFilter.
let needs_restart: bool = self
.inputs
.end_blocks
.range(revert_to_ptr.number..=subgraph_ptr.number)
.next()
.is_some();
let needs_restart: bool = self.needs_restart(revert_to_ptr, subgraph_ptr);

let action = if needs_restart {
Action::Restart
Expand Down
15 changes: 1 addition & 14 deletions graph/src/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mod types;
use crate::{
cheap_clone::CheapClone,
components::store::{DeploymentCursorTracker, DeploymentLocator, StoredDynamicDataSource},
data::subgraph::{UnifiedMappingApiVersion, MIN_SPEC_VERSION, SPEC_VERSION_0_0_9},
data::subgraph::{UnifiedMappingApiVersion, MIN_SPEC_VERSION},
data_source,
prelude::DataSourceContext,
runtime::{gas::GasCounter, AscHeap, HostExportError},
Expand Down Expand Up @@ -263,20 +263,7 @@ pub trait DataSource<C: Blockchain>: 'static + Sized + Send + Sync + Clone {
fn creation_block(&self) -> Option<BlockNumber>;
fn api_version(&self) -> semver::Version;

/// **NOTE: DO NOT OVERRIDE THIS METHOD.**
/// Use `min_spec_version_inner` for custom logic instead.
/// This method is used to determine the minimum spec version required by the data source.
fn min_spec_version(&self) -> semver::Version {
// If the data source has an end block, then min spec version is 0.0.9
let min_version_from_trait = self
.end_block()
.map_or(MIN_SPEC_VERSION, |_| SPEC_VERSION_0_0_9);
let min_version_from_inner = self.min_spec_version_inner();

min_version_from_inner.max(min_version_from_trait)
}

fn min_spec_version_inner(&self) -> semver::Version {
MIN_SPEC_VERSION
}

Expand Down
13 changes: 11 additions & 2 deletions graph/src/data/subgraph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,16 +419,16 @@ impl UnresolvedSchema {
}

#[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Source {
/// The contract address for the data source. We allow data sources
/// without an address for 'wildcard' triggers that catch all possible
/// events with the given `abi`
#[serde(default, deserialize_with = "deserialize_address")]
pub address: Option<Address>,
pub abi: String,
#[serde(rename = "startBlock", default)]
#[serde(default)]
pub start_block: BlockNumber,
#[serde(rename = "endBlock", default)]
pub end_block: Option<BlockNumber>,
}

Expand Down Expand Up @@ -860,6 +860,15 @@ impl<C: Blockchain> UnresolvedSubgraphManifest<C> {
);
}

if spec_version < SPEC_VERSION_0_0_9
&& data_sources.iter().any(|ds| ds.end_block().is_some())
{
bail!(
"Defining `endBlock` in the manifest is not supported prior to {}",
SPEC_VERSION_0_0_9
);
}

// Check the min_spec_version of each data source against the spec version of the subgraph
let min_spec_version_mismatch = data_sources
.iter()
Expand Down
7 changes: 7 additions & 0 deletions graph/src/data_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ impl<C: Blockchain> DataSource<C> {
}
}

pub fn end_block(&self) -> Option<BlockNumber> {
match self {
Self::Onchain(ds) => ds.end_block(),
Self::Offchain(_) => None,
}
}

pub fn creation_block(&self) -> Option<BlockNumber> {
match self {
Self::Onchain(ds) => ds.creation_block(),
Expand Down

0 comments on commit d572536

Please sign in to comment.