Skip to content

Commit

Permalink
allow skip header
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
  • Loading branch information
BusyJay committed Aug 19, 2021
1 parent f6bd9b0 commit 339c834
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 199 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,6 @@ test_util = { path = "components/test_util", default-features = false }
tokio = { version = "0.2", features = ["macros", "rt-threaded", "time"] }
zipf = "6.1.0"

[build-dependencies]
example_plugin = { path = "components/test_coprocessor_plugin/example_plugin" }

[patch.crates-io]
# TODO: remove this when new raft-rs is published.
raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false }
Expand Down
54 changes: 0 additions & 54 deletions build.rs

This file was deleted.

1 change: 1 addition & 0 deletions components/cdc/test.log
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2021/08/19 16:41:41.446 lib.rs:512: [INFO] environment variable is present, GRPC_POLL_STRATEGY: epollex
4 changes: 4 additions & 0 deletions components/raftstore/src/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ pub struct Config {
pub apply_yield_duration: ReadableDuration,
#[config(skip)]
pub disable_kv_wal: bool,
pub enable_propose_batch: bool,
pub skip_header: bool,

// Deprecated! These configuration has been moved to Coprocessor.
// They are preserved for compatibility check.
Expand Down Expand Up @@ -261,6 +263,8 @@ impl Default for Config {
dev_assert: false,
apply_yield_duration: ReadableDuration::millis(500),
disable_kv_wal: false,
enable_propose_batch: true,
skip_header: false,

// They are preserved for compatibility check.
region_max_size: ReadableSize(0),
Expand Down
10 changes: 6 additions & 4 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1351,10 +1351,12 @@ where
ctx: &mut ApplyContext<EK, W>,
req: &RaftCmdRequest,
) -> Result<(RaftCmdResponse, ApplyResult<EK::Snapshot>)> {
// Include region for epoch not match after merge may cause key not in range.
let include_region =
req.get_header().get_region_epoch().get_version() >= self.last_merge_version;
check_region_epoch(req, &self.region, include_region)?;
if req.has_header() {
// Include region for epoch not match after merge may cause key not in range.
let include_region =
req.get_header().get_region_epoch().get_version() >= self.last_merge_version;
check_region_epoch(req, &self.region, include_region)?;
}
if req.has_admin_request() {
self.exec_admin_cmd(ctx, req)
} else {
Expand Down
14 changes: 9 additions & 5 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,14 +550,18 @@ where
continue;
}

let req_size = cmd.request.compute_size();
if self.fsm.batch_req_builder.can_batch(&cmd.request, req_size) {
self.fsm.batch_req_builder.add(cmd, req_size);
if self.fsm.batch_req_builder.should_finish() {
if self.ctx.cfg.enable_propose_batch {
let req_size = cmd.request.compute_size();
if self.fsm.batch_req_builder.can_batch(&cmd.request, req_size) {
self.fsm.batch_req_builder.add(cmd, req_size);
if self.fsm.batch_req_builder.should_finish() {
self.propose_batch_raft_command();
}
} else {
self.propose_batch_raft_command();
self.propose_raft_command(cmd.request, cmd.callback)
}
} else {
self.propose_batch_raft_command();
self.propose_raft_command(cmd.request, cmd.callback)
}
}
Expand Down
5 changes: 5 additions & 0 deletions components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3030,6 +3030,7 @@ where
}

poll_ctx.raft_metrics.propose.normal += 1;
let mut skip_header_check = false;

if self.has_applied_to_current_term() {
// Only when applied index's term is equal to current leader's term, the information
Expand All @@ -3040,6 +3041,7 @@ where
{
return Ok(Either::Right(index));
}
skip_header_check = poll_ctx.cfg.skip_header;
} else if req.has_admin_request() {
// The admin request is rejected because it may need to update epoch checker which
// introduces an uncertainty and may breaks the correctness of epoch checker.
Expand All @@ -3066,6 +3068,9 @@ where
}
};

if skip_header_check && req.get_header().get_uuid().is_empty() {
req.take_header();
}
let data = req.write_to_bytes()?;

// TODO: use local histogram metrics
Expand Down
132 changes: 0 additions & 132 deletions src/coprocessor_v2/plugin_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,135 +445,3 @@ fn is_library_file<P: AsRef<Path>>(path: P) -> bool {
fn is_library_file<P: AsRef<Path>>(path: P) -> bool {
path.as_ref().extension() == Some(OsStr::new("dll"))
}

#[cfg(test)]
mod tests {
use super::*;
use coprocessor_plugin_api::util::pkgname_to_libname;
use std::sync::Once;

static INIT: Once = Once::new();
static EXAMPLE_PLUGIN: &[u8] = include_bytes!(env!("CARGO_DYLIB_FILE_EXAMPLE_PLUGIN"));

fn initialize_library() -> PathBuf {
let lib_path = std::env::temp_dir().join(&pkgname_to_libname("example-plugin"));
INIT.call_once(|| {
std::fs::write(&lib_path, EXAMPLE_PLUGIN).unwrap();
});
lib_path
}

#[test]
fn load_plugin() {
let library_path = initialize_library();

let loaded_plugin = unsafe { LoadedPlugin::new(&library_path).unwrap() };

assert_eq!(loaded_plugin.name(), "example_plugin");
assert_eq!(loaded_plugin.version(), &Version::parse("0.1.0").unwrap());
}

#[test]
fn registry_load_and_get_plugin() {
let library_path = initialize_library();

let registry = PluginRegistry::new();
let plugin_name = registry.load_plugin(&library_path).unwrap();

let plugin = registry.get_plugin(&plugin_name).unwrap();

assert_eq!(plugin.name(), "example_plugin");
assert_eq!(registry.loaded_plugin_names(), vec!["example_plugin"]);
assert_eq!(
registry.get_path_for_plugin("example_plugin").unwrap(),
library_path.as_os_str()
);
}

#[test]
fn update_plugin_path() {
let library_path = initialize_library();

let library_path_2 = library_path
.parent()
.unwrap()
.join(pkgname_to_libname("example-plugin-2"));

let registry = PluginRegistry::new();
let plugin_name = registry.load_plugin(&library_path).unwrap();

assert_eq!(
registry.get_path_for_plugin(&plugin_name).unwrap(),
library_path.as_os_str()
);

registry.update_plugin_path(&plugin_name, &library_path_2);

assert_eq!(
registry.get_path_for_plugin(&plugin_name).unwrap(),
library_path_2.into_os_string()
);
}

#[test]
fn registry_unload_plugin() {
let library_path = initialize_library();

let registry = PluginRegistry::new();

let plugin_name = registry.load_plugin(&library_path).unwrap();

assert!(registry.get_plugin(&plugin_name).is_some());

registry.unload_plugin(&plugin_name);

assert!(registry.get_plugin(&plugin_name).is_none());
assert_eq!(registry.loaded_plugin_names().len(), 0);
}

#[test]
fn plugin_registry_hot_reloading() {
let original_library_path = initialize_library();

let coprocessor_dir = std::env::temp_dir().join("coprocessors");
let library_path = coprocessor_dir.join(pkgname_to_libname("example-plugin"));
let library_path_2 = coprocessor_dir.join(pkgname_to_libname("example-plugin-2"));
let plugin_name = "example_plugin";

// Make the coprocessor directory is empty.
std::fs::create_dir_all(&coprocessor_dir).unwrap();
std::fs::remove_dir_all(&coprocessor_dir).unwrap();

let mut registry = PluginRegistry::new();
registry.start_hot_reloading(&coprocessor_dir).unwrap();

// trigger loading
std::fs::copy(&original_library_path, &library_path).unwrap();
// fs watcher detects changes in every 3 seconds, therefore, wait 4 seconds so as to make sure the watcher is triggered.
std::thread::sleep(Duration::from_secs(4));

assert!(registry.get_plugin(plugin_name).is_some());
assert_eq!(
&PathBuf::from(registry.get_path_for_plugin(plugin_name).unwrap()),
&library_path
);

// trigger rename
std::fs::rename(&library_path, &library_path_2).unwrap();
// fs watcher detects changes in every 3 seconds, therefore, wait 4 seconds so as to make sure the watcher is triggered.
std::thread::sleep(Duration::from_secs(4));

assert!(registry.get_plugin(plugin_name).is_some());
assert_eq!(
&PathBuf::from(registry.get_path_for_plugin(plugin_name).unwrap()),
&library_path_2
);

// trigger unloading
std::fs::remove_file(&library_path_2).unwrap();
// fs watcher detects changes in every 3 seconds, therefore, wait 4 seconds so as to make sure the watcher is triggered.
std::thread::sleep(Duration::from_secs(4));

assert!(registry.get_plugin(plugin_name).is_none());
}
}

0 comments on commit 339c834

Please sign in to comment.