Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Rocksdb Secondary Instance Api #384

Merged
merged 32 commits into from
May 1, 2020
Merged
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
8ee1dad
kvdb-rocksdb: update to new set_upper_bound API
ordian Apr 22, 2020
2d92d86
kvdb-rocksdb: update rocksdb to crates.io version
ordian Apr 22, 2020
0e0bca8
kvdb-rocksdb: update the changelog
ordian Apr 22, 2020
1731b14
Fix build? Set VM template.
dvdplm Apr 23, 2020
27d6768
Fix build? correct image name
dvdplm Apr 23, 2020
b5aa3ce
Fix build? Maybe it's 2019?
dvdplm Apr 23, 2020
ace87ee
appveyor: try release build
ordian Apr 23, 2020
080de4a
Revert "appveyor: try release build"
ordian Apr 23, 2020
f168c09
checkout https://github.com/rust-rocksdb/rust-rocksdb/pull/412
ordian Apr 23, 2020
c64a021
revert patch
ordian Apr 23, 2020
f1fead0
revert unrelated changes
ordian Apr 23, 2020
36f799f
Merge branch 'master' into ao-upgrade-rocksdb
ordian Apr 23, 2020
f1d94ea
add open as secondary rocksdb api
insipx Apr 28, 2020
b1e1a38
Update kvdb-rocksdb/src/lib.rs
insipx Apr 28, 2020
06df3fd
add more information to secondary mode comment
insipx Apr 28, 2020
faa9f6f
add function to catch up a secondary instance with a primary instance
insipx Apr 29, 2020
54f5cb8
one more doc comment for more clarity
insipx Apr 29, 2020
8c032f3
style fixes
insipx Apr 29, 2020
ccbd7d5
Update kvdb-rocksdb/src/lib.rs
insipx Apr 29, 2020
234baaa
Update kvdb-rocksdb/src/lib.rs
insipx Apr 29, 2020
66f2258
change name of `secondary_mode` option to `secondary`
insipx Apr 29, 2020
d31bdc9
Merge branch 'master' of github.com:paritytech/parity-common into ins…
insipx Apr 29, 2020
4c0aaee
Update kvdb-rocksdb/src/lib.rs
insipx Apr 29, 2020
1019484
fix some punctuation
insipx Apr 29, 2020
6a402f8
specify a different directory for secondary instance to store its logs
insipx Apr 30, 2020
d43daf6
Update kvdb-rocksdb/src/lib.rs
insipx Apr 30, 2020
78c1a05
remove catching up on primary db in test
insipx Apr 30, 2020
be980bf
doc comment fixes
insipx Apr 30, 2020
3636b97
remove wrong info about blocking primary instance
insipx Apr 30, 2020
d301420
more docs for catch-up-with-primary
insipx Apr 30, 2020
b5719da
grammar
insipx May 1, 2020
99bddd7
make `max_open_files` comment clearer
insipx May 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 138 additions & 15 deletions kvdb-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,17 @@ pub struct DatabaseConfig {
/// It can have a negative performance impact up to 10% according to
/// https://github.com/facebook/rocksdb/wiki/Statistics.
pub enable_statistics: bool,
/// Open the database as a secondary instance.
/// Specify a path for the secondary instance of the database.
/// Secondary instances are read-only and kept updated by tailing the rocksdb MANIFEST.
/// It is up to the user to call `catch_up_with_primary()` manually to update the secondary db.
/// Disabled by default.
///
/// `max_open_files` is overridden to always equal `-1`.
/// May have a negative performance impact on the secondary instance
/// if the secondary instance reads and applies state changes before the primary instance compacts them.
/// More info: https://github.com/facebook/rocksdb/wiki/Secondary-instance
pub secondary: Option<String>,
}

impl DatabaseConfig {
Expand Down Expand Up @@ -215,6 +226,7 @@ impl Default for DatabaseConfig {
columns: 1,
keep_log_file_num: 1,
enable_statistics: false,
secondary: None,
}
}
}
Expand Down Expand Up @@ -305,7 +317,11 @@ fn generate_options(config: &DatabaseConfig) -> Options {
}
opts.set_use_fsync(false);
opts.create_if_missing(true);
opts.set_max_open_files(config.max_open_files);
if config.secondary.is_some() {
opts.set_max_open_files(-1)
} else {
opts.set_max_open_files(config.max_open_files);
}
opts.set_bytes_per_sync(1 * MB as u64);
opts.set_keep_log_file_num(1);
opts.increase_parallelism(cmp::max(1, num_cpus::get() as i32 / 2));
Expand Down Expand Up @@ -364,12 +380,38 @@ impl Database {
}

let column_names: Vec<_> = (0..config.columns).map(|c| format!("col{}", c)).collect();

let write_opts = WriteOptions::default();
let read_opts = generate_read_options();

let db = if let Some(secondary_path) = &config.secondary {
Self::open_secondary(&opts, path, secondary_path.as_str(), column_names.as_slice())?
} else {
let column_names: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
Self::open_primary(&opts, path, config, column_names.as_slice(), &block_opts)?
};

Ok(Database {
db: RwLock::new(Some(DBAndColumns { db, column_names })),
config: config.clone(),
path: path.to_owned(),
opts,
read_opts,
write_opts,
block_opts,
stats: stats::RunningDbStats::new(),
})
}

/// Internal api to open a database in primary mode.
fn open_primary(
opts: &Options,
path: &str,
config: &DatabaseConfig,
column_names: &[&str],
block_opts: &BlockBasedOptions,
) -> io::Result<rocksdb::DB> {
let cf_descriptors: Vec<_> = (0..config.columns)
.map(|i| ColumnFamilyDescriptor::new(&column_names[i as usize], config.column_config(&block_opts, i)))
.map(|i| ColumnFamilyDescriptor::new(column_names[i as usize], config.column_config(&block_opts, i)))
.collect();

let db = match DB::open_cf_descriptors(&opts, path, cf_descriptors) {
Expand All @@ -390,31 +432,42 @@ impl Database {
ok => ok,
};

let db = match db {
Ok(match db {
Ok(db) => db,
Err(ref s) if is_corrupted(s) => {
warn!("DB corrupted: {}, attempting repair", s);
DB::repair(&opts, path).map_err(other_io_err)?;

let cf_descriptors: Vec<_> = (0..config.columns)
.map(|i| {
ColumnFamilyDescriptor::new(&column_names[i as usize], config.column_config(&block_opts, i))
ColumnFamilyDescriptor::new(column_names[i as usize], config.column_config(&block_opts, i))
})
.collect();

DB::open_cf_descriptors(&opts, path, cf_descriptors).map_err(other_io_err)?
}
Err(s) => return Err(other_io_err(s)),
};
Ok(Database {
db: RwLock::new(Some(DBAndColumns { db, column_names })),
config: config.clone(),
path: path.to_owned(),
opts,
read_opts,
write_opts,
block_opts,
stats: stats::RunningDbStats::new(),
})
}

/// Internal api to open a database in secondary mode.
/// Secondary database needs a seperate path to store its own logs.
fn open_secondary(
opts: &Options,
path: &str,
secondary_path: &str,
column_names: &[String],
) -> io::Result<rocksdb::DB> {
let db = DB::open_cf_as_secondary(&opts, path, secondary_path, column_names);

Ok(match db {
Ok(db) => db,
Err(ref s) if is_corrupted(s) => {
warn!("DB corrupted: {}, attempting repair", s);
DB::repair(&opts, path).map_err(other_io_err)?;
DB::open_cf_as_secondary(&opts, path, secondary_path, column_names).map_err(other_io_err)?
}
Err(s) => return Err(other_io_err(s)),
})
}

Expand Down Expand Up @@ -635,6 +688,33 @@ impl Database {
HashMap::new()
}
}

/// Try to catch up a secondary instance with
/// the primary by reading as much from the logs as possible.
///
/// Guaranteed to have changes up to the the time that `try_catch_up_with_primary` is called
/// if it finishes succesfully.
///
/// Blocks until the MANIFEST file and any state changes in the corresponding Write-Ahead-Logs
/// are applied to the secondary instance. If the manifest files are very large
/// this method could take a long time.
///
/// If Write-Ahead-Logs have been purged by the primary instance before the secondary
/// is able to open them, the secondary will not be caught up
/// until this function is called again and new Write-Ahead-Logs are identified.
///
/// If called while the primary is writing, the catch-up may fail.
///
/// If the secondary is unable to catch up because of missing logs,
/// this method fails silently and no error is returned.
///
/// Calling this as primary will return an error.
pub fn try_catch_up_with_primary(&self) -> io::Result<()> {
insipx marked this conversation as resolved.
Show resolved Hide resolved
match self.db.read().as_ref() {
Some(DBAndColumns { db, .. }) => db.try_catch_up_with_primary().map_err(other_io_err),
None => Ok(()),
}
}
}

// duplicate declaration of methods here to avoid trait import in certain existing cases
Expand Down Expand Up @@ -755,6 +835,48 @@ mod tests {
st::test_io_stats(&db)
}

#[test]
fn secondary_db_get() -> io::Result<()> {
let primary = TempDir::new("")?;
let config = DatabaseConfig::with_columns(1);
let db = Database::open(&config, primary.path().to_str().expect("tempdir path is valid unicode"))?;

let key1 = b"key1";
let mut transaction = db.transaction();
transaction.put(0, key1, b"horse");
db.write(transaction)?;

let config = DatabaseConfig {
secondary: TempDir::new("")?.path().to_str().map(|s| s.to_string()),
..DatabaseConfig::with_columns(1)
};
let second_db = Database::open(&config, primary.path().to_str().expect("tempdir path is valid unicode"))?;
assert_eq!(&*second_db.get(0, key1)?.unwrap(), b"horse");
Ok(())
}

#[test]
fn secondary_db_catch_up() -> io::Result<()> {
let primary = TempDir::new("")?;
let config = DatabaseConfig::with_columns(1);
let db = Database::open(&config, primary.path().to_str().expect("tempdir path is valid unicode"))?;

let config = DatabaseConfig {
secondary: TempDir::new("")?.path().to_str().map(|s| s.to_string()),
..DatabaseConfig::with_columns(1)
};
let second_db = Database::open(&config, primary.path().to_str().expect("tempdir path is valid unicode"))?;

let mut transaction = db.transaction();
transaction.put(0, b"key1", b"mule");
transaction.put(0, b"key2", b"cat");
db.write(transaction)?;

second_db.try_catch_up_with_primary()?;
assert_eq!(&*second_db.get(0, b"key2")?.unwrap(), b"cat");
Ok(())
}

#[test]
fn mem_tables_size() {
let tempdir = TempDir::new("").unwrap();
Expand All @@ -766,6 +888,7 @@ mod tests {
columns: 11,
keep_log_file_num: 1,
enable_statistics: false,
secondary: None,
};

let db = Database::open(&config, tempdir.path().to_str().unwrap()).unwrap();
Expand Down