From 8d14119d19813771c533adc658460528559f5b8b Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Mon, 2 Sep 2024 16:30:26 +0530 Subject: [PATCH] graph, node: Allow rpc clients when using firehose --- chain/ethereum/src/chain.rs | 10 +++++----- graph/src/blockchain/client.rs | 13 +++++++++---- node/src/chain.rs | 5 ++++- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index 11b5fff4868..dbe5c5bda6f 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -389,7 +389,7 @@ impl Chain { // caller can spawn. pub async fn cheapest_adapter(&self) -> Arc { let adapters = match self.client.as_ref() { - ChainClient::Firehose(_) => panic!("no adapter with firehose"), + ChainClient::Firehose(_, _) => panic!("no adapter with firehose"), ChainClient::Rpc(adapter) => adapter, }; adapters.cheapest().await.unwrap() @@ -472,7 +472,7 @@ impl Blockchain for Chain { ) .await } - ChainClient::Firehose(_) => { + ChainClient::Firehose(_, _) => { self.block_stream_builder .build_firehose( self, @@ -498,7 +498,7 @@ impl Blockchain for Chain { number: BlockNumber, ) -> Result { match self.client.as_ref() { - ChainClient::Firehose(endpoints) => endpoints + ChainClient::Firehose(endpoints, _) => endpoints .endpoint() .await? .block_ptr_for_number::(logger, number) @@ -557,7 +557,7 @@ impl Blockchain for Chain { async fn block_ingestor(&self) -> anyhow::Result> { let ingestor: Box = match self.chain_client().as_ref() { - ChainClient::Firehose(_) => { + ChainClient::Firehose(_, _) => { let ingestor = FirehoseBlockIngestor::::new( self.chain_store.cheap_clone(), self.chain_client(), @@ -852,7 +852,7 @@ impl TriggersAdapterTrait for TriggersAdapter { use graph::prelude::LightEthereumBlockExt; let block = match self.chain_client.as_ref() { - ChainClient::Firehose(_) => Some(BlockPtr { + ChainClient::Firehose(_, _) => Some(BlockPtr { hash: BlockHash::from(vec![0xff; 32]), number: block.number.saturating_sub(1), }), diff --git a/graph/src/blockchain/client.rs b/graph/src/blockchain/client.rs index 8d83536b577..666666955f4 100644 --- a/graph/src/blockchain/client.rs +++ b/graph/src/blockchain/client.rs @@ -11,13 +11,17 @@ use anyhow::anyhow; // Substreams only requires the FirehoseEndpoints. #[derive(Debug)] pub enum ChainClient { - Firehose(FirehoseEndpoints), + Firehose(FirehoseEndpoints, Option), Rpc(C::Client), } impl ChainClient { pub fn new_firehose(firehose_endpoints: FirehoseEndpoints) -> Self { - Self::Firehose(firehose_endpoints) + Self::Firehose(firehose_endpoints, None) + } + + pub fn new_firehose_with_rpc(firehose_endpoints: FirehoseEndpoints, rpc: C::Client) -> Self { + Self::Firehose(firehose_endpoints, Some(rpc)) } pub fn new_rpc(rpc: C::Client) -> Self { @@ -26,14 +30,14 @@ impl ChainClient { pub fn is_firehose(&self) -> bool { match self { - ChainClient::Firehose(_) => true, + ChainClient::Firehose(_, _) => true, ChainClient::Rpc(_) => false, } } pub async fn firehose_endpoint(&self) -> anyhow::Result> { match self { - ChainClient::Firehose(endpoints) => endpoints.endpoint().await, + ChainClient::Firehose(endpoints, _) => endpoints.endpoint().await, _ => Err(anyhow!("firehose endpoint requested on rpc chain client")), } } @@ -41,6 +45,7 @@ impl ChainClient { pub fn rpc(&self) -> anyhow::Result<&C::Client> { match self { Self::Rpc(rpc) => Ok(rpc), + Self::Firehose(_, Some(rpc)) => Ok(rpc), _ => Err(anyhow!("rpc endpoint requested on firehose chain client")), } } diff --git a/node/src/chain.rs b/node/src/chain.rs index 3e87ff8295b..57640c8d13b 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -523,7 +523,10 @@ pub async fn networks_as_chains( let eth_adapters = networks.ethereum_rpcs(chain_id.clone()); let cc = if firehose_endpoints.len() > 0 { - ChainClient::::new_firehose(firehose_endpoints) + ChainClient::::new_firehose_with_rpc( + firehose_endpoints, + eth_adapters.clone(), + ) } else { ChainClient::::new_rpc(eth_adapters.clone()) };