From b25a972d34383617aefea6d8cd2af48d8835eac6 Mon Sep 17 00:00:00 2001 From: kennytm Date: Sat, 27 Apr 2019 02:32:36 +0800 Subject: [PATCH] import: added test case to ensure the fix in #4566 works Signed-off-by: kennytm --- src/import/import.rs | 40 ++++++++++++++++++++++++++++++++++++-- src/import/test_helpers.rs | 20 ++++++++++++++++++- 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/src/import/import.rs b/src/import/import.rs index 2cf7ff11695..83f85327df9 100644 --- a/src/import/import.rs +++ b/src/import/import.rs @@ -21,7 +21,10 @@ use super::stream::*; use super::{Config, Error, Result}; const MAX_RETRY_TIMES: u64 = 5; -const RETRY_INTERVAL_SECS: u64 = 3; +#[cfg(not(test))] +const RETRY_INTERVAL: Duration = Duration::from_secs(3); +#[cfg(test)] +const RETRY_INTERVAL: Duration = Duration::from_millis(10); const STORE_UNAVAILABLE_WAIT_INTERVAL_MILLIS: u64 = 20000; /// ImportJob is responsible for importing data stored in an engine to a cluster. @@ -290,7 +293,7 @@ impl ImportSSTJob { for i in 0..MAX_RETRY_TIMES { if i != 0 { - thread::sleep(Duration::from_secs(RETRY_INTERVAL_SECS)); + thread::sleep(RETRY_INTERVAL); } let range = self.sst.meta.get_range().clone(); @@ -444,3 +447,36 @@ impl ImportSSTJob { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::import::test_helpers::*; + use kvproto::import_kvpb::{Mutation, Mutation_OP, WriteBatch}; + + #[test] + fn test_import_failure() { + let (_dir, engine) = new_engine("test_import_failure"); + let mut client = MockClient::new(); + client.add_region_range(b"a", b"z"); + + let mut wb = WriteBatch::new(); + let mut mutation = Mutation::new(); + mutation.set_op(Mutation_OP::Put); + mutation.set_key(b"key".to_vec()); + mutation.set_value(b"value".to_vec()); + wb.mut_mutations().push(mutation); + wb.set_commit_ts(1); + engine.write(wb).unwrap(); + engine.flush(true).unwrap(); + + let cfg = Config::default(); + let mut job = ImportJob::new(cfg, client, engine); + + job.client.set_upload_sst_successful(false); + job.run().unwrap_err(); + + job.client.set_upload_sst_successful(true); + job.run().unwrap(); + } +} diff --git a/src/import/test_helpers.rs b/src/import/test_helpers.rs index 93ab5ce96ef..1f295567a6c 100644 --- a/src/import/test_helpers.rs +++ b/src/import/test_helpers.rs @@ -22,7 +22,7 @@ use tikv_util::security::SecurityConfig; use super::client::*; use super::common::*; -use super::Result; +use super::{Error, Result}; pub fn calc_data_crc32(data: &[u8]) -> u32 { let mut digest = crc32::Digest::new(crc32::IEEE); @@ -82,6 +82,7 @@ pub struct MockClient { counter: Arc, regions: Arc>>, scatter_regions: Arc>>, + is_upload_sst_successful: bool, } impl MockClient { @@ -90,6 +91,7 @@ impl MockClient { counter: Arc::new(AtomicUsize::new(1)), regions: Arc::new(Mutex::new(HashMap::new())), scatter_regions: Arc::new(Mutex::new(HashMap::new())), + is_upload_sst_successful: true, } } @@ -114,6 +116,10 @@ impl MockClient { let regions = self.scatter_regions.lock().unwrap(); regions.get(&id).map(|r| RegionInfo::new(r.clone(), None)) } + + pub fn set_upload_sst_successful(&mut self, success: bool) { + self.is_upload_sst_successful = success; + } } impl ImportClient for MockClient { @@ -176,4 +182,16 @@ impl ImportClient for MockClient { fn is_space_enough(&self, _: u64, _: u64) -> Result { Ok(true) } + + fn upload_sst(&self, _: u64, _: UploadStream) -> Result { + if self.is_upload_sst_successful { + Ok(UploadResponse::new()) + } else { + Err(Error::ImportSSTJobFailed("mock failure".to_string())) + } + } + + fn ingest_sst(&self, _: u64, _: IngestRequest) -> Result { + Ok(IngestResponse::new()) + } }