Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return valid POI for deterministically failed SGs #4774

Merged
merged 1 commit into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ where
// There's no point in calling it if we have no current or parent block
// pointers, because there would be: no block to revert to or to search
// errors from (first execution).
//
// We attempt to unfail deterministic errors to mitigate deterministic
// errors caused by wrong data being consumed from the providers. It has
// been a frequent case in the past so this helps recover on a larger scale.
let _outcome = self
.inputs
.store
Expand Down
41 changes: 24 additions & 17 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -918,25 +918,24 @@ impl DeploymentStore {
block: BlockPtr,
) -> Result<Option<[u8; 32]>, StoreError> {
let indexer = *indexer;
let site3 = site.cheap_clone();
let site4 = site.cheap_clone();
let site5 = site.cheap_clone();
let site2 = site.cheap_clone();
let store = self.cheap_clone();
let block2 = block.cheap_clone();

let entities = self
let entities: Option<(Vec<Entity>, BlockPtr)> = self
.with_conn(move |conn, cancel| {
let site = site.clone();
cancel.check_cancel()?;

let layout = store.layout(conn, site4.cheap_clone())?;
let layout = store.layout(conn, site.cheap_clone())?;

if !layout.supports_proof_of_indexing() {
return Ok(None);
}

conn.transaction::<_, CancelableError<anyhow::Error>, _>(move || {
let mut block_ptr = block.cheap_clone();
let latest_block_ptr =
match Self::block_ptr_with_conn(conn, site4.cheap_clone())? {
match Self::block_ptr_with_conn(conn, site.cheap_clone())? {
Some(inner) => inner,
None => return Ok(None),
};
Expand All @@ -951,30 +950,38 @@ impl DeploymentStore {
// The best we can do right now is just to make sure that the block number
// is high enough.
if latest_block_ptr.number < block.number {
return Ok(None);
}
// If a subgraph has failed deterministically then any blocks past head
// should return the same POI
let fatal_error = ErrorDetail::fatal(conn, &site.deployment)?;
block_ptr = match fatal_error {
Some(se) => TryInto::<SubgraphError>::try_into(se)?
.block_ptr
.unwrap_or(block_ptr),
None => return Ok(None),
};
};

let query = EntityQuery::new(
site4.deployment.cheap_clone(),
block.number,
site.deployment.cheap_clone(),
block_ptr.number,
EntityCollection::All(vec![(
POI_OBJECT.cheap_clone(),
AttributeNames::All,
)]),
);
let entities = store
.execute_query::<Entity>(conn, site4, query)
.execute_query::<Entity>(conn, site, query)
.map(|(entities, _)| entities)
.map_err(anyhow::Error::from)?;

Ok(Some(entities))
Ok(Some((entities, block_ptr)))
})
.map_err(Into::into)
})
.await?;

let entities = if let Some(entities) = entities {
entities
let (entities, block_ptr) = if let Some((entities, bp)) = entities {
(entities, bp)
} else {
return Ok(None);
};
Expand All @@ -995,10 +1002,10 @@ impl DeploymentStore {
})
.collect::<Result<HashMap<_, _>, anyhow::Error>>()?;

let info = self.subgraph_info(&site5).map_err(anyhow::Error::from)?;
let info = self.subgraph_info(&site2).map_err(anyhow::Error::from)?;

let mut finisher =
ProofOfIndexingFinisher::new(&block2, &site3.deployment, &indexer, info.poi_version);
ProofOfIndexingFinisher::new(&block_ptr, &site2.deployment, &indexer, info.poi_version);
for (name, region) in by_causality_region.drain() {
finisher.add_causality_region(&name, &region);
}
Expand Down
2 changes: 1 addition & 1 deletion tests/src/fixture/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl Drop for TestContext {
pub struct Stores {
network_name: String,
chain_head_listener: Arc<ChainHeadUpdateListener>,
network_store: Arc<Store>,
pub network_store: Arc<Store>,
chain_store: Arc<ChainStore>,
}

Expand Down
64 changes: 63 additions & 1 deletion tests/tests/runner_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use graph_tests::helpers::run_cmd;
use slog::{o, Discard, Logger};

struct RunnerTestRecipe {
stores: Stores,
pub stores: Stores,
subgraph_name: SubgraphName,
hash: DeploymentHash,
}
Expand Down Expand Up @@ -503,6 +503,68 @@ async fn fatal_error() -> anyhow::Result<()> {
Ok(())
}

#[tokio::test]
async fn poi_for_deterministically_failed_sg() -> anyhow::Result<()> {
let RunnerTestRecipe {
stores,
subgraph_name,
hash,
} = RunnerTestRecipe::new("fatal-error").await;

let blocks = {
let block_0 = genesis();
let block_1 = empty_block(block_0.ptr(), test_ptr(1));
let block_2 = empty_block(block_1.ptr(), test_ptr(2));
let block_3 = empty_block(block_2.ptr(), test_ptr(3));
// let block_4 = empty_block(block_3.ptr(), test_ptr(4));
vec![block_0, block_1, block_2, block_3]
};

let stop_block = blocks.last().unwrap().block.ptr();

let chain = chain(blocks.clone(), &stores, None).await;
let ctx = fixture::setup(subgraph_name.clone(), &hash, &stores, &chain, None, None).await;

ctx.start_and_sync_to_error(stop_block).await;

// Go through the indexing status API to also test it.
let status = ctx.indexing_status().await;
assert!(status.health == SubgraphHealth::Failed);
assert!(status.entity_count == 1.into()); // Only PoI
let err = status.fatal_error.unwrap();
assert!(err.block.number == 3.into());
assert!(err.deterministic);

let sg_store = stores.network_store.subgraph_store();

let poi2 = sg_store
.get_proof_of_indexing(&hash, &None, test_ptr(2))
.await
.unwrap();

// All POIs past this point should be the same
let poi3 = sg_store
.get_proof_of_indexing(&hash, &None, test_ptr(3))
.await
.unwrap();
assert!(poi2 != poi3);

let poi4 = sg_store
.get_proof_of_indexing(&hash, &None, test_ptr(4))
.await
.unwrap();
assert_eq!(poi3, poi4);
assert!(poi2 != poi4);

let poi100 = sg_store
.get_proof_of_indexing(&hash, &None, test_ptr(100))
.await
.unwrap();
assert_eq!(poi4, poi100);
assert!(poi2 != poi100);

Ok(())
}
async fn build_subgraph(dir: &str) -> DeploymentHash {
build_subgraph_with_yarn_cmd(dir, "deploy:test").await
}
Expand Down
Loading