From 55f0e98ed6d276baedef7d2e3fa767aefbcb7bc0 Mon Sep 17 00:00:00 2001 From: Douglas Chimento Date: Sat, 27 Apr 2024 17:09:43 +0300 Subject: [PATCH 1/2] Adding Paged Vault accounts --- README.md | 2 ++ src/error.rs | 3 +++ src/lib.rs | 30 ++++++++++++++++++------- src/page_client.rs | 12 ---------- src/paged_client.rs | 55 +++++++++++++++++++++++++++++++++++++++++++++ src/types/page.rs | 5 +++-- 6 files changed, 85 insertions(+), 22 deletions(-) delete mode 100644 src/page_client.rs create mode 100644 src/paged_client.rs diff --git a/README.md b/README.md index c316b02..2e17c15 100644 --- a/README.md +++ b/README.md @@ -62,3 +62,5 @@ Run tests: cargo test ``` --- + +# Supported [Endpoints](./SUPPORTED_ENDPOINTS.md) diff --git a/src/error.rs b/src/error.rs index c6d0129..75df333 100644 --- a/src/error.rs +++ b/src/error.rs @@ -41,6 +41,9 @@ pub enum FireblocksError { #[error(transparent)] UrlError(#[from] ParseError), + #[error(transparent)] + QueryParamError(#[from] ParamError), + #[error("Internal Fireblocks Error. HTTP Code {code} {text} request_id:{request_id}")] InternalError { request_id: String, path: String, code: u16, text: String }, diff --git a/src/lib.rs b/src/lib.rs index a7dc161..c57da25 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,13 +5,14 @@ mod assets; mod client; pub mod error; pub(crate) mod jwt; -mod page_client; +mod paged_client; pub mod types; -pub use crate::error::FireblocksError; +pub use crate::error::*; pub use crate::types::PagingVaultRequestBuilder; pub use assets::{Asset, ASSET_BTC, ASSET_BTC_TEST, ASSET_ETH, ASSET_ETH_TEST, ASSET_SOL, ASSET_SOL_TEST}; pub use client::{Client, ClientBuilder}; +pub use paged_client::{PagedClient, VaultStream}; pub const FIREBLOCKS_API: &str = "https://api.fireblocks.io/v1"; pub const FIREBLOCKS_SANDBOX_API: &str = "https://sandbox-api.fireblocks.io/v1"; @@ -36,7 +37,7 @@ macro_rules! impl_base_query_params { self } - pub fn after(&mut self, t: &Epoch) -> &mut Self { + pub fn after(&mut self, t: &str) -> &mut Self { self.base.after(t); self } @@ -58,10 +59,11 @@ mod tests { use std::{env, time::Duration}; use crate::assets::{ASSET_BTC_TEST, ASSET_SOL_TEST}; + use crate::paged_client::{AsyncIteratorAsyncNext, PagedClient}; use crate::types::*; use crate::{Client, ClientBuilder, ASSET_ETH_TEST}; use bigdecimal::BigDecimal; - use chrono::{TimeZone, Utc}; + use chrono::Utc; use color_eyre::eyre::format_err; use tokio::time; use tracing::warn; @@ -209,11 +211,10 @@ mod tests { #[tokio::test] #[allow(clippy::unwrap_used)] async fn test_transaction_list(config: Config) -> color_eyre::Result<()> { - let after = Utc::now(); let before = Utc::now(); // test all options let options = TransactionListBuilder::new() - .after(&after) + .after("something") .before(&before) .assets(&[ASSET_BTC_TEST, ASSET_SOL_TEST]) .tx_hash("something") @@ -308,9 +309,8 @@ mod tests { let rename = format!("{vault_name}-rename"); c.rename_vault(result.id, &rename).await?; - let after = &Utc.with_ymd_and_hms(2023, 4, 6, 0, 1, 1).unwrap(); let before = &chrono::offset::Utc::now(); - PagingAddressRequestBuilder::new().limit(10).after(after).build()?; + PagingAddressRequestBuilder::new().limit(10).after("after").build()?; //config.client().addresses_paginated(0, ASSET_BTC_TEST, page).await?; PagingAddressRequestBuilder::new().limit(10).before(before).build()?; @@ -461,6 +461,20 @@ mod tests { c.internal_wallets().await?; Ok(()) } + #[rstest::rstest] + #[tokio::test] + async fn test_paged_vaults(config: Config) -> color_eyre::Result<()> { + if !config.is_ok() { + return Ok(()); + } + let c = config.client(); + let pc = PagedClient::new(c); + let mut vault_stream = pc.vaults(100); + while let (Some(result), _) = vault_stream.next().await? { + assert!(!result.accounts.is_empty()); + } + Ok(()) + } #[rstest::rstest] #[test] diff --git a/src/page_client.rs b/src/page_client.rs deleted file mode 100644 index e55b319..0000000 --- a/src/page_client.rs +++ /dev/null @@ -1,12 +0,0 @@ -use crate::{Client, Epoch}; - -#[derive(Clone)] -pub struct PagedClient { - pub client: Client, -} - -#[derive(Clone)] -pub struct TransactionStream { - client: Client, - after: Epoch, -} diff --git a/src/paged_client.rs b/src/paged_client.rs new file mode 100644 index 0000000..c0652c0 --- /dev/null +++ b/src/paged_client.rs @@ -0,0 +1,55 @@ +use crate::types::VaultAccounts; +use crate::{Client, PagingVaultRequestBuilder}; + +#[derive(Clone)] +pub struct PagedClient { + pub client: Client, +} + +#[derive(Clone)] +pub struct VaultStream { + client: Client, + batch: u16, + init: bool, // has the stream started? + after: Option, +} + +pub trait AsyncIteratorAsyncNext { + type Item; + async fn next(&mut self) -> crate::Result>; +} + +impl AsyncIteratorAsyncNext for VaultStream { + type Item = VaultAccounts; + + async fn next(&mut self) -> crate::Result> { + if let Some(ref after) = self.after { + let params = PagingVaultRequestBuilder::new().limit(self.batch).after(after).build()?; + let (va, id) = self.client.vaults(params).await?; + self.after = Option::clone(&va.paging.after); + return Ok((Some(va), id)); + } + // this is the 1st attempt + if !self.init { + self.init = true; + let params = PagingVaultRequestBuilder::new().limit(self.batch).build()?; + let (va, id) = self.client.vaults(params).await?; + self.after = Option::clone(&va.paging.after); + return Ok((Some(va), id)); + } + Ok((None, String::new())) + } +} + +impl PagedClient { + pub const fn new(client: Client) -> Self { + Self { client } + } + + /// Stream the vault accounts based on batch size + /// + /// see + pub fn vaults(&self, batch_size: u16) -> VaultStream { + VaultStream { batch: batch_size, client: self.client.clone(), init: false, after: None } + } +} diff --git a/src/types/page.rs b/src/types/page.rs index 36c0be9..dd119a9 100644 --- a/src/types/page.rs +++ b/src/types/page.rs @@ -29,8 +29,9 @@ impl BasePageParams { self.add_instant("before", t) } - pub(crate) fn after(&mut self, t: &Epoch) -> &mut Self { - self.add_instant("after", t) + pub(crate) fn after(&mut self, t: &str) -> &mut Self { + self.params.push(("after".to_owned(), String::from(t))); + self } fn add_instant(&mut self, param: &str, t: &Epoch) -> &mut Self { From 93d8935767a040b8e357fe083f0edfcc364bde6b Mon Sep 17 00:00:00 2001 From: Douglas Chimento Date: Sun, 28 Apr 2024 13:34:54 +0300 Subject: [PATCH 2/2] vault and transaction paging client --- Cargo.toml | 6 +- src/api/transactions.rs | 5 +- src/lib.rs | 80 ++++++++----- src/paged_client.rs | 242 +++++++++++++++++++++++++++++++++++---- src/types/page.rs | 45 +++++--- src/types/transaction.rs | 36 ++++++ 6 files changed, 338 insertions(+), 76 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1ea989f..c51955e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,8 @@ serde = "1.0.195" serde_derive = { version = "1"} jsonwebtoken = "9" reqwest = { version = "0.12", default-features = true, features = ["json"] } -tokio = { version = "1", features = ["full", "tracing"] } +tokio = { version = "1", default-features = false, features = ["time"] } +tokio-stream = {version = "0.1" } thiserror = "1" bigdecimal = { version = "^0.3", features = ["serde"] } url = "2" @@ -58,12 +59,13 @@ sha2 = "0.10" rand = "0.8" sqlx = { version = "0.7", features = ["postgres"], optional = true } + [target.'cfg(any())'.dependencies] openssl = { version = "0.10.55", optional = true } # needed to allow foo to build with -Zminimal-versions [dev-dependencies] tracing-subscriber = { version = "0.3", features = ["env-filter", "serde"] } -tokio = { version = "1", features = ["macros", "rt"] } +tokio = { version = "1", features = [ "full", "macros", "rt"] } rstest = { version = "0.19" } color-eyre = { version = "0.6" } lazy_static = "1.4.0" diff --git a/src/api/transactions.rs b/src/api/transactions.rs index 1e1c611..5f7d748 100644 --- a/src/api/transactions.rs +++ b/src/api/transactions.rs @@ -13,8 +13,9 @@ use tracing::debug; impl Client { /// Query transactions /// - /// [getTransactions](https://docs.fireblocks.com/api/swagger-ui/#/Transactions/getTransactions) - /// + /// See + /// * [getTransactions](https://docs.fireblocks.com/api/swagger-ui/#/Transactions/getTransactions) + /// * [`crate::types::transaction::TransactionListBuilder`] #[tracing::instrument(level = "debug", skip(self, options))] pub async fn transactions(&self, options: I) -> crate::Result> where diff --git a/src/lib.rs b/src/lib.rs index c57da25..bc2bd27 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,16 +32,6 @@ macro_rules! impl_base_query_params { self } - pub fn before(&mut self, t: &Epoch) -> &mut Self { - self.base.before(t); - self - } - - pub fn after(&mut self, t: &str) -> &mut Self { - self.base.after(t); - self - } - pub fn build(&self) -> std::result::Result { let mut p = Vec::clone(&self.params); let b = self.base.build()?; @@ -55,17 +45,18 @@ macro_rules! impl_base_query_params { #[cfg(test)] mod tests { use std::str::FromStr; - use std::sync::{Once, OnceLock}; - use std::{env, time::Duration}; + use std::sync::{Arc, Once, OnceLock}; + use std::time::Duration; use crate::assets::{ASSET_BTC_TEST, ASSET_SOL_TEST}; - use crate::paged_client::{AsyncIteratorAsyncNext, PagedClient}; + use crate::paged_client::{PagedClient, TransactionStream}; use crate::types::*; use crate::{Client, ClientBuilder, ASSET_ETH_TEST}; use bigdecimal::BigDecimal; - use chrono::Utc; + use chrono::{TimeZone, Utc}; use color_eyre::eyre::format_err; use tokio::time; + use tokio_stream::StreamExt; use tracing::warn; use tracing_subscriber::fmt::format::FmtSpan; use tracing_subscriber::EnvFilter; @@ -126,7 +117,7 @@ mod tests { .ok() }, }; - let create_tx = env::var("FIREBLOCKS_CREATE_TX").ok().is_some(); + let create_tx = std::env::var("FIREBLOCKS_CREATE_TX").ok().is_some(); Self { client, create_tx } } @@ -156,7 +147,6 @@ mod tests { assert!(!id.is_empty()); assert!(results.accounts.is_empty()); - let _before = &chrono::offset::Utc::now(); let params = PagingVaultRequestBuilder::new().limit(1).build()?; let (results, id) = config.client().vaults(params).await?; assert!(!id.is_empty()); @@ -166,6 +156,8 @@ mod tests { assert!(!id.is_empty()); assert_eq!(0, result.id); assert!(!result.assets.is_empty()); + + let _ = PagingVaultRequestBuilder::new().before("before").build(); // code coverage Ok(()) } @@ -211,10 +203,11 @@ mod tests { #[tokio::test] #[allow(clippy::unwrap_used)] async fn test_transaction_list(config: Config) -> color_eyre::Result<()> { + let after = Utc::now(); let before = Utc::now(); // test all options let options = TransactionListBuilder::new() - .after("something") + .after(&after) .before(&before) .assets(&[ASSET_BTC_TEST, ASSET_SOL_TEST]) .tx_hash("something") @@ -309,13 +302,7 @@ mod tests { let rename = format!("{vault_name}-rename"); c.rename_vault(result.id, &rename).await?; - let before = &chrono::offset::Utc::now(); - PagingAddressRequestBuilder::new().limit(10).after("after").build()?; - //config.client().addresses_paginated(0, ASSET_BTC_TEST, page).await?; - - PagingAddressRequestBuilder::new().limit(10).before(before).build()?; - //config.client().addresses_paginated(0, ASSET_BTC_TEST, page).await?; - + PagingAddressRequestBuilder::new().limit(10).after("after").before("before").build()?; // code coverage c.vault_hide(result.id, false).await?; c.vault_hide(result.id, true).await?; Ok(()) @@ -461,6 +448,7 @@ mod tests { c.internal_wallets().await?; Ok(()) } + #[rstest::rstest] #[tokio::test] async fn test_paged_vaults(config: Config) -> color_eyre::Result<()> { @@ -468,18 +456,52 @@ mod tests { return Ok(()); } let c = config.client(); - let pc = PagedClient::new(c); - let mut vault_stream = pc.vaults(100); - while let (Some(result), _) = vault_stream.next().await? { - assert!(!result.accounts.is_empty()); + let pc = PagedClient::new(Arc::new(c)); + let mut vs = pc.vaults(100); + + while let Ok(Some(result)) = vs.try_next().await { + tracing::info!("accounts {}", result.0.accounts.len()); + time::sleep(Duration::from_millis(200)).await; } Ok(()) } + async fn transaction_stream(mut ts: TransactionStream) -> color_eyre::Result<()> { + let mut counter = 0; + let mut after = Utc.with_ymd_and_hms(2022, 4, 6, 0, 1, 1).unwrap(); + while let Some(result) = ts.try_next().await? { + tracing::info!("transactions {}", result.0.len()); + counter += 1; + if counter > 5 { + break; + } + if let Some(last) = result.0.last() { + assert!(after < last.created_at); + after = last.created_at; + } + time::sleep(Duration::from_millis(100)).await; + } + Ok(()) + } + + #[rstest::rstest] + #[tokio::test] + async fn test_paged_transactions(config: Config) -> color_eyre::Result<()> { + if !config.is_ok() { + return Ok(()); + } + let c = config.client(); + let pc = PagedClient::new(Arc::new(c)); + let ts = pc.transactions_from_source(0, 100, None); + transaction_stream(ts).await?; + let ts = pc.transactions_from_destination(0, 100, None); + transaction_stream(ts).await + } + #[rstest::rstest] #[test] fn check_ci(config: Config) -> color_eyre::Result<()> { - match env::var("CI") { + match std::env::var("CI") { Err(_) => Ok(()), Ok(_) => match config.client { Some(_) => Ok(()), diff --git a/src/paged_client.rs b/src/paged_client.rs index c0652c0..fa4e281 100644 --- a/src/paged_client.rs +++ b/src/paged_client.rs @@ -1,55 +1,249 @@ -use crate::types::VaultAccounts; -use crate::{Client, PagingVaultRequestBuilder}; +use crate::types::{Transaction, TransactionListBuilder, VaultAccounts}; +use crate::{Client, Epoch, FireblocksError, PagingVaultRequestBuilder, ParamError, QueryParams, Result}; +use chrono::{TimeZone, Utc}; +use futures::future::BoxFuture; +use futures::stream::FuturesUnordered; +use futures::{FutureExt, Stream, StreamExt}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; #[derive(Clone)] pub struct PagedClient { - pub client: Client, + pub client: Arc, } -#[derive(Clone)] pub struct VaultStream { - client: Client, + client: Arc, batch: u16, - init: bool, // has the stream started? after: Option, + init: bool, + fut: FuturesUnordered>>, +} + +impl VaultStream { + fn new(client: Arc, batch: u16) -> Self { + Self { client, batch, init: false, after: None, fut: FuturesUnordered::new() } + } + fn build_params(&self) -> std::result::Result { + PagingVaultRequestBuilder::new().limit(self.batch).after(self.after.as_ref().unwrap_or(&String::new())).build() + } +} + +pub struct TransactionStream { + client: Arc, + batch: u16, + init: bool, // has the stream started? + vault_id: i32, + after: Epoch, + is_source: bool, // are we streaming from source vault account or destination + fut: FuturesUnordered>>>, +} + +impl TransactionStream { + fn from_source(client: Arc, batch: u16, vault_id: i32, after: Epoch) -> Self { + Self { client, batch, init: false, vault_id, after, fut: FuturesUnordered::new(), is_source: true } + } + + fn from_dest(client: Arc, batch: u16, vault_id: i32, after: Epoch) -> Self { + Self { client, batch, init: false, vault_id, after, fut: FuturesUnordered::new(), is_source: false } + } + + fn build_params(&self) -> std::result::Result { + let mut builder = TransactionListBuilder::new(); + let builder = builder.limit(self.batch).sort_asc().order_created_at().after(&self.after); + + if self.is_source { + return builder.source_id(self.vault_id).build(); + } + builder.destination_id(self.vault_id).build() + } } pub trait AsyncIteratorAsyncNext { type Item; - async fn next(&mut self) -> crate::Result>; + async fn next(&mut self) -> Result>; } -impl AsyncIteratorAsyncNext for VaultStream { - type Item = VaultAccounts; +impl Stream for VaultStream { + type Item = Result; + + #[allow(clippy::cognitive_complexity)] + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if !self.init { + tracing::debug!("init future"); + self.init = true; + let client = self.client.clone(); + let params = match self.build_params() { + Ok(p) => p, + Err(e) => return Poll::Ready(Some(Err(FireblocksError::from(e)))), + }; + let fut = async move { client.vaults(params).await }.boxed(); + self.fut.push(fut); + cx.waker().wake_by_ref(); + return Poll::Pending; + } + + // Try to resolve any existing futures first + tracing::debug!("check future poll"); + match self.fut.poll_next_unpin(cx) { + Poll::Ready(opt) => { + if let Some(result) = opt { + match result { + Ok((ref va, ref _id)) => { + self.after = Option::clone(&va.paging.after); + }, + Err(e) => { + return Poll::Ready(Some(Err(e))); + }, + } + return Poll::Ready(Some(result)); + } + }, + Poll::Pending => { + tracing::debug!("still pending"); + cx.waker().wake_by_ref(); + return Poll::Pending; + }, + }; - async fn next(&mut self) -> crate::Result> { - if let Some(ref after) = self.after { - let params = PagingVaultRequestBuilder::new().limit(self.batch).after(after).build()?; - let (va, id) = self.client.vaults(params).await?; - self.after = Option::clone(&va.paging.after); - return Ok((Some(va), id)); + tracing::debug!("checking after {:#?}", self.after); + // If there are no more pages to fetch and no pending futures, end the stream + if self.after.is_none() { + return Poll::Ready(None); } - // this is the 1st attempt + + let client = self.client.clone(); + let params = match self.build_params() { + Ok(p) => p, + Err(e) => return Poll::Ready(Some(Err(FireblocksError::from(e)))), + }; + let fut = async move { client.vaults(params).await }.boxed(); + self.fut.push(fut); + cx.waker().wake_by_ref(); + Poll::Pending + } +} + +impl Stream for TransactionStream { + type Item = Result>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if !self.init { + tracing::debug!("init tracing stream"); self.init = true; - let params = PagingVaultRequestBuilder::new().limit(self.batch).build()?; - let (va, id) = self.client.vaults(params).await?; - self.after = Option::clone(&va.paging.after); - return Ok((Some(va), id)); + let client = self.client.clone(); + let params = match self.build_params() { + Ok(p) => p, + Err(e) => return Poll::Ready(Some(Err(FireblocksError::from(e)))), + }; + let fut = async move { client.transactions(params).await }.boxed(); + self.fut.push(fut); + cx.waker().wake_by_ref(); + return Poll::Pending; } - Ok((None, String::new())) + + match self.fut.poll_next_unpin(cx) { + Poll::Ready(opt) => { + if let Some(result) = opt { + match result { + Ok((ref va, ref _id)) => { + if va.is_empty() { + return Poll::Ready(None); + } + if let Some(last) = va.last() { + tracing::debug!("1st after {:#?} last after {:#?}", va[0].created_at, last.created_at); + self.after = last.created_at + chrono::Duration::milliseconds(100); + //TODO possible to miss a transaction + } + }, + Err(e) => { + return Poll::Ready(Some(Err(e))); + }, + } + return Poll::Ready(Some(result)); + } + }, + Poll::Pending => { + cx.waker().wake_by_ref(); + return Poll::Pending; + }, + }; + + let client = self.client.clone(); + let params = match self.build_params() { + Ok(p) => p, + Err(e) => return Poll::Ready(Some(Err(FireblocksError::from(e)))), + }; + let fut = async move { client.transactions(params).await }.boxed(); + self.fut.push(fut); + cx.waker().wake_by_ref(); + Poll::Pending } } impl PagedClient { - pub const fn new(client: Client) -> Self { + pub const fn new(client: Arc) -> Self { Self { client } } /// Stream the vault accounts based on batch size /// - /// see + /// ``` + /// use std::sync::Arc; + /// use futures::TryStreamExt; + /// use fireblocks_sdk::{Client, PagedClient}; + /// + /// async fn vault_accounts(c: Client) -> color_eyre::Result<()> { + /// let pc = PagedClient::new(Arc::new(c)); + /// let mut vault_stream = pc.vaults(100); + /// while let Ok(Some(result)) = vault_stream.try_next().await { + /// tracing::info!("accounts {}", result.0.accounts.len()); + /// tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + /// } + /// Ok(()) + /// } + /// ``` + /// see [`Client::vaults`] pub fn vaults(&self, batch_size: u16) -> VaultStream { - VaultStream { batch: batch_size, client: self.client.clone(), init: false, after: None } + VaultStream::new(self.client.clone(), batch_size) + } + + /// Stream all the transactions from source vault account id and after some date + /// + /// Default date is 2022-04-06 if None provided + /// + /// ``` + /// use std::sync::Arc; + /// use futures::TryStreamExt; + /// use fireblocks_sdk::{Client, PagedClient}; + /// + /// async fn transactions_paged(c: Client) -> color_eyre::Result<()> { + /// let pc = PagedClient::new(Arc::new(c)); + /// let mut ts = pc.transactions_from_source(0, 100, None); + /// while let Ok(Some(result)) = ts.try_next().await { + /// tracing::info!("transactions {}", result.0.len()); + /// } + /// Ok(()) + /// } + /// ``` + /// + /// see + /// * [`Client::transactions`] + pub fn transactions_from_source(&self, vault_id: i32, batch_size: u16, after: Option) -> TransactionStream { + let default_after = Utc.with_ymd_and_hms(2022, 4, 6, 0, 1, 1).unwrap(); + TransactionStream::from_source(self.client.clone(), batch_size, vault_id, after.unwrap_or(default_after)) + } + + /// Stream all the transactions from destination vault account id + /// See [`self.transactions_from_source`] + pub fn transactions_from_destination( + &self, + vault_id: i32, + batch_size: u16, + after: Option, + ) -> TransactionStream { + let default_after = Utc.with_ymd_and_hms(2022, 4, 6, 0, 1, 1).unwrap(); + TransactionStream::from_dest(self.client.clone(), batch_size, vault_id, after.unwrap_or(default_after)) } } diff --git a/src/types/page.rs b/src/types/page.rs index dd119a9..3bd0a76 100644 --- a/src/types/page.rs +++ b/src/types/page.rs @@ -1,4 +1,4 @@ -use crate::{impl_base_query_params, Epoch, QueryParams}; +use crate::{impl_base_query_params, QueryParams}; use bigdecimal::BigDecimal; use serde_derive::{Deserialize, Serialize}; @@ -25,23 +25,6 @@ impl BasePageParams { self } - pub(crate) fn before(&mut self, t: &Epoch) -> &mut Self { - self.add_instant("before", t) - } - - pub(crate) fn after(&mut self, t: &str) -> &mut Self { - self.params.push(("after".to_owned(), String::from(t))); - self - } - - fn add_instant(&mut self, param: &str, t: &Epoch) -> &mut Self { - self.params.push((param.to_owned(), Self::epoch(t))); - self - } - fn epoch(before: &Epoch) -> String { - format!("{}", before.timestamp_millis()) - } - #[allow(clippy::unnecessary_wraps)] pub(crate) fn build(&self) -> std::result::Result { Ok(Vec::clone(&self.params)) @@ -50,7 +33,7 @@ impl BasePageParams { #[derive(Debug, Default)] pub struct PagingAddressRequestBuilder { - params: QueryParams, // this is ignored + params: QueryParams, base: BasePageParams, } @@ -64,6 +47,18 @@ pub struct PagingVaultRequestBuilder { impl_base_query_params!(PagingVaultRequestBuilder); +impl PagingAddressRequestBuilder { + pub fn before(&mut self, t: &str) -> &mut Self { + self.params.push(("before".to_owned(), String::from(t))); + self + } + + pub fn after(&mut self, t: &str) -> &mut Self { + self.params.push(("before".to_owned(), String::from(t))); + self + } +} + impl PagingVaultRequestBuilder { pub fn min_threshold(&mut self, min: &BigDecimal) -> &mut Self { self.params.push(("minAmountThreshold".to_owned(), min.to_string())); @@ -79,4 +74,16 @@ impl PagingVaultRequestBuilder { self.params.push(("nameSuffix".to_owned(), String::from(n))); self } + + pub fn before(&mut self, t: &str) -> &mut Self { + self.params.push(("before".to_owned(), String::from(t))); + self + } + + pub fn after(&mut self, t: &str) -> &mut Self { + if !t.is_empty() { + self.params.push(("after".to_owned(), String::from(t))); + } + self + } } diff --git a/src/types/transaction.rs b/src/types/transaction.rs index 98a6a3d..94b4524 100644 --- a/src/types/transaction.rs +++ b/src/types/transaction.rs @@ -92,6 +92,26 @@ impl TransactionListBuilder { self } + pub fn sort_desc(&mut self) -> &mut Self { + self.params.push(("sort".to_string(), String::from("DESC"))); + self + } + + pub fn sort_asc(&mut self) -> &mut Self { + self.params.push(("sort".to_string(), String::from("ASC"))); + self + } + + pub fn order_created_at(&mut self) -> &mut Self { + self.params.push(("orderBy".to_string(), String::from("createdAt"))); + self + } + + pub fn order_last_updated(&mut self) -> &mut Self { + self.params.push(("orderBy".to_string(), String::from("lastUpdated"))); + self + } + pub fn hash(&mut self, s: &str) -> &mut Self { self.params.push(("txHash".to_string(), String::from(s))); self @@ -107,6 +127,22 @@ impl TransactionListBuilder { self } + pub(crate) fn before(&mut self, t: &Epoch) -> &mut Self { + self.add_instant("before", t) + } + + pub(crate) fn after(&mut self, t: &Epoch) -> &mut Self { + self.add_instant("after", t) + } + + fn add_instant(&mut self, param: &str, t: &Epoch) -> &mut Self { + self.params.push((param.to_owned(), Self::epoch(t))); + self + } + fn epoch(before: &Epoch) -> String { + format!("{}", before.timestamp_millis()) + } + /// Alias to [`TransactionListBuilder::hash`] pub fn tx_hash(&mut self, tx: &str) -> &mut Self { self.hash(tx)