diff --git a/Cargo.toml b/Cargo.toml index f46579c..8742fb1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,4 +29,5 @@ tracing-subscriber = "0.3.17" [features] doc = [] +local = [] diff --git a/src/v037/server.rs b/src/v037/server.rs index 9eb8403..8fa7cd6 100644 --- a/src/v037/server.rs +++ b/src/v037/server.rs @@ -3,7 +3,8 @@ use std::convert::{TryFrom, TryInto}; use futures::future::{FutureExt, TryFutureExt}; use futures::sink::SinkExt; use futures::stream::{FuturesOrdered, StreamExt}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use futures::{Sink, Stream}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::{ net::{TcpListener, ToSocketAddrs}, select, @@ -24,6 +25,7 @@ use tendermint::v0_37::abci::{ ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest, MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse, }; +use tendermint_proto::v0_37::abci as pb; /// An ABCI server which listens for connections and forwards requests to four /// component ABCI [`Service`]s. @@ -185,28 +187,50 @@ where } } - pub async fn listen_channel(self, mut channel: tokio::sync::mpsc::Receiver<(R, W)>) - where - R: AsyncReadExt + std::marker::Unpin + Send + 'static, - W: AsyncWriteExt + std::marker::Unpin + Send + 'static, - { - tracing::info!("ABCI server starting on tokio channel"); + /// Connects to this server, using an in-memory connection. + #[cfg(feature = "local")] + pub fn connect_local( + &self, + max_buf_size: usize, + ) -> ( + tokio::task::JoinHandle>, + impl Stream>, + impl Sink, + ) { + let (client, server) = tokio::io::duplex(max_buf_size); - match channel.recv().await { - Some((read, write)) => { - tracing::debug!("accepted new connection"); - let conn = Connection { - consensus: self.consensus.clone(), - mempool: self.mempool.clone(), - info: self.info.clone(), - snapshot: self.snapshot.clone(), - }; - tokio::spawn(async move { conn.run(read, write).await.unwrap() }); + tracing::debug!("ABCI server listening on in-memory connection"); + let (reader, writer) = tokio::io::split(server); + let conn = tokio::spawn( + Connection { + consensus: self.consensus.clone(), + mempool: self.mempool.clone(), + info: self.info.clone(), + snapshot: self.snapshot.clone(), } - None => { - tracing::trace!("channel closed"); - } - } + .run(reader, writer), + ); + + let (reader, writer) = tokio::io::split(client); + let (response_stream, request_sink) = Self::client_io(reader, writer); + + (conn, response_stream, request_sink) + } + + /// Returns the client's reader and writer: a response stream and a request sink. + #[cfg(feature = "local")] + fn client_io( + read: impl AsyncRead, + write: impl AsyncWrite, + ) -> ( + impl Stream>, + impl Sink, + ) { + use crate::v037::codec::{Decode, Encode}; + ( + FramedRead::new(read, Decode::::default()), + FramedWrite::new(write, Encode::::default()), + ) } } @@ -237,8 +261,6 @@ where ) -> Result<(), BoxError> { tracing::info!("listening for requests"); - use tendermint_proto::v0_37::abci as pb; - let (mut request_stream, mut response_sink) = { use crate::v037::codec::{Decode, Encode}; (