Skip to content

Commit

Permalink
🌾 connect_local returns connection, stream, and sink
Browse files Browse the repository at this point in the history
pushing this, as experimentation continues. this really does seem like
a very promising avenue for abci testing!
  • Loading branch information
cratelyn committed Feb 12, 2024
1 parent cdc6028 commit 5d5f502
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 23 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ tracing-subscriber = "0.3.17"

[features]
doc = []
local = []

68 changes: 45 additions & 23 deletions src/v037/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::convert::{TryFrom, TryInto};
use futures::future::{FutureExt, TryFutureExt};
use futures::sink::SinkExt;
use futures::stream::{FuturesOrdered, StreamExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use futures::{Sink, Stream};

Check failure on line 6 in src/v037/server.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused imports: `Sink`, `Stream`

Check failure on line 6 in src/v037/server.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused imports: `Sink`, `Stream`

Check warning on line 6 in src/v037/server.rs

View workflow job for this annotation

GitHub Actions / Check

unused imports: `Sink`, `Stream`

Check warning on line 6 in src/v037/server.rs

View workflow job for this annotation

GitHub Actions / Check

unused imports: `Sink`, `Stream`

Check warning on line 6 in src/v037/server.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused imports: `Sink`, `Stream`

Check warning on line 6 in src/v037/server.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused imports: `Sink`, `Stream`
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

Check failure on line 7 in src/v037/server.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused imports: `AsyncRead`, `AsyncWrite`

Check failure on line 7 in src/v037/server.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused imports: `AsyncRead`, `AsyncWrite`

Check warning on line 7 in src/v037/server.rs

View workflow job for this annotation

GitHub Actions / Check

unused imports: `AsyncRead`, `AsyncWrite`

Check warning on line 7 in src/v037/server.rs

View workflow job for this annotation

GitHub Actions / Check

unused imports: `AsyncRead`, `AsyncWrite`

Check warning on line 7 in src/v037/server.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused imports: `AsyncRead`, `AsyncWrite`

Check warning on line 7 in src/v037/server.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused imports: `AsyncRead`, `AsyncWrite`
use tokio::{
net::{TcpListener, ToSocketAddrs},
select,
Expand All @@ -24,6 +25,7 @@ use tendermint::v0_37::abci::{
ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse,
};
use tendermint_proto::v0_37::abci as pb;

/// An ABCI server which listens for connections and forwards requests to four
/// component ABCI [`Service`]s.
Expand Down Expand Up @@ -185,28 +187,50 @@ where
}
}

pub async fn listen_channel<R, W>(self, mut channel: tokio::sync::mpsc::Receiver<(R, W)>)
where
R: AsyncReadExt + std::marker::Unpin + Send + 'static,
W: AsyncWriteExt + std::marker::Unpin + Send + 'static,
{
tracing::info!("ABCI server starting on tokio channel");
/// Connects to this server, using an in-memory connection.
#[cfg(feature = "local")]
pub fn connect_local(
&self,
max_buf_size: usize,
) -> (
tokio::task::JoinHandle<Result<(), BoxError>>,
impl Stream<Item = Result<pb::Response, BoxError>>,
impl Sink<pb::Request>,
) {
let (client, server) = tokio::io::duplex(max_buf_size);

match channel.recv().await {
Some((read, write)) => {
tracing::debug!("accepted new connection");
let conn = Connection {
consensus: self.consensus.clone(),
mempool: self.mempool.clone(),
info: self.info.clone(),
snapshot: self.snapshot.clone(),
};
tokio::spawn(async move { conn.run(read, write).await.unwrap() });
tracing::debug!("ABCI server listening on in-memory connection");
let (reader, writer) = tokio::io::split(server);
let conn = tokio::spawn(
Connection {
consensus: self.consensus.clone(),
mempool: self.mempool.clone(),
info: self.info.clone(),
snapshot: self.snapshot.clone(),
}
None => {
tracing::trace!("channel closed");
}
}
.run(reader, writer),
);

let (reader, writer) = tokio::io::split(client);
let (response_stream, request_sink) = Self::client_io(reader, writer);

(conn, response_stream, request_sink)
}

/// Returns the client's reader and writer: a response stream and a request sink.
#[cfg(feature = "local")]
fn client_io(
read: impl AsyncRead,
write: impl AsyncWrite,
) -> (
impl Stream<Item = Result<pb::Response, BoxError>>,
impl Sink<pb::Request>,
) {
use crate::v037::codec::{Decode, Encode};
(
FramedRead::new(read, Decode::<pb::Response>::default()),
FramedWrite::new(write, Encode::<pb::Request>::default()),
)
}
}

Expand Down Expand Up @@ -237,8 +261,6 @@ where
) -> Result<(), BoxError> {
tracing::info!("listening for requests");

use tendermint_proto::v0_37::abci as pb;

let (mut request_stream, mut response_sink) = {
use crate::v037::codec::{Decode, Encode};
(
Expand Down

0 comments on commit 5d5f502

Please sign in to comment.