diff --git a/Cargo.lock b/Cargo.lock index d653deedd88..60cfb4ea1c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5445,7 +5445,6 @@ dependencies = [ "engine_traits", "engine_traits_tests", "error_code", - "example_plugin", "fail", "file_system", "fs2", diff --git a/Cargo.toml b/Cargo.toml index 00c47cc20a6..5004cb6f227 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -233,9 +233,6 @@ test_util = { path = "components/test_util", default-features = false } tokio = { version = "0.2", features = ["macros", "rt-threaded", "time"] } zipf = "6.1.0" -[build-dependencies] -example_plugin = { path = "components/test_coprocessor_plugin/example_plugin" } - [patch.crates-io] # TODO: remove this when new raft-rs is published. raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false } diff --git a/build.rs b/build.rs deleted file mode 100644 index e0cdfda7cfa..00000000000 --- a/build.rs +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. - -use std::env; -use std::path::Path; - -fn main() { - locate_coprocessor_plugins(&["example-plugin"]); -} - -/// Locates compiled coprocessor plugins and makes their paths available via environment variables. -/// -/// This is a bit hacky, but can be removed as soon as RFC -/// is available. -/// -/// Why do we need this? -/// This is because we want to use some coprocessor tests during tests. We can't just add them as a -/// `dev-dependency`, because of how the CI works. Thus, we want to include them in our tests with -/// `include_bytes!()`. However, we don't really know where the artifacts are located, so we need to -/// find them with this function and expose the locations via environment variables. -/// -/// Coprocessor plugins need to be added as a `build-dependency` in `Cargo.toml` for TiKV. -fn locate_coprocessor_plugins(plugin_names: &[&str]) { - let out_dir = env::var_os("OUT_DIR").unwrap(); - let build_dir = Path::new(&out_dir) - .parent() - .unwrap() - .parent() - .unwrap() - .parent() - .unwrap(); - - for plugin_name in plugin_names { - let plugin_path = build_dir.join(pkgname_to_libname(plugin_name)); - println!("cargo:rerun-if-changed={}", plugin_path.display()); - println!( - "cargo:rustc-env=CARGO_DYLIB_FILE_{}={}", - plugin_name.to_string().replace("-", "_").to_uppercase(), - plugin_path.display() - ); - } -} - -/// Converts a Rust `dylib` crate name to the platform specific library name for the compiled -/// artifact. -pub fn pkgname_to_libname(pkgname: &str) -> String { - let pkgname = pkgname.to_string().replace("-", "_"); - if cfg!(target_os = "windows") { - format!("{}.dll", pkgname) - } else if cfg!(target_os = "macos") { - format!("lib{}.dylib", pkgname) - } else { - format!("lib{}.so", pkgname) - } -} diff --git a/components/cdc/test.log b/components/cdc/test.log new file mode 100644 index 00000000000..e0b318cb27b --- /dev/null +++ b/components/cdc/test.log @@ -0,0 +1 @@ +2021/08/19 16:41:41.446 lib.rs:512: [INFO] environment variable is present, GRPC_POLL_STRATEGY: epollex diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index 75b30b79e43..6469fe70754 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -176,6 +176,8 @@ pub struct Config { pub apply_yield_duration: ReadableDuration, #[config(skip)] pub disable_kv_wal: bool, + pub enable_propose_batch: bool, + pub skip_header: bool, // Deprecated! These configuration has been moved to Coprocessor. // They are preserved for compatibility check. @@ -261,6 +263,8 @@ impl Default for Config { dev_assert: false, apply_yield_duration: ReadableDuration::millis(500), disable_kv_wal: false, + enable_propose_batch: true, + skip_header: false, // They are preserved for compatibility check. region_max_size: ReadableSize(0), diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index b19766a86aa..ba897b774e3 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -1351,10 +1351,12 @@ where ctx: &mut ApplyContext, req: &RaftCmdRequest, ) -> Result<(RaftCmdResponse, ApplyResult)> { - // Include region for epoch not match after merge may cause key not in range. - let include_region = - req.get_header().get_region_epoch().get_version() >= self.last_merge_version; - check_region_epoch(req, &self.region, include_region)?; + if req.has_header() { + // Include region for epoch not match after merge may cause key not in range. + let include_region = + req.get_header().get_region_epoch().get_version() >= self.last_merge_version; + check_region_epoch(req, &self.region, include_region)?; + } if req.has_admin_request() { self.exec_admin_cmd(ctx, req) } else { diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index 1cf927cdd02..f3e7bea1822 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -550,14 +550,18 @@ where continue; } - let req_size = cmd.request.compute_size(); - if self.fsm.batch_req_builder.can_batch(&cmd.request, req_size) { - self.fsm.batch_req_builder.add(cmd, req_size); - if self.fsm.batch_req_builder.should_finish() { + if self.ctx.cfg.enable_propose_batch { + let req_size = cmd.request.compute_size(); + if self.fsm.batch_req_builder.can_batch(&cmd.request, req_size) { + self.fsm.batch_req_builder.add(cmd, req_size); + if self.fsm.batch_req_builder.should_finish() { + self.propose_batch_raft_command(); + } + } else { self.propose_batch_raft_command(); + self.propose_raft_command(cmd.request, cmd.callback) } } else { - self.propose_batch_raft_command(); self.propose_raft_command(cmd.request, cmd.callback) } } diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index 6dbb913660f..cfb1bafcd53 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -3030,6 +3030,7 @@ where } poll_ctx.raft_metrics.propose.normal += 1; + let mut skip_header_check = false; if self.has_applied_to_current_term() { // Only when applied index's term is equal to current leader's term, the information @@ -3040,6 +3041,7 @@ where { return Ok(Either::Right(index)); } + skip_header_check = poll_ctx.cfg.skip_header; } else if req.has_admin_request() { // The admin request is rejected because it may need to update epoch checker which // introduces an uncertainty and may breaks the correctness of epoch checker. @@ -3066,6 +3068,9 @@ where } }; + if skip_header_check && req.get_header().get_uuid().is_empty() { + req.take_header(); + } let data = req.write_to_bytes()?; // TODO: use local histogram metrics diff --git a/src/coprocessor_v2/plugin_registry.rs b/src/coprocessor_v2/plugin_registry.rs index de34d57a89c..66b075f8151 100644 --- a/src/coprocessor_v2/plugin_registry.rs +++ b/src/coprocessor_v2/plugin_registry.rs @@ -445,135 +445,3 @@ fn is_library_file>(path: P) -> bool { fn is_library_file>(path: P) -> bool { path.as_ref().extension() == Some(OsStr::new("dll")) } - -#[cfg(test)] -mod tests { - use super::*; - use coprocessor_plugin_api::util::pkgname_to_libname; - use std::sync::Once; - - static INIT: Once = Once::new(); - static EXAMPLE_PLUGIN: &[u8] = include_bytes!(env!("CARGO_DYLIB_FILE_EXAMPLE_PLUGIN")); - - fn initialize_library() -> PathBuf { - let lib_path = std::env::temp_dir().join(&pkgname_to_libname("example-plugin")); - INIT.call_once(|| { - std::fs::write(&lib_path, EXAMPLE_PLUGIN).unwrap(); - }); - lib_path - } - - #[test] - fn load_plugin() { - let library_path = initialize_library(); - - let loaded_plugin = unsafe { LoadedPlugin::new(&library_path).unwrap() }; - - assert_eq!(loaded_plugin.name(), "example_plugin"); - assert_eq!(loaded_plugin.version(), &Version::parse("0.1.0").unwrap()); - } - - #[test] - fn registry_load_and_get_plugin() { - let library_path = initialize_library(); - - let registry = PluginRegistry::new(); - let plugin_name = registry.load_plugin(&library_path).unwrap(); - - let plugin = registry.get_plugin(&plugin_name).unwrap(); - - assert_eq!(plugin.name(), "example_plugin"); - assert_eq!(registry.loaded_plugin_names(), vec!["example_plugin"]); - assert_eq!( - registry.get_path_for_plugin("example_plugin").unwrap(), - library_path.as_os_str() - ); - } - - #[test] - fn update_plugin_path() { - let library_path = initialize_library(); - - let library_path_2 = library_path - .parent() - .unwrap() - .join(pkgname_to_libname("example-plugin-2")); - - let registry = PluginRegistry::new(); - let plugin_name = registry.load_plugin(&library_path).unwrap(); - - assert_eq!( - registry.get_path_for_plugin(&plugin_name).unwrap(), - library_path.as_os_str() - ); - - registry.update_plugin_path(&plugin_name, &library_path_2); - - assert_eq!( - registry.get_path_for_plugin(&plugin_name).unwrap(), - library_path_2.into_os_string() - ); - } - - #[test] - fn registry_unload_plugin() { - let library_path = initialize_library(); - - let registry = PluginRegistry::new(); - - let plugin_name = registry.load_plugin(&library_path).unwrap(); - - assert!(registry.get_plugin(&plugin_name).is_some()); - - registry.unload_plugin(&plugin_name); - - assert!(registry.get_plugin(&plugin_name).is_none()); - assert_eq!(registry.loaded_plugin_names().len(), 0); - } - - #[test] - fn plugin_registry_hot_reloading() { - let original_library_path = initialize_library(); - - let coprocessor_dir = std::env::temp_dir().join("coprocessors"); - let library_path = coprocessor_dir.join(pkgname_to_libname("example-plugin")); - let library_path_2 = coprocessor_dir.join(pkgname_to_libname("example-plugin-2")); - let plugin_name = "example_plugin"; - - // Make the coprocessor directory is empty. - std::fs::create_dir_all(&coprocessor_dir).unwrap(); - std::fs::remove_dir_all(&coprocessor_dir).unwrap(); - - let mut registry = PluginRegistry::new(); - registry.start_hot_reloading(&coprocessor_dir).unwrap(); - - // trigger loading - std::fs::copy(&original_library_path, &library_path).unwrap(); - // fs watcher detects changes in every 3 seconds, therefore, wait 4 seconds so as to make sure the watcher is triggered. - std::thread::sleep(Duration::from_secs(4)); - - assert!(registry.get_plugin(plugin_name).is_some()); - assert_eq!( - &PathBuf::from(registry.get_path_for_plugin(plugin_name).unwrap()), - &library_path - ); - - // trigger rename - std::fs::rename(&library_path, &library_path_2).unwrap(); - // fs watcher detects changes in every 3 seconds, therefore, wait 4 seconds so as to make sure the watcher is triggered. - std::thread::sleep(Duration::from_secs(4)); - - assert!(registry.get_plugin(plugin_name).is_some()); - assert_eq!( - &PathBuf::from(registry.get_path_for_plugin(plugin_name).unwrap()), - &library_path_2 - ); - - // trigger unloading - std::fs::remove_file(&library_path_2).unwrap(); - // fs watcher detects changes in every 3 seconds, therefore, wait 4 seconds so as to make sure the watcher is triggered. - std::thread::sleep(Duration::from_secs(4)); - - assert!(registry.get_plugin(plugin_name).is_none()); - } -}