Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Flood protection for large statements. #2984

Merged
merged 4 commits into from
May 6, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 230 additions & 0 deletions node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ const VC_THRESHOLD: usize = 2;

const LOG_TARGET: &str = "parachain::statement-distribution";

/// Large statements should be rare.
const MAX_LARGE_STATEMENTS_PER_SENDER: usize = 20;

/// The statement distribution subsystem.
pub struct StatementDistribution {
/// Pointer to a keystore, which is required for determining this nodes validator index.
Expand Down Expand Up @@ -194,6 +197,29 @@ struct PeerRelayParentKnowledge {
seconded_counts: HashMap<ValidatorIndex, VcPerPeerTracker>,
/// How many statements we've received for each candidate that we're aware of.
received_message_count: HashMap<CandidateHash, usize>,


/// How many large statements this peer already sent us.
///
/// Flood protection for large statements is rather hard and as soon as we get
/// https://github.com/paritytech/polkadot/issues/2979 implemented also no longer necessary.
/// Reason: We keep messages around until we fetched the payload, but if a node makes up
/// statements and never provides the data, we will keep it around for the slot duration. Not
/// even signature checking would help, as the sender, if a validator, can just sign arbitrary
/// invalid statements and will not face any consequences as long as it won't provide the
/// payload.
///
/// Quick and temporary fix, only accept `MAX_LARGE_STATEMENTS_PER_SENDER` per connected node.
///
/// Large statements should be rare, if they were not, we would run into problems anyways, as
/// we would not be able to distribute them in a timely manner. Therefore
/// `MAX_LARGE_STATEMENTS_PER_SENDER` can be set to a relatively small number. It is also not
/// per candidate hash, but in total as candidate hashes can be made up, as illustrated above.
///
/// An attacker could still try to fill up our memory, by repeatedly disconnecting and
/// connecting again with new peer ids, but we assume that the resulting effective bandwidth
/// for such an attack would be too low.
large_statement_count: usize,
}

impl PeerRelayParentKnowledge {
Expand Down Expand Up @@ -318,6 +344,15 @@ impl PeerRelayParentKnowledge {
Ok(self.received_candidates.insert(candidate_hash.clone()))
}

/// Note a received large statement metadata.
fn receive_large_statement(&mut self) -> std::result::Result<(), Rep> {
if self.large_statement_count >= MAX_LARGE_STATEMENTS_PER_SENDER {
return Err(COST_APPARENT_FLOOD);
}
self.large_statement_count += 1;
Ok(())
}

/// This method does the same checks as `receive` without modifying the internal state.
/// Returns an error if the peer should not have sent us this message according to protocol
/// rules for flood protection.
Expand Down Expand Up @@ -458,6 +493,17 @@ impl PeerData {
.ok_or(COST_UNEXPECTED_STATEMENT)?
.check_can_receive(fingerprint, max_message_count)
}

/// Basic flood protection for large statements.
fn receive_large_statement(
&mut self,
relay_parent: &Hash,
) -> std::result::Result<(), Rep> {
self.view_knowledge
.get_mut(relay_parent)
.ok_or(COST_UNEXPECTED_STATEMENT)?
.receive_large_statement()
}
}

// A statement stored while a relay chain head is active.
Expand Down Expand Up @@ -1278,6 +1324,20 @@ async fn handle_incoming_message<'a>(
}
};

if let protocol_v1::StatementDistributionMessage::LargeStatement(_) = message {
if let Err(rep) = peer_data.receive_large_statement(&relay_parent) {
tracing::debug!(
target: LOG_TARGET,
?peer,
?message,
?rep,
"Unexpected large statement.",
);
report_peer(ctx, peer, rep).await;
return None;
}
}

let fingerprint = message.get_fingerprint();
let candidate_hash = fingerprint.0.candidate_hash().clone();
let handle_incoming_span = active_head.span.child("handle-incoming")
Expand Down Expand Up @@ -3471,6 +3531,176 @@ mod tests {
executor::block_on(future::join(test_fut, bg));
}

#[test]
fn peer_cant_flood_with_large_statements() {
sp_tracing::try_init_simple();
let hash_a = Hash::repeat_byte(1);

let candidate = {
let mut c = CommittedCandidateReceipt::default();
c.descriptor.relay_parent = hash_a;
c.descriptor.para_id = 1.into();
c.commitments.new_validation_code = Some(ValidationCode(vec![1,2,3]));
c
};

let peer_a = PeerId::random(); // Alice

let validators = vec![
Sr25519Keyring::Alice.pair(),
Sr25519Keyring::Bob.pair(),
Sr25519Keyring::Charlie.pair(),
// other group
Sr25519Keyring::Dave.pair(),
// We:
Sr25519Keyring::Ferdie.pair(),
];

let first_group = vec![0,1,2,4];
let session_info = make_session_info(
validators,
vec![first_group, vec![3]]
);

let session_index = 1;

let pool = sp_core::testing::TaskExecutor::new();
let (ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);

let bg = async move {
let s = StatementDistribution { metrics: Default::default(), keystore: make_ferdie_keystore()};
s.run(ctx).await.unwrap();
};

let (_, rx_reqs) = mpsc::channel(1);

let test_fut = async move {
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::StatementFetchingReceiver(rx_reqs)
}).await;

// register our active heads.
handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: vec![ActivatedLeaf {
hash: hash_a,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
}].into(),
deactivated: vec![].into(),
}))).await;

assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx))
)
if r == hash_a
=> {
let _ = tx.send(Ok(session_index));
}
);

assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionInfo(sess_index, tx))
)
if r == hash_a && sess_index == session_index
=> {
let _ = tx.send(Ok(Some(session_info)));
}
);

// notify of peers and view
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(
peer_a.clone(),
ObservedRole::Full,
Some(Sr25519Keyring::Alice.public().into())
)
)
}).await;

handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![hash_a])
)
}).await;

// receive a seconded statement from peer A.
let statement = {
let signing_context = SigningContext {
parent_hash: hash_a,
session_index,
};

let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let alice_public = CryptoStore::sr25519_generate_new(
&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Alice.to_seed())
).await.unwrap();

SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate.clone()),
&signing_context,
ValidatorIndex(0),
&alice_public.into(),
).await.ok().flatten().expect("should be signed")
};

let metadata =
protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone().into()).get_metadata();

for _ in 0..MAX_LARGE_STATEMENTS_PER_SENDER + 1 {
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
protocol_v1::StatementDistributionMessage::LargeStatement(metadata.clone()),
)
)
}).await;
}

// We should try to fetch the data:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetching(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
// Just drop request - should trigger error.
}
);

// Then we should punish peer:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(p, r)
) if p == peer_a && r == COST_APPARENT_FLOOD => {}
);

handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};

futures::pin_mut!(test_fut);
futures::pin_mut!(bg);

executor::block_on(future::join(test_fut, bg));
}

fn make_session_info(validators: Vec<Pair>, groups: Vec<Vec<u32>>) -> SessionInfo {

let validator_groups: Vec<Vec<ValidatorIndex>> = groups
Expand Down