diff --git a/Cargo.toml b/Cargo.toml index d05b037..2de1c77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,10 +4,8 @@ version = "0.1.0" edition = "2021" [dependencies] -async-lock = "3.2.0" futures-channel = "0.3.29" futures-util = "0.3.29" -pin-project-lite = "0.2.13" tokio = { version = "1.34.0", features = ["sync"] } [dev-dependencies] diff --git a/examples/basic.rs b/examples/basic.rs index 241261d..46ad676 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -10,6 +10,7 @@ async fn main() { sleep(Duration::from_secs(5)).await; "foo" }) + .then(|output| async move { output }) .concurrency_limit(1) .buffer(2) .load_shed(); diff --git a/examples/retry.rs b/examples/retry.rs index 687fd14..03b299c 100644 --- a/examples/retry.rs +++ b/examples/retry.rs @@ -1,66 +1,59 @@ -use std::{ - future::{ready, Future, Ready}, - sync::atomic::{AtomicUsize, Ordering}, -}; +use std::sync::atomic::{AtomicUsize, Ordering}; use burger::{service_fn, Policy, Service, ServiceExt}; -struct FiniteRetries { - max_retries: usize, +struct FiniteRetries(usize); + +struct Attempts<'a> { + max: &'a usize, + attempted: usize, } -struct Attempts(usize); #[derive(Debug)] struct MaxAttempts; -impl Policy for FiniteRetries +impl Policy for FiniteRetries where - S: Service, - // https://github.com/rust-lang/rust/issues/49601 - // for<'a> S::Future<'a>: Future, - for<'a> S::Future<'a>: Future, - Request: Clone, + for<'a> S: Service<(), Response<'a> = usize>, { - type RequestState<'a> = Attempts; - type Future<'a> = Ready>; - type Error<'a> = MaxAttempts; + type RequestState<'a> = Attempts<'a>; + type Error = MaxAttempts; - fn create(&self, request: &Request) -> (Attempts, Request) { - (Attempts(self.max_retries), request.clone()) + fn create(&self, _request: &()) -> Attempts { + Attempts { + max: &self.0, + attempted: 0, + } } - fn classify( + async fn classify<'a>( &self, - state: &mut Attempts, - request: &Request, - response: &<>::Future<'_> as futures_util::Future>::Output, - ) -> Self::Future<'_> { - let result = if *response != 200 && state.0.checked_sub(1).is_none() { - Ok(()) - } else { - Err(request.clone()) - }; - ready(result) - } + mut state: Self::RequestState<'a>, + response: &>::Response<'_>, + ) -> Result)>, Self::Error> { + if *response == 200 { + return Ok(None); + } - fn error(&self, _state: Attempts) -> Self::Error<'_> { - MaxAttempts + state.attempted += 1; + + if state.attempted >= *state.max { + return Err(MaxAttempts); + } + + Ok(Some(((), state))) } } #[tokio::main] async fn main() { - let x = AtomicUsize::new(198); - let svc = service_fn(move |()| { - // let x = x; - let x = &x; - async move { - let y = x.fetch_add(1, Ordering::SeqCst); - println!("{y}"); - y - } - }); - let svc = svc.retry(FiniteRetries { max_retries: 5 }); - let x = svc.oneshot(()).await.unwrap(); - drop(x); + let ref x = AtomicUsize::new(198); + service_fn(|()| async move { + let y = x.fetch_add(1, Ordering::SeqCst); + y + }) + .retry(FiniteRetries(4)) + .oneshot(()) + .await + .unwrap(); } diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..5d56faf --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,2 @@ +[toolchain] +channel = "nightly" diff --git a/src/buffer.rs b/src/buffer.rs index ae603eb..172ebf1 100644 --- a/src/buffer.rs +++ b/src/buffer.rs @@ -1,14 +1,6 @@ -use std::{ - future::Future, - pin::Pin, - task::{ready, Context, Poll}, -}; +use tokio::sync::{Semaphore, SemaphorePermit}; -use async_lock::{Semaphore, SemaphoreGuard}; -use futures_util::future::MaybeDone; -use pin_project_lite::pin_project; - -use crate::{oneshot::Oneshot, Service, ServiceExt}; +use crate::{Service, ServiceExt}; pub struct Buffer { inner: S, @@ -26,58 +18,26 @@ impl Buffer { pub struct BufferPermit<'a, S> { inner: &'a S, - _semaphore_permit: SemaphoreGuard<'a>, -} - -pin_project! { - pub struct BufferAcquire<'a, S> - { - inner: Option<&'a S>, - #[pin] - semaphore_acquire: MaybeDone> - } -} - -impl<'a, S> Future for BufferAcquire<'a, S> { - type Output = BufferPermit<'a, S>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); - ready!(this.semaphore_acquire.as_mut().poll(cx)); - Poll::Ready(BufferPermit { - inner: this - .inner - .take() - .expect("futures cannot be polled after completion"), - _semaphore_permit: this - .semaphore_acquire - .as_mut() - .take_output() - .expect("futures cannot be polled after completion"), - }) - } + _semaphore_permit: SemaphorePermit<'a>, } impl Service for Buffer where S: Service, { - type Future<'a> = Oneshot<'a, S, Request> where S: 'a; + type Response<'a> = S::Response<'a>; type Permit<'a> = BufferPermit<'a, S> - where - S: 'a; - type Acquire<'a> = BufferAcquire<'a, S> where S: 'a; - fn acquire(&self) -> Self::Acquire<'_> { - BufferAcquire { - inner: Some(&self.inner), - semaphore_acquire: MaybeDone::Future(self.semaphore.acquire()), + async fn acquire(&self) -> Self::Permit<'_> { + BufferPermit { + inner: &self.inner, + _semaphore_permit: self.semaphore.acquire().await.expect("not closed"), } } - fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Future<'a> { - permit.inner.oneshot(request) + async fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Response<'a> { + permit.inner.oneshot(request).await } } diff --git a/src/lib.rs b/src/lib.rs index e6577a3..9a42519 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(return_type_notation)] + mod buffer; mod limit; mod load_shed; @@ -18,31 +20,26 @@ pub use select::*; pub use service_fn::*; pub use then::*; -use std::future::Future; - pub trait Service { - type Future<'a>: Future - where - Self: 'a; - + type Response<'a>; type Permit<'a> where Self: 'a; - type Acquire<'a>: Future> - where - Self: 'a; - fn acquire(&self) -> Self::Acquire<'_>; + async fn acquire(&self) -> Self::Permit<'_>; + + async fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Response<'a>; - fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Future<'a>; + // `Self::Response` does not need `Self: 'a` + fn _silence_incorrect_lint(_: &Self::Response<'_>) {} } pub trait ServiceExt: Service { - fn oneshot(&self, request: Request) -> Oneshot<'_, Self, Request> + async fn oneshot(&self, request: Request) -> Self::Response<'_> where Self: Sized, { - oneshot(request, self) + oneshot(request, self).await } fn then(self, closure: F) -> Then diff --git a/src/limit/concurrency.rs b/src/limit/concurrency.rs index e30a2f2..31eba9f 100644 --- a/src/limit/concurrency.rs +++ b/src/limit/concurrency.rs @@ -1,12 +1,4 @@ -use std::{ - future::Future, - pin::Pin, - task::{ready, Context, Poll}, -}; - -use async_lock::{Semaphore, SemaphoreGuard}; -use futures_util::future::MaybeDone; -use pin_project_lite::pin_project; +use tokio::sync::{Semaphore, SemaphorePermit}; use crate::Service; @@ -26,88 +18,26 @@ impl ConcurrencyLimit { pub struct ConcurrencyLimitPermit<'a, Inner> { inner: Inner, - semaphore_permit: SemaphoreGuard<'a>, -} - -pin_project! { - pub struct ConcurencyLimitAcquire<'a, Inner> - where - Inner: Future - { - #[pin] - inner: MaybeDone, - #[pin] - semaphore_acquire: MaybeDone> - } -} - -impl<'a, Inner> Future for ConcurencyLimitAcquire<'a, Inner> -where - Inner: Future, -{ - type Output = ConcurrencyLimitPermit<'a, Inner::Output>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); - ready!(this.inner.as_mut().poll(cx)); - ready!(this.semaphore_acquire.as_mut().poll(cx)); - Poll::Ready(ConcurrencyLimitPermit { - inner: this - .inner - .as_mut() - .take_output() - .expect("futures cannot be polled after completion"), - semaphore_permit: this - .semaphore_acquire - .as_mut() - .take_output() - .expect("futures cannot be polled after completion"), - }) - } -} - -pin_project! { - pub struct ConcurrencyLimitFuture<'a, Inner> { - #[pin] - inner: Inner, - _semaphore_permit: SemaphoreGuard<'a>, - } -} - -impl<'a, Inner> Future for ConcurrencyLimitFuture<'a, Inner> -where - Inner: Future, -{ - type Output = Inner::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().inner.poll(cx) - } + _semaphore_permit: SemaphorePermit<'a>, } impl Service for ConcurrencyLimit where S: Service, { - type Future<'a> = ConcurrencyLimitFuture<'a, S::Future<'a>> where S: 'a; + type Response<'a> = S::Response<'a>; type Permit<'a> = ConcurrencyLimitPermit<'a, S::Permit<'a>> - where - S: 'a; - type Acquire<'a> = ConcurencyLimitAcquire<'a, S::Acquire<'a>> where S: 'a; - fn acquire(&self) -> Self::Acquire<'_> { - ConcurencyLimitAcquire { - inner: MaybeDone::Future(self.inner.acquire()), - semaphore_acquire: MaybeDone::Future(self.semaphore.acquire()), + async fn acquire(&self) -> Self::Permit<'_> { + ConcurrencyLimitPermit { + inner: self.inner.acquire().await, + _semaphore_permit: self.semaphore.acquire().await.expect("not closed"), } } - fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Future<'a> { - ConcurrencyLimitFuture { - inner: S::call(permit.inner, request), - _semaphore_permit: permit.semaphore_permit, - } + async fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Response<'a> { + S::call(permit.inner, request).await } } diff --git a/src/limit/mod.rs b/src/limit/mod.rs index 05e6585..3895a5b 100644 --- a/src/limit/mod.rs +++ b/src/limit/mod.rs @@ -1,5 +1,5 @@ mod concurrency; -mod rate; +// mod rate; pub use concurrency::*; -pub use rate::*; +// pub use rate::*; diff --git a/src/load_shed.rs b/src/load_shed.rs index 1e7f13c..11649f3 100644 --- a/src/load_shed.rs +++ b/src/load_shed.rs @@ -1,9 +1,4 @@ -use std::future::{ready, Future, Ready}; - -use futures_util::{ - future::{Either, Map}, - FutureExt, -}; +use futures_util::FutureExt; use crate::Service; @@ -21,37 +16,24 @@ impl LoadShed { #[non_exhaustive] pub struct Shed; -type NewOutput<'a, S, Request> = - Result<<>::Future<'a> as Future>::Output, Shed>; - impl Service for LoadShed where S: Service, { - type Future<'a> = Either< - Map, fn( as Future>::Output) -> NewOutput<'a, S, Request>>, - Ready> - > - where - Self: 'a; - + type Response<'a> = Result, Shed>; type Permit<'a> = Option> where - Self: 'a; - - type Acquire<'a> = Ready> - where - Self: 'a; + S: 'a; - fn acquire(&self) -> Self::Acquire<'_> { - ready(self.inner.acquire().now_or_never()) + async fn acquire(&self) -> Self::Permit<'_> { + self.inner.acquire().now_or_never() } - fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Future<'a> { + async fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Response<'a> { if let Some(permit) = permit { - Either::Left(S::call(permit, request).map(Ok)) + Ok(S::call(permit, request).await) } else { - Either::Right(ready(Err(Shed))) + Err(Shed) } } } diff --git a/src/map.rs b/src/map.rs index 8c13a46..ebeccb0 100644 --- a/src/map.rs +++ b/src/map.rs @@ -1,11 +1,3 @@ -use std::{ - future::Future, - pin::Pin, - task::{ready, Context, Poll}, -}; - -use futures_util::{future::Map as MapFuture, FutureExt}; - use crate::Service; pub struct Map { @@ -24,47 +16,22 @@ pub struct MapPermit<'a, Inner, F> { closure: &'a F, } -pin_project_lite::pin_project! { - pub struct MapAcquire<'a, Inner, F> { - #[pin] - inner: Inner, - closure: &'a F - } -} - -impl<'a, Inner, F> Future for MapAcquire<'a, Inner, F> -where - Inner: Future, -{ - type Output = MapPermit<'a, Inner::Output, F>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let inner = ready!(this.inner.poll(cx)); - Poll::Ready(MapPermit { - inner, - closure: this.closure, - }) - } -} - impl Service for Map where S: Service, - for<'a> F: Fn( as Future>::Output) -> Output, + for<'a> F: Fn(S::Response<'a>) -> Output, { - type Future<'a> = MapFuture, &'a F> where S: 'a, F: 'a; + type Response<'a> = Output; type Permit<'a> = MapPermit<'a, S::Permit<'a>, F> where S: 'a, F: 'a; - type Acquire<'a> = MapAcquire<'a, S::Acquire<'a>, F> where F: 'a, S: 'a; - fn acquire(&self) -> Self::Acquire<'_> { - MapAcquire { - inner: self.inner.acquire(), + async fn acquire(&self) -> Self::Permit<'_> { + MapPermit { + inner: self.inner.acquire().await, closure: &self.closure, } } - fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Future<'a> { - S::call(permit.inner, request).map(permit.closure) + async fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Response<'a> { + (permit.closure)(S::call(permit.inner, request).await) } } diff --git a/src/oneshot.rs b/src/oneshot.rs index 8313445..faa530d 100644 --- a/src/oneshot.rs +++ b/src/oneshot.rs @@ -1,82 +1,11 @@ -use std::{ - future::{ready, Future, Ready}, - pin::Pin, - task::{ready, Context, Poll}, -}; - -use pin_project_lite::pin_project; - use crate::Service; -pin_project! { - #[project= OneshotInnerProj] - #[project_replace = OneshotInnerProjReplace] - enum OneshotInner - { - // TODO: Can we remove this `Option`? - Acquire { request: Option, #[pin] inner: Acquire }, - Call { #[pin] inner: Call }, - Transition - } -} - -pin_project! { - pub struct Oneshot<'a, S, Request> - where - S: Service, - { - service: &'a S, - #[pin] - state: OneshotInner, S::Future<'a>>, - } -} - -pub fn oneshot(request: Request, service: &S) -> Oneshot<'_, S, Request> +pub async fn oneshot(request: Request, service: &S) -> S::Response<'_> where S: Service, { - Oneshot { - service, - state: OneshotInner::Acquire { - request: Some(request), - inner: service.acquire(), - }, - } -} - -impl<'a, Request, S> Future for Oneshot<'a, S, Request> -where - S: Service, -{ - type Output = as Future>::Output; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); - loop { - let state = this.state.as_mut().project(); - let new_state = match state { - OneshotInnerProj::Acquire { inner, request } => match inner.poll(cx) { - Poll::Ready(ready) => OneshotInner::Call { - inner: S::call( - ready, - request.take().expect("this cannot be taken more than once"), - ), - }, - Poll::Pending => { - return Poll::Pending; - } - }, - OneshotInnerProj::Call { inner } => { - let output = ready!(inner.poll(cx)); - return Poll::Ready(output); - } - OneshotInnerProj::Transition => { - unreachable!("this is an ephemeral state and cannot be reached") - } - }; - this.state.as_mut().project_replace(new_state); - } - } + let permit = service.acquire().await; + S::call(permit, request).await } pub(crate) struct Depressurize { @@ -87,23 +16,16 @@ impl Service for Depressurize where S: Service, { - type Future<'a> = Oneshot<'a, S, Request> - where - S: 'a; - + type Response<'a> = S::Response<'a>; type Permit<'a> = &'a S where S: 'a; - type Acquire<'a> = Ready> - where - S: 'a; - - fn acquire(&self) -> Self::Acquire<'_> { - ready(&self.inner) + async fn acquire(&self) -> Self::Permit<'_> { + &self.inner } - fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Future<'a> { - oneshot(request, permit) + async fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Response<'a> { + oneshot(request, permit).await } } diff --git a/src/retry.rs b/src/retry.rs index 13d3b3f..008c525 100644 --- a/src/retry.rs +++ b/src/retry.rs @@ -1,35 +1,19 @@ -use std::{ - future::Future, - pin::Pin, - task::{ready, Context, Poll}, -}; - -use pin_project_lite::pin_project; - -use crate::{oneshot::Oneshot, Service, ServiceExt}; +use crate::{Service, ServiceExt}; pub trait Policy where S: Service, { type RequestState<'a>; - type Future<'a>: Future> - where - Self: 'a; - type Error<'a> - where - Self: 'a; + type Error; - fn create(&self, request: &Request) -> (Self::RequestState<'_>, Request); + fn create(&self, request: &Request) -> Self::RequestState<'_>; - fn classify( + async fn classify<'a>( &self, - state: &mut Self::RequestState<'_>, - request: &Request, - response: & as Future>::Output, - ) -> Self::Future<'_>; - - fn error(&self, state: Self::RequestState<'_>) -> Self::Error<'_>; + state: Self::RequestState<'a>, + response: &S::Response<'_>, + ) -> Result)>, Self::Error>; } pub struct Retry { @@ -38,211 +22,50 @@ pub struct Retry { } impl Retry { - pub fn new(inner: S, policy: P) -> Self { + pub(crate) fn new(inner: S, policy: P) -> Self { Self { inner, policy } } } -pin_project! { - pub struct RetryFuture<'a, S, P, Request> - where - S: Service, - P: Policy - { - service: &'a S, - policy: &'a P, - #[pin] - inner: RetryFutureInner<'a, S, P, Request>, - request: Request - } -} - -pin_project! { - #[project = RetryFutureInnerProj] - #[project_replace = RetryFutureInnerProjReplace] - enum RetryFutureInner<'a, S: 'a, P: 'a, Request> - where - P: Policy, - S: Service, - { - Initial { permit: S::Permit<'a> }, - Calling { - #[pin] call: S::Future<'a>, - state: P::RequestState<'a>, - }, - Retrying { - #[pin] - oneshot: Oneshot<'a, S, Request>, - state: P::RequestState<'a>, - }, - Classifying { - #[pin] classify: P::Future<'a>, - state: P::RequestState<'a>, - response: as Future>::Output - }, - Pending - } -} - -impl<'a, S, P, Request> Future for RetryFuture<'a, S, P, Request> -where - S: Service, - P: Policy, -{ - type Output = Result< as Future>::Output, P::Error<'a>>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); - loop { - let next = match this.inner.as_mut().project() { - RetryFutureInnerProj::Initial { .. } => { - let RetryFutureInnerProjReplace::Initial { permit } = this - .inner - .as_mut() - .project_replace(RetryFutureInner::Pending) - else { - unreachable!() - }; - let (state, request) = this.policy.create(&this.request); - RetryFutureInner::Calling { - call: S::call(permit, request), - state, - } - } - RetryFutureInnerProj::Calling { call, .. } => { - let response = ready!(call.poll(cx)); - let RetryFutureInnerProjReplace::Calling { mut state, .. } = this - .inner - .as_mut() - .project_replace(RetryFutureInner::Pending) - else { - unreachable!() - }; - let classify = this.policy.classify(&mut state, &this.request, &response); - RetryFutureInner::Classifying { - classify, - state, - response, - } - } - RetryFutureInnerProj::Retrying { oneshot, .. } => { - let response = ready!(oneshot.poll(cx)); - let RetryFutureInnerProjReplace::Retrying { mut state, .. } = this - .inner - .as_mut() - .project_replace(RetryFutureInner::Pending) - else { - unreachable!() - }; - let classify = this.policy.classify(&mut state, &this.request, &response); - RetryFutureInner::Classifying { - classify, - state, - response, - } - } - RetryFutureInnerProj::Classifying { classify, .. } => { - let result = ready!(classify.poll(cx)); - match result { - Ok(_) => { - let RetryFutureInnerProjReplace::Classifying { response, .. } = this - .inner - .as_mut() - .project_replace(RetryFutureInner::Pending) - else { - unreachable!() - }; - return Poll::Ready(Ok(response)); - } - Err(request) => { - let oneshot = this.service.oneshot(request); - let state = { - let RetryFutureInnerProjReplace::Classifying { state, .. } = this - .inner - .as_mut() - .project_replace(RetryFutureInner::Pending) - else { - unreachable!() - }; - state - }; - RetryFutureInner::Retrying { oneshot, state } - } - } - } - RetryFutureInnerProj::Pending => unreachable!(), - }; - this.inner.as_mut().project_replace(next); - } - } -} - pub struct RetryPermit<'a, S, P, Inner> { service: &'a S, policy: &'a P, inner: Inner, } -pin_project! { - pub struct RetryAcquire<'a, S, P, Inner> { - service: &'a S, - policy: &'a P, - #[pin] - inner: Inner, - } -} - -impl<'a, S, P, Inner> Future for RetryAcquire<'a, S, P, Inner> -where - Inner: Future, -{ - type Output = RetryPermit<'a, S, P, Inner::Output>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().project(); - let inner = ready!(this.inner.poll(cx)); - let permit = RetryPermit { - service: self.service, - policy: self.policy, - inner, - }; - Poll::Ready(permit) - } -} - impl Service for Retry where S: Service, P: Policy, { - type Future<'a> = RetryFuture<'a, S, P, Request> - where - Self: 'a; - + type Response<'a> = Result, P::Error>; type Permit<'a> = RetryPermit<'a, S, P, S::Permit<'a>> where Self: 'a; - type Acquire<'a> = RetryAcquire<'a, S, P, S::Acquire<'a>> - where - Self: 'a; - - fn acquire(&self) -> Self::Acquire<'_> { - RetryAcquire { + async fn acquire(&self) -> Self::Permit<'_> { + RetryPermit { service: &self.inner, policy: &self.policy, - inner: self.inner.acquire(), + inner: self.inner.acquire().await, } } - fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Future<'a> { - RetryFuture { - service: permit.service, - policy: permit.policy, - inner: RetryFutureInner::Initial { - permit: permit.inner, - }, - request, + async fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Response<'a> { + let RetryPermit { + service, + policy, + inner, + } = permit; + let mut state = policy.create(&request); + let mut response = S::call(inner, request).await; + + loop { + let Some((request, new_state)) = policy.classify(state, &response).await? else { + return Ok(response); + }; + state = new_state; + response = service.oneshot(request).await; } } } diff --git a/src/select.rs b/src/select.rs index 1c3b9ef..238e906 100644 --- a/src/select.rs +++ b/src/select.rs @@ -1,7 +1,4 @@ -use futures_util::{ - future::{select_all, Map, SelectAll}, - FutureExt, -}; +use futures_util::future::select_all; use crate::Service; @@ -13,29 +10,24 @@ impl Service for Select where for<'a> &'a I: IntoIterator, I: 'static, - S: Service + 'static, - for<'a> S::Acquire<'a>: Unpin, + S: Service + 'static, { - type Future<'a> = S::Future<'a> - where - I: 'a; + type Response<'a> = S::Response<'a>; type Permit<'a> = S::Permit<'a> - where - I: 'a; - type Acquire<'a> = Map>, fn((S::Permit<'a>, usize, Vec>)) -> S::Permit<'a>> where I: 'a; - fn acquire<'a>(&'a self) -> Self::Acquire<'a> { + async fn acquire<'a>(&'a self) -> Self::Permit<'a> { let iter = self.services.into_iter().map(|s| s.acquire()); - select_all(iter).map(|(permit, _, _)| permit) + let (permit, _, _) = select_all(iter).await; + permit } - fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Future<'a> { - S::call(permit, request) + async fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Response<'a> { + S::call(permit, request).await } } pub fn select(services: I) -> Select { Select { services } -} \ No newline at end of file +} diff --git a/src/service_fn.rs b/src/service_fn.rs index f753438..1f6dc75 100644 --- a/src/service_fn.rs +++ b/src/service_fn.rs @@ -1,4 +1,4 @@ -use std::future::{ready, Future, Ready}; +use std::future::Future; use crate::Service; @@ -11,16 +11,15 @@ where F: Fn(Request) -> Fut, Fut: Future, { - type Future<'a> = Fut where F: 'a; + type Response<'a> = Fut::Output; type Permit<'a> = &'a F where F: 'a; - type Acquire<'a> = Ready<&'a F> where F: 'a; - fn acquire(&self) -> Self::Acquire<'_> { - ready(&self.closure) + async fn acquire(&self) -> Self::Permit<'_> { + &self.closure } - fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Future<'a> { - permit(request) + async fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Response<'a> { + permit(request).await } } diff --git a/src/then.rs b/src/then.rs index 6fbb3ee..5040cf5 100644 --- a/src/then.rs +++ b/src/then.rs @@ -1,10 +1,4 @@ -use std::{ - future::Future, - pin::Pin, - task::{ready, Context, Poll}, -}; - -use futures_util::{future::Then as ThenFuture, FutureExt}; +use std::future::Future; use crate::Service; @@ -24,48 +18,23 @@ pub struct ThenPermit<'a, Inner, F> { closure: &'a F, } -pin_project_lite::pin_project! { - pub struct ThenAcquire<'a, Inner, F> { - #[pin] - inner: Inner, - closure: &'a F - } -} - -impl<'a, Inner, F> Future for ThenAcquire<'a, Inner, F> -where - Inner: Future, -{ - type Output = ThenPermit<'a, Inner::Output, F>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let inner = ready!(this.inner.poll(cx)); - Poll::Ready(ThenPermit { - inner, - closure: this.closure, - }) - } -} - -impl Service for Then +impl Service for Then where S: Service, - for<'a> F: Fn( as Future>::Output) -> Fut2, - Fut2: Future, + for<'a> F: Fn(S::Response<'a>) -> Fut, + Fut: Future, { - type Future<'a> = ThenFuture, Fut2, &'a F> where S: 'a, F: 'a; + type Response<'a> = Fut::Output; type Permit<'a> = ThenPermit<'a, S::Permit<'a>, F> where S: 'a, F: 'a; - type Acquire<'a> = ThenAcquire<'a, S::Acquire<'a>, F> where F: 'a, S: 'a; - fn acquire(&self) -> Self::Acquire<'_> { - ThenAcquire { - inner: self.inner.acquire(), + async fn acquire(&self) -> Self::Permit<'_> { + ThenPermit { + inner: self.inner.acquire().await, closure: &self.closure, } } - fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Future<'a> { - S::call(permit.inner, request).then(permit.closure) + async fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Response<'a> { + (permit.closure)(S::call(permit.inner, request).await).await } }