diff --git a/Cargo.lock b/Cargo.lock index d653deedd88..af03469ecae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2332,7 +2332,7 @@ dependencies = [ [[package]] name = "librocksdb_sys" version = "0.1.0" -source = "git+https://github.com/tikv/rust-rocksdb.git?branch=tikv-5.0#651a5c0d17662a0d95c76c18f2bb46036f6c501d" +source = "git+https://github.com/tonyxuqqi/rust-rocksdb.git?branch=tikv-5.0#854677c2ccb09f0dfa889e9d1bf4195f3d5ddcb5" dependencies = [ "bindgen", "bzip2-sys", @@ -2351,7 +2351,7 @@ dependencies = [ [[package]] name = "libtitan_sys" version = "0.0.1" -source = "git+https://github.com/tikv/rust-rocksdb.git?branch=tikv-5.0#651a5c0d17662a0d95c76c18f2bb46036f6c501d" +source = "git+https://github.com/tonyxuqqi/rust-rocksdb.git?branch=tikv-5.0#854677c2ccb09f0dfa889e9d1bf4195f3d5ddcb5" dependencies = [ "bzip2-sys", "cc", @@ -4068,7 +4068,7 @@ checksum = "cabe4fa914dec5870285fa7f71f602645da47c486e68486d2b4ceb4a343e90ac" [[package]] name = "rocksdb" version = "0.3.0" -source = "git+https://github.com/tikv/rust-rocksdb.git?branch=tikv-5.0#651a5c0d17662a0d95c76c18f2bb46036f6c501d" +source = "git+https://github.com/tonyxuqqi/rust-rocksdb.git?branch=tikv-5.0#854677c2ccb09f0dfa889e9d1bf4195f3d5ddcb5" dependencies = [ "libc 0.2.86", "librocksdb_sys", @@ -5445,7 +5445,6 @@ dependencies = [ "engine_traits", "engine_traits_tests", "error_code", - "example_plugin", "fail", "file_system", "fs2", diff --git a/Cargo.toml b/Cargo.toml index 00c47cc20a6..5004cb6f227 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -233,9 +233,6 @@ test_util = { path = "components/test_util", default-features = false } tokio = { version = "0.2", features = ["macros", "rt-threaded", "time"] } zipf = "6.1.0" -[build-dependencies] -example_plugin = { path = "components/test_coprocessor_plugin/example_plugin" } - [patch.crates-io] # TODO: remove this when new raft-rs is published. raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false } diff --git a/build.rs b/build.rs deleted file mode 100644 index e0cdfda7cfa..00000000000 --- a/build.rs +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. - -use std::env; -use std::path::Path; - -fn main() { - locate_coprocessor_plugins(&["example-plugin"]); -} - -/// Locates compiled coprocessor plugins and makes their paths available via environment variables. -/// -/// This is a bit hacky, but can be removed as soon as RFC -/// is available. -/// -/// Why do we need this? -/// This is because we want to use some coprocessor tests during tests. We can't just add them as a -/// `dev-dependency`, because of how the CI works. Thus, we want to include them in our tests with -/// `include_bytes!()`. However, we don't really know where the artifacts are located, so we need to -/// find them with this function and expose the locations via environment variables. -/// -/// Coprocessor plugins need to be added as a `build-dependency` in `Cargo.toml` for TiKV. -fn locate_coprocessor_plugins(plugin_names: &[&str]) { - let out_dir = env::var_os("OUT_DIR").unwrap(); - let build_dir = Path::new(&out_dir) - .parent() - .unwrap() - .parent() - .unwrap() - .parent() - .unwrap(); - - for plugin_name in plugin_names { - let plugin_path = build_dir.join(pkgname_to_libname(plugin_name)); - println!("cargo:rerun-if-changed={}", plugin_path.display()); - println!( - "cargo:rustc-env=CARGO_DYLIB_FILE_{}={}", - plugin_name.to_string().replace("-", "_").to_uppercase(), - plugin_path.display() - ); - } -} - -/// Converts a Rust `dylib` crate name to the platform specific library name for the compiled -/// artifact. -pub fn pkgname_to_libname(pkgname: &str) -> String { - let pkgname = pkgname.to_string().replace("-", "_"); - if cfg!(target_os = "windows") { - format!("{}.dll", pkgname) - } else if cfg!(target_os = "macos") { - format!("lib{}.dylib", pkgname) - } else { - format!("lib{}.so", pkgname) - } -} diff --git a/components/cdc/test.log b/components/cdc/test.log new file mode 100644 index 00000000000..e0b318cb27b --- /dev/null +++ b/components/cdc/test.log @@ -0,0 +1 @@ +2021/08/19 16:41:41.446 lib.rs:512: [INFO] environment variable is present, GRPC_POLL_STRATEGY: epollex diff --git a/components/engine_panic/src/write_batch.rs b/components/engine_panic/src/write_batch.rs index 6d37b7ad4a0..f0c422cb5dc 100644 --- a/components/engine_panic/src/write_batch.rs +++ b/components/engine_panic/src/write_batch.rs @@ -19,6 +19,14 @@ impl WriteBatchExt for PanicEngine { fn write_batch_with_cap(&self, cap: usize) -> Self::WriteBatch { panic!() } + + fn write_batch_with_cap_and_max_keys( + &self, + cap: usize, + max_batch_key_size: usize, + ) -> Self::WriteBatch { + panic!() + } } pub struct PanicWriteBatch; @@ -28,6 +36,10 @@ impl WriteBatch for PanicWriteBatch { panic!() } + fn with_capacity_and_max_entries(_: &PanicEngine, _: usize, _: usize) -> Self { + panic!() + } + fn write_opt(&self, _: &WriteOptions) -> Result<()> { panic!() } diff --git a/components/engine_rocks/Cargo.toml b/components/engine_rocks/Cargo.toml index 32cdfcbddf1..6e5058b5f09 100644 --- a/components/engine_rocks/Cargo.toml +++ b/components/engine_rocks/Cargo.toml @@ -70,7 +70,7 @@ protobuf = "2" fail = "0.4" [dependencies.rocksdb] -git = "https://github.com/tikv/rust-rocksdb.git" +git = "https://github.com/tonyxuqqi/rust-rocksdb.git" package = "rocksdb" branch = "tikv-5.0" features = ["encryption", "static_libcpp"] diff --git a/components/engine_rocks/src/options.rs b/components/engine_rocks/src/options.rs index 2fdc62742cd..3f783325ef1 100644 --- a/components/engine_rocks/src/options.rs +++ b/components/engine_rocks/src/options.rs @@ -17,6 +17,7 @@ impl From for RocksReadOptions { fn from(opts: engine_traits::ReadOptions) -> Self { let mut r = RawReadOptions::default(); r.fill_cache(opts.fill_cache()); + r.set_read_tier(opts.read_tier() as i32); RocksReadOptions(r) } } diff --git a/components/engine_rocks/src/raft_engine.rs b/components/engine_rocks/src/raft_engine.rs index 231e50e8690..98c4691ec80 100644 --- a/components/engine_rocks/src/raft_engine.rs +++ b/components/engine_rocks/src/raft_engine.rs @@ -104,7 +104,11 @@ impl RaftEngine for RocksEngine { type LogBatch = RocksWriteBatch; fn log_batch(&self, capacity: usize) -> Self::LogBatch { - RocksWriteBatch::with_capacity(self.as_inner().clone(), capacity) + RocksWriteBatch::with_capacity( + self.as_inner().clone(), + capacity, + Self::WRITE_BATCH_MAX_KEYS, + ) } fn sync(&self) -> Result<()> { @@ -161,7 +165,7 @@ impl RaftEngine for RocksEngine { } fn append(&self, raft_group_id: u64, entries: Vec) -> Result { - let mut wb = RocksWriteBatch::new(self.as_inner().clone()); + let mut wb = RocksWriteBatch::new(self.as_inner().clone(), Self::WRITE_BATCH_MAX_KEYS); let buf = Vec::with_capacity(1024); wb.append_impl(raft_group_id, &entries, buf)?; self.consume(&mut wb, false) diff --git a/components/engine_rocks/src/write_batch.rs b/components/engine_rocks/src/write_batch.rs index d04e03d42df..f7100925fd0 100644 --- a/components/engine_rocks/src/write_batch.rs +++ b/components/engine_rocks/src/write_batch.rs @@ -7,6 +7,7 @@ use crate::options::RocksWriteOptions; use crate::util::get_cf_handle; use engine_traits::{self, Error, Mutable, Result, WriteBatchExt, WriteOptions}; use rocksdb::{Writable, WriteBatch as RawWriteBatch, DB}; +use tikv_util::debug; const WRITE_BATCH_MAX_BATCH: usize = 16; const WRITE_BATCH_LIMIT: usize = 16; @@ -23,24 +24,38 @@ impl WriteBatchExt for RocksEngine { } fn write_batch(&self) -> Self::WriteBatch { - Self::WriteBatch::new(Arc::clone(&self.as_inner())) + Self::WriteBatch::new(Arc::clone(&self.as_inner()), Self::WRITE_BATCH_MAX_KEYS) } fn write_batch_with_cap(&self, cap: usize) -> Self::WriteBatch { - Self::WriteBatch::with_capacity(Arc::clone(&self.as_inner()), cap) + Self::WriteBatch::with_capacity( + Arc::clone(&self.as_inner()), + cap, + Self::WRITE_BATCH_MAX_KEYS, + ) + } + + fn write_batch_with_cap_and_max_keys( + &self, + cap: usize, + max_batch_key_size: usize, + ) -> Self::WriteBatch { + Self::WriteBatch::with_capacity(Arc::clone(&self.as_inner()), cap, max_batch_key_size) } } pub struct RocksWriteBatch { db: Arc, wb: RawWriteBatch, + max_batch_key_size: usize, } impl RocksWriteBatch { - pub fn new(db: Arc) -> RocksWriteBatch { + pub fn new(db: Arc, max_batch_key_size: usize) -> RocksWriteBatch { RocksWriteBatch { db, wb: RawWriteBatch::default(), + max_batch_key_size, } } @@ -48,17 +63,25 @@ impl RocksWriteBatch { &self.wb } - pub fn with_capacity(db: Arc, cap: usize) -> RocksWriteBatch { + pub fn with_capacity(db: Arc, cap: usize, max_batch_key_size: usize) -> RocksWriteBatch { let wb = if cap == 0 { RawWriteBatch::default() } else { RawWriteBatch::with_capacity(cap) }; - RocksWriteBatch { db, wb } + RocksWriteBatch { + db, + wb, + max_batch_key_size, + } } - pub fn from_raw(db: Arc, wb: RawWriteBatch) -> RocksWriteBatch { - RocksWriteBatch { db, wb } + pub fn from_raw(db: Arc, wb: RawWriteBatch, max_batch_key_size: usize) -> RocksWriteBatch { + RocksWriteBatch { + db, + wb, + max_batch_key_size, + } } pub fn get_db(&self) -> &DB { @@ -71,6 +94,14 @@ impl engine_traits::WriteBatch for RocksWriteBatch { e.write_batch_with_cap(cap) } + fn with_capacity_and_max_entries( + e: &RocksEngine, + cap: usize, + max_entries: usize, + ) -> RocksWriteBatch { + e.write_batch_with_cap_and_max_keys(cap, max_entries) + } + fn write_opt(&self, opts: &WriteOptions) -> Result<()> { let opt: RocksWriteOptions = opts.into(); self.get_db() @@ -91,7 +122,14 @@ impl engine_traits::WriteBatch for RocksWriteBatch { } fn should_write_to_engine(&self) -> bool { - self.wb.count() > RocksEngine::WRITE_BATCH_MAX_KEYS + if self.wb.count() > self.max_batch_key_size { + debug!( + "should_write_to_engine return true"; + "max_batch_key_size" => self.max_batch_key_size, + ); + return true; + } + return false; } fn clear(&mut self) { @@ -156,11 +194,17 @@ pub struct RocksWriteBatchVec { save_points: Vec, index: usize, cur_batch_size: usize, - batch_size_limit: usize, + batch_size_limit: usize, // the max size of the single batch + max_batch_count: usize, // the max total batch count } impl RocksWriteBatchVec { - pub fn new(db: Arc, batch_size_limit: usize, cap: usize) -> RocksWriteBatchVec { + pub fn new( + db: Arc, + batch_size_limit: usize, + max_batch_count: usize, + cap: usize, + ) -> RocksWriteBatchVec { let wb = RawWriteBatch::with_capacity(cap); RocksWriteBatchVec { db, @@ -169,6 +213,7 @@ impl RocksWriteBatchVec { index: 0, cur_batch_size: 0, batch_size_limit, + max_batch_count, } } @@ -200,7 +245,25 @@ impl RocksWriteBatchVec { impl engine_traits::WriteBatch for RocksWriteBatchVec { fn with_capacity(e: &RocksEngine, cap: usize) -> RocksWriteBatchVec { - RocksWriteBatchVec::new(e.as_inner().clone(), WRITE_BATCH_LIMIT, cap) + RocksWriteBatchVec::new( + e.as_inner().clone(), + WRITE_BATCH_LIMIT, + WRITE_BATCH_MAX_BATCH, + cap, + ) + } + + fn with_capacity_and_max_entries( + e: &RocksEngine, + cap: usize, + max_batch_count: usize, + ) -> RocksWriteBatchVec { + RocksWriteBatchVec::new( + e.as_inner().clone(), + WRITE_BATCH_LIMIT, + max_batch_count, + cap, + ) } fn write_opt(&self, opts: &WriteOptions) -> Result<()> { @@ -229,7 +292,7 @@ impl engine_traits::WriteBatch for RocksWriteBatchVec { } fn should_write_to_engine(&self) -> bool { - self.index >= WRITE_BATCH_MAX_BATCH + self.index >= self.max_batch_count } fn clear(&mut self) { diff --git a/components/engine_traits/src/options.rs b/components/engine_traits/src/options.rs index d8d2c864ec6..6ca72bb477b 100644 --- a/components/engine_traits/src/options.rs +++ b/components/engine_traits/src/options.rs @@ -2,9 +2,18 @@ use std::ops::Bound; use tikv_util::keybuilder::KeyBuilder; +#[repr(i32)] +#[derive(Clone, Copy)] +pub enum ReadTier { + ReadAllTier = 0, + BlockCacheTier = 1, + PersistedTier = 2, +} + #[derive(Clone)] pub struct ReadOptions { fill_cache: bool, + read_tier: ReadTier, } impl ReadOptions { @@ -21,11 +30,24 @@ impl ReadOptions { pub fn set_fill_cache(&mut self, v: bool) { self.fill_cache = v; } + + #[inline] + pub fn read_tier(&self) -> ReadTier { + self.read_tier + } + + #[inline] + pub fn set_read_tier(&mut self, v: ReadTier) { + self.read_tier = v; + } } impl Default for ReadOptions { fn default() -> ReadOptions { - ReadOptions { fill_cache: true } + ReadOptions { + fill_cache: true, + read_tier: ReadTier::ReadAllTier, // all tier + } } } diff --git a/components/engine_traits/src/write_batch.rs b/components/engine_traits/src/write_batch.rs index 0a49c0d7951..8f8db08116c 100644 --- a/components/engine_traits/src/write_batch.rs +++ b/components/engine_traits/src/write_batch.rs @@ -27,6 +27,11 @@ pub trait WriteBatchExt: Sized { fn write_batch(&self) -> Self::WriteBatch; fn write_batch_with_cap(&self, cap: usize) -> Self::WriteBatch; + fn write_batch_with_cap_and_max_keys( + &self, + cap: usize, + max_batch_key_size: usize, + ) -> Self::WriteBatch; } /// A trait implemented by WriteBatch @@ -83,6 +88,9 @@ pub trait WriteBatch: Mutable { /// Create a WriteBatch with a given command capacity fn with_capacity(e: &E, cap: usize) -> Self; + /// Create a WriteBatch with a given command capacity and max entries + fn with_capacity_and_max_entries(e: &E, cap: usize, max_entries: usize) -> Self; + /// Commit the WriteBatch to disk with the given options fn write_opt(&self, opts: &WriteOptions) -> Result<()>; diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index 75b30b79e43..c34a4f7db2a 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -176,6 +176,12 @@ pub struct Config { pub apply_yield_duration: ReadableDuration, #[config(skip)] pub disable_kv_wal: bool, + #[config(skip)] + pub max_batch_key_size: usize, + #[config(skip)] + pub max_batch_count: usize, + pub enable_propose_batch: bool, + pub skip_header: bool, // Deprecated! These configuration has been moved to Coprocessor. // They are preserved for compatibility check. @@ -261,6 +267,10 @@ impl Default for Config { dev_assert: false, apply_yield_duration: ReadableDuration::millis(500), disable_kv_wal: false, + max_batch_key_size: 256, + max_batch_count: 16, + enable_propose_batch: true, + skip_header: false, // They are preserved for compatibility check. region_max_size: ReadableSize(0), diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index b19766a86aa..4f2c484340b 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -375,12 +375,14 @@ where kv_wb_last_keys: u64, committed_count: usize, + write_count: usize, // Whether synchronize WAL is preferred. sync_log_hint: bool, // Whether to use the delete range API instead of deleting one by one. use_delete_range: bool, disable_kv_wal: bool, + max_batch_limit: usize, perf_context: EK::PerfContext, @@ -428,7 +430,13 @@ where ) -> ApplyContext { // If `enable_multi_batch_write` was set true, we create `RocksWriteBatchVec`. // Otherwise create `RocksWriteBatch`. - let kv_wb = W::with_capacity(&engine, DEFAULT_APPLY_WB_SIZE); + let max_batch_limit = if engine.support_write_batch_vec() { + cfg.max_batch_count + } else { + cfg.max_batch_key_size + }; + let kv_wb = + W::with_capacity_and_max_entries(&engine, DEFAULT_APPLY_WB_SIZE, max_batch_limit); ApplyContext { tag, @@ -447,6 +455,7 @@ where committed_count: 0, sync_log_hint: false, disable_kv_wal: cfg.disable_kv_wal, + max_batch_limit, exec_ctx: None, use_delete_range: cfg.use_delete_range, perf_context: engine.get_perf_context(cfg.perf_level, PerfContextKind::RaftstoreApply), @@ -457,6 +466,7 @@ where priority, yield_high_latency_operation: cfg.apply_batch_system.low_priority_pool_size > 0, pending_ssts: vec![], + write_count: 0, } } @@ -518,13 +528,18 @@ where self.kv_wb().write_opt(&write_opts).unwrap_or_else(|e| { panic!("failed to write to engine: {:?}", e); }); + self.write_count = self.write_count + 1; self.perf_context.report_metrics(); self.sync_log_hint = false; let data_size = self.kv_wb().data_size(); if data_size > APPLY_WB_SHRINK_SIZE { // Control the memory usage for the WriteBatch. Whether it's `RocksWriteBatch` or // `RocksWriteBatchVec` depends on the `enable_multi_batch_write` configuration. - self.kv_wb = W::with_capacity(&self.engine, DEFAULT_APPLY_WB_SIZE); + self.kv_wb = W::with_capacity_and_max_entries( + &self.engine, + DEFAULT_APPLY_WB_SIZE, + self.max_batch_limit, + ); } else { // Clear data, reuse the WriteBatch, this can reduce memory allocations and deallocations. self.kv_wb_mut().clear(); @@ -616,6 +631,7 @@ where let elapsed = t.saturating_elapsed(); STORE_APPLY_LOG_HISTOGRAM.observe(duration_to_sec(elapsed) as f64); + STORE_APPLY_LOG_WRITE_COUNT_HISTOGRAM.observe(self.write_count as f64); slow_log!( elapsed, @@ -624,6 +640,7 @@ where self.committed_count ); self.committed_count = 0; + self.write_count = 0; is_synced } } @@ -1351,10 +1368,12 @@ where ctx: &mut ApplyContext, req: &RaftCmdRequest, ) -> Result<(RaftCmdResponse, ApplyResult)> { - // Include region for epoch not match after merge may cause key not in range. - let include_region = - req.get_header().get_region_epoch().get_version() >= self.last_merge_version; - check_region_epoch(req, &self.region, include_region)?; + if req.has_header() { + // Include region for epoch not match after merge may cause key not in range. + let include_region = + req.get_header().get_region_epoch().get_version() >= self.last_merge_version; + check_region_epoch(req, &self.region, include_region)?; + } if req.has_admin_request() { self.exec_admin_cmd(ctx, req) } else { @@ -3148,6 +3167,7 @@ where ) { if apply_ctx.timer.is_none() { apply_ctx.timer = Some(Instant::now_coarse()); + apply_ctx.write_count = 0; } fail_point!("on_handle_apply_1003", self.delegate.id() == 1003, |_| {}); @@ -3281,6 +3301,7 @@ where if ctx.timer.is_none() { ctx.timer = Some(Instant::now_coarse()); + ctx.write_count = 0; } if !state.pending_entries.is_empty() { self.delegate @@ -3355,6 +3376,7 @@ where if need_sync { if apply_ctx.timer.is_none() { apply_ctx.timer = Some(Instant::now_coarse()); + apply_ctx.write_count = 0; } self.delegate.write_apply_state(apply_ctx.kv_wb_mut()); fail_point!( diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index 1cf927cdd02..76e2e2400d6 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -11,7 +11,7 @@ use batch_system::{BasicMailbox, Fsm}; use collections::HashMap; use engine_traits::CF_RAFT; use engine_traits::{ - Engines, KvEngine, RaftEngine, RaftLogBatch, SSTMetaInfo, WriteBatch, WriteBatchExt, + Engines, KvEngine, RaftEngine, RaftLogBatch, ReadOptions, ReadTier, SSTMetaInfo, WriteBatch, }; use error_code::ErrorCodeExt; use fail::fail_point; @@ -131,6 +131,9 @@ where max_inflight_msgs: usize, trace: PeerMemoryTrace, + + // read applied idx from sst for GC + check_truncated_idx_for_gc: bool, } pub struct BatchRaftCmdRequestBuilder @@ -231,6 +234,7 @@ where ), max_inflight_msgs: cfg.raft_max_inflight_msgs, trace: PeerMemoryTrace::default(), + check_truncated_idx_for_gc: cfg.disable_kv_wal, }), )) } @@ -276,6 +280,7 @@ where ), max_inflight_msgs: cfg.raft_max_inflight_msgs, trace: PeerMemoryTrace::default(), + check_truncated_idx_for_gc: cfg.disable_kv_wal, }), )) } @@ -387,14 +392,14 @@ where self.batch_req_size += req_size; } - fn should_finish(&self) -> bool { + fn should_finish(&self, max_batch_keys: usize) -> bool { if let Some(batch_req) = self.request.as_ref() { // Limit the size of batch request so that it will not exceed raft_entry_max_size after // adding header. if f64::from(self.batch_req_size) > self.raft_entry_max_size * 0.4 { return true; } - if batch_req.get_requests().len() > ::WRITE_BATCH_MAX_KEYS { + if batch_req.get_requests().len() > max_batch_keys { return true; } } @@ -550,14 +555,22 @@ where continue; } - let req_size = cmd.request.compute_size(); - if self.fsm.batch_req_builder.can_batch(&cmd.request, req_size) { - self.fsm.batch_req_builder.add(cmd, req_size); - if self.fsm.batch_req_builder.should_finish() { + if self.ctx.cfg.enable_propose_batch { + let req_size = cmd.request.compute_size(); + if self.fsm.batch_req_builder.can_batch(&cmd.request, req_size) { + self.fsm.batch_req_builder.add(cmd, req_size); + if self + .fsm + .batch_req_builder + .should_finish(self.ctx.cfg.max_batch_key_size) + { + self.propose_batch_raft_command(); + } + } else { self.propose_batch_raft_command(); + self.propose_raft_command(cmd.request, cmd.callback) } } else { - self.propose_batch_raft_command(); self.propose_raft_command(cmd.request, cmd.callback) } } @@ -2208,18 +2221,50 @@ where } } + fn get_flushed_truncated_idx(&mut self, region_id: u64) -> u64 { + let state_key = keys::apply_state_key(region_id); + let mut opts = ReadOptions::new(); + opts.set_read_tier(ReadTier::PersistedTier); + let value = self + .ctx + .engines + .kv + .get_value_cf_opt(&opts, CF_RAFT, &state_key) + .unwrap(); + if value.is_none() { + return 0; + } + let mut m = RaftApplyState::default(); + m.merge_from_bytes(&value.unwrap()).unwrap(); + cmp::min(m.get_truncated_state().get_index(), m.applied_index) + } + fn on_ready_compact_log(&mut self, first_index: u64, state: RaftTruncatedState) { let total_cnt = self.fsm.peer.last_applying_idx - first_index; + + let mut index = state.get_index(); + if self.fsm.check_truncated_idx_for_gc { + let last_truncated_idx = + self.get_flushed_truncated_idx(self.fsm.peer.get_store().get_region_id()); + if last_truncated_idx != 0 && last_truncated_idx - 1 < index { + debug!("on_ready_compact_log update index."; + "region" => self.fsm.peer.get_store().get_region_id(), + "original" => index, + "new value" => last_truncated_idx-1); + index = last_truncated_idx - 1; + } + } // the size of current CompactLog command can be ignored. let remain_cnt = self.fsm.peer.last_applying_idx - state.get_index() - 1; self.fsm.peer.raft_log_size_hint = self.fsm.peer.raft_log_size_hint * remain_cnt / total_cnt; - let compact_to = state.get_index() + 1; + let compact_to = index + 1; let task = RaftlogGcTask::gc( self.fsm.peer.get_store().get_region_id(), self.fsm.peer.last_compacted_idx, compact_to, ); + let start_idx = self.fsm.peer.last_compacted_idx; self.fsm.peer.last_compacted_idx = compact_to; self.fsm.peer.mut_store().compact_to(compact_to); if let Err(e) = self.ctx.raftlog_gc_scheduler.schedule(task) { @@ -2229,6 +2274,14 @@ where "peer_id" => self.fsm.peer_id(), "err" => %e, ); + } else { + debug!( + "successfully to schedule compact task"; + "region_id" => self.fsm.region_id(), + "peer_id" => self.fsm.peer_id(), + "start_idx" => start_idx, + "end_idx" => compact_to, + ); } } diff --git a/components/raftstore/src/store/metrics.rs b/components/raftstore/src/store/metrics.rs index 3882087b6e6..b6f2813e610 100644 --- a/components/raftstore/src/store/metrics.rs +++ b/components/raftstore/src/store/metrics.rs @@ -253,6 +253,13 @@ lazy_static! { exponential_buckets(0.0005, 2.0, 20).unwrap() ).unwrap(); + pub static ref STORE_APPLY_LOG_WRITE_COUNT_HISTOGRAM: Histogram = + register_histogram!( + "tikv_raftstore_apply_log_write_count", + "Bucketed histogram of peer applying log write count duration.", + exponential_buckets(0.0005, 2.0, 20).unwrap() + ).unwrap(); + pub static ref APPLY_TASK_WAIT_TIME_HISTOGRAM: Histogram = register_histogram!( "tikv_raftstore_apply_wait_time_duration_secs", diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index 6dbb913660f..cfb1bafcd53 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -3030,6 +3030,7 @@ where } poll_ctx.raft_metrics.propose.normal += 1; + let mut skip_header_check = false; if self.has_applied_to_current_term() { // Only when applied index's term is equal to current leader's term, the information @@ -3040,6 +3041,7 @@ where { return Ok(Either::Right(index)); } + skip_header_check = poll_ctx.cfg.skip_header; } else if req.has_admin_request() { // The admin request is rejected because it may need to update epoch checker which // introduces an uncertainty and may breaks the correctness of epoch checker. @@ -3066,6 +3068,9 @@ where } }; + if skip_header_check && req.get_header().get_uuid().is_empty() { + req.take_header(); + } let data = req.write_to_bytes()?; // TODO: use local histogram metrics diff --git a/metrics/grafana/tikv_details.json b/metrics/grafana/tikv_details.json index d36370fa2a8..21a5daa0ae9 100644 --- a/metrics/grafana/tikv_details.json +++ b/metrics/grafana/tikv_details.json @@ -8975,6 +8975,168 @@ "align": false, "alignLevel": null } + }, + { + "cards": { + "cardPadding": null, + "cardRound": null + }, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolateSpectral", + "exponent": 0.5, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": "${DS_TEST-CLUSTER}", + "description": "The rocksdb write count when Raft applies log", + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 29 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 51, + "legend": { + "show": false + }, + "links": [], + "reverseYBuckets": false, + "targets": [ + { + "expr": "sum(delta(tikv_raftstore_apply_log_write_count_bucket{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le)", + "format": "heatmap", + "intervalFactor": 2, + "legendFormat": "{{le}}", + "metric": "", + "refId": "A", + "step": 4 + } + ], + "timeFrom": null, + "timeShift": null, + "title": "Apply log RocksDB write count", + "tooltip": { + "show": true, + "showHistogram": false + }, + "type": "heatmap", + "xAxis": { + "show": true + }, + "xBucketNumber": null, + "xBucketSize": null, + "yAxis": { + "decimals": 0, + "format": "s", + "logBase": 1, + "max": null, + "min": null, + "show": true, + "splitFactor": null + }, + "yBucketBound": "upper", + "yBucketNumber": null, + "yBucketSize": null + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The rocksdb write count for Raft to apply logs per TiKV instance", + "editable": true, + "error": false, + "fill": 1, + "grid": {}, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 29 + }, + "id": 50, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "sort": "current", + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.99, sum(rate(tikv_raftstore_apply_log_write_count_bucket{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le, instance))", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": " {{instance}}", + "refId": "A", + "step": 4 + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "99% Apply log write count per server", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "repeat": null, diff --git a/src/coprocessor_v2/plugin_registry.rs b/src/coprocessor_v2/plugin_registry.rs index de34d57a89c..66b075f8151 100644 --- a/src/coprocessor_v2/plugin_registry.rs +++ b/src/coprocessor_v2/plugin_registry.rs @@ -445,135 +445,3 @@ fn is_library_file>(path: P) -> bool { fn is_library_file>(path: P) -> bool { path.as_ref().extension() == Some(OsStr::new("dll")) } - -#[cfg(test)] -mod tests { - use super::*; - use coprocessor_plugin_api::util::pkgname_to_libname; - use std::sync::Once; - - static INIT: Once = Once::new(); - static EXAMPLE_PLUGIN: &[u8] = include_bytes!(env!("CARGO_DYLIB_FILE_EXAMPLE_PLUGIN")); - - fn initialize_library() -> PathBuf { - let lib_path = std::env::temp_dir().join(&pkgname_to_libname("example-plugin")); - INIT.call_once(|| { - std::fs::write(&lib_path, EXAMPLE_PLUGIN).unwrap(); - }); - lib_path - } - - #[test] - fn load_plugin() { - let library_path = initialize_library(); - - let loaded_plugin = unsafe { LoadedPlugin::new(&library_path).unwrap() }; - - assert_eq!(loaded_plugin.name(), "example_plugin"); - assert_eq!(loaded_plugin.version(), &Version::parse("0.1.0").unwrap()); - } - - #[test] - fn registry_load_and_get_plugin() { - let library_path = initialize_library(); - - let registry = PluginRegistry::new(); - let plugin_name = registry.load_plugin(&library_path).unwrap(); - - let plugin = registry.get_plugin(&plugin_name).unwrap(); - - assert_eq!(plugin.name(), "example_plugin"); - assert_eq!(registry.loaded_plugin_names(), vec!["example_plugin"]); - assert_eq!( - registry.get_path_for_plugin("example_plugin").unwrap(), - library_path.as_os_str() - ); - } - - #[test] - fn update_plugin_path() { - let library_path = initialize_library(); - - let library_path_2 = library_path - .parent() - .unwrap() - .join(pkgname_to_libname("example-plugin-2")); - - let registry = PluginRegistry::new(); - let plugin_name = registry.load_plugin(&library_path).unwrap(); - - assert_eq!( - registry.get_path_for_plugin(&plugin_name).unwrap(), - library_path.as_os_str() - ); - - registry.update_plugin_path(&plugin_name, &library_path_2); - - assert_eq!( - registry.get_path_for_plugin(&plugin_name).unwrap(), - library_path_2.into_os_string() - ); - } - - #[test] - fn registry_unload_plugin() { - let library_path = initialize_library(); - - let registry = PluginRegistry::new(); - - let plugin_name = registry.load_plugin(&library_path).unwrap(); - - assert!(registry.get_plugin(&plugin_name).is_some()); - - registry.unload_plugin(&plugin_name); - - assert!(registry.get_plugin(&plugin_name).is_none()); - assert_eq!(registry.loaded_plugin_names().len(), 0); - } - - #[test] - fn plugin_registry_hot_reloading() { - let original_library_path = initialize_library(); - - let coprocessor_dir = std::env::temp_dir().join("coprocessors"); - let library_path = coprocessor_dir.join(pkgname_to_libname("example-plugin")); - let library_path_2 = coprocessor_dir.join(pkgname_to_libname("example-plugin-2")); - let plugin_name = "example_plugin"; - - // Make the coprocessor directory is empty. - std::fs::create_dir_all(&coprocessor_dir).unwrap(); - std::fs::remove_dir_all(&coprocessor_dir).unwrap(); - - let mut registry = PluginRegistry::new(); - registry.start_hot_reloading(&coprocessor_dir).unwrap(); - - // trigger loading - std::fs::copy(&original_library_path, &library_path).unwrap(); - // fs watcher detects changes in every 3 seconds, therefore, wait 4 seconds so as to make sure the watcher is triggered. - std::thread::sleep(Duration::from_secs(4)); - - assert!(registry.get_plugin(plugin_name).is_some()); - assert_eq!( - &PathBuf::from(registry.get_path_for_plugin(plugin_name).unwrap()), - &library_path - ); - - // trigger rename - std::fs::rename(&library_path, &library_path_2).unwrap(); - // fs watcher detects changes in every 3 seconds, therefore, wait 4 seconds so as to make sure the watcher is triggered. - std::thread::sleep(Duration::from_secs(4)); - - assert!(registry.get_plugin(plugin_name).is_some()); - assert_eq!( - &PathBuf::from(registry.get_path_for_plugin(plugin_name).unwrap()), - &library_path_2 - ); - - // trigger unloading - std::fs::remove_file(&library_path_2).unwrap(); - // fs watcher detects changes in every 3 seconds, therefore, wait 4 seconds so as to make sure the watcher is triggered. - std::thread::sleep(Duration::from_secs(4)); - - assert!(registry.get_plugin(plugin_name).is_none()); - } -} diff --git a/tests/integrations/config/mod.rs b/tests/integrations/config/mod.rs index 439f2506c69..b3d26c5793f 100644 --- a/tests/integrations/config/mod.rs +++ b/tests/integrations/config/mod.rs @@ -209,6 +209,10 @@ fn test_serde_custom_tikv_config() { dev_assert: true, apply_yield_duration: ReadableDuration::millis(333), perf_level: PerfLevel::EnableTime, + enable_propose_batch: true, + max_batch_key_size: 256, + skip_header: false, + max_batch_count: 16, }; value.pd = PdConfig::new(vec!["example.com:443".to_owned()]); let titan_cf_config = TitanCfConfig {