Skip to content
This repository has been archived by the owner on Jan 28, 2024. It is now read-only.

Commit

Permalink
refactor: use replace remote address with local port
Browse files Browse the repository at this point in the history
  • Loading branch information
Itsusinn committed Jun 8, 2023
1 parent 9201dbb commit bac26e1
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 15 deletions.
4 changes: 2 additions & 2 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ use crate::{
};

pub type WsConn = Sender<tungstenite::Message>;
pub type WsConnId = Arc<SocketAddr>;
pub type WsConnId = u16;

pub use websocket::{ws, wss};

pub async fn receive_packets(
data: Vec<u8>,
conn: Sender<tungstenite::Message>,
conn_id: Arc<SocketAddr>,
conn_id: u16,
) -> Result<()> {
let pkt: Packet = ciborium::de::from_reader(&*data)?;
#[cfg(debug_assertions)]
Expand Down
23 changes: 10 additions & 13 deletions src/server/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ pub async fn wss(certs: &(Vec<Certificate>, PrivateKey)) -> Result<()> {
let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(config));
while let Some((stream, _)) = listener.accept().await.log() {
let acceptor = acceptor.clone();
if let Some(peer_address) = stream.peer_addr().log()
if let Some(peer_address) = stream.local_addr().log()
&& let Some(stream) = acceptor.accept(stream).await.log() {
accept_connection(stream, peer_address).await.eyre_log();
accept_connection(stream, peer_address.port()).await.log();
};
}
info!("wss listening stopped");
Expand All @@ -36,29 +36,26 @@ pub async fn wss(certs: &(Vec<Certificate>, PrivateKey)) -> Result<()> {
pub async fn ws() -> Result<()> {
let listener = TcpListener::bind(&ws_server_addr()).await?;
while let Some((stream, _)) = listener.accept().await.log() {
if let Some(peer_address) = stream.peer_addr().log() {
accept_connection(stream, peer_address).await.eyre_log();
if let Some(local_address) = stream.local_addr().log() {
accept_connection(stream, local_address.port()).await.log();
};
}
info!("ws listening stopped");
Ok(())
}

pub async fn accept_connection<S>(stream: S, peer_address: SocketAddr) -> Result<()>
pub async fn accept_connection<S>(stream: S, peer_id: u16) -> Result<()>
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
// handshake happens here
let ws_stream = tokio_tungstenite::accept_async(stream).await?;

info!("New WebSocket connection: {}", peer_address);

let conn_id = Arc::new(peer_address);
info!("New WebSocket connection: {}", peer_id);

let (tx, mut rx) = mpsc::channel(128);
let (write, mut read) = ws_stream.split();

let conn_id_clone = conn_id.clone();
tokio::spawn(async move {
let mut write = write;
while let Some(ws_message) = rx.recv().await {
Expand All @@ -74,14 +71,14 @@ where
Ok(_) => {}
};
}
info!("ws disconnected {}", conn_id_clone);
info!("ws disconnected {}", peer_id);
rx.close();
});
tokio::spawn(async move {
let tx = tx;
while let Some(next) = read.next().await {
let tx = tx.clone();
let conn_id = conn_id.clone();

match next {
Err(ws::Error::ConnectionClosed) | Err(ws::Error::AlreadyClosed) => {
break;
Expand All @@ -93,7 +90,7 @@ where
Err(e) => error!("{:?}", e.to_eyre()),
Ok(ws::Message::Binary(data)) => {
tokio::spawn(async move {
receive_packets(data, tx, conn_id).await.log();
receive_packets(data, tx, peer_id).await.log();
});
}
Ok(ws::Message::Ping(data)) => {
Expand All @@ -102,7 +99,7 @@ where
Ok(msg) => warn!("unexpected message {}", msg),
}
}
info!("ws disconnected {}", conn_id)
info!("ws disconnected {}", peer_id)
});
Ok(())
}

0 comments on commit bac26e1

Please sign in to comment.