Skip to content

Commit

Permalink
Merge branch 'main' into add-min-reqs
Browse files Browse the repository at this point in the history
  • Loading branch information
mpguerra committed Sep 2, 2024
2 parents d364c67 + 6b95d27 commit b710cdc
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 46 deletions.
4 changes: 2 additions & 2 deletions zebra-node-services/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ rpc-client = [
"serde_json",
]

shielded-scan = ["tokio"]
shielded-scan = []

[dependencies]
zebra-chain = { path = "../zebra-chain" , version = "1.0.0-beta.39" }
Expand All @@ -48,7 +48,7 @@ jsonrpc-core = { version = "18.0.0", optional = true }
reqwest = { version = "0.11.26", default-features = false, features = ["rustls-tls"], optional = true }
serde = { version = "1.0.204", optional = true }
serde_json = { version = "1.0.122", optional = true }
tokio = { version = "1.39.2", features = ["time"], optional = true }
tokio = { version = "1.39.2", features = ["time", "sync"] }

[dev-dependencies]

Expand Down
9 changes: 4 additions & 5 deletions zebra-node-services/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use std::collections::HashSet;

use tokio::sync::oneshot;
use zebra_chain::transaction::{self, UnminedTx, UnminedTxId};

#[cfg(feature = "getblocktemplate-rpcs")]
Expand Down Expand Up @@ -114,13 +115,11 @@ pub enum Response {
/// Returns matching cached rejected [`UnminedTxId`]s from the mempool,
RejectedTransactionIds(HashSet<UnminedTxId>),

/// Returns a list of queue results.
///
/// These are the results of the initial queue checks.
/// The transaction may also fail download or verification later.
/// Returns a list of initial queue checks results and a oneshot receiver
/// for awaiting download and/or verification results.
///
/// Each result matches the request at the corresponding vector index.
Queued(Vec<Result<(), BoxError>>),
Queued(Vec<Result<oneshot::Receiver<Result<(), BoxError>>, BoxError>>),

/// Confirms that the mempool has checked for recently verified transactions.
CheckedForVerifiedTransactions,
Expand Down
15 changes: 11 additions & 4 deletions zebra-rpc/src/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ where

let response = mempool.oneshot(request).await.map_server_error()?;

let queue_results = match response {
let mut queue_results = match response {
mempool::Response::Queued(results) => results,
_ => unreachable!("incorrect response variant from mempool service"),
};
Expand All @@ -675,10 +675,17 @@ where
"mempool service returned more results than expected"
);

tracing::debug!("sent transaction to mempool: {:?}", &queue_results[0]);
let queue_result = queue_results
.pop()
.expect("there should be exactly one item in Vec")
.inspect_err(|err| tracing::debug!("sent transaction to mempool: {:?}", &err))
.map_server_error()?
.await;

tracing::debug!("sent transaction to mempool: {:?}", &queue_result);

queue_results[0]
.as_ref()
queue_result
.map_server_error()?
.map(|_| SentTransactionHash(transaction_hash))
.map_server_error()
}
Expand Down
43 changes: 38 additions & 5 deletions zebra-rpc/src/methods/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use hex::ToHex;
use jsonrpc_core::{Error, ErrorCode};
use proptest::{collection::vec, prelude::*};
use thiserror::Error;
use tokio::sync::oneshot;
use tower::buffer::Buffer;

use zebra_chain::{
Expand Down Expand Up @@ -61,7 +62,9 @@ proptest! {

let unmined_transaction = UnminedTx::from(transaction);
let expected_request = mempool::Request::Queue(vec![unmined_transaction.into()]);
let response = mempool::Response::Queued(vec![Ok(())]);
let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Ok(()));
let response = mempool::Response::Queued(vec![Ok(rsp_rx)]);

mempool
.expect_request(expected_request)
Expand Down Expand Up @@ -111,10 +114,10 @@ proptest! {
.expect("Transaction serializes successfully");
let transaction_hex = hex::encode(&transaction_bytes);

let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex));
let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex.clone()));

let unmined_transaction = UnminedTx::from(transaction);
let expected_request = mempool::Request::Queue(vec![unmined_transaction.into()]);
let expected_request = mempool::Request::Queue(vec![unmined_transaction.clone().into()]);

mempool
.expect_request(expected_request)
Expand All @@ -138,6 +141,32 @@ proptest! {
"Result is not a server error: {result:?}"
);

let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex));

let expected_request = mempool::Request::Queue(vec![unmined_transaction.clone().into()]);

let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Err("any verification error".into()));
mempool
.expect_request(expected_request)
.await?
.respond(Ok::<_, BoxError>(mempool::Response::Queued(vec![Ok(rsp_rx)])));

let result = send_task
.await
.expect("Sending raw transactions should not panic");

prop_assert!(
matches!(
result,
Err(Error {
code: ErrorCode::ServerError(_),
..
})
),
"Result is not a server error: {result:?}"
);

// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(rpc_tx_queue_task_result.is_none());
Expand Down Expand Up @@ -897,7 +926,9 @@ proptest! {
// now a retry will be sent to the mempool
let expected_request =
mempool::Request::Queue(vec![mempool::Gossip::Tx(tx_unmined.clone())]);
let response = mempool::Response::Queued(vec![Ok(())]);
let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Ok(()));
let response = mempool::Response::Queued(vec![Ok(rsp_rx)]);

mempool
.expect_request(expected_request)
Expand Down Expand Up @@ -997,7 +1028,9 @@ proptest! {
for tx in txs.clone() {
let expected_request =
mempool::Request::Queue(vec![mempool::Gossip::Tx(UnminedTx::from(tx))]);
let response = mempool::Response::Queued(vec![Ok(())]);
let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Ok(()));
let response = mempool::Response::Queued(vec![Ok(rsp_rx)]);

mempool
.expect_request(expected_request)
Expand Down
10 changes: 7 additions & 3 deletions zebra-rpc/src/queue/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{collections::HashSet, env, sync::Arc};
use proptest::prelude::*;

use chrono::Duration;
use tokio::time;
use tokio::{sync::oneshot, time};
use tower::ServiceExt;

use zebra_chain::{
Expand Down Expand Up @@ -196,7 +196,9 @@ proptest! {
let request = Request::Queue(vec![Gossip::Tx(unmined_transaction.clone())]);
let expected_request = Request::Queue(vec![Gossip::Tx(unmined_transaction.clone())]);
let send_task = tokio::spawn(mempool.clone().oneshot(request));
let response = Response::Queued(vec![Ok(())]);
let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Ok(()));
let response = Response::Queued(vec![Ok(rsp_rx)]);

mempool
.expect_request(expected_request)
Expand Down Expand Up @@ -337,7 +339,9 @@ proptest! {
// retry will queue the transaction to mempool
let gossip = Gossip::Tx(UnminedTx::from(transaction.clone()));
let expected_request = Request::Queue(vec![gossip]);
let response = Response::Queued(vec![Ok(())]);
let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Ok(()));
let response = Response::Queued(vec![Ok(rsp_rx)]);

mempool
.expect_request(expected_request)
Expand Down
36 changes: 22 additions & 14 deletions zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::{
};

use futures::{future::FutureExt, stream::Stream};
use tokio::sync::broadcast;
use tokio::sync::{broadcast, oneshot};
use tokio_stream::StreamExt;
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};

Expand Down Expand Up @@ -560,7 +560,7 @@ impl Service<Request> for Mempool {
for tx in tx_retries {
// This is just an efficiency optimisation, so we don't care if queueing
// transaction requests fails.
let _result = tx_downloads.download_if_needed_and_verify(tx);
let _result = tx_downloads.download_if_needed_and_verify(tx, None);
}
}

Expand Down Expand Up @@ -608,8 +608,8 @@ impl Service<Request> for Mempool {
tracing::trace!("chain grew during tx verification, retrying ..",);

// We don't care if re-queueing the transaction request fails.
let _result =
tx_downloads.download_if_needed_and_verify(tx.transaction.into());
let _result = tx_downloads
.download_if_needed_and_verify(tx.transaction.into(), None);
}
}
Ok(Err((txid, error))) => {
Expand Down Expand Up @@ -758,16 +758,24 @@ impl Service<Request> for Mempool {
Request::Queue(gossiped_txs) => {
trace!(req_count = ?gossiped_txs.len(), "got mempool Queue request");

let rsp: Vec<Result<(), BoxError>> = gossiped_txs
.into_iter()
.map(|gossiped_tx| -> Result<(), MempoolError> {
storage.should_download_or_verify(gossiped_tx.id())?;
tx_downloads.download_if_needed_and_verify(gossiped_tx)?;

Ok(())
})
.map(|result| result.map_err(BoxError::from))
.collect();
let rsp: Vec<Result<oneshot::Receiver<Result<(), BoxError>>, BoxError>> =
gossiped_txs
.into_iter()
.map(
|gossiped_tx| -> Result<
oneshot::Receiver<Result<(), BoxError>>,
MempoolError,
> {
let (rsp_tx, rsp_rx) = oneshot::channel();
storage.should_download_or_verify(gossiped_tx.id())?;
tx_downloads
.download_if_needed_and_verify(gossiped_tx, Some(rsp_tx))?;

Ok(rsp_rx)
},
)
.map(|result| result.map_err(BoxError::from))
.collect();

// We've added transactions to the queue
self.update_metrics();
Expand Down
14 changes: 11 additions & 3 deletions zebrad/src/components/mempool/crawler/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use proptest::{
collection::{hash_set, vec},
prelude::*,
};
use tokio::time;
use tokio::{sync::oneshot, time};

use zebra_chain::{
chain_sync_status::ChainSyncStatus, parameters::Network, transaction::UnminedTxId,
Expand Down Expand Up @@ -317,9 +317,17 @@ async fn respond_to_queue_request(
expected_transaction_ids: HashSet<UnminedTxId>,
response: impl IntoIterator<Item = Result<(), MempoolError>>,
) -> Result<(), TestCaseError> {
let response = response
let response: Vec<Result<oneshot::Receiver<Result<(), BoxError>>, BoxError>> = response
.into_iter()
.map(|result| result.map_err(BoxError::from))
.map(|result| {
result
.map(|_| {
let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Ok(()));
rsp_rx
})
.map_err(BoxError::from)
})
.collect();

mempool
Expand Down
31 changes: 23 additions & 8 deletions zebrad/src/components/mempool/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use zebra_chain::{
use zebra_consensus::transaction as tx;
use zebra_network as zn;
use zebra_node_services::mempool::Gossip;
use zebra_state as zs;
use zebra_state::{self as zs, CloneError};

use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};

Expand Down Expand Up @@ -105,17 +105,17 @@ pub const MAX_INBOUND_CONCURRENCY: usize = 25;
struct CancelDownloadAndVerify;

/// Errors that can occur while downloading and verifying a transaction.
#[derive(Error, Debug)]
#[derive(Error, Debug, Clone)]
#[allow(dead_code)]
pub enum TransactionDownloadVerifyError {
#[error("transaction is already in state")]
InState,

#[error("error in state service")]
StateError(#[source] BoxError),
StateError(#[source] CloneError),

#[error("error downloading transaction")]
DownloadFailed(#[source] BoxError),
DownloadFailed(#[source] CloneError),

#[error("transaction download / verification was cancelled")]
Cancelled,
Expand Down Expand Up @@ -243,6 +243,7 @@ where
pub fn download_if_needed_and_verify(
&mut self,
gossiped_tx: Gossip,
rsp_tx: Option<oneshot::Sender<Result<(), BoxError>>>,
) -> Result<(), MempoolError> {
let txid = gossiped_tx.id();

Expand Down Expand Up @@ -295,7 +296,7 @@ where
Ok((Some(height), next_height))
}
Ok(_) => unreachable!("wrong response"),
Err(e) => Err(TransactionDownloadVerifyError::StateError(e)),
Err(e) => Err(TransactionDownloadVerifyError::StateError(e.into())),
}?;

trace!(?txid, ?next_height, "got next height");
Expand All @@ -307,11 +308,12 @@ where
let tx = match network
.oneshot(req)
.await
.map_err(CloneError::from)
.map_err(TransactionDownloadVerifyError::DownloadFailed)?
{
zn::Response::Transactions(mut txs) => txs.pop().ok_or_else(|| {
TransactionDownloadVerifyError::DownloadFailed(
"no transactions returned".into(),
BoxError::from("no transactions returned").into(),
)
})?,
_ => unreachable!("wrong response to transaction request"),
Expand Down Expand Up @@ -373,15 +375,27 @@ where

let task = tokio::spawn(async move {
// Prefer the cancel handle if both are ready.
tokio::select! {
let result = tokio::select! {
biased;
_ = &mut cancel_rx => {
trace!("task cancelled prior to completion");
metrics::counter!("mempool.cancelled.verify.tasks.total").increment(1);
Err((TransactionDownloadVerifyError::Cancelled, txid))
}
verification = fut => verification,
};

// Send the result to responder channel if one was provided.
if let Some(rsp_tx) = rsp_tx {
let _ = rsp_tx.send(
result
.as_ref()
.map(|_| ())
.map_err(|(err, _)| err.clone().into()),
);
}

result
});

self.pending.push(task);
Expand Down Expand Up @@ -458,14 +472,15 @@ where
match state
.ready()
.await
.map_err(CloneError::from)
.map_err(TransactionDownloadVerifyError::StateError)?
.call(zs::Request::Transaction(txid.mined_id()))
.await
{
Ok(zs::Response::Transaction(None)) => Ok(()),
Ok(zs::Response::Transaction(Some(_))) => Err(TransactionDownloadVerifyError::InState),
Ok(_) => unreachable!("wrong response"),
Err(e) => Err(TransactionDownloadVerifyError::StateError(e)),
Err(e) => Err(TransactionDownloadVerifyError::StateError(e.into())),
}?;

Ok(())
Expand Down
Loading

0 comments on commit b710cdc

Please sign in to comment.