Skip to content
This repository has been archived by the owner on Jan 28, 2024. It is now read-only.

Commit

Permalink
feat(tls): allow disable websocket tls
Browse files Browse the repository at this point in the history
  • Loading branch information
Itsusinn committed Aug 13, 2022
1 parent f4f7bd4 commit bc777df
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 37 deletions.
49 changes: 30 additions & 19 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
#![feature(let_chains)]

mod config;
mod data;
mod ext;
mod log;
mod room;
pub mod server;
mod tls;

use std::{net::SocketAddr, sync::Arc, time::Duration};
mod tls;

use std::net::SocketAddr;
use color_eyre::eyre::Result;
use quinn::TransportConfig;
use config::Config;


use crate::config::CONFIG;

#[macro_use]
extern crate educe;
#[macro_use]
extern crate automatic_config;
// #[macro_use]
// extern crate singleton;
#[macro_use]
extern crate tracing;

Expand All @@ -35,29 +45,30 @@ async fn run() -> Result<()> {
.install()?;
}
log::init().await?;

let certs = tls::read_certs_from_file()?;
let mut server_config = quinn::ServerConfig::with_single_cert(certs.0.to_owned(), certs.1)?;
let mut trans_config = TransportConfig::default();
trans_config.keep_alive_interval(Some(Duration::from_secs(5)));
server_config.transport = Arc::new(trans_config);
let mut cert_store = rustls::RootCertStore::empty();
for cert in certs.0 {
cert_store.add(&cert)?
Config::reload().await?;
if !CONFIG.enable {
warn!("MesagistoCenter is not enabled, about to exit the program.");
warn!("To enable, please modify the configuration file.");
return Ok(());
}
let certs = tls::read_certs_from_file().await?;

server::quic(&certs).await?;

server::quic(server_config).await.unwrap();
if CONFIG.tls.enable_for_ws {
server::wss(&certs).await?;
} else {
server::ws().await?;
}

tokio::spawn(async {
server::ws().await.unwrap();
});
info!("Start successfully");
tokio::signal::ctrl_c().await?;
Ok(())
}

fn server_addr() -> SocketAddr {
"127.0.0.1:6996".parse::<SocketAddr>().unwrap()
fn quic_server_addr() -> SocketAddr {
CONFIG.server.quic.as_str().parse::<SocketAddr>().unwrap()
}
fn ws_server_addr() -> SocketAddr {
server_addr()
CONFIG.server.ws.as_str().parse::<SocketAddr>().unwrap()
}
23 changes: 19 additions & 4 deletions src/server/quic.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,34 @@
use std::{sync::Arc, time::Duration};

use color_eyre::eyre::Result;
use futures_util::StreamExt;
use quinn::{Endpoint, NewConnection};
use quinn::{Endpoint, NewConnection, TransportConfig, IdleTimeout};
use rustls::{Certificate, PrivateKey};

use super::receive_packets;
use crate::{
ext::{EitherExt, ResultExt},
server_addr,
quic_server_addr,
};

pub async fn quic(server_config: quinn::ServerConfig) -> Result<()> {
let (_, incoming) = Endpoint::server(server_config, server_addr())?;
pub async fn quic(certs: &(Vec<Certificate>, PrivateKey)) -> Result<()> {
let mut server_config =
quinn::ServerConfig::with_single_cert(certs.0.to_owned(), certs.1.to_owned())?;
let mut trans_config = TransportConfig::default();
trans_config.keep_alive_interval(Some(Duration::from_secs(5)));
trans_config.max_idle_timeout(Some(IdleTimeout::try_from(Duration::from_secs(8))?));
server_config.transport = Arc::new(trans_config);
let mut cert_store = rustls::RootCertStore::empty();
for cert in &certs.0 {
cert_store.add(cert)?
}
let (_, incoming) = Endpoint::server(server_config, quic_server_addr())?;
tokio::spawn(async move {
handle_incoming(incoming).await.unwrap();
});
Ok(())
}

pub async fn handle_incoming(mut incoming: quinn::Incoming) -> Result<()> {
while let Some(conn) = incoming.next().await {
let mut nconn: NewConnection = match conn.await {
Expand All @@ -36,6 +50,7 @@ pub async fn handle_incoming(mut incoming: quinn::Incoming) -> Result<()> {
}
});
}
info!("quic connection close")
});
}
Ok(())
Expand Down
64 changes: 50 additions & 14 deletions src/server/websocket.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,87 @@
use std::sync::Arc;
use std::{net::SocketAddr, sync::Arc};

use color_eyre::eyre::Result;
use futures_util::{SinkExt, StreamExt};
use rustls::{Certificate, PrivateKey};
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpListener,
sync::mpsc
sync::mpsc,
};
use tokio_tungstenite::tungstenite as ws;

use crate::{
ext::{EitherExt, ResultExt, EyreExt},
ext::{EitherExt, EyreExt, ResultExt},
server::receive_packets,
ws_server_addr,
};

pub async fn wss(certs: &(Vec<Certificate>, PrivateKey)) -> Result<()> {
let listener = TcpListener::bind(&ws_server_addr()).await?;
let config = rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(certs.0.to_owned(), certs.1.to_owned())?;
let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(config));
tokio::spawn(async move {
let acceptor = acceptor;
while let Ok((stream, _)) = listener.accept().await {
let acceptor = acceptor.clone();
if let Some(peer_address) = stream.peer_addr().eyre_log()
&& let Some(stream) = acceptor.accept(stream).await.eyre_log() {
tokio::spawn(async move {
accept_connection(stream, peer_address).await.log()
});
};
}
unimplemented!()
});
Ok(())
}
pub async fn ws() -> Result<()> {
// Create the event loop and TCP listener we'll accept connections on.
let listener = TcpListener::bind(&ws_server_addr()).await?;
tokio::spawn(async move {
while let Ok((stream, _)) = listener.accept().await {
tokio::spawn(accept_connection(stream));
if let Some(peer_address) = stream.peer_addr().eyre_log() {
tokio::spawn(async move {
accept_connection(stream, peer_address).await.log()
});
};
}
unimplemented!()
});
Ok(())
}

pub async fn accept_connection(stream: tokio::net::TcpStream) -> Result<()> {
let addr = stream.peer_addr()?;
pub async fn accept_connection<S>(stream: S, peer_address: SocketAddr) -> Result<()>
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
// handshake happens here
let ws_stream = tokio_tungstenite::accept_async(stream).await?;

info!("New WebSocket connection: {}", addr);
info!("New WebSocket connection: {}", peer_address);

let conn_id = Arc::new(addr);
let conn_id = Arc::new(peer_address);

let (tx, mut rx) = mpsc::channel(64);
let (write, mut read) = ws_stream.split();

let conn_id_clone = conn_id.clone();
tokio::spawn(async move {
let mut write = write;
while let Some(ws_message) = rx.recv().await {
match write.send(ws_message).await {
Err(ws::Error::ConnectionClosed) | Err(ws::Error::AlreadyClosed) => {
break;
}
Err(ws::Error::Protocol(ws::error::ProtocolError::ResetWithoutClosingHandshake)) => {
Err(ws::Error::Protocol(ws::error::ProtocolError::ResetWithoutClosingHandshake)) | Err(ws::Error::Io(_)) => {
break;
}
Err(e) => error!("{:?}", e),
_ => {}
Ok(_) => {}
};
}
info!("ws disconnected {}",conn_id_clone);
rx.close();
});
tokio::spawn(async move {
Expand All @@ -61,13 +93,17 @@ pub async fn accept_connection(stream: tokio::net::TcpStream) -> Result<()> {
Err(ws::Error::ConnectionClosed) | Err(ws::Error::AlreadyClosed) => {
break;
}
Err(ws::Error::Protocol(ws::error::ProtocolError::ResetWithoutClosingHandshake)) => {
Err(ws::Error::Protocol(ws::error::ProtocolError::ResetWithoutClosingHandshake)) | Err(ws::Error::Io(_)) => {
break;
}
Err(e) => error!("{:?}", e.to_eyre()),
Ok(ws::Message::Pong(_)) => {
debug!("pong from {}", conn_id);
}
Ok(ws::Message::Ping(ping)) => {
debug!("ping from {}", conn_id);
tx.send(ws::Message::Pong(ping)).await.log();
}
Ok(ws::Message::Binary(data)) => {
tokio::spawn(async move {
receive_packets(data, tx.tr(), conn_id.tr()).await.log();
Expand All @@ -76,7 +112,7 @@ pub async fn accept_connection(stream: tokio::net::TcpStream) -> Result<()> {
Ok(msg) => warn!("unexpected message {}", msg),
}
}
info!("ws disconnected {}",conn_id)
});
Ok(())
}

0 comments on commit bc777df

Please sign in to comment.