From 941d1ac528da34631380f2ef1d6eb666a9b8b253 Mon Sep 17 00:00:00 2001 From: Filipe Azevedo Date: Thu, 20 Jul 2023 15:07:18 +0100 Subject: [PATCH] Return valid POI for deterministically failed SGs Requesting a POI for a deterministically failed Subgraph on a block past the failure will now return the latest valid POI. --- core/src/subgraph/runner.rs | 4 ++ store/postgres/src/deployment_store.rs | 60 +++++++++++++++++------- tests/src/fixture/mod.rs | 2 +- tests/tests/runner_tests.rs | 64 +++++++++++++++++++++++++- 4 files changed, 111 insertions(+), 19 deletions(-) diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 3c28663604e..ce01d79490d 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -127,6 +127,10 @@ where // There's no point in calling it if we have no current or parent block // pointers, because there would be: no block to revert to or to search // errors from (first execution). + // + // We attempt to unfail deterministic errors to mitigate deterministic + // errors caused by wrong data being consumed from the providers. It has + // been a frequent case in the past so this helps recover on a larger scale. let _outcome = self .inputs .store diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 7eb66502cd8..dee235cfcf3 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -37,7 +37,9 @@ use std::time::{Duration, Instant}; use graph::components::store::EntityCollection; use graph::components::subgraph::{ProofOfIndexingFinisher, ProofOfIndexingVersion}; use graph::constraint_violation; -use graph::data::subgraph::schema::{DeploymentCreate, SubgraphError, POI_DIGEST, POI_OBJECT}; +use graph::data::subgraph::schema::{ + DeploymentCreate, SubgraphError, SubgraphHealth, POI_DIGEST, POI_OBJECT, +}; use graph::prelude::{ anyhow, debug, info, o, warn, web3, AttributeNames, BlockNumber, BlockPtr, CheapClone, DeploymentHash, DeploymentState, Entity, EntityQuery, Error, Logger, QueryExecutionError, @@ -918,25 +920,24 @@ impl DeploymentStore { block: BlockPtr, ) -> Result, StoreError> { let indexer = *indexer; - let site3 = site.cheap_clone(); - let site4 = site.cheap_clone(); - let site5 = site.cheap_clone(); + let site2 = site.cheap_clone(); let store = self.cheap_clone(); - let block2 = block.cheap_clone(); - let entities = self + let entities: Option<(Vec, BlockPtr)> = self .with_conn(move |conn, cancel| { + let site = site.clone(); cancel.check_cancel()?; - let layout = store.layout(conn, site4.cheap_clone())?; + let layout = store.layout(conn, site.cheap_clone())?; if !layout.supports_proof_of_indexing() { return Ok(None); } conn.transaction::<_, CancelableError, _>(move || { + let mut block_ptr = block.cheap_clone(); let latest_block_ptr = - match Self::block_ptr_with_conn(conn, site4.cheap_clone())? { + match Self::block_ptr_with_conn(conn, site.cheap_clone())? { Some(inner) => inner, None => return Ok(None), }; @@ -951,30 +952,55 @@ impl DeploymentStore { // The best we can do right now is just to make sure that the block number // is high enough. if latest_block_ptr.number < block.number { - return Ok(None); + // If the subgraph failed deterministically then the POI from the failure block + // should be used. + let status = store.deployment_statuses(&[site.cheap_clone()])?; + let status: Option = status + .into_iter() + .filter(|e| e.health.eq(&SubgraphHealth::Failed)) + // There should only ever be a deterministic error in this stack. + .find_map(|e| match e.fatal_error { + Some(se) if se.deterministic => Some(se), + _ => None, + }); + + match status { + Some(se) => { + let bp = match se.block_ptr { + Some(bp) => bp, + // Keep previous behaviour if we can't find a block_ptr where + // the subgraph failed deterministically. + None => return Ok(None), + }; + // Override the block_number so we can get the entities from the + // relevant block. + block_ptr = bp; + } + None => return Ok(None), + }; } let query = EntityQuery::new( - site4.deployment.cheap_clone(), - block.number, + site.deployment.cheap_clone(), + block_ptr.number, EntityCollection::All(vec![( POI_OBJECT.cheap_clone(), AttributeNames::All, )]), ); let entities = store - .execute_query::(conn, site4, query) + .execute_query::(conn, site, query) .map(|(entities, _)| entities) .map_err(anyhow::Error::from)?; - Ok(Some(entities)) + Ok(Some((entities, block_ptr))) }) .map_err(Into::into) }) .await?; - let entities = if let Some(entities) = entities { - entities + let (entities, block_ptr) = if let Some((entities, bp)) = entities { + (entities, bp) } else { return Ok(None); }; @@ -995,10 +1021,10 @@ impl DeploymentStore { }) .collect::, anyhow::Error>>()?; - let info = self.subgraph_info(&site5).map_err(anyhow::Error::from)?; + let info = self.subgraph_info(&site2).map_err(anyhow::Error::from)?; let mut finisher = - ProofOfIndexingFinisher::new(&block2, &site3.deployment, &indexer, info.poi_version); + ProofOfIndexingFinisher::new(&block_ptr, &site2.deployment, &indexer, info.poi_version); for (name, region) in by_causality_region.drain() { finisher.add_causality_region(&name, ®ion); } diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 3c040e8a36f..b249d7b79d0 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -262,7 +262,7 @@ impl Drop for TestContext { pub struct Stores { network_name: String, chain_head_listener: Arc, - network_store: Arc, + pub network_store: Arc, chain_store: Arc, } diff --git a/tests/tests/runner_tests.rs b/tests/tests/runner_tests.rs index 5b513f49280..246a73c5f44 100644 --- a/tests/tests/runner_tests.rs +++ b/tests/tests/runner_tests.rs @@ -24,7 +24,7 @@ use graph_tests::helpers::run_cmd; use slog::{o, Discard, Logger}; struct RunnerTestRecipe { - stores: Stores, + pub stores: Stores, subgraph_name: SubgraphName, hash: DeploymentHash, } @@ -503,6 +503,68 @@ async fn fatal_error() -> anyhow::Result<()> { Ok(()) } +#[tokio::test] +async fn poi_for_deterministically_failed_sg() -> anyhow::Result<()> { + let RunnerTestRecipe { + stores, + subgraph_name, + hash, + } = RunnerTestRecipe::new("fatal-error").await; + + let blocks = { + let block_0 = genesis(); + let block_1 = empty_block(block_0.ptr(), test_ptr(1)); + let block_2 = empty_block(block_1.ptr(), test_ptr(2)); + let block_3 = empty_block(block_2.ptr(), test_ptr(3)); + // let block_4 = empty_block(block_3.ptr(), test_ptr(4)); + vec![block_0, block_1, block_2, block_3] + }; + + let stop_block = blocks.last().unwrap().block.ptr(); + + let chain = chain(blocks.clone(), &stores, None).await; + let ctx = fixture::setup(subgraph_name.clone(), &hash, &stores, &chain, None, None).await; + + ctx.start_and_sync_to_error(stop_block).await; + + // Go through the indexing status API to also test it. + let status = ctx.indexing_status().await; + assert!(status.health == SubgraphHealth::Failed); + assert!(status.entity_count == 1.into()); // Only PoI + let err = status.fatal_error.unwrap(); + assert!(err.block.number == 3.into()); + assert!(err.deterministic); + + let sg_store = stores.network_store.subgraph_store(); + + let poi2 = sg_store + .get_proof_of_indexing(&hash, &None, test_ptr(2)) + .await + .unwrap(); + + // All POIs past this point should be the same + let poi3 = sg_store + .get_proof_of_indexing(&hash, &None, test_ptr(3)) + .await + .unwrap(); + assert!(poi2 != poi3); + + let poi4 = sg_store + .get_proof_of_indexing(&hash, &None, test_ptr(4)) + .await + .unwrap(); + assert_eq!(poi3, poi4); + assert!(poi2 != poi4); + + let poi5 = sg_store + .get_proof_of_indexing(&hash, &None, test_ptr(4)) + .await + .unwrap(); + assert_eq!(poi4, poi5); + assert!(poi2 != poi5); + + Ok(()) +} async fn build_subgraph(dir: &str) -> DeploymentHash { build_subgraph_with_yarn_cmd(dir, "deploy:test").await }