Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statement-distribution: prep for re-enabling #4431

Merged
merged 8 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,6 @@ use polkadot_primitives::{CandidateHash, CompactStatement, Hash, ValidatorIndex}
use crate::LOG_TARGET;
use std::collections::{HashMap, HashSet};

#[derive(Hash, PartialEq, Eq)]
struct ValidStatementManifest {
remote: ValidatorIndex,
originator: ValidatorIndex,
candidate_hash: CandidateHash,
}

// A piece of knowledge about a candidate
#[derive(Hash, Clone, PartialEq, Eq)]
enum Knowledge {
Expand Down
147 changes: 54 additions & 93 deletions polkadot/node/network/statement-distribution/src/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use futures::{
use std::{
collections::{
hash_map::{Entry, HashMap},
BTreeSet, HashSet,
HashSet,
},
time::{Duration, Instant},
};
Expand Down Expand Up @@ -156,6 +156,7 @@ struct PerRelayParentState {
seconding_limit: usize,
session: SessionIndex,
groups_per_para: HashMap<ParaId, Vec<GroupIndex>>,
disabled_validators: HashSet<ValidatorIndex>,
}

impl PerRelayParentState {
Expand All @@ -166,6 +167,17 @@ impl PerRelayParentState {
fn active_validator_state_mut(&mut self) -> Option<&mut ActiveValidatorState> {
self.local_validator.as_mut().and_then(|local| local.active.as_mut())
}

/// Returns `true` if the given validator is disabled in the context of the relay parent.
pub fn is_disabled(&self, validator_index: &ValidatorIndex) -> bool {
self.disabled_validators.contains(validator_index)
}

/// A convenience function to generate a disabled bitmask for the given backing group.
/// The output bits are set to `true` for validators that are disabled.
pub fn disabled_bitmask(&self, group: &[ValidatorIndex]) -> BitVec<u8, Lsb0> {
BitVec::from_iter(group.iter().map(|v| self.is_disabled(v)))
}
}

// per-relay-parent local validator state.
Expand Down Expand Up @@ -206,8 +218,6 @@ struct PerSessionState {
// getting the topology from the gossip-support subsystem
grid_view: Option<grid::SessionTopologyView>,
local_validator: Option<LocalValidatorIndex>,
// We store the latest state here based on union of leaves.
disabled_validators: BTreeSet<ValidatorIndex>,
}

impl PerSessionState {
Expand All @@ -224,16 +234,7 @@ impl PerSessionState {
)
.map(|(_, index)| LocalValidatorIndex::Active(index));

let disabled_validators = BTreeSet::new();

PerSessionState {
session_info,
groups,
authority_lookup,
grid_view: None,
local_validator,
disabled_validators,
}
PerSessionState { session_info, groups, authority_lookup, grid_view: None, local_validator }
}

fn supply_topology(
Expand Down Expand Up @@ -269,33 +270,6 @@ impl PerSessionState {
fn is_not_validator(&self) -> bool {
self.grid_view.is_some() && self.local_validator.is_none()
}

/// A convenience function to generate a disabled bitmask for the given backing group.
/// The output bits are set to `true` for validators that are disabled.
/// Returns `None` if the group index is out of bounds.
pub fn disabled_bitmask(&self, group: GroupIndex) -> Option<BitVec<u8, Lsb0>> {
let group = self.groups.get(group)?;
let mask = BitVec::from_iter(group.iter().map(|v| self.is_disabled(v)));
Some(mask)
}

/// Returns `true` if the given validator is disabled in the current session.
pub fn is_disabled(&self, validator_index: &ValidatorIndex) -> bool {
self.disabled_validators.contains(validator_index)
}

/// Extend the list of disabled validators.
pub fn extend_disabled_validators(
&mut self,
disabled: impl IntoIterator<Item = ValidatorIndex>,
) {
self.disabled_validators.extend(disabled);
}

/// Clear the list of disabled validators.
pub fn clear_disabled_validators(&mut self) {
self.disabled_validators.clear();
}
}

pub(crate) struct State {
Expand Down Expand Up @@ -582,19 +556,16 @@ pub(crate) async fn handle_active_leaves_update<Context>(
let new_relay_parents =
state.implicit_view.all_allowed_relay_parents().cloned().collect::<Vec<_>>();

// We clear the list of disabled validators to reset it properly based on union of leaves.
let mut cleared_disabled_validators: BTreeSet<SessionIndex> = BTreeSet::new();

for new_relay_parent in new_relay_parents.iter().cloned() {
// Even if we processed this relay parent before, we need to fetch the list of disabled
// validators based on union of active leaves.
let disabled_validators =
let disabled_validators: HashSet<_> =
polkadot_node_subsystem_util::vstaging::get_disabled_validators_with_fallback(
ctx.sender(),
new_relay_parent,
)
.await
.map_err(JfyiError::FetchDisabledValidators)?;
.map_err(JfyiError::FetchDisabledValidators)?
.into_iter()
.collect();

let session_index = polkadot_node_subsystem_util::request_session_index_for_child(
new_relay_parent,
Expand Down Expand Up @@ -644,10 +615,6 @@ pub(crate) async fn handle_active_leaves_update<Context>(
.get_mut(&session_index)
.expect("either existed or just inserted; qed");

if cleared_disabled_validators.insert(session_index) {
per_session.clear_disabled_validators();
}

if !disabled_validators.is_empty() {
gum::debug!(
target: LOG_TARGET,
Expand All @@ -656,8 +623,6 @@ pub(crate) async fn handle_active_leaves_update<Context>(
?disabled_validators,
"Disabled validators detected"
);

per_session.extend_disabled_validators(disabled_validators);
}

if state.per_relay_parent.contains_key(&new_relay_parent) {
Expand Down Expand Up @@ -723,6 +688,7 @@ pub(crate) async fn handle_active_leaves_update<Context>(
seconding_limit,
session: session_index,
groups_per_para,
disabled_validators,
},
);
}
Expand Down Expand Up @@ -1581,6 +1547,17 @@ async fn handle_incoming_statement<Context>(
};
let session_info = &per_session.session_info;

if per_relay_parent.is_disabled(&statement.unchecked_validator_index()) {
gum::debug!(
target: LOG_TARGET,
?relay_parent,
validator_index = ?statement.unchecked_validator_index(),
"Ignoring a statement from disabled validator."
);
modify_reputation(reputation, ctx.sender(), peer, COST_DISABLED_VALIDATOR).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general we want it for sure. I if it poses network fragmentation risks on different node version interacting.

Thinking in context of only a fraction of the network updating. Those are node changes mostly so very likely it will go like that.

Ths change means that new nodes will be generally considering themselves disabled later - only when it gets to the relay parent rather then leaf. So consider validator A that was disabled in some leaf. Validator A updated to new node with relay parent disabling. He got disabled in one of the leafs already but A does not yet see it as it hasn't yet been in the relay parent. That means he might be sending statements and old nodes that consider him disabled will rep punish him. Potential fragmentation risk?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might not be enough rep damages to be a problem but just highlighting the possibility

return
}

let local_validator = match per_relay_parent.local_validator.as_mut() {
None => {
// we shouldn't be receiving statements unless we're a validator
Expand Down Expand Up @@ -1614,17 +1591,6 @@ async fn handle_incoming_statement<Context>(
},
};

if per_session.is_disabled(&statement.unchecked_validator_index()) {
gum::debug!(
target: LOG_TARGET,
?relay_parent,
validator_index = ?statement.unchecked_validator_index(),
"Ignoring a statement from disabled validator."
);
modify_reputation(reputation, ctx.sender(), peer, COST_DISABLED_VALIDATOR).await;
return
}

let (active, cluster_sender_index) = {
// This block of code only returns `Some` when both the originator and
// the sending peer are in the cluster.
Expand Down Expand Up @@ -2379,21 +2345,18 @@ async fn handle_incoming_manifest_common<'a, Context>(
Some(s) => s,
};

let local_validator = match relay_parent_state.local_validator.as_mut() {
None => {
if per_session.is_not_validator() {
modify_reputation(
reputation,
ctx.sender(),
peer,
COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE,
)
.await;
}
return None
},
Some(x) => x,
};
if relay_parent_state.local_validator.is_none() {
if per_session.is_not_validator() {
modify_reputation(
reputation,
ctx.sender(),
peer,
COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE,
)
.await;
}
return None
}

let Some(expected_groups) = relay_parent_state.groups_per_para.get(&para_id) else {
modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
Expand Down Expand Up @@ -2436,10 +2399,13 @@ async fn handle_incoming_manifest_common<'a, Context>(
let claimed_parent_hash = manifest_summary.claimed_parent_hash;

// Ignore votes from disabled validators when counting towards the threshold.
let disabled_mask = per_session.disabled_bitmask(group_index).unwrap_or_default();
let group = per_session.groups.get(group_index).unwrap_or(&[]);
let disabled_mask = relay_parent_state.disabled_bitmask(group);
manifest_summary.statement_knowledge.mask_seconded(&disabled_mask);
manifest_summary.statement_knowledge.mask_valid(&disabled_mask);

let local_validator = relay_parent_state.local_validator.as_mut().expect("checked above; qed");

let acknowledge = match local_validator.grid_tracker.import_manifest(
grid_topology,
&per_session.groups,
Expand Down Expand Up @@ -3018,9 +2984,7 @@ pub(crate) async fn dispatch_requests<Context>(ctx: &mut Context, state: &mut St
}

// Add disabled validators to the unwanted mask.
let disabled_mask = per_session
.disabled_bitmask(group_index)
.expect("group existence checked above; qed");
let disabled_mask = relay_parent_state.disabled_bitmask(group);
unwanted_mask.seconded_in_group |= &disabled_mask;
unwanted_mask.validated_in_group |= &disabled_mask;

Expand Down Expand Up @@ -3111,9 +3075,7 @@ pub(crate) async fn handle_response<Context>(
Some(g) => g,
};

let disabled_mask = per_session
.disabled_bitmask(group_index)
.expect("group_index checked above; qed");
let disabled_mask = relay_parent_state.disabled_bitmask(group);

let res = response.validate_response(
&mut state.request_manager,
Expand Down Expand Up @@ -3258,7 +3220,7 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {
Some(s) => s,
};

let local_validator = match relay_parent_state.local_validator.as_mut() {
let local_validator = match relay_parent_state.local_validator.as_ref() {
None => return,
Some(s) => s,
};
Expand Down Expand Up @@ -3332,16 +3294,15 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {

// Transform mask with 'OR' semantics into one with 'AND' semantics for the API used
// below.
let mut and_mask = StatementFilter {
let and_mask = StatementFilter {
seconded_in_group: !mask.seconded_in_group.clone(),
validated_in_group: !mask.validated_in_group.clone(),
};

// Ignore disabled validators from the latest state when sending the response.
let disabled_mask =
per_session.disabled_bitmask(group_index).expect("group existence checked; qed");
and_mask.mask_seconded(&disabled_mask);
and_mask.mask_valid(&disabled_mask);
let local_validator = match relay_parent_state.local_validator.as_mut() {
None => return,
Some(s) => s,
};

let mut sent_filter = StatementFilter::blank(group_size);
let statements: Vec<_> = relay_parent_state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ struct TestLeaf {
parent_hash: Hash,
session: SessionIndex,
availability_cores: Vec<CoreState>,
disabled_validators: Vec<ValidatorIndex>,
pub disabled_validators: Vec<ValidatorIndex>,
para_data: Vec<(ParaId, PerParaData)>,
minimum_backing_votes: u32,
}
Expand Down
Loading
Loading