Skip to content

Commit

Permalink
migrate to async fn
Browse files Browse the repository at this point in the history
  • Loading branch information
hlbarber committed Dec 14, 2023
1 parent 61df649 commit 1e0995a
Show file tree
Hide file tree
Showing 15 changed files with 147 additions and 612 deletions.
2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ version = "0.1.0"
edition = "2021"

[dependencies]
async-lock = "3.2.0"
futures-channel = "0.3.29"
futures-util = "0.3.29"
pin-project-lite = "0.2.13"
tokio = { version = "1.34.0", features = ["sync"] }

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ async fn main() {
sleep(Duration::from_secs(5)).await;
"foo"
})
.then(|output| async move { output })
.concurrency_limit(1)
.buffer(2)
.load_shed();
Expand Down
83 changes: 38 additions & 45 deletions examples/retry.rs
Original file line number Diff line number Diff line change
@@ -1,66 +1,59 @@
use std::{
future::{ready, Future, Ready},
sync::atomic::{AtomicUsize, Ordering},
};
use std::sync::atomic::{AtomicUsize, Ordering};

use burger::{service_fn, Policy, Service, ServiceExt};

struct FiniteRetries {
max_retries: usize,
struct FiniteRetries(usize);

struct Attempts<'a> {
max: &'a usize,
attempted: usize,
}
struct Attempts(usize);

#[derive(Debug)]
struct MaxAttempts;

impl<S, Request> Policy<S, Request> for FiniteRetries
impl<S> Policy<S, ()> for FiniteRetries
where
S: Service<Request>,
// https://github.com/rust-lang/rust/issues/49601
// for<'a> S::Future<'a>: Future<Output = &'a str>,
for<'a> S::Future<'a>: Future<Output = usize>,
Request: Clone,
for<'a> S: Service<(), Response<'a> = usize>,
{
type RequestState<'a> = Attempts;
type Future<'a> = Ready<Result<(), Request>>;
type Error<'a> = MaxAttempts;
type RequestState<'a> = Attempts<'a>;
type Error = MaxAttempts;

fn create(&self, request: &Request) -> (Attempts, Request) {
(Attempts(self.max_retries), request.clone())
fn create(&self, _request: &()) -> Attempts {
Attempts {
max: &self.0,
attempted: 0,
}
}

fn classify(
async fn classify<'a>(
&self,
state: &mut Attempts,
request: &Request,
response: &<<S as Service<Request>>::Future<'_> as futures_util::Future>::Output,
) -> Self::Future<'_> {
let result = if *response != 200 && state.0.checked_sub(1).is_none() {
Ok(())
} else {
Err(request.clone())
};
ready(result)
}
mut state: Self::RequestState<'a>,
response: &<S as Service<()>>::Response<'_>,
) -> Result<Option<((), Self::RequestState<'a>)>, Self::Error> {
if *response == 200 {
return Ok(None);
}

fn error(&self, _state: Attempts) -> Self::Error<'_> {
MaxAttempts
state.attempted += 1;

if state.attempted >= *state.max {
return Err(MaxAttempts);
}

Ok(Some(((), state)))
}
}

#[tokio::main]
async fn main() {
let x = AtomicUsize::new(198);
let svc = service_fn(move |()| {
// let x = x;
let x = &x;
async move {
let y = x.fetch_add(1, Ordering::SeqCst);
println!("{y}");
y
}
});
let svc = svc.retry(FiniteRetries { max_retries: 5 });
let x = svc.oneshot(()).await.unwrap();
drop(x);
let ref x = AtomicUsize::new(198);
service_fn(|()| async move {
let y = x.fetch_add(1, Ordering::SeqCst);
y
})
.retry(FiniteRetries(4))
.oneshot(())
.await
.unwrap();
}
2 changes: 2 additions & 0 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[toolchain]
channel = "nightly"
60 changes: 10 additions & 50 deletions src/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::{Semaphore, SemaphorePermit};

use async_lock::{Semaphore, SemaphoreGuard};
use futures_util::future::MaybeDone;
use pin_project_lite::pin_project;

use crate::{oneshot::Oneshot, Service, ServiceExt};
use crate::{Service, ServiceExt};

pub struct Buffer<S> {
inner: S,
Expand All @@ -26,58 +18,26 @@ impl<S> Buffer<S> {

pub struct BufferPermit<'a, S> {
inner: &'a S,
_semaphore_permit: SemaphoreGuard<'a>,
}

pin_project! {
pub struct BufferAcquire<'a, S>
{
inner: Option<&'a S>,
#[pin]
semaphore_acquire: MaybeDone<async_lock::futures::Acquire<'a>>
}
}

impl<'a, S> Future for BufferAcquire<'a, S> {
type Output = BufferPermit<'a, S>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
ready!(this.semaphore_acquire.as_mut().poll(cx));
Poll::Ready(BufferPermit {
inner: this
.inner
.take()
.expect("futures cannot be polled after completion"),
_semaphore_permit: this
.semaphore_acquire
.as_mut()
.take_output()
.expect("futures cannot be polled after completion"),
})
}
_semaphore_permit: SemaphorePermit<'a>,
}

impl<Request, S> Service<Request> for Buffer<S>
where
S: Service<Request>,
{
type Future<'a> = Oneshot<'a, S, Request> where S: 'a;
type Response<'a> = S::Response<'a>;
type Permit<'a> = BufferPermit<'a, S>
where
S: 'a;
type Acquire<'a> = BufferAcquire<'a, S>
where
S: 'a;

fn acquire(&self) -> Self::Acquire<'_> {
BufferAcquire {
inner: Some(&self.inner),
semaphore_acquire: MaybeDone::Future(self.semaphore.acquire()),
async fn acquire(&self) -> Self::Permit<'_> {
BufferPermit {
inner: &self.inner,
_semaphore_permit: self.semaphore.acquire().await.expect("not closed"),
}
}

fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Future<'a> {
permit.inner.oneshot(request)
async fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Response<'a> {
permit.inner.oneshot(request).await
}
}
23 changes: 10 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![feature(return_type_notation)]

mod buffer;
mod limit;
mod load_shed;
Expand All @@ -18,31 +20,26 @@ pub use select::*;
pub use service_fn::*;
pub use then::*;

use std::future::Future;

pub trait Service<Request> {
type Future<'a>: Future
where
Self: 'a;

type Response<'a>;
type Permit<'a>
where
Self: 'a;
type Acquire<'a>: Future<Output = Self::Permit<'a>>
where
Self: 'a;

fn acquire(&self) -> Self::Acquire<'_>;
async fn acquire(&self) -> Self::Permit<'_>;

async fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Response<'a>;

fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Future<'a>;
// `Self::Response` does not need `Self: 'a`
fn _silence_incorrect_lint(_: &Self::Response<'_>) {}
}

pub trait ServiceExt<Request>: Service<Request> {
fn oneshot(&self, request: Request) -> Oneshot<'_, Self, Request>
async fn oneshot(&self, request: Request) -> Self::Response<'_>
where
Self: Sized,
{
oneshot(request, self)
oneshot(request, self).await
}

fn then<F>(self, closure: F) -> Then<Self, F>
Expand Down
88 changes: 9 additions & 79 deletions src/limit/concurrency.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,4 @@
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};

use async_lock::{Semaphore, SemaphoreGuard};
use futures_util::future::MaybeDone;
use pin_project_lite::pin_project;
use tokio::sync::{Semaphore, SemaphorePermit};

use crate::Service;

Expand All @@ -26,88 +18,26 @@ impl<S> ConcurrencyLimit<S> {

pub struct ConcurrencyLimitPermit<'a, Inner> {
inner: Inner,
semaphore_permit: SemaphoreGuard<'a>,
}

pin_project! {
pub struct ConcurencyLimitAcquire<'a, Inner>
where
Inner: Future
{
#[pin]
inner: MaybeDone<Inner>,
#[pin]
semaphore_acquire: MaybeDone<async_lock::futures::Acquire<'a>>
}
}

impl<'a, Inner> Future for ConcurencyLimitAcquire<'a, Inner>
where
Inner: Future,
{
type Output = ConcurrencyLimitPermit<'a, Inner::Output>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
ready!(this.inner.as_mut().poll(cx));
ready!(this.semaphore_acquire.as_mut().poll(cx));
Poll::Ready(ConcurrencyLimitPermit {
inner: this
.inner
.as_mut()
.take_output()
.expect("futures cannot be polled after completion"),
semaphore_permit: this
.semaphore_acquire
.as_mut()
.take_output()
.expect("futures cannot be polled after completion"),
})
}
}

pin_project! {
pub struct ConcurrencyLimitFuture<'a, Inner> {
#[pin]
inner: Inner,
_semaphore_permit: SemaphoreGuard<'a>,
}
}

impl<'a, Inner> Future for ConcurrencyLimitFuture<'a, Inner>
where
Inner: Future,
{
type Output = Inner::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx)
}
_semaphore_permit: SemaphorePermit<'a>,
}

impl<Request, S> Service<Request> for ConcurrencyLimit<S>
where
S: Service<Request>,
{
type Future<'a> = ConcurrencyLimitFuture<'a, S::Future<'a>> where S: 'a;
type Response<'a> = S::Response<'a>;
type Permit<'a> = ConcurrencyLimitPermit<'a, S::Permit<'a>>
where
S: 'a;
type Acquire<'a> = ConcurencyLimitAcquire<'a, S::Acquire<'a>>
where
S: 'a;

fn acquire(&self) -> Self::Acquire<'_> {
ConcurencyLimitAcquire {
inner: MaybeDone::Future(self.inner.acquire()),
semaphore_acquire: MaybeDone::Future(self.semaphore.acquire()),
async fn acquire(&self) -> Self::Permit<'_> {
ConcurrencyLimitPermit {
inner: self.inner.acquire().await,
_semaphore_permit: self.semaphore.acquire().await.expect("not closed"),
}
}

fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Future<'a> {
ConcurrencyLimitFuture {
inner: S::call(permit.inner, request),
_semaphore_permit: permit.semaphore_permit,
}
async fn call<'a>(permit: Self::Permit<'a>, request: Request) -> Self::Response<'a> {
S::call(permit.inner, request).await
}
}
4 changes: 2 additions & 2 deletions src/limit/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod concurrency;
mod rate;
// mod rate;

pub use concurrency::*;
pub use rate::*;
// pub use rate::*;
Loading

0 comments on commit 1e0995a

Please sign in to comment.