Skip to content

Commit

Permalink
import: added test case to ensure the fix in tikv#4566 works
Browse files Browse the repository at this point in the history
Signed-off-by: kennytm <kennytm@gmail.com>
  • Loading branch information
kennytm committed Apr 26, 2019
1 parent 48c0995 commit 01d3cbe
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 3 deletions.
40 changes: 38 additions & 2 deletions src/import/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use super::stream::*;
use super::{Config, Error, Result};

const MAX_RETRY_TIMES: u64 = 5;
const RETRY_INTERVAL_SECS: u64 = 3;
#[cfg(not(test))]
const RETRY_INTERVAL: Duration = Duration::from_secs(3);
#[cfg(test)]
const RETRY_INTERVAL: Duration = Duration::from_millis(10);
const STORE_UNAVAILABLE_WAIT_INTERVAL_MILLIS: u64 = 20000;

/// ImportJob is responsible for importing data stored in an engine to a cluster.
Expand Down Expand Up @@ -290,7 +293,7 @@ impl<Client: ImportClient> ImportSSTJob<Client> {

for i in 0..MAX_RETRY_TIMES {
if i != 0 {
thread::sleep(Duration::from_secs(RETRY_INTERVAL_SECS));
thread::sleep(RETRY_INTERVAL);
}

let range = self.sst.meta.get_range().clone();
Expand Down Expand Up @@ -444,3 +447,36 @@ impl<Client: ImportClient> ImportSSTJob<Client> {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use kvproto::import_kvpb::{WriteBatch, Mutation, Mutation_OP};
use crate::import::test_helpers::*;

#[test]
fn test_import_failure() {
let (_dir, engine) = new_engine("test_import_failure");
let mut client = MockClient::new();
client.add_region_range(b"a", b"z");

let mut wb = WriteBatch::new();
let mut mutation = Mutation::new();
mutation.set_op(Mutation_OP::Put);
mutation.set_key(b"key".to_vec());
mutation.set_value(b"value".to_vec());
wb.mut_mutations().push(mutation);
wb.set_commit_ts(1);
engine.write(wb).unwrap();
engine.flush(true).unwrap();

let cfg = Config::default();
let mut job = ImportJob::new(cfg, client, engine);

job.client.set_upload_sst_successful(false);
job.run().unwrap_err();

job.client.set_upload_sst_successful(true);
job.run().unwrap();
}
}
20 changes: 19 additions & 1 deletion src/import/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tikv_util::security::SecurityConfig;

use super::client::*;
use super::common::*;
use super::Result;
use super::{Error, Result};

pub fn calc_data_crc32(data: &[u8]) -> u32 {
let mut digest = crc32::Digest::new(crc32::IEEE);
Expand Down Expand Up @@ -82,6 +82,7 @@ pub struct MockClient {
counter: Arc<AtomicUsize>,
regions: Arc<Mutex<HashMap<u64, Region>>>,
scatter_regions: Arc<Mutex<HashMap<u64, Region>>>,
is_upload_sst_successful: bool,
}

impl MockClient {
Expand All @@ -90,6 +91,7 @@ impl MockClient {
counter: Arc::new(AtomicUsize::new(1)),
regions: Arc::new(Mutex::new(HashMap::new())),
scatter_regions: Arc::new(Mutex::new(HashMap::new())),
is_upload_sst_successful: true,
}
}

Expand All @@ -114,6 +116,10 @@ impl MockClient {
let regions = self.scatter_regions.lock().unwrap();
regions.get(&id).map(|r| RegionInfo::new(r.clone(), None))
}

pub fn set_upload_sst_successful(&mut self, success: bool) {
self.is_upload_sst_successful = success;
}
}

impl ImportClient for MockClient {
Expand Down Expand Up @@ -176,4 +182,16 @@ impl ImportClient for MockClient {
fn is_space_enough(&self, _: u64, _: u64) -> Result<bool> {
Ok(true)
}

fn upload_sst(&self, _: u64, _: UploadStream) -> Result<UploadResponse> {
if self.is_upload_sst_successful {
Ok(UploadResponse::new())
} else {
Err(Error::ImportSSTJobFailed("mock failure".to_string()))
}
}

fn ingest_sst(&self, _: u64, _: IngestRequest) -> Result<IngestResponse> {
Ok(IngestResponse::new())
}
}

0 comments on commit 01d3cbe

Please sign in to comment.