Skip to content

Commit

Permalink
Return valid POI for deterministically failed SGs (#4774)
Browse files Browse the repository at this point in the history
Requesting a POI for a deterministically failed
Subgraph on a block past the failure will now
return the latest valid POI.
  • Loading branch information
mangas committed Jul 24, 2023
1 parent 97e0adc commit ba3a680
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 19 deletions.
4 changes: 4 additions & 0 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ where
// There's no point in calling it if we have no current or parent block
// pointers, because there would be: no block to revert to or to search
// errors from (first execution).
//
// We attempt to unfail deterministic errors to mitigate deterministic
// errors caused by wrong data being consumed from the providers. It has
// been a frequent case in the past so this helps recover on a larger scale.
let _outcome = self
.inputs
.store
Expand Down
41 changes: 24 additions & 17 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -918,25 +918,24 @@ impl DeploymentStore {
block: BlockPtr,
) -> Result<Option<[u8; 32]>, StoreError> {
let indexer = *indexer;
let site3 = site.cheap_clone();
let site4 = site.cheap_clone();
let site5 = site.cheap_clone();
let site2 = site.cheap_clone();
let store = self.cheap_clone();
let block2 = block.cheap_clone();

let entities = self
let entities: Option<(Vec<Entity>, BlockPtr)> = self
.with_conn(move |conn, cancel| {
let site = site.clone();
cancel.check_cancel()?;

let layout = store.layout(conn, site4.cheap_clone())?;
let layout = store.layout(conn, site.cheap_clone())?;

if !layout.supports_proof_of_indexing() {
return Ok(None);
}

conn.transaction::<_, CancelableError<anyhow::Error>, _>(move || {
let mut block_ptr = block.cheap_clone();
let latest_block_ptr =
match Self::block_ptr_with_conn(conn, site4.cheap_clone())? {
match Self::block_ptr_with_conn(conn, site.cheap_clone())? {
Some(inner) => inner,
None => return Ok(None),
};
Expand All @@ -951,30 +950,38 @@ impl DeploymentStore {
// The best we can do right now is just to make sure that the block number
// is high enough.
if latest_block_ptr.number < block.number {
return Ok(None);
}
// If a subgraph has failed deterministically then any blocks past head
// should return the same POI
let fatal_error = ErrorDetail::fatal(conn, &site.deployment)?;
block_ptr = match fatal_error {
Some(se) => TryInto::<SubgraphError>::try_into(se)?
.block_ptr
.unwrap_or(block_ptr),
None => return Ok(None),
};
};

let query = EntityQuery::new(
site4.deployment.cheap_clone(),
block.number,
site.deployment.cheap_clone(),
block_ptr.number,
EntityCollection::All(vec![(
POI_OBJECT.cheap_clone(),
AttributeNames::All,
)]),
);
let entities = store
.execute_query::<Entity>(conn, site4, query)
.execute_query::<Entity>(conn, site, query)
.map(|(entities, _)| entities)
.map_err(anyhow::Error::from)?;

Ok(Some(entities))
Ok(Some((entities, block_ptr)))
})
.map_err(Into::into)
})
.await?;

let entities = if let Some(entities) = entities {
entities
let (entities, block_ptr) = if let Some((entities, bp)) = entities {
(entities, bp)
} else {
return Ok(None);
};
Expand All @@ -995,10 +1002,10 @@ impl DeploymentStore {
})
.collect::<Result<HashMap<_, _>, anyhow::Error>>()?;

let info = self.subgraph_info(&site5).map_err(anyhow::Error::from)?;
let info = self.subgraph_info(&site2).map_err(anyhow::Error::from)?;

let mut finisher =
ProofOfIndexingFinisher::new(&block2, &site3.deployment, &indexer, info.poi_version);
ProofOfIndexingFinisher::new(&block_ptr, &site2.deployment, &indexer, info.poi_version);
for (name, region) in by_causality_region.drain() {
finisher.add_causality_region(&name, &region);
}
Expand Down
2 changes: 1 addition & 1 deletion tests/src/fixture/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl Drop for TestContext {
pub struct Stores {
network_name: String,
chain_head_listener: Arc<ChainHeadUpdateListener>,
network_store: Arc<Store>,
pub network_store: Arc<Store>,
chain_store: Arc<ChainStore>,
}

Expand Down
64 changes: 63 additions & 1 deletion tests/tests/runner_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use graph_tests::helpers::run_cmd;
use slog::{o, Discard, Logger};

struct RunnerTestRecipe {
stores: Stores,
pub stores: Stores,
subgraph_name: SubgraphName,
hash: DeploymentHash,
}
Expand Down Expand Up @@ -503,6 +503,68 @@ async fn fatal_error() -> anyhow::Result<()> {
Ok(())
}

#[tokio::test]
async fn poi_for_deterministically_failed_sg() -> anyhow::Result<()> {
let RunnerTestRecipe {
stores,
subgraph_name,
hash,
} = RunnerTestRecipe::new("fatal-error").await;

let blocks = {
let block_0 = genesis();
let block_1 = empty_block(block_0.ptr(), test_ptr(1));
let block_2 = empty_block(block_1.ptr(), test_ptr(2));
let block_3 = empty_block(block_2.ptr(), test_ptr(3));
// let block_4 = empty_block(block_3.ptr(), test_ptr(4));
vec![block_0, block_1, block_2, block_3]
};

let stop_block = blocks.last().unwrap().block.ptr();

let chain = chain(blocks.clone(), &stores, None).await;
let ctx = fixture::setup(subgraph_name.clone(), &hash, &stores, &chain, None, None).await;

ctx.start_and_sync_to_error(stop_block).await;

// Go through the indexing status API to also test it.
let status = ctx.indexing_status().await;
assert!(status.health == SubgraphHealth::Failed);
assert!(status.entity_count == 1.into()); // Only PoI
let err = status.fatal_error.unwrap();
assert!(err.block.number == 3.into());
assert!(err.deterministic);

let sg_store = stores.network_store.subgraph_store();

let poi2 = sg_store
.get_proof_of_indexing(&hash, &None, test_ptr(2))
.await
.unwrap();

// All POIs past this point should be the same
let poi3 = sg_store
.get_proof_of_indexing(&hash, &None, test_ptr(3))
.await
.unwrap();
assert!(poi2 != poi3);

let poi4 = sg_store
.get_proof_of_indexing(&hash, &None, test_ptr(4))
.await
.unwrap();
assert_eq!(poi3, poi4);
assert!(poi2 != poi4);

let poi100 = sg_store
.get_proof_of_indexing(&hash, &None, test_ptr(100))
.await
.unwrap();
assert_eq!(poi4, poi100);
assert!(poi2 != poi100);

Ok(())
}
async fn build_subgraph(dir: &str) -> DeploymentHash {
build_subgraph_with_yarn_cmd(dir, "deploy:test").await
}
Expand Down

0 comments on commit ba3a680

Please sign in to comment.