Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update proto version and enable grpc gzip #18

Merged
merged 5 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ dashmap = "5.4"
enum_dispatch = "0.3"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", tag = "v0.4.1" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", tag = "v0.7.0" }
parking_lot = "0.12"
prost = "0.12"
rand = "0.8"
snafu = "0.7"
tokio = { version = "1", features = ["rt", "time"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = { version = "0.10", features = ["tls", "tls-roots"] }
tonic = { version = "0.11", features = ["tls", "tls-roots", "gzip", "zstd"] }
tower = "0.4"
derive_builder = "0.20"

[build-dependencies]
tonic-build = "0.9"
Expand Down
9 changes: 6 additions & 3 deletions examples/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use greptimedb_ingester::api::v1::*;
use greptimedb_ingester::helpers::schema::*;
use greptimedb_ingester::helpers::values::*;
use greptimedb_ingester::{
ChannelConfig, ChannelManager, Client, ClientTlsOption, Database, DEFAULT_SCHEMA_NAME,
ChannelConfig, ChannelManager, ClientBuilder, ClientTlsOption, Database, DEFAULT_SCHEMA_NAME,
};

#[tokio::main]
Expand All @@ -31,14 +31,17 @@ async fn main() {
.map(|s| s == "1")
.unwrap_or(false);

let builder = ClientBuilder::default()
.peers(vec![&greptimedb_endpoint])
.compression(greptimedb_ingester::Compression::Gzip);
let grpc_client = if greptimedb_secure {
let channel_config = ChannelConfig::default().client_tls_config(ClientTlsOption::default());

let channel_manager = ChannelManager::with_tls_config(channel_config)
.expect("Failed to create channel manager");
Client::with_manager_and_urls(channel_manager, vec![&greptimedb_endpoint])
builder.channel_manager(channel_manager).build()
} else {
Client::with_urls(vec![&greptimedb_endpoint])
builder.build()
};

let client = Database::new_with_dbname(greptimedb_dbname, grpc_client);
Expand Down
6 changes: 4 additions & 2 deletions examples/stream_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use derive_new::new;

use greptimedb_ingester::api::v1::*;
use greptimedb_ingester::{Client, Database, DEFAULT_SCHEMA_NAME};
use greptimedb_ingester::{ClientBuilder, Database, DEFAULT_SCHEMA_NAME};

#[tokio::main]
async fn main() {
Expand All @@ -25,7 +25,9 @@ async fn main() {
let greptimedb_dbname =
std::env::var("GREPTIMEDB_DBNAME").unwrap_or_else(|_| DEFAULT_SCHEMA_NAME.to_owned());

let grpc_client = Client::with_urls(vec![&greptimedb_endpoint]);
let grpc_client = ClientBuilder::default()
.peers(vec![&greptimedb_endpoint])
.build();

let client = Database::new_with_dbname(greptimedb_dbname, grpc_client);

Expand Down
135 changes: 110 additions & 25 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ use crate::api::v1::HealthCheckRequest;
use crate::channel_manager::ChannelManager;
use parking_lot::RwLock;
use snafu::OptionExt;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;

use crate::load_balance::{LoadBalance, Loadbalancer};
use crate::{error, Result};
use derive_builder::Builder;

const MAX_MESSAGE_SIZE: usize = 512 * 1024 * 1024;

Expand All @@ -36,22 +38,78 @@ pub struct Client {
inner: Arc<Inner>,
}

#[derive(Debug, Default)]
#[derive(Default)]
pub struct ClientBuilder {
paomian marked this conversation as resolved.
Show resolved Hide resolved
channel_manager: ChannelManager,
load_balance: Loadbalancer,
compression: Compression,
peers: Vec<String>,
}

impl ClientBuilder {
pub fn channel_manager(mut self, channel_manager: ChannelManager) -> Self {
self.channel_manager = channel_manager;
self
}

pub fn load_balance(mut self, load_balance: Loadbalancer) -> Self {
self.load_balance = load_balance;
self
}

pub fn compression(mut self, compression: Compression) -> Self {
self.compression = compression;
self
}

pub fn peers<U, A>(mut self, peers: A) -> Self
where
U: AsRef<str>,
A: AsRef<[U]>,
{
self.peers = normalize_urls(peers);
self
}

pub fn build(self) -> Client {
let inner = InnerBuilder::default()
.channel_manager(self.channel_manager)
.load_balance(self.load_balance)
.compression(self.compression)
.peers(self.peers)
.build()
.unwrap();
Client {
inner: Arc::new(inner),
}
}
}

#[derive(Debug, Clone, Default)]
pub enum Compression {
#[default]
Gzip,
Zstd,
None,
}

#[derive(Debug, Default, Builder)]
struct Inner {
channel_manager: ChannelManager,
#[builder(setter(custom))]
peers: Arc<RwLock<Vec<String>>>,
load_balance: Loadbalancer,
compression: Compression,
}

impl Inner {
fn with_manager(channel_manager: ChannelManager) -> Self {
Self {
channel_manager,
peers: Default::default(),
load_balance: Default::default(),
}
impl InnerBuilder {
pub fn peers(&mut self, peers: Vec<String>) -> &mut Self {
self.peers = Some(Arc::new(RwLock::new(peers)));
self
}
}

impl Inner {
fn set_peers(&self, peers: Vec<String>) {
let mut guard = self.peers.write();
*guard = peers;
Expand All @@ -64,50 +122,55 @@ impl Inner {
}

impl Client {
#[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")]
pub fn new() -> Self {
Default::default()
}

#[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")]
pub fn with_manager(channel_manager: ChannelManager) -> Self {
let inner = Arc::new(Inner::with_manager(channel_manager));
Self { inner }
let inner = InnerBuilder::default()
.channel_manager(channel_manager)
.build()
.unwrap();
Self {
inner: Arc::new(inner),
}
}

#[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")]
pub fn with_urls<U, A>(urls: A) -> Self
where
U: AsRef<str>,
A: AsRef<[U]>,
{
Self::with_manager_and_urls(ChannelManager::new(), urls)
ClientBuilder::default().peers(urls).build()
}

#[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")]
pub fn with_manager_and_urls<U, A>(channel_manager: ChannelManager, urls: A) -> Self
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let inner = Inner::with_manager(channel_manager);
let urls: Vec<String> = urls
.as_ref()
.iter()
.map(|peer| peer.as_ref().to_string())
.collect();
inner.set_peers(urls);
let inner = InnerBuilder::default()
.channel_manager(channel_manager)
.peers(normalize_urls(urls))
.build()
.unwrap();

Self {
inner: Arc::new(inner),
}
}

#[deprecated(since = "0.1.0", note = "use `ClientBuilder` instead of this method")]
pub fn start<U, A>(&self, urls: A)
where
U: AsRef<str>,
A: AsRef<[U]>,
{
let urls: Vec<String> = urls
.as_ref()
.iter()
.map(|peer| peer.as_ref().to_string())
.collect();
let urls: Vec<String> = normalize_urls(urls);

self.inner.set_peers(urls);
}
Expand All @@ -127,8 +190,19 @@ impl Client {

pub(crate) fn make_database_client(&self) -> Result<DatabaseClient> {
let (_, channel) = self.find_channel()?;
let client =
GreptimeDatabaseClient::new(channel).max_decoding_message_size(MAX_MESSAGE_SIZE);
let mut client = GreptimeDatabaseClient::new(channel)
.max_decoding_message_size(MAX_MESSAGE_SIZE)
.accept_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd);
match self.inner.compression {
Compression::Gzip => {
client = client.send_compressed(CompressionEncoding::Gzip);
}
Compression::Zstd => {
client = client.send_compressed(CompressionEncoding::Zstd);
}
Compression::None => {}
}
Ok(DatabaseClient { inner: client })
}

Expand All @@ -140,6 +214,17 @@ impl Client {
}
}

fn normalize_urls<U, A>(urls: A) -> Vec<String>
where
U: AsRef<str>,
A: AsRef<[U]>,
{
urls.as_ref()
.iter()
.map(|peer| peer.as_ref().to_string())
.collect()
}

#[cfg(test)]
mod tests {
use std::collections::HashSet;
Expand Down
16 changes: 6 additions & 10 deletions src/helpers/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,8 @@ define_value_fn!(time_second_value, i64, TimeSecondValue);
define_value_fn!(time_millisecond_value, i64, TimeMillisecondValue);
define_value_fn!(time_microsecond_value, i64, TimeMicrosecondValue);
define_value_fn!(time_nanosecond_value, i64, TimeNanosecondValue);
define_value_fn!(interval_year_month_value, i32, IntervalYearMonthValues);
define_value_fn!(interval_day_time_value, i64, IntervalDayTimeValues);
define_value_fn!(duration_second_value, i64, DurationSecondValue);
define_value_fn!(duration_millisecond_value, i64, DurationMillisecondValue);
define_value_fn!(duration_microsecond_value, i64, DurationMicrosecondValue);
define_value_fn!(duration_nanosecond_value, i64, DurationNanosecondValue);
define_value_fn!(interval_year_month_value, i32, IntervalYearMonthValue);
define_value_fn!(interval_day_time_value, i64, IntervalDayTimeValue);

#[inline]
pub fn interval_month_day_nano_value(
Expand All @@ -81,13 +77,13 @@ pub fn interval_month_day_nano_value(
nanoseconds: i64,
) -> crate::api::v1::Value {
crate::api::v1::Value {
value_data: Some(
crate::api::v1::value::ValueData::IntervalMonthDayNanoValues(IntervalMonthDayNano {
value_data: Some(crate::api::v1::value::ValueData::IntervalMonthDayNanoValue(
IntervalMonthDayNano {
months,
days,
nanoseconds,
}),
),
},
)),
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub mod load_balance;
mod stream_insert;

pub use self::channel_manager::{ChannelConfig, ChannelManager, ClientTlsOption};
pub use self::client::Client;
pub use self::client::{Client, ClientBuilder, Compression};
pub use self::database::Database;
pub use self::error::{Error, Result};
pub use self::stream_insert::StreamInserter;
Expand Down
4 changes: 2 additions & 2 deletions src/load_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub trait LoadBalance {
}

#[enum_dispatch(LoadBalance)]
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum Loadbalancer {
Random,
}
Expand All @@ -32,7 +32,7 @@ impl Default for Loadbalancer {
}
}

#[derive(Debug)]
#[derive(Debug, Copy, Clone)]
pub struct Random;

impl LoadBalance for Random {
Expand Down