diff --git a/substrate/bin/node/cli/benches/block_production.rs b/substrate/bin/node/cli/benches/block_production.rs index c17c12dfef13e..d28cfbddfd222 100644 --- a/substrate/bin/node/cli/benches/block_production.rs +++ b/substrate/bin/node/cli/benches/block_production.rs @@ -84,6 +84,7 @@ fn new_node(tokio_handle: Handle) -> node_cli::service::NewFullBase { rpc_max_subs_per_conn: Default::default(), rpc_port: 9944, rpc_message_buffer_capacity: Default::default(), + rpc_rate_limit: None, prometheus_config: None, telemetry_endpoints: None, default_heap_pages: None, diff --git a/substrate/bin/node/cli/benches/transaction_pool.rs b/substrate/bin/node/cli/benches/transaction_pool.rs index 0d0d3a072d89d..1e25b7ce6fd87 100644 --- a/substrate/bin/node/cli/benches/transaction_pool.rs +++ b/substrate/bin/node/cli/benches/transaction_pool.rs @@ -80,6 +80,7 @@ fn new_node(tokio_handle: Handle) -> node_cli::service::NewFullBase { rpc_max_subs_per_conn: Default::default(), rpc_port: 9944, rpc_message_buffer_capacity: Default::default(), + rpc_rate_limit: None, prometheus_config: None, telemetry_endpoints: None, default_heap_pages: None, diff --git a/substrate/client/cli/src/commands/run_cmd.rs b/substrate/client/cli/src/commands/run_cmd.rs index f7b0fc5104910..ecff7eead7b67 100644 --- a/substrate/client/cli/src/commands/run_cmd.rs +++ b/substrate/client/cli/src/commands/run_cmd.rs @@ -34,7 +34,10 @@ use sc_service::{ ChainSpec, Role, }; use sc_telemetry::TelemetryEndpoints; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + num::NonZeroU32, +}; /// The `run` command used to run a node. #[derive(Debug, Clone, Parser)] @@ -82,6 +85,15 @@ pub struct RunCmd { )] pub rpc_methods: RpcMethods, + /// RPC rate limiting (calls/minute) for each connection. + /// + /// This is disabled by default. + /// + /// For example `--rpc-rate-limit 10` will maximum allow + /// 10 calls per minute per connection. + #[arg(long)] + pub rpc_rate_limit: Option, + /// Set the maximum RPC request payload size for both HTTP and WS in megabytes. #[arg(long, default_value_t = RPC_DEFAULT_MAX_REQUEST_SIZE_MB)] pub rpc_max_request_size: u32, @@ -399,6 +411,10 @@ impl CliConfiguration for RunCmd { Ok(self.rpc_max_subscriptions_per_connection) } + fn rpc_rate_limit(&self) -> Result> { + Ok(self.rpc_rate_limit) + } + fn transaction_pool(&self, is_dev: bool) -> Result { Ok(self.pool_config.transaction_pool(is_dev)) } diff --git a/substrate/client/cli/src/config.rs b/substrate/client/cli/src/config.rs index defcc4a8a6907..78015cf8373d1 100644 --- a/substrate/client/cli/src/config.rs +++ b/substrate/client/cli/src/config.rs @@ -33,7 +33,7 @@ use sc_service::{ BlocksPruning, ChainSpec, TracingReceiver, }; use sc_tracing::logging::LoggerBuilder; -use std::{net::SocketAddr, path::PathBuf}; +use std::{net::SocketAddr, num::NonZeroU32, path::PathBuf}; /// The maximum number of characters for a node name. pub(crate) const NODE_NAME_MAX_LENGTH: usize = 64; @@ -338,6 +338,11 @@ pub trait CliConfiguration: Sized { Ok(RPC_DEFAULT_MESSAGE_CAPACITY_PER_CONN) } + /// Rate limit calls per minute. + fn rpc_rate_limit(&self) -> Result> { + Ok(None) + } + /// Get the prometheus configuration (`None` if disabled) /// /// By default this is `None`. @@ -510,6 +515,7 @@ pub trait CliConfiguration: Sized { rpc_max_subs_per_conn: self.rpc_max_subscriptions_per_connection()?, rpc_port: DCV::rpc_listen_port(), rpc_message_buffer_capacity: self.rpc_buffer_capacity_per_connection()?, + rpc_rate_limit: self.rpc_rate_limit()?, prometheus_config: self .prometheus_config(DCV::prometheus_listen_port(), &chain_spec)?, telemetry_endpoints, diff --git a/substrate/client/cli/src/runner.rs b/substrate/client/cli/src/runner.rs index e37c8ab0e5516..b4937db71e690 100644 --- a/substrate/client/cli/src/runner.rs +++ b/substrate/client/cli/src/runner.rs @@ -271,6 +271,7 @@ mod tests { rpc_max_subs_per_conn: Default::default(), rpc_message_buffer_capacity: Default::default(), rpc_port: 9944, + rpc_rate_limit: None, prometheus_config: None, telemetry_endpoints: None, default_heap_pages: None, diff --git a/substrate/client/rpc-servers/Cargo.toml b/substrate/client/rpc-servers/Cargo.toml index 7dd525ada6533..3e04b927a222f 100644 --- a/substrate/client/rpc-servers/Cargo.toml +++ b/substrate/client/rpc-servers/Cargo.toml @@ -27,3 +27,4 @@ http = "0.2.8" hyper = "0.14.27" futures = "0.3.29" pin-project = "1.1.3" +governor = "0.6.0" diff --git a/substrate/client/rpc-servers/src/lib.rs b/substrate/client/rpc-servers/src/lib.rs index 29b34b2945b13..a22b7309ac192 100644 --- a/substrate/client/rpc-servers/src/lib.rs +++ b/substrate/client/rpc-servers/src/lib.rs @@ -22,7 +22,9 @@ pub mod middleware; -use std::{convert::Infallible, error::Error as StdError, net::SocketAddr, time::Duration}; +use std::{ + convert::Infallible, error::Error as StdError, net::SocketAddr, num::NonZeroU32, time::Duration, +}; use http::header::HeaderValue; use hyper::{ @@ -31,10 +33,7 @@ use hyper::{ }; use jsonrpsee::{ server::{ - middleware::{ - http::{HostFilterLayer, ProxyGetRequestLayer}, - rpc::RpcServiceBuilder, - }, + middleware::http::{HostFilterLayer, ProxyGetRequestLayer}, stop_channel, ws, PingConfig, StopHandle, TowerServiceBuilder, }, Methods, RpcModule, @@ -43,11 +42,14 @@ use tokio::net::TcpListener; use tower::Service; use tower_http::cors::{AllowOrigin, CorsLayer}; -pub use jsonrpsee::core::{ - id_providers::{RandomIntegerIdProvider, RandomStringIdProvider}, - traits::IdProvider, +pub use jsonrpsee::{ + core::{ + id_providers::{RandomIntegerIdProvider, RandomStringIdProvider}, + traits::IdProvider, + }, + server::middleware::rpc::RpcServiceBuilder, }; -pub use middleware::{MetricsLayer, RpcMetrics}; +pub use middleware::{MetricsLayer, RateLimitLayer, RpcMetrics}; const MEGABYTE: u32 = 1024 * 1024; @@ -79,12 +81,26 @@ pub struct Config<'a, M: Send + Sync + 'static> { pub id_provider: Option>, /// Tokio runtime handle. pub tokio_handle: tokio::runtime::Handle, + /// Rate limit calls per minute. + pub rate_limit: Option, +} + +#[derive(Debug, Clone)] +struct PerConnection { + methods: Methods, + stop_handle: StopHandle, + metrics: Option, + tokio_handle: tokio::runtime::Handle, + service_builder: TowerServiceBuilder, } /// Start RPC server listening on given address. -pub async fn start_server( +pub async fn start_server( config: Config<'_, M>, -) -> Result> { +) -> Result> +where + M: Send + Sync, +{ let Config { addrs, cors, @@ -97,6 +113,7 @@ pub async fn start_server( id_provider, tokio_handle, rpc_api, + rate_limit, } = config; let std_listener = TcpListener::bind(addrs.as_slice()).await?.into_std()?; @@ -153,7 +170,13 @@ pub async fn start_server( let transport_label = if is_websocket { "ws" } else { "http" }; let metrics = metrics.map(|m| MetricsLayer::new(m, transport_label)); - let rpc_middleware = RpcServiceBuilder::new().option_layer(metrics.clone()); + let rate_limit = rate_limit.map(|r| RateLimitLayer::per_minute(r)); + + // NOTE: The metrics needs to run first to include rate-limited calls in the + // metrics. + let rpc_middleware = + RpcServiceBuilder::new().option_layer(metrics.clone()).option_layer(rate_limit); + let mut svc = service_builder.set_rpc_middleware(rpc_middleware).build(methods, stop_handle); @@ -245,12 +268,3 @@ fn format_cors(maybe_cors: Option<&Vec>) -> String { format!("{:?}", ["*"]) } } - -#[derive(Clone)] -struct PerConnection { - methods: Methods, - stop_handle: StopHandle, - metrics: Option, - tokio_handle: tokio::runtime::Handle, - service_builder: TowerServiceBuilder, -} diff --git a/substrate/client/rpc-servers/src/middleware/mod.rs b/substrate/client/rpc-servers/src/middleware/mod.rs index 1c1930582441b..cac516913d327 100644 --- a/substrate/client/rpc-servers/src/middleware/mod.rs +++ b/substrate/client/rpc-servers/src/middleware/mod.rs @@ -18,6 +18,10 @@ //! JSON-RPC specific middleware. +/// Grafana metrics middleware. pub mod metrics; +/// Rate limit middleware. +pub mod rate_limit; pub use metrics::*; +pub use rate_limit::*; diff --git a/substrate/client/rpc-servers/src/middleware/rate_limit.rs b/substrate/client/rpc-servers/src/middleware/rate_limit.rs new file mode 100644 index 0000000000000..cdcc3ebf66f7d --- /dev/null +++ b/substrate/client/rpc-servers/src/middleware/rate_limit.rs @@ -0,0 +1,107 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! RPC rate limiting middleware. + +use std::{num::NonZeroU32, sync::Arc, time::Duration}; + +use futures::future::{BoxFuture, FutureExt}; +use governor::{ + clock::{Clock, DefaultClock, QuantaClock}, + middleware::NoOpMiddleware, + state::{InMemoryState, NotKeyed}, + Jitter, +}; +use jsonrpsee::{ + server::middleware::rpc::RpcServiceT, + types::{ErrorObject, Id, Request}, + MethodResponse, +}; + +type RateLimitInner = governor::RateLimiter; + +const MAX_JITTER: Duration = Duration::from_millis(50); +const MAX_RETRIES: usize = 10; + +/// JSON-RPC rate limit middleware layer. +#[derive(Debug, Clone)] +pub struct RateLimitLayer(governor::Quota); + +impl RateLimitLayer { + /// Create new rate limit enforced per minute. + pub fn per_minute(n: NonZeroU32) -> Self { + Self(governor::Quota::per_minute(n)) + } +} + +/// JSON-RPC rate limit middleware +pub struct RateLimit { + service: S, + rate_limit: Arc, + clock: QuantaClock, +} + +impl tower::Layer for RateLimitLayer { + type Service = RateLimit; + + fn layer(&self, service: S) -> Self::Service { + let clock = QuantaClock::default(); + RateLimit { + service, + rate_limit: Arc::new(RateLimitInner::direct_with_clock(self.0, &clock)), + clock, + } + } +} + +impl<'a, S> RpcServiceT<'a> for RateLimit +where + S: Send + Sync + RpcServiceT<'a> + Clone + 'static, +{ + type Future = BoxFuture<'a, MethodResponse>; + + fn call(&self, req: Request<'a>) -> Self::Future { + let service = self.service.clone(); + let rate_limit = self.rate_limit.clone(); + let clock = self.clock.clone(); + + async move { + let mut attempts = 0; + let jitter = Jitter::up_to(MAX_JITTER); + + loop { + if attempts >= MAX_RETRIES { + break reject_too_many_calls(req.id); + } + + if let Err(rejected) = rate_limit.check() { + tokio::time::sleep(jitter + rejected.wait_time_from(clock.now())).await; + } else { + break service.call(req).await; + } + + attempts += 1; + } + } + .boxed() + } +} + +fn reject_too_many_calls(id: Id) -> MethodResponse { + MethodResponse::error(id, ErrorObject::owned(-32999, "RPC rate limit exceeded", None::<()>)) +} diff --git a/substrate/client/service/src/config.rs b/substrate/client/service/src/config.rs index 3e68f5b58defc..74c5dd775b0ee 100644 --- a/substrate/client/service/src/config.rs +++ b/substrate/client/service/src/config.rs @@ -39,6 +39,7 @@ use sp_core::crypto::SecretString; use std::{ io, iter, net::SocketAddr, + num::NonZeroU32, path::{Path, PathBuf}, }; use tempfile::TempDir; @@ -102,6 +103,8 @@ pub struct Configuration { pub rpc_port: u16, /// The number of messages the JSON-RPC server is allowed to keep in memory. pub rpc_message_buffer_capacity: u32, + /// RPC rate limit per minute. + pub rpc_rate_limit: Option, /// Prometheus endpoint configuration. `None` if disabled. pub prometheus_config: Option, /// Telemetry service URL. `None` if disabled. diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index 1fbfd14c3beba..d0adf4f7d5c56 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -403,6 +403,7 @@ where id_provider: rpc_id_provider, cors: config.rpc_cors.as_ref(), tokio_handle: config.tokio_handle.clone(), + rate_limit: config.rpc_rate_limit, }; // TODO: https://github.com/paritytech/substrate/issues/13773 diff --git a/substrate/client/service/test/src/lib.rs b/substrate/client/service/test/src/lib.rs index 9b88300bf5304..6148bb05fcfbf 100644 --- a/substrate/client/service/test/src/lib.rs +++ b/substrate/client/service/test/src/lib.rs @@ -254,6 +254,7 @@ fn node_config< rpc_max_subs_per_conn: Default::default(), rpc_port: 9944, rpc_message_buffer_capacity: Default::default(), + rpc_rate_limit: None, prometheus_config: None, telemetry_endpoints: None, default_heap_pages: None,