Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

rpc server with HTTP/WS on the same socket #12663

Merged
merged 15 commits into from
Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
506 changes: 332 additions & 174 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion bin/node-template/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ frame-system = { version = "4.0.0-dev", path = "../../../frame/system" }
pallet-transaction-payment = { version = "4.0.0-dev", default-features = false, path = "../../../frame/transaction-payment" }

# These dependencies are used for the node template's RPCs
jsonrpsee = { version = "0.15.1", features = ["server"] }
jsonrpsee = { version = "0.16.1", features = ["server"] }
sc-rpc = { version = "4.0.0-dev", path = "../../../client/rpc" }
sp-api = { version = "4.0.0-dev", path = "../../../primitives/api" }
sc-rpc-api = { version = "0.10.0-dev", path = "../../../client/rpc-api" }
Expand Down
2 changes: 1 addition & 1 deletion bin/node/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ array-bytes = "4.1"
clap = { version = "4.0.9", features = ["derive"], optional = true }
codec = { package = "parity-scale-codec", version = "3.0.0" }
serde = { version = "1.0.136", features = ["derive"] }
jsonrpsee = { version = "0.15.1", features = ["server"] }
jsonrpsee = { version = "0.16.1", features = ["server"] }
futures = "0.3.21"
log = "0.4.17"
rand = "0.8"
Expand Down
2 changes: 1 addition & 1 deletion bin/node/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ repository = "https://github.com/paritytech/substrate/"
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
jsonrpsee = { version = "0.15.1", features = ["server"] }
jsonrpsee = { version = "0.16.1", features = ["server"] }
node-primitives = { version = "2.0.0", path = "../primitives" }
pallet-mmr-rpc = { version = "3.0.0", path = "../../../frame/merkle-mountain-range/rpc/" }
pallet-transaction-payment-rpc = { version = "4.0.0-dev", path = "../../../frame/transaction-payment/rpc/" }
Expand Down
2 changes: 1 addition & 1 deletion client/beefy/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ homepage = "https://substrate.io"
[dependencies]
codec = { package = "parity-scale-codec", version = "3.0.0", features = ["derive"] }
futures = "0.3.21"
jsonrpsee = { version = "0.15.1", features = ["server", "macros"] }
jsonrpsee = { version = "0.16.1", features = ["server", "macros"] }
log = "0.4"
parking_lot = "0.12.1"
serde = { version = "1.0.136", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion client/beefy/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ mod tests {
};
use beefy_primitives::{known_payloads, Payload, SignedCommitment};
use codec::{Decode, Encode};
use jsonrpsee::{types::EmptyParams, RpcModule};
use jsonrpsee::{types::EmptyServerParams as EmptyParams, RpcModule};
use sp_runtime::traits::{BlakeTwo256, Hash};
use substrate_test_runtime_client::runtime::Block;

Expand Down
2 changes: 1 addition & 1 deletion client/consensus/babe/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ readme = "README.md"
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
jsonrpsee = { version = "0.15.1", features = ["server", "macros"] }
jsonrpsee = { version = "0.16.1", features = ["server", "macros"] }
futures = "0.3.21"
serde = { version = "1.0.136", features = ["derive"] }
thiserror = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion client/consensus/manual-seal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ readme = "README.md"
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
jsonrpsee = { version = "0.15.1", features = ["server", "macros"] }
jsonrpsee = { version = "0.16.1", features = ["server", "macros"] }
assert_matches = "1.3.0"
async-trait = "0.1.57"
codec = { package = "parity-scale-codec", version = "3.0.0" }
Expand Down
2 changes: 1 addition & 1 deletion client/finality-grandpa/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ homepage = "https://substrate.io"
[dependencies]
finality-grandpa = { version = "0.16.0", features = ["derive-codec"] }
futures = "0.3.16"
jsonrpsee = { version = "0.15.1", features = ["server", "macros"] }
jsonrpsee = { version = "0.16.1", features = ["server", "macros"] }
log = "0.4.8"
parity-scale-codec = { version = "3.0.0", features = ["derive"] }
serde = { version = "1.0.105", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion client/finality-grandpa/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ mod tests {
use std::{collections::HashSet, convert::TryInto, sync::Arc};

use jsonrpsee::{
types::{EmptyParams, SubscriptionId},
types::{EmptyServerParams as EmptyParams, SubscriptionId},
RpcModule,
};
use parity_scale_codec::{Decode, Encode};
Expand Down
2 changes: 1 addition & 1 deletion client/rpc-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ sp-rpc = { version = "6.0.0", path = "../../primitives/rpc" }
sp-runtime = { version = "7.0.0", path = "../../primitives/runtime" }
sp-tracing = { version = "6.0.0", path = "../../primitives/tracing" }
sp-version = { version = "5.0.0", path = "../../primitives/version" }
jsonrpsee = { version = "0.15.1", features = ["server", "macros"] }
jsonrpsee = { version = "0.16.1", features = ["server", "client-core", "macros"] }
5 changes: 4 additions & 1 deletion client/rpc-servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
futures = "0.3.21"
jsonrpsee = { version = "0.15.1", features = ["server"] }
jsonrpsee = { version = "0.16.1", features = ["server"] }
log = "0.4.17"
serde_json = "1.0.85"
tokio = { version = "1.17.0", features = ["parking_lot"] }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../utils/prometheus" }
tower-http = { version = "0.3.4", features = ["full"] }
tower = { version = "0.4.13", features = ["full"] }
http = "0.2.8"
136 changes: 77 additions & 59 deletions client/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,21 @@
#![warn(missing_docs)]

use jsonrpsee::{
http_server::{AccessControlBuilder, HttpServerBuilder, HttpServerHandle},
ws_server::{WsServerBuilder, WsServerHandle},
server::{
middleware::proxy_get_request::ProxyGetRequestLayer, AllowHosts, ServerBuilder,
ServerHandle,
},
RpcModule,
};
use std::{error::Error as StdError, net::SocketAddr};

pub use crate::middleware::{RpcMetrics, RpcMiddleware};
pub use crate::middleware::RpcMetrics;
use http::header::HeaderValue;
pub use jsonrpsee::core::{
id_providers::{RandomIntegerIdProvider, RandomStringIdProvider},
traits::IdProvider,
};
use tower_http::cors::{AllowOrigin, CorsLayer};

const MEGABYTE: usize = 1024 * 1024;

Expand All @@ -46,12 +50,11 @@ const WS_MAX_SUBS_PER_CONN: usize = 1024;

pub mod middleware;

/// Type alias for http server
pub type HttpServer = HttpServerHandle;
/// Type alias for ws server
pub type WsServer = WsServerHandle;
/// Type alias JSON-RPC server
pub type Server = ServerHandle;

/// WebSocket specific settings on the server.
/// Server config.
#[derive(Debug, Clone)]
pub struct WsConfig {
/// Maximum connections.
pub max_connections: Option<usize>,
Expand All @@ -67,8 +70,8 @@ impl WsConfig {
// Deconstructs the config to get the finalized inner values.
//
// `Payload size` or `max subs per connection` bigger than u32::MAX will be truncated.
fn deconstruct(self) -> (u32, u32, u64, u32) {
let max_conns = self.max_connections.unwrap_or(WS_MAX_CONNECTIONS) as u64;
fn deconstruct(self) -> (u32, u32, u32, u32) {
let max_conns = self.max_connections.unwrap_or(WS_MAX_CONNECTIONS) as u32;
let max_payload_in_mb = payload_size_or_default(self.max_payload_in_mb) as u32;
let max_payload_out_mb = payload_size_or_default(self.max_payload_out_mb) as u32;
let max_subs_per_conn = self.max_subs_per_conn.unwrap_or(WS_MAX_SUBS_PER_CONN) as u32;
Expand All @@ -86,31 +89,28 @@ pub async fn start_http<M: Send + Sync + 'static>(
metrics: Option<RpcMetrics>,
rpc_api: RpcModule<M>,
rt: tokio::runtime::Handle,
) -> Result<HttpServerHandle, Box<dyn StdError + Send + Sync>> {
let max_payload_in = payload_size_or_default(max_payload_in_mb);
let max_payload_out = payload_size_or_default(max_payload_out_mb);

let mut acl = AccessControlBuilder::new();

if let Some(cors) = cors {
// Whitelist listening address.
// NOTE: set_allowed_hosts will whitelist both ports but only one will used.
acl = acl.set_allowed_hosts(format_allowed_hosts(&addrs[..]))?;
acl = acl.set_allowed_origins(cors)?;
};

let builder = HttpServerBuilder::new()
.max_request_body_size(max_payload_in as u32)
.max_response_body_size(max_payload_out as u32)
.set_access_control(acl.build())
.health_api("/health", "system_health")?
.custom_tokio_runtime(rt);
) -> Result<ServerHandle, Box<dyn StdError + Send + Sync>> {
let max_payload_in = payload_size_or_default(max_payload_in_mb) as u32;
let max_payload_out = payload_size_or_default(max_payload_out_mb) as u32;
let host_filter = hosts_filter(cors.is_some(), &addrs);
let cors = try_into_cors(cors)?;

let middleware = tower::ServiceBuilder::new()
// Proxy `GET /health` requests to internal `system_health` method.
.layer(ProxyGetRequestLayer::new("/health", "system_health")?)
.layer(cors.clone());

let builder = ServerBuilder::new()
.max_request_body_size(max_payload_in)
.max_response_body_size(max_payload_out)
.set_host_filtering(host_filter)
.set_middleware(middleware)
.custom_tokio_runtime(rt)
.http_only();
Copy link
Member Author

@niklasad1 niklasad1 Nov 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this runs only on HTTP and will be deprecated/removed in a few releases.

/cc @PierreBesson @jam206 would be great if you can migrate the health checks to the --ws-port when this is deployed that should work

I think the most reasonable is to keep --rpc-port and similar flags and just deprecate the ws-related flags.
Then make the 9944 the default port.


let rpc_api = build_rpc_api(rpc_api);
let (handle, addr) = if let Some(metrics) = metrics {
let middleware = RpcMiddleware::new(metrics, "http".into());
let builder = builder.set_middleware(middleware);
let server = builder.build(&addrs[..]).await?;
let server = builder.set_logger(metrics).build(&addrs[..]).await?;
let addr = server.local_addr();
(server.start(rpc_api)?, addr)
} else {
Expand All @@ -120,44 +120,44 @@ pub async fn start_http<M: Send + Sync + 'static>(
};

log::info!(
"Running JSON-RPC HTTP server: addr={}, allowed origins={:?}",
"Running JSON-RPC HTTP server: addr={}, cors={:?}",
addr.map_or_else(|_| "unknown".to_string(), |a| a.to_string()),
cors
);

Ok(handle)
}

/// Start WS server listening on given address.
pub async fn start_ws<M: Send + Sync + 'static>(
/// Start a JSON-RPC server listening on given address that supports both HTTP and WS.
pub async fn start<M: Send + Sync + 'static>(
Copy link
Member Author

@niklasad1 niklasad1 Nov 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reckon that it would be useful for folks to inject their own tower middleware here but not possible currently. We could do it in another PR but it's a bit tricky to control the CORS then....

addrs: [SocketAddr; 2],
cors: Option<&Vec<String>>,
ws_config: WsConfig,
metrics: Option<RpcMetrics>,
rpc_api: RpcModule<M>,
rt: tokio::runtime::Handle,
id_provider: Option<Box<dyn IdProvider>>,
) -> Result<WsServerHandle, Box<dyn StdError + Send + Sync>> {
) -> Result<ServerHandle, Box<dyn StdError + Send + Sync>> {
let (max_payload_in, max_payload_out, max_connections, max_subs_per_conn) =
ws_config.deconstruct();

let mut acl = AccessControlBuilder::new();
let host_filter = hosts_filter(cors.is_some(), &addrs);
let cors = try_into_cors(cors)?;

if let Some(cors) = cors {
// Whitelist listening address.
// NOTE: set_allowed_hosts will whitelist both ports but only one will used.
acl = acl.set_allowed_hosts(format_allowed_hosts(&addrs[..]))?;
acl = acl.set_allowed_origins(cors)?;
};
let middleware = tower::ServiceBuilder::new()
// Proxy `GET /health` requests to internal `system_health` method.
.layer(ProxyGetRequestLayer::new("/health", "system_health")?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DQ: previously we exposed the /health API path for HTTP only. If we upgrade the HTTP connection to WS, would the /health API still be reachable for the "old-HTTP port, now WS port"?

Copy link
Member Author

@niklasad1 niklasad1 Nov 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/health is enabled for both i.e, the new mixed WS/HTTP server and the old HTTP server.

.layer(cors.clone());

let mut builder = WsServerBuilder::new()
let mut builder = ServerBuilder::new()
.max_request_body_size(max_payload_in)
.max_response_body_size(max_payload_out)
.max_connections(max_connections)
.max_subscriptions_per_connection(max_subs_per_conn)
.ping_interval(std::time::Duration::from_secs(30))
.custom_tokio_runtime(rt)
.set_access_control(acl.build());
.set_host_filtering(host_filter)
.set_middleware(middleware)
.custom_tokio_runtime(rt);

if let Some(provider) = id_provider {
builder = builder.set_id_provider(provider);
Expand All @@ -167,9 +167,7 @@ pub async fn start_ws<M: Send + Sync + 'static>(

let rpc_api = build_rpc_api(rpc_api);
let (handle, addr) = if let Some(metrics) = metrics {
let middleware = RpcMiddleware::new(metrics, "ws".into());
let builder = builder.set_middleware(middleware);
let server = builder.build(&addrs[..]).await?;
let server = builder.set_logger(metrics).build(&addrs[..]).await?;
let addr = server.local_addr();
(server.start(rpc_api)?, addr)
} else {
Expand All @@ -179,23 +177,14 @@ pub async fn start_ws<M: Send + Sync + 'static>(
};

log::info!(
"Running JSON-RPC WS server: addr={}, allowed origins={:?}",
"Running JSON-RPC WS server: addr={}, cors={:?}",
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
addr.map_or_else(|_| "unknown".to_string(), |a| a.to_string()),
cors
);

Ok(handle)
}

fn format_allowed_hosts(addrs: &[SocketAddr]) -> Vec<String> {
let mut hosts = Vec::with_capacity(addrs.len() * 2);
for addr in addrs {
hosts.push(format!("localhost:{}", addr.port()));
hosts.push(format!("127.0.0.1:{}", addr.port()));
}
hosts
}

fn build_rpc_api<M: Send + Sync + 'static>(mut rpc_api: RpcModule<M>) -> RpcModule<M> {
let mut available_methods = rpc_api.method_names().collect::<Vec<_>>();
available_methods.sort();
Expand All @@ -214,3 +203,32 @@ fn build_rpc_api<M: Send + Sync + 'static>(mut rpc_api: RpcModule<M>) -> RpcModu
fn payload_size_or_default(size_mb: Option<usize>) -> usize {
size_mb.map_or(RPC_MAX_PAYLOAD_DEFAULT, |mb| mb.saturating_mul(MEGABYTE))
}

fn hosts_filter(enabled: bool, addrs: &[SocketAddr]) -> AllowHosts {
if enabled {
// NOTE The listening addresses are whitelisted by default.
let mut hosts = Vec::with_capacity(addrs.len() * 2);
for addr in addrs {
hosts.push(format!("localhost:{}", addr.port()).into());
hosts.push(format!("127.0.0.1:{}", addr.port()).into());
}
AllowHosts::Only(hosts)
} else {
AllowHosts::Any
}
}

fn try_into_cors(
maybe_cors: Option<&Vec<String>>,
) -> Result<CorsLayer, Box<dyn StdError + Send + Sync>> {
if let Some(cors) = maybe_cors {
let mut list = Vec::new();
for origin in cors {
list.push(HeaderValue::from_str(origin)?);
}
Ok(CorsLayer::new().allow_origin(AllowOrigin::list(list)))
} else {
// allow all cors
Ok(CorsLayer::permissive())
}
}
Loading