Skip to content

Commit

Permalink
cdc: limit scan speed (tikv#9983)
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
Jay Lee and ti-chi-bot committed May 10, 2021
1 parent 14e97c5 commit 563bcf3
Show file tree
Hide file tree
Showing 13 changed files with 274 additions and 105 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/cdc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ engine_rocks = { path = "../engine_rocks" }
engine_traits = { path = "../engine_traits" }
failure = "0.1"
futures = "0.1"
futures03 = { package = "futures", version = "0.3.1", features = ["compat"] }
grpcio = { version = "0.5", default-features = false, features = ["openssl-vendored"] }
kvproto = { git = "https://github.com/pingcap/kvproto.git", branch = "release-4.0", default-features = false }
pd_client = { path = "../pd_client" }
Expand Down
31 changes: 16 additions & 15 deletions components/cdc/src/delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,20 @@ impl Downstream {
pub fn sink_event(&self, mut event: Event) {
event.set_request_id(self.req_id);
if self.sink.is_none() {
info!("drop event, no sink";
"conn_id" => ?self.conn_id, "downstream_id" => ?self.id);
info!("cdc drop event, no sink";
"conn_id" => ?self.conn_id, "downstream_id" => ?self.id, "req_id" => self.req_id);
return;
}
let sink = self.sink.as_ref().unwrap();
if let Err(e) = sink.try_send(CdcEvent::Event(event)) {
match e {
crossbeam::channel::TrySendError::Disconnected(_) => {
debug!("send event failed, disconnected";
"conn_id" => ?self.conn_id, "downstream_id" => ?self.id);
debug!("cdc send event failed, disconnected";
"conn_id" => ?self.conn_id, "downstream_id" => ?self.id, "req_id" => self.req_id);
}
crossbeam::channel::TrySendError::Full(_) => {
info!("send event failed, full";
"conn_id" => ?self.conn_id, "downstream_id" => ?self.id);
info!("cdc send event failed, full";
"conn_id" => ?self.conn_id, "downstream_id" => ?self.id, "req_id" => self.req_id);
}
}
}
Expand Down Expand Up @@ -253,7 +253,7 @@ impl Delegate {
true, /* check_ver */
true, /* include_region */
) {
info!("fail to subscribe downstream";
info!("cdc fail to subscribe downstream";
"region_id" => region.get_id(),
"downstream_id" => ?downstream.get_id(),
"conn_id" => ?downstream.get_conn_id(),
Expand Down Expand Up @@ -348,7 +348,7 @@ impl Delegate {
// Stop observe further events.
self.enabled.store(false, Ordering::SeqCst);

info!("region met error";
info!("cdc met region error";
"region_id" => self.region_id, "error" => ?err);
let change_data_err = self.error_event(err);
for d in &self.downstreams {
Expand Down Expand Up @@ -402,24 +402,24 @@ impl Delegate {
}
}
self.resolver = Some(resolver);
info!("region is ready"; "region_id" => self.region_id);
info!("cdc region is ready"; "region_id" => self.region_id);
pending.take_downstreams()
}

/// Try advance and broadcast resolved ts.
pub fn on_min_ts(&mut self, min_ts: TimeStamp) -> Option<TimeStamp> {
if self.resolver.is_none() {
debug!("region resolver not ready";
debug!("cdc region resolver not ready";
"region_id" => self.region_id, "min_ts" => min_ts);
return None;
}
debug!("try to advance ts"; "region_id" => self.region_id, "min_ts" => min_ts);
debug!("cdc try to advance ts"; "region_id" => self.region_id, "min_ts" => min_ts);
let resolver = self.resolver.as_mut().unwrap();
let resolved_ts = match resolver.resolve(min_ts) {
Some(rts) => rts,
None => return None,
};
debug!("resolved ts updated";
debug!("cdc resolved ts updated";
"region_id" => self.region_id, "resolved_ts" => resolved_ts);
CDC_RESOLVED_TS_GAP_HISTOGRAM
.observe((min_ts.physical() - resolved_ts.physical()) as f64 / 1000f64);
Expand Down Expand Up @@ -465,7 +465,8 @@ impl Delegate {
let downstream = if let Some(d) = downstreams.iter().find(|d| d.id == downstream_id) {
d
} else {
warn!("downstream not found"; "downstream_id" => ?downstream_id, "region_id" => self.region_id);
warn!("cdc downstream not found";
"downstream_id" => ?downstream_id, "region_id" => self.region_id);
return;
};

Expand Down Expand Up @@ -735,7 +736,7 @@ fn decode_write(key: Vec<u8>, value: &[u8], row: &mut EventRow) -> bool {
WriteType::Delete => (EventRowOpType::Delete, EventLogType::Commit),
WriteType::Rollback => (EventRowOpType::Unknown, EventLogType::Rollback),
other => {
debug!("skip write record"; "write" => ?other, "key" => &log_wrappers::Value::key(&key));
debug!("cdc skip write record"; "write" => ?other, "key" => &log_wrappers::Value::key(&key));
return true;
}
};
Expand All @@ -762,7 +763,7 @@ fn decode_lock(key: Vec<u8>, lock: Lock, row: &mut EventRow) -> bool {
LockType::Put => EventRowOpType::Put,
LockType::Delete => EventRowOpType::Delete,
other => {
debug!("skip lock record";
debug!("cdc skip lock record";
"type" => ?other,
"start_ts" => ?lock.ts,
"key" => &log_wrappers::Value::key(&key),
Expand Down
Loading

0 comments on commit 563bcf3

Please sign in to comment.