diff --git a/Cargo.toml b/Cargo.toml index 1ea989f..c51955e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,8 @@ serde = "1.0.195" serde_derive = { version = "1"} jsonwebtoken = "9" reqwest = { version = "0.12", default-features = true, features = ["json"] } -tokio = { version = "1", features = ["full", "tracing"] } +tokio = { version = "1", default-features = false, features = ["time"] } +tokio-stream = {version = "0.1" } thiserror = "1" bigdecimal = { version = "^0.3", features = ["serde"] } url = "2" @@ -58,12 +59,13 @@ sha2 = "0.10" rand = "0.8" sqlx = { version = "0.7", features = ["postgres"], optional = true } + [target.'cfg(any())'.dependencies] openssl = { version = "0.10.55", optional = true } # needed to allow foo to build with -Zminimal-versions [dev-dependencies] tracing-subscriber = { version = "0.3", features = ["env-filter", "serde"] } -tokio = { version = "1", features = ["macros", "rt"] } +tokio = { version = "1", features = [ "full", "macros", "rt"] } rstest = { version = "0.19" } color-eyre = { version = "0.6" } lazy_static = "1.4.0" diff --git a/README.md b/README.md index c316b02..2e17c15 100644 --- a/README.md +++ b/README.md @@ -62,3 +62,5 @@ Run tests: cargo test ``` --- + +# Supported [Endpoints](./SUPPORTED_ENDPOINTS.md) diff --git a/src/api/transactions.rs b/src/api/transactions.rs index 1e1c611..5f7d748 100644 --- a/src/api/transactions.rs +++ b/src/api/transactions.rs @@ -13,8 +13,9 @@ use tracing::debug; impl Client { /// Query transactions /// - /// [getTransactions](https://docs.fireblocks.com/api/swagger-ui/#/Transactions/getTransactions) - /// + /// See + /// * [getTransactions](https://docs.fireblocks.com/api/swagger-ui/#/Transactions/getTransactions) + /// * [`crate::types::transaction::TransactionListBuilder`] #[tracing::instrument(level = "debug", skip(self, options))] pub async fn transactions(&self, options: I) -> crate::Result> where diff --git a/src/error.rs b/src/error.rs index c6d0129..75df333 100644 --- a/src/error.rs +++ b/src/error.rs @@ -41,6 +41,9 @@ pub enum FireblocksError { #[error(transparent)] UrlError(#[from] ParseError), + #[error(transparent)] + QueryParamError(#[from] ParamError), + #[error("Internal Fireblocks Error. HTTP Code {code} {text} request_id:{request_id}")] InternalError { request_id: String, path: String, code: u16, text: String }, diff --git a/src/lib.rs b/src/lib.rs index a7dc161..bc2bd27 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,13 +5,14 @@ mod assets; mod client; pub mod error; pub(crate) mod jwt; -mod page_client; +mod paged_client; pub mod types; -pub use crate::error::FireblocksError; +pub use crate::error::*; pub use crate::types::PagingVaultRequestBuilder; pub use assets::{Asset, ASSET_BTC, ASSET_BTC_TEST, ASSET_ETH, ASSET_ETH_TEST, ASSET_SOL, ASSET_SOL_TEST}; pub use client::{Client, ClientBuilder}; +pub use paged_client::{PagedClient, VaultStream}; pub const FIREBLOCKS_API: &str = "https://api.fireblocks.io/v1"; pub const FIREBLOCKS_SANDBOX_API: &str = "https://sandbox-api.fireblocks.io/v1"; @@ -31,16 +32,6 @@ macro_rules! impl_base_query_params { self } - pub fn before(&mut self, t: &Epoch) -> &mut Self { - self.base.before(t); - self - } - - pub fn after(&mut self, t: &Epoch) -> &mut Self { - self.base.after(t); - self - } - pub fn build(&self) -> std::result::Result { let mut p = Vec::clone(&self.params); let b = self.base.build()?; @@ -54,16 +45,18 @@ macro_rules! impl_base_query_params { #[cfg(test)] mod tests { use std::str::FromStr; - use std::sync::{Once, OnceLock}; - use std::{env, time::Duration}; + use std::sync::{Arc, Once, OnceLock}; + use std::time::Duration; use crate::assets::{ASSET_BTC_TEST, ASSET_SOL_TEST}; + use crate::paged_client::{PagedClient, TransactionStream}; use crate::types::*; use crate::{Client, ClientBuilder, ASSET_ETH_TEST}; use bigdecimal::BigDecimal; use chrono::{TimeZone, Utc}; use color_eyre::eyre::format_err; use tokio::time; + use tokio_stream::StreamExt; use tracing::warn; use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::EnvFilter; @@ -124,7 +117,7 @@ mod tests { .ok() }, }; - let create_tx = env::var("FIREBLOCKS_CREATE_TX").ok().is_some(); + let create_tx = std::env::var("FIREBLOCKS_CREATE_TX").ok().is_some(); Self { client, create_tx } } @@ -154,7 +147,6 @@ mod tests { assert!(!id.is_empty()); assert!(results.accounts.is_empty()); - let _before = &chrono::offset::Utc::now(); let params = PagingVaultRequestBuilder::new().limit(1).build()?; let (results, id) = config.client().vaults(params).await?; assert!(!id.is_empty()); @@ -164,6 +156,8 @@ mod tests { assert!(!id.is_empty()); assert_eq!(0, result.id); assert!(!result.assets.is_empty()); + + let _ = PagingVaultRequestBuilder::new().before("before").build(); // code coverage Ok(()) } @@ -308,14 +302,7 @@ mod tests { let rename = format!("{vault_name}-rename"); c.rename_vault(result.id, &rename).await?; - let after = &Utc.with_ymd_and_hms(2023, 4, 6, 0, 1, 1).unwrap(); - let before = &chrono::offset::Utc::now(); - PagingAddressRequestBuilder::new().limit(10).after(after).build()?; - //config.client().addresses_paginated(0, ASSET_BTC_TEST, page).await?; - - PagingAddressRequestBuilder::new().limit(10).before(before).build()?; - //config.client().addresses_paginated(0, ASSET_BTC_TEST, page).await?; - + PagingAddressRequestBuilder::new().limit(10).after("after").before("before").build()?; // code coverage c.vault_hide(result.id, false).await?; c.vault_hide(result.id, true).await?; Ok(()) @@ -462,10 +449,59 @@ mod tests { Ok(()) } + #[rstest::rstest] + #[tokio::test] + async fn test_paged_vaults(config: Config) -> color_eyre::Result<()> { + if !config.is_ok() { + return Ok(()); + } + let c = config.client(); + let pc = PagedClient::new(Arc::new(c)); + let mut vs = pc.vaults(100); + + while let Ok(Some(result)) = vs.try_next().await { + tracing::info!("accounts {}", result.0.accounts.len()); + time::sleep(Duration::from_millis(200)).await; + } + Ok(()) + } + + async fn transaction_stream(mut ts: TransactionStream) -> color_eyre::Result<()> { + let mut counter = 0; + let mut after = Utc.with_ymd_and_hms(2022, 4, 6, 0, 1, 1).unwrap(); + while let Some(result) = ts.try_next().await? { + tracing::info!("transactions {}", result.0.len()); + counter += 1; + if counter > 5 { + break; + } + if let Some(last) = result.0.last() { + assert!(after < last.created_at); + after = last.created_at; + } + time::sleep(Duration::from_millis(100)).await; + } + Ok(()) + } + + #[rstest::rstest] + #[tokio::test] + async fn test_paged_transactions(config: Config) -> color_eyre::Result<()> { + if !config.is_ok() { + return Ok(()); + } + let c = config.client(); + let pc = PagedClient::new(Arc::new(c)); + let ts = pc.transactions_from_source(0, 100, None); + transaction_stream(ts).await?; + let ts = pc.transactions_from_destination(0, 100, None); + transaction_stream(ts).await + } + #[rstest::rstest] #[test] fn check_ci(config: Config) -> color_eyre::Result<()> { - match env::var("CI") { + match std::env::var("CI") { Err(_) => Ok(()), Ok(_) => match config.client { Some(_) => Ok(()), diff --git a/src/page_client.rs b/src/page_client.rs deleted file mode 100644 index e55b319..0000000 --- a/src/page_client.rs +++ /dev/null @@ -1,12 +0,0 @@ -use crate::{Client, Epoch}; - -#[derive(Clone)] -pub struct PagedClient { - pub client: Client, -} - -#[derive(Clone)] -pub struct TransactionStream { - client: Client, - after: Epoch, -} diff --git a/src/paged_client.rs b/src/paged_client.rs new file mode 100644 index 0000000..fa4e281 --- /dev/null +++ b/src/paged_client.rs @@ -0,0 +1,249 @@ +use crate::types::{Transaction, TransactionListBuilder, VaultAccounts}; +use crate::{Client, Epoch, FireblocksError, PagingVaultRequestBuilder, ParamError, QueryParams, Result}; +use chrono::{TimeZone, Utc}; +use futures::future::BoxFuture; +use futures::stream::FuturesUnordered; +use futures::{FutureExt, Stream, StreamExt}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +#[derive(Clone)] +pub struct PagedClient { + pub client: Arc, +} + +pub struct VaultStream { + client: Arc, + batch: u16, + after: Option, + init: bool, + fut: FuturesUnordered>>, +} + +impl VaultStream { + fn new(client: Arc, batch: u16) -> Self { + Self { client, batch, init: false, after: None, fut: FuturesUnordered::new() } + } + fn build_params(&self) -> std::result::Result { + PagingVaultRequestBuilder::new().limit(self.batch).after(self.after.as_ref().unwrap_or(&String::new())).build() + } +} + +pub struct TransactionStream { + client: Arc, + batch: u16, + init: bool, // has the stream started? + vault_id: i32, + after: Epoch, + is_source: bool, // are we streaming from source vault account or destination + fut: FuturesUnordered>>>, +} + +impl TransactionStream { + fn from_source(client: Arc, batch: u16, vault_id: i32, after: Epoch) -> Self { + Self { client, batch, init: false, vault_id, after, fut: FuturesUnordered::new(), is_source: true } + } + + fn from_dest(client: Arc, batch: u16, vault_id: i32, after: Epoch) -> Self { + Self { client, batch, init: false, vault_id, after, fut: FuturesUnordered::new(), is_source: false } + } + + fn build_params(&self) -> std::result::Result { + let mut builder = TransactionListBuilder::new(); + let builder = builder.limit(self.batch).sort_asc().order_created_at().after(&self.after); + + if self.is_source { + return builder.source_id(self.vault_id).build(); + } + builder.destination_id(self.vault_id).build() + } +} + +pub trait AsyncIteratorAsyncNext { + type Item; + async fn next(&mut self) -> Result>; +} + +impl Stream for VaultStream { + type Item = Result; + + #[allow(clippy::cognitive_complexity)] + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if !self.init { + tracing::debug!("init future"); + self.init = true; + let client = self.client.clone(); + let params = match self.build_params() { + Ok(p) => p, + Err(e) => return Poll::Ready(Some(Err(FireblocksError::from(e)))), + }; + let fut = async move { client.vaults(params).await }.boxed(); + self.fut.push(fut); + cx.waker().wake_by_ref(); + return Poll::Pending; + } + + // Try to resolve any existing futures first + tracing::debug!("check future poll"); + match self.fut.poll_next_unpin(cx) { + Poll::Ready(opt) => { + if let Some(result) = opt { + match result { + Ok((ref va, ref _id)) => { + self.after = Option::clone(&va.paging.after); + }, + Err(e) => { + return Poll::Ready(Some(Err(e))); + }, + } + return Poll::Ready(Some(result)); + } + }, + Poll::Pending => { + tracing::debug!("still pending"); + cx.waker().wake_by_ref(); + return Poll::Pending; + }, + }; + + tracing::debug!("checking after {:#?}", self.after); + // If there are no more pages to fetch and no pending futures, end the stream + if self.after.is_none() { + return Poll::Ready(None); + } + + let client = self.client.clone(); + let params = match self.build_params() { + Ok(p) => p, + Err(e) => return Poll::Ready(Some(Err(FireblocksError::from(e)))), + }; + let fut = async move { client.vaults(params).await }.boxed(); + self.fut.push(fut); + cx.waker().wake_by_ref(); + Poll::Pending + } +} + +impl Stream for TransactionStream { + type Item = Result>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if !self.init { + tracing::debug!("init tracing stream"); + self.init = true; + let client = self.client.clone(); + let params = match self.build_params() { + Ok(p) => p, + Err(e) => return Poll::Ready(Some(Err(FireblocksError::from(e)))), + }; + let fut = async move { client.transactions(params).await }.boxed(); + self.fut.push(fut); + cx.waker().wake_by_ref(); + return Poll::Pending; + } + + match self.fut.poll_next_unpin(cx) { + Poll::Ready(opt) => { + if let Some(result) = opt { + match result { + Ok((ref va, ref _id)) => { + if va.is_empty() { + return Poll::Ready(None); + } + if let Some(last) = va.last() { + tracing::debug!("1st after {:#?} last after {:#?}", va[0].created_at, last.created_at); + self.after = last.created_at + chrono::Duration::milliseconds(100); + //TODO possible to miss a transaction + } + }, + Err(e) => { + return Poll::Ready(Some(Err(e))); + }, + } + return Poll::Ready(Some(result)); + } + }, + Poll::Pending => { + cx.waker().wake_by_ref(); + return Poll::Pending; + }, + }; + + let client = self.client.clone(); + let params = match self.build_params() { + Ok(p) => p, + Err(e) => return Poll::Ready(Some(Err(FireblocksError::from(e)))), + }; + let fut = async move { client.transactions(params).await }.boxed(); + self.fut.push(fut); + cx.waker().wake_by_ref(); + Poll::Pending + } +} + +impl PagedClient { + pub const fn new(client: Arc) -> Self { + Self { client } + } + + /// Stream the vault accounts based on batch size + /// + /// ``` + /// use std::sync::Arc; + /// use futures::TryStreamExt; + /// use fireblocks_sdk::{Client, PagedClient}; + /// + /// async fn vault_accounts(c: Client) -> color_eyre::Result<()> { + /// let pc = PagedClient::new(Arc::new(c)); + /// let mut vault_stream = pc.vaults(100); + /// while let Ok(Some(result)) = vault_stream.try_next().await { + /// tracing::info!("accounts {}", result.0.accounts.len()); + /// tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + /// } + /// Ok(()) + /// } + /// ``` + /// see [`Client::vaults`] + pub fn vaults(&self, batch_size: u16) -> VaultStream { + VaultStream::new(self.client.clone(), batch_size) + } + + /// Stream all the transactions from source vault account id and after some date + /// + /// Default date is 2022-04-06 if None provided + /// + /// ``` + /// use std::sync::Arc; + /// use futures::TryStreamExt; + /// use fireblocks_sdk::{Client, PagedClient}; + /// + /// async fn transactions_paged(c: Client) -> color_eyre::Result<()> { + /// let pc = PagedClient::new(Arc::new(c)); + /// let mut ts = pc.transactions_from_source(0, 100, None); + /// while let Ok(Some(result)) = ts.try_next().await { + /// tracing::info!("transactions {}", result.0.len()); + /// } + /// Ok(()) + /// } + /// ``` + /// + /// see + /// * [`Client::transactions`] + pub fn transactions_from_source(&self, vault_id: i32, batch_size: u16, after: Option) -> TransactionStream { + let default_after = Utc.with_ymd_and_hms(2022, 4, 6, 0, 1, 1).unwrap(); + TransactionStream::from_source(self.client.clone(), batch_size, vault_id, after.unwrap_or(default_after)) + } + + /// Stream all the transactions from destination vault account id + /// See [`self.transactions_from_source`] + pub fn transactions_from_destination( + &self, + vault_id: i32, + batch_size: u16, + after: Option, + ) -> TransactionStream { + let default_after = Utc.with_ymd_and_hms(2022, 4, 6, 0, 1, 1).unwrap(); + TransactionStream::from_dest(self.client.clone(), batch_size, vault_id, after.unwrap_or(default_after)) + } +} diff --git a/src/types/page.rs b/src/types/page.rs index 36c0be9..3bd0a76 100644 --- a/src/types/page.rs +++ b/src/types/page.rs @@ -1,4 +1,4 @@ -use crate::{impl_base_query_params, Epoch, QueryParams}; +use crate::{impl_base_query_params, QueryParams}; use bigdecimal::BigDecimal; use serde_derive::{Deserialize, Serialize}; @@ -25,22 +25,6 @@ impl BasePageParams { self } - pub(crate) fn before(&mut self, t: &Epoch) -> &mut Self { - self.add_instant("before", t) - } - - pub(crate) fn after(&mut self, t: &Epoch) -> &mut Self { - self.add_instant("after", t) - } - - fn add_instant(&mut self, param: &str, t: &Epoch) -> &mut Self { - self.params.push((param.to_owned(), Self::epoch(t))); - self - } - fn epoch(before: &Epoch) -> String { - format!("{}", before.timestamp_millis()) - } - #[allow(clippy::unnecessary_wraps)] pub(crate) fn build(&self) -> std::result::Result { Ok(Vec::clone(&self.params)) @@ -49,7 +33,7 @@ impl BasePageParams { #[derive(Debug, Default)] pub struct PagingAddressRequestBuilder { - params: QueryParams, // this is ignored + params: QueryParams, base: BasePageParams, } @@ -63,6 +47,18 @@ pub struct PagingVaultRequestBuilder { impl_base_query_params!(PagingVaultRequestBuilder); +impl PagingAddressRequestBuilder { + pub fn before(&mut self, t: &str) -> &mut Self { + self.params.push(("before".to_owned(), String::from(t))); + self + } + + pub fn after(&mut self, t: &str) -> &mut Self { + self.params.push(("before".to_owned(), String::from(t))); + self + } +} + impl PagingVaultRequestBuilder { pub fn min_threshold(&mut self, min: &BigDecimal) -> &mut Self { self.params.push(("minAmountThreshold".to_owned(), min.to_string())); @@ -78,4 +74,16 @@ impl PagingVaultRequestBuilder { self.params.push(("nameSuffix".to_owned(), String::from(n))); self } + + pub fn before(&mut self, t: &str) -> &mut Self { + self.params.push(("before".to_owned(), String::from(t))); + self + } + + pub fn after(&mut self, t: &str) -> &mut Self { + if !t.is_empty() { + self.params.push(("after".to_owned(), String::from(t))); + } + self + } } diff --git a/src/types/transaction.rs b/src/types/transaction.rs index 98a6a3d..94b4524 100644 --- a/src/types/transaction.rs +++ b/src/types/transaction.rs @@ -92,6 +92,26 @@ impl TransactionListBuilder { self } + pub fn sort_desc(&mut self) -> &mut Self { + self.params.push(("sort".to_string(), String::from("DESC"))); + self + } + + pub fn sort_asc(&mut self) -> &mut Self { + self.params.push(("sort".to_string(), String::from("ASC"))); + self + } + + pub fn order_created_at(&mut self) -> &mut Self { + self.params.push(("orderBy".to_string(), String::from("createdAt"))); + self + } + + pub fn order_last_updated(&mut self) -> &mut Self { + self.params.push(("orderBy".to_string(), String::from("lastUpdated"))); + self + } + pub fn hash(&mut self, s: &str) -> &mut Self { self.params.push(("txHash".to_string(), String::from(s))); self @@ -107,6 +127,22 @@ impl TransactionListBuilder { self } + pub(crate) fn before(&mut self, t: &Epoch) -> &mut Self { + self.add_instant("before", t) + } + + pub(crate) fn after(&mut self, t: &Epoch) -> &mut Self { + self.add_instant("after", t) + } + + fn add_instant(&mut self, param: &str, t: &Epoch) -> &mut Self { + self.params.push((param.to_owned(), Self::epoch(t))); + self + } + fn epoch(before: &Epoch) -> String { + format!("{}", before.timestamp_millis()) + } + /// Alias to [`TransactionListBuilder::hash`] pub fn tx_hash(&mut self, tx: &str) -> &mut Self { self.hash(tx)