Skip to content

Commit

Permalink
graph, node: Allow rpc clients when using firehose
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 committed Sep 12, 2024
1 parent a47e6c1 commit 8d14119
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 10 deletions.
10 changes: 5 additions & 5 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ impl Chain {
// caller can spawn.
pub async fn cheapest_adapter(&self) -> Arc<EthereumAdapter> {
let adapters = match self.client.as_ref() {
ChainClient::Firehose(_) => panic!("no adapter with firehose"),
ChainClient::Firehose(_, _) => panic!("no adapter with firehose"),
ChainClient::Rpc(adapter) => adapter,
};
adapters.cheapest().await.unwrap()
Expand Down Expand Up @@ -472,7 +472,7 @@ impl Blockchain for Chain {
)
.await
}
ChainClient::Firehose(_) => {
ChainClient::Firehose(_, _) => {
self.block_stream_builder
.build_firehose(
self,
Expand All @@ -498,7 +498,7 @@ impl Blockchain for Chain {
number: BlockNumber,
) -> Result<BlockPtr, IngestorError> {
match self.client.as_ref() {
ChainClient::Firehose(endpoints) => endpoints
ChainClient::Firehose(endpoints, _) => endpoints
.endpoint()
.await?
.block_ptr_for_number::<HeaderOnlyBlock>(logger, number)
Expand Down Expand Up @@ -557,7 +557,7 @@ impl Blockchain for Chain {

async fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
let ingestor: Box<dyn BlockIngestor> = match self.chain_client().as_ref() {
ChainClient::Firehose(_) => {
ChainClient::Firehose(_, _) => {
let ingestor = FirehoseBlockIngestor::<HeaderOnlyBlock, Self>::new(
self.chain_store.cheap_clone(),
self.chain_client(),
Expand Down Expand Up @@ -852,7 +852,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
use graph::prelude::LightEthereumBlockExt;

let block = match self.chain_client.as_ref() {
ChainClient::Firehose(_) => Some(BlockPtr {
ChainClient::Firehose(_, _) => Some(BlockPtr {
hash: BlockHash::from(vec![0xff; 32]),
number: block.number.saturating_sub(1),
}),
Expand Down
13 changes: 9 additions & 4 deletions graph/src/blockchain/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ use anyhow::anyhow;
// Substreams only requires the FirehoseEndpoints.
#[derive(Debug)]
pub enum ChainClient<C: Blockchain> {
Firehose(FirehoseEndpoints),
Firehose(FirehoseEndpoints, Option<C::Client>),
Rpc(C::Client),
}

impl<C: Blockchain> ChainClient<C> {
pub fn new_firehose(firehose_endpoints: FirehoseEndpoints) -> Self {
Self::Firehose(firehose_endpoints)
Self::Firehose(firehose_endpoints, None)
}

pub fn new_firehose_with_rpc(firehose_endpoints: FirehoseEndpoints, rpc: C::Client) -> Self {
Self::Firehose(firehose_endpoints, Some(rpc))
}

pub fn new_rpc(rpc: C::Client) -> Self {
Expand All @@ -26,21 +30,22 @@ impl<C: Blockchain> ChainClient<C> {

pub fn is_firehose(&self) -> bool {
match self {
ChainClient::Firehose(_) => true,
ChainClient::Firehose(_, _) => true,
ChainClient::Rpc(_) => false,
}
}

pub async fn firehose_endpoint(&self) -> anyhow::Result<Arc<FirehoseEndpoint>> {
match self {
ChainClient::Firehose(endpoints) => endpoints.endpoint().await,
ChainClient::Firehose(endpoints, _) => endpoints.endpoint().await,
_ => Err(anyhow!("firehose endpoint requested on rpc chain client")),
}
}

pub fn rpc(&self) -> anyhow::Result<&C::Client> {
match self {
Self::Rpc(rpc) => Ok(rpc),
Self::Firehose(_, Some(rpc)) => Ok(rpc),
_ => Err(anyhow!("rpc endpoint requested on firehose chain client")),
}
}
Expand Down
5 changes: 4 additions & 1 deletion node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,10 @@ pub async fn networks_as_chains(
let eth_adapters = networks.ethereum_rpcs(chain_id.clone());

let cc = if firehose_endpoints.len() > 0 {
ChainClient::<graph_chain_ethereum::Chain>::new_firehose(firehose_endpoints)
ChainClient::<graph_chain_ethereum::Chain>::new_firehose_with_rpc(
firehose_endpoints,
eth_adapters.clone(),
)
} else {
ChainClient::<graph_chain_ethereum::Chain>::new_rpc(eth_adapters.clone())
};
Expand Down

0 comments on commit 8d14119

Please sign in to comment.