Skip to content

Commit

Permalink
merge pingcap/release-3.1 until (tikv#6737)
Browse files Browse the repository at this point in the history
* raftstore/hibernate: wake up for more cases (tikv#6672)
* raftstore/hibernate: wake up on proposal (tikv#6736)
Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
  • Loading branch information
BusyJay authored and solotzg committed Feb 29, 2020
1 parent 715e73d commit 014edf3
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 7 deletions.
18 changes: 18 additions & 0 deletions components/test_raftstore/src/transport_simulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,3 +735,21 @@ impl Filter for LeaseReadFilter {
Ok(())
}
}

#[derive(Clone)]
pub struct DropMessageFilter {
ty: MessageType,
}

impl DropMessageFilter {
pub fn new(ty: MessageType) -> DropMessageFilter {
DropMessageFilter { ty }
}
}

impl Filter for DropMessageFilter {
fn before(&self, msgs: &mut Vec<RaftMessage>) -> Result<()> {
msgs.retain(|m| m.get_message().get_msg_type() != self.ty);
Ok(())
}
}
7 changes: 7 additions & 0 deletions components/test_raftstore/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,13 @@ pub fn create_test_engine(
(engines, path)
}

pub fn configure_for_hibernate<T: Simulator>(cluster: &mut Cluster<T>) {
// Uses long check interval to make leader keep sleeping during tests.
cluster.cfg.raft_store.abnormal_leader_missing_duration = ReadableDuration::secs(20);
cluster.cfg.raft_store.max_leader_missing_duration = ReadableDuration::secs(40);
cluster.cfg.raft_store.peer_stale_state_check_interval = ReadableDuration::secs(10);
}

pub fn configure_for_snapshot<T: Simulator>(cluster: &mut Cluster<T>) {
// Truncate the log quickly so that we can force sending snapshot.
cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(20);
Expand Down
12 changes: 9 additions & 3 deletions src/raftstore/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,10 +839,13 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
let from_peer_id = msg.get_from_peer().get_id();
self.fsm.peer.insert_peer_cache(msg.take_from_peer());
self.fsm.peer.step(self.ctx, msg.take_message())?;
if self.fsm.peer.should_wake_up {
self.reset_raft_tick(GroupState::Ordered);
}

if self.fsm.peer.any_new_peer_catch_up(from_peer_id) {
self.fsm.peer.heartbeat_pd(self.ctx);
self.register_raft_base_tick();
self.reset_raft_tick(GroupState::Ordered);
}

self.fsm.has_ready = true;
Expand All @@ -863,6 +866,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
fn reset_raft_tick(&mut self, state: GroupState) {
self.fsm.group_state = state;
self.fsm.missing_ticks = 0;
self.fsm.peer.should_wake_up = false;
self.register_raft_base_tick();
}

Expand Down Expand Up @@ -2289,8 +2293,10 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
bind_term(&mut resp, term);
if self.fsm.peer.propose(self.ctx, cb, msg, resp) {
self.fsm.has_ready = true;
self.fsm.group_state = GroupState::Ordered;
self.register_raft_base_tick();
}

if self.fsm.peer.should_wake_up {
self.reset_raft_tick(GroupState::Ordered);
}

self.register_pd_heartbeat_tick();
Expand Down
28 changes: 24 additions & 4 deletions src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ pub struct Peer {

/// If it fails to send messages to leader.
pub leader_unreachable: bool,
/// Indicates whether the peer should be woken up.
pub should_wake_up: bool,
/// Whether this peer is destroyed asynchronously.
/// If it's true when merging, its data in storeMeta will be removed early by the target peer
pub pending_remove: bool,
Expand Down Expand Up @@ -291,6 +293,7 @@ impl Peer {
compaction_declined_bytes: 0,
leader_unreachable: false,
pending_remove: false,
should_wake_up: false,
pending_merge_state: None,
last_committed_prepare_merge_idx: 0,
leader_missing_time: Some(Instant::now()),
Expand Down Expand Up @@ -524,14 +527,20 @@ impl Peer {
return res;
}
}
if self.raft_group.raft.pending_read_count() > 0 {
return res;
}
if self.raft_group.raft.lead_transferee.is_some() {
return res;
}
// Unapplied entries can change the configuration of the group.
res.up_to_date = self.get_store().applied_index() == last_index;
res
}

pub fn check_after_tick(&self, state: GroupState, res: CheckTickResult) -> bool {
if res.leader {
res.up_to_date && self.is_leader() && self.raft_group.raft.pending_read_count() == 0
res.up_to_date && self.is_leader()
} else {
// If follower keeps receiving data from leader, then it's safe to stop
// ticking, as leader will make sure it has the latest logs.
Expand All @@ -541,6 +550,8 @@ impl Peer {
&& self.raft_group.raft.leader_id != raft::INVALID_ID
&& self.raft_group.raft.raft_log.last_term() == self.raft_group.raft.term
&& !self.has_unresolved_reads()
// If it becomes leader, the stats is not valid anymore.
&& !self.is_leader()
}
}

Expand Down Expand Up @@ -717,7 +728,8 @@ impl Peer {
if msg_type == MessageType::MsgReadIndex && expected_term == self.raft_group.raft.term {
// If the leader hasn't committed any entries in its term, it can't response read only
// requests. Please also take a look at raft-rs.
if let LeaseState::Valid = self.inspect_lease() {
let state = self.inspect_lease();
if let LeaseState::Valid = state {
let mut resp = eraftpb::Message::default();
resp.set_msg_type(MessageType::MsgReadIndexResp);
resp.to = m.from;
Expand All @@ -727,6 +739,7 @@ impl Peer {
self.pending_messages.push(resp);
return Ok(());
}
self.should_wake_up = state == LeaseState::Expired;
}

if msg_type == MessageType::MsgTransferLeader {
Expand Down Expand Up @@ -1605,6 +1618,7 @@ impl Peer {
// possible.
self.raft_group.skip_bcast_commit(false);
}
self.should_wake_up = true;
let meta = ProposalMeta {
index: idx,
term: self.term(),
Expand Down Expand Up @@ -1669,7 +1683,7 @@ impl Peer {
/// need to be up to date for now. If 'allow_remove_leader' is false then
/// the peer to be removed should not be the leader.
fn check_conf_change<T, C>(
&self,
&mut self,
ctx: &mut PollContext<T, C>,
cmd: &RaftCmdRequest,
) -> Result<()> {
Expand Down Expand Up @@ -1744,6 +1758,8 @@ impl Peer {
"healthy" => healthy,
"quorum_after_change" => quorum_after_change,
);
// Waking it up to replicate logs to candidate.
self.should_wake_up = true;
Err(box_err!(
"unsafe to perform conf change {:?}, total {}, healthy {}, quorum after \
change {}",
Expand Down Expand Up @@ -1981,6 +1997,7 @@ impl Peer {

let read = ReadIndexRequest::with_command(id, req, cb, renew_lease_time);
self.pending_reads.push_back(read, self.is_leader());
self.should_wake_up = true;

debug!(
"request to get a read index";
Expand Down Expand Up @@ -2183,7 +2200,10 @@ impl Peer {
"last_index" => self.get_store().last_index(),
);
}
None => self.transfer_leader(&from),
None => {
self.transfer_leader(&from);
self.should_wake_up = true;
}
}
return;
}
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/raftstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod test_compact_after_delete;
mod test_compact_lock_cf;
mod test_compact_log;
mod test_conf_change;
mod test_hibernate;
mod test_lease_read;
mod test_merge;
mod test_multi;
Expand Down
Loading

0 comments on commit 014edf3

Please sign in to comment.