From 5c2be32e143f23b5c0af1efe60060119e9eb1330 Mon Sep 17 00:00:00 2001
From: Eshan <60269431+Eshanatnight@users.noreply.github.com>
Date: Tue, 2 Apr 2024 09:22:33 +0530
Subject: [PATCH] feat: implement distributed system with ingest and query
modes (#730)
---
Cargo.lock | 22 +
server/Cargo.toml | 1 +
server/src/about.rs | 8 +-
server/src/analytics.rs | 2 +-
server/src/banner.rs | 32 +-
server/src/catalog.rs | 72 ++-
server/src/cli.rs | 451 ++++++++++++++++
server/src/handlers/http.rs | 418 +++------------
server/src/handlers/http/about.rs | 42 +-
server/src/handlers/http/cluster/mod.rs | 403 ++++++++++++++
server/src/handlers/http/cluster/utils.rs | 265 +++++++++
server/src/handlers/http/ingest.rs | 30 +-
server/src/handlers/http/logstream.rs | 125 +++--
.../src/handlers/http/modal/ingest_server.rs | 332 ++++++++++++
server/src/handlers/http/modal/mod.rs | 133 +++++
.../src/handlers/http/modal/query_server.rs | 203 +++++++
server/src/handlers/http/modal/server.rs | 487 +++++++++++++++++
.../src/handlers/http/modal/ssl_acceptor.rs | 54 ++
server/src/handlers/http/query.rs | 98 +++-
server/src/main.rs | 172 +-----
server/src/metadata.rs | 4 +-
server/src/metrics/mod.rs | 3 +-
server/src/metrics/prom_utils.rs | 87 +++
server/src/migration.rs | 128 ++++-
server/src/migration/metadata_migration.rs | 75 ++-
server/src/option.rs | 507 ++----------------
server/src/query.rs | 135 ++++-
server/src/query/filter_optimizer.rs | 2 +-
server/src/rbac/role.rs | 7 +
server/src/response.rs | 13 +-
server/src/storage.rs | 13 +-
server/src/storage/localfs.rs | 211 +++++++-
server/src/storage/object_storage.rs | 137 ++++-
server/src/storage/s3.rs | 178 +++++-
server/src/storage/staging.rs | 20 +
server/src/storage/store_metadata.rs | 115 +++-
server/src/sync.rs | 112 ++++
server/src/utils.rs | 10 +
server/src/utils/arrow/merged_reader.rs | 3 +-
39 files changed, 4012 insertions(+), 1098 deletions(-)
create mode 100644 server/src/cli.rs
create mode 100644 server/src/handlers/http/cluster/mod.rs
create mode 100644 server/src/handlers/http/cluster/utils.rs
create mode 100644 server/src/handlers/http/modal/ingest_server.rs
create mode 100644 server/src/handlers/http/modal/mod.rs
create mode 100644 server/src/handlers/http/modal/query_server.rs
create mode 100644 server/src/handlers/http/modal/server.rs
create mode 100644 server/src/handlers/http/modal/ssl_acceptor.rs
create mode 100644 server/src/metrics/prom_utils.rs
create mode 100644 server/src/sync.rs
diff --git a/Cargo.lock b/Cargo.lock
index 0327f42c..daa85d56 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2107,6 +2107,15 @@ dependencies = [
"either",
]
+[[package]]
+name = "itertools"
+version = "0.12.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569"
+dependencies = [
+ "either",
+]
+
[[package]]
name = "itoa"
version = "0.4.8"
@@ -2734,6 +2743,7 @@ dependencies = [
"parquet",
"path-clean",
"prometheus",
+ "prometheus-parse",
"prost",
"prost-build",
"rand",
@@ -2982,6 +2992,18 @@ dependencies = [
"thiserror",
]
+[[package]]
+name = "prometheus-parse"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "811031bea65e5a401fb2e1f37d802cca6601e204ac463809a3189352d13b78a5"
+dependencies = [
+ "chrono",
+ "itertools 0.12.1",
+ "once_cell",
+ "regex",
+]
+
[[package]]
name = "prost"
version = "0.12.3"
diff --git a/server/Cargo.toml b/server/Cargo.toml
index 0af2b43c..a456f699 100644
--- a/server/Cargo.toml
+++ b/server/Cargo.toml
@@ -104,6 +104,7 @@ serde_repr = "0.1.17"
hashlru = { version = "0.11.0", features = ["serde"] }
path-clean = "1.0.1"
prost = "0.12.3"
+prometheus-parse = "0.2.5"
[build-dependencies]
cargo_toml = "0.15"
diff --git a/server/src/about.rs b/server/src/about.rs
index f33ac213..9aea9ff2 100644
--- a/server/src/about.rs
+++ b/server/src/about.rs
@@ -90,10 +90,10 @@ pub fn print_about(
eprint!(
"
{}
- Version: \"v{}\"",
+ Version:\t\t\t\t\t\"v{}\"",
"About:".to_string().bold(),
current_version,
- );
+ ); // " " " "
if let Some(latest_release) = latest_release {
if latest_release.version > current_version {
@@ -103,8 +103,8 @@ pub fn print_about(
eprintln!(
"
- Commit: \"{commit_hash}\"
- Docs: \"https://logg.ing/docs\""
+ Commit:\t\t\t\t\t\t\"{commit_hash}\"
+ Docs:\t\t\t\t\t\t\"https://logg.ing/docs\""
);
}
diff --git a/server/src/analytics.rs b/server/src/analytics.rs
index ca8d172d..e1031146 100644
--- a/server/src/analytics.rs
+++ b/server/src/analytics.rs
@@ -90,7 +90,7 @@ impl Report {
cpu_count,
memory_total_bytes: mem_total,
platform: platform().to_string(),
- mode: CONFIG.mode_string().to_string(),
+ mode: CONFIG.get_storage_mode_string().to_string(),
version: current().released_version.to_string(),
commit_hash: current().commit_hash,
metrics: build_metrics(),
diff --git a/server/src/banner.rs b/server/src/banner.rs
index 0f1dc512..d9f3cc60 100644
--- a/server/src/banner.rs
+++ b/server/src/banner.rs
@@ -35,13 +35,13 @@ pub async fn print(config: &Config, meta: &StorageMetadata) {
fn print_ascii_art() {
let ascii_name = r#"
- `7MM"""Mq. *MM `7MM
- MM `MM. MM MM
- MM ,M9 ,6"Yb. `7Mb,od8 ,pP"Ybd .gP"Ya ,6"Yb. MM,dMMb. MM .gP"Ya
- MMmmdM9 8) MM MM' "' 8I `" ,M' Yb 8) MM MM `Mb MM ,M' Yb
- MM ,pm9MM MM `YMMMa. 8M"""""" ,pm9MM MM M8 MM 8M""""""
- MM 8M MM MM L. I8 YM. , 8M MM MM. ,M9 MM YM. ,
- .JMML. `Moo9^Yo..JMML. M9mmmP' `Mbmmd' `Moo9^Yo. P^YbmdP' .JMML. `Mbmmd'
+ `7MM"""Mq. *MM `7MM
+ MM `MM. MM MM
+ MM ,M9 ,6"Yb. `7Mb,od8 ,pP"Ybd .gP"Ya ,6"Yb. MM,dMMb. MM .gP"Ya
+ MMmmdM9 8) MM MM' "' 8I `" ,M' Yb 8) MM MM `Mb MM ,M' Yb
+ MM ,pm9MM MM `YMMMa. 8M"""""" ,pm9MM MM M8 MM 8M""""""
+ MM 8M MM MM L. I8 YM. , 8M MM MM. ,M9 MM YM. ,
+ .JMML. `Moo9^Yo..JMML. M9mmmP' `Mbmmd' `Moo9^Yo. P^YbmdP' .JMML. `Mbmmd'
"#;
eprint!("{ascii_name}");
@@ -77,12 +77,14 @@ fn status_info(config: &Config, scheme: &str, id: Uid) {
eprintln!(
"
{}
- Address: {}
- Credentials: {}
- LLM Status: \"{}\"",
+ Address:\t\t\t\t\t{}
+ Credentials:\t\t\t\t\t{}
+ Server Mode:\t\t\t\t\t\"{}\"
+ LLM Status:\t\t\t\t\t\"{}\"",
"Server:".to_string().bold(),
address,
credentials,
+ config.parseable.mode.to_str(),
llm_status
);
}
@@ -99,10 +101,10 @@ async fn storage_info(config: &Config) {
eprintln!(
"
{}
- Mode: \"{}\"
- Staging: \"{}\"",
+ Storage Mode:\t\t\t\t\t\"{}\"
+ Staging Path:\t\t\t\t\t\"{}\"",
"Storage:".to_string().bold(),
- config.mode_string(),
+ config.get_storage_mode_string(),
config.staging_dir().to_string_lossy(),
);
@@ -114,7 +116,7 @@ async fn storage_info(config: &Config) {
eprintln!(
"\
- {:8}Cache: \"{}\", (size: {})",
+ {:8}Cache:\t\t\t\t\t\"{}\", (size: {})",
"",
path.display(),
size
@@ -123,7 +125,7 @@ async fn storage_info(config: &Config) {
eprintln!(
"\
- {:8}Store: \"{}\", (latency: {:?})",
+ {:8}Store:\t\t\t\t\t\t\"{}\", (latency: {:?})",
"",
storage.get_endpoint(),
latency
diff --git a/server/src/catalog.rs b/server/src/catalog.rs
index 2c841b10..0e8716dc 100644
--- a/server/src/catalog.rs
+++ b/server/src/catalog.rs
@@ -24,7 +24,8 @@ use relative_path::RelativePathBuf;
use crate::{
catalog::manifest::Manifest,
query::PartialTimeFilter,
- storage::{ObjectStorage, ObjectStorageError},
+ storage::{ObjectStorage, ObjectStorageError, MANIFEST_FILE},
+ utils::get_address,
};
use self::{column::Column, snapshot::ManifestItem};
@@ -105,20 +106,67 @@ pub async fn update_snapshot(
item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound
});
+ // if the mode in I.S. manifest needs to be created but it is not getting created because
+ // there is already a pos, to index into stream.json
+
// We update the manifest referenced by this position
// This updates an existing file so there is no need to create a snapshot entry.
if let Some(pos) = pos {
let info = &mut manifests[pos];
let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound);
- let Some(mut manifest) = storage.get_manifest(&path).await? else {
- return Err(ObjectStorageError::UnhandledError(
- "Manifest found in snapshot but not in object-storage"
- .to_string()
- .into(),
- ));
- };
- manifest.apply_change(change);
- storage.put_manifest(&path, manifest).await?;
+
+ let mut ch = false;
+ for m in manifests.iter() {
+ let s = get_address();
+ let p = format!("{}.{}.{}", s.0, s.1, MANIFEST_FILE);
+ if m.manifest_path.contains(&p) {
+ ch = true;
+ }
+ }
+ if ch {
+ let Some(mut manifest) = storage.get_manifest(&path).await? else {
+ return Err(ObjectStorageError::UnhandledError(
+ "Manifest found in snapshot but not in object-storage"
+ .to_string()
+ .into(),
+ ));
+ };
+ manifest.apply_change(change);
+ storage.put_manifest(&path, manifest).await?;
+ } else {
+ let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
+ let upper_bound = lower_bound
+ .date_naive()
+ .and_time(
+ NaiveTime::from_num_seconds_from_midnight_opt(
+ 23 * 3600 + 59 * 60 + 59,
+ 999_999_999,
+ )
+ .unwrap(),
+ )
+ .and_utc();
+
+ let manifest = Manifest {
+ files: vec![change],
+ ..Manifest::default()
+ };
+
+ let addr = get_address();
+ let mainfest_file_name = format!("{}.{}.{}", addr.0, addr.1, MANIFEST_FILE);
+ let path =
+ partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
+ storage
+ .put_object(&path, serde_json::to_vec(&manifest).unwrap().into())
+ .await?;
+ let path = storage.absolute_url(&path);
+ let new_snapshot_entriy = snapshot::ManifestItem {
+ manifest_path: path.to_string(),
+ time_lower_bound: lower_bound,
+ time_upper_bound: upper_bound,
+ };
+ manifests.push(new_snapshot_entriy);
+ storage.put_snapshot(stream_name, meta.snapshot).await?;
+ }
} else {
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
let upper_bound = lower_bound
@@ -137,7 +185,9 @@ pub async fn update_snapshot(
..Manifest::default()
};
- let path = partition_path(stream_name, lower_bound, upper_bound).join("manifest.json");
+ let addr = get_address();
+ let mainfest_file_name = format!("{}.{}.{}", addr.0, addr.1, MANIFEST_FILE);
+ let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
storage
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())
.await?;
diff --git a/server/src/cli.rs b/server/src/cli.rs
new file mode 100644
index 00000000..691547f8
--- /dev/null
+++ b/server/src/cli.rs
@@ -0,0 +1,451 @@
+/*
+ * Parseable Server (C) 2022 - 2024 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use clap::{value_parser, Arg, ArgGroup, Command, FromArgMatches};
+use std::path::PathBuf;
+
+use url::Url;
+
+use crate::{
+ oidc::{self, OpenidConfig},
+ option::{validation, Compression, Mode},
+};
+
+#[derive(Debug, Default)]
+pub struct Cli {
+ /// The location of TLS Cert file
+ pub tls_cert_path: Option,
+
+ /// The location of TLS Private Key file
+ pub tls_key_path: Option,
+
+ /// The address on which the http server will listen.
+ pub address: String,
+
+ /// Base domain under which server is hosted.
+ /// This information is used by OIDC to refer redirects
+ pub domain_address: Option,
+
+ /// The local staging path is used as a temporary landing point
+ /// for incoming events and local cache
+ pub local_staging_path: PathBuf,
+
+ /// The local cache path is used for speeding up query on latest data
+ pub local_cache_path: Option,
+
+ /// Size for local cache
+ pub local_cache_size: u64,
+
+ /// Username for the basic authentication on the server
+ pub username: String,
+
+ /// Password for the basic authentication on the server
+ pub password: String,
+
+ /// OpenId configuration
+ pub openid: Option,
+
+ /// Server should check for update or not
+ pub check_update: bool,
+
+ /// Server should send anonymous analytics or not
+ pub send_analytics: bool,
+
+ /// Open AI access key
+ pub open_ai_key: Option,
+
+ /// Livetail port
+ pub grpc_port: u16,
+
+ /// Livetail channel capacity
+ pub livetail_channel_capacity: usize,
+
+ /// Rows in Parquet Rowgroup
+ pub row_group_size: usize,
+
+ /// Query memory limit in bytes
+ pub query_memory_pool_size: Option,
+
+ /// Parquet compression algorithm
+ pub parquet_compression: Compression,
+
+ /// Mode of operation
+ pub mode: Mode,
+}
+
+impl Cli {
+ // identifiers for arguments
+ pub const TLS_CERT: &'static str = "tls-cert-path";
+ pub const TLS_KEY: &'static str = "tls-key-path";
+ pub const ADDRESS: &'static str = "address";
+ pub const DOMAIN_URI: &'static str = "origin";
+ pub const STAGING: &'static str = "local-staging-path";
+ pub const CACHE: &'static str = "cache-path";
+ pub const CACHE_SIZE: &'static str = "cache-size";
+ pub const USERNAME: &'static str = "username";
+ pub const PASSWORD: &'static str = "password";
+ pub const CHECK_UPDATE: &'static str = "check-update";
+ pub const SEND_ANALYTICS: &'static str = "send-analytics";
+ pub const OPEN_AI_KEY: &'static str = "open-ai-key";
+ pub const OPENID_CLIENT_ID: &'static str = "oidc-client";
+ pub const OPENID_CLIENT_SECRET: &'static str = "oidc-client-secret";
+ pub const OPENID_ISSUER: &'static str = "oidc-issuer";
+ pub const GRPC_PORT: &'static str = "grpc-port";
+ pub const LIVETAIL_CAPACITY: &'static str = "livetail-capacity";
+ // todo : what should this flag be
+ pub const QUERY_MEM_POOL_SIZE: &'static str = "query-mempool-size";
+ pub const ROW_GROUP_SIZE: &'static str = "row-group-size";
+ pub const PARQUET_COMPRESSION_ALGO: &'static str = "compression-algo";
+ pub const MODE: &'static str = "mode";
+ pub const DEFAULT_USERNAME: &'static str = "admin";
+ pub const DEFAULT_PASSWORD: &'static str = "admin";
+
+ pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
+ self.local_staging_path.join(stream_name)
+ }
+
+ pub fn get_scheme(&self) -> String {
+ if self.tls_cert_path.is_some() && self.tls_key_path.is_some() {
+ return "https".to_string();
+ }
+ "http".to_string()
+ }
+
+ pub fn create_cli_command_with_clap(name: &'static str) -> Command {
+ Command::new(name).next_line_help(false)
+ .arg(
+ Arg::new(Self::TLS_CERT)
+ .long(Self::TLS_CERT)
+ .env("P_TLS_CERT_PATH")
+ .value_name("PATH")
+ .value_parser(validation::file_path)
+ .help("Local path on this device where certificate file is located. Required to enable TLS"),
+ )
+ .arg(
+ Arg::new(Self::TLS_KEY)
+ .long(Self::TLS_KEY)
+ .env("P_TLS_KEY_PATH")
+ .value_name("PATH")
+ .value_parser(validation::file_path)
+ .help("Local path on this device where private key file is located. Required to enable TLS"),
+ )
+ .arg(
+ Arg::new(Self::ADDRESS)
+ .long(Self::ADDRESS)
+ .env("P_ADDR")
+ .value_name("ADDR:PORT")
+ .default_value("0.0.0.0:8000")
+ .value_parser(validation::socket_addr)
+ .help("Address and port for Parseable HTTP(s) server"),
+ )
+ .arg(
+ Arg::new(Self::STAGING)
+ .long(Self::STAGING)
+ .env("P_STAGING_DIR")
+ .value_name("DIR")
+ .default_value("./staging")
+ .value_parser(validation::canonicalize_path)
+ .help("Local path on this device to be used as landing point for incoming events")
+ .next_line_help(true),
+ )
+ .arg(
+ Arg::new(Self::CACHE)
+ .long(Self::CACHE)
+ .env("P_CACHE_DIR")
+ .value_name("DIR")
+ .value_parser(validation::canonicalize_path)
+ .help("Local path on this device to be used for caching data")
+ .next_line_help(true),
+ )
+ .arg(
+ Arg::new(Self::CACHE_SIZE)
+ .long(Self::CACHE_SIZE)
+ .env("P_CACHE_SIZE")
+ .value_name("size")
+ .default_value("1GiB")
+ .value_parser(validation::cache_size)
+ .help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)")
+ .next_line_help(true),
+ )
+
+ .arg(
+ Arg::new(Self::USERNAME)
+ .long(Self::USERNAME)
+ .env("P_USERNAME")
+ .value_name("STRING")
+ .required(true)
+ .help("Admin username to be set for this Parseable server"),
+ )
+ .arg(
+ Arg::new(Self::PASSWORD)
+ .long(Self::PASSWORD)
+ .env("P_PASSWORD")
+ .value_name("STRING")
+ .required(true)
+ .help("Admin password to be set for this Parseable server"),
+ )
+ .arg(
+ Arg::new(Self::CHECK_UPDATE)
+ .long(Self::CHECK_UPDATE)
+ .env("P_CHECK_UPDATE")
+ .value_name("BOOL")
+ .required(false)
+ .default_value("true")
+ .value_parser(value_parser!(bool))
+ .help("Enable/Disable checking for new Parseable release"),
+ )
+ .arg(
+ Arg::new(Self::SEND_ANALYTICS)
+ .long(Self::SEND_ANALYTICS)
+ .env("P_SEND_ANONYMOUS_USAGE_DATA")
+ .value_name("BOOL")
+ .required(false)
+ .default_value("true")
+ .value_parser(value_parser!(bool))
+ .help("Enable/Disable anonymous telemetry data collection"),
+ )
+ .arg(
+ Arg::new(Self::OPEN_AI_KEY)
+ .long(Self::OPEN_AI_KEY)
+ .env("P_OPENAI_API_KEY")
+ .value_name("STRING")
+ .required(false)
+ .help("OpenAI key to enable llm features"),
+ )
+ .arg(
+ Arg::new(Self::OPENID_CLIENT_ID)
+ .long(Self::OPENID_CLIENT_ID)
+ .env("P_OIDC_CLIENT_ID")
+ .value_name("STRING")
+ .required(false)
+ .help("Client id for OIDC provider"),
+ )
+ .arg(
+ Arg::new(Self::OPENID_CLIENT_SECRET)
+ .long(Self::OPENID_CLIENT_SECRET)
+ .env("P_OIDC_CLIENT_SECRET")
+ .value_name("STRING")
+ .required(false)
+ .help("Client secret for OIDC provider"),
+ )
+ .arg(
+ Arg::new(Self::OPENID_ISSUER)
+ .long(Self::OPENID_ISSUER)
+ .env("P_OIDC_ISSUER")
+ .value_name("URl")
+ .required(false)
+ .value_parser(validation::url)
+ .help("OIDC provider's host address"),
+ )
+ .arg(
+ Arg::new(Self::DOMAIN_URI)
+ .long(Self::DOMAIN_URI)
+ .env("P_ORIGIN_URI")
+ .value_name("URL")
+ .required(false)
+ .value_parser(validation::url)
+ .help("Parseable server global domain address"),
+ )
+ .arg(
+ Arg::new(Self::GRPC_PORT)
+ .long(Self::GRPC_PORT)
+ .env("P_GRPC_PORT")
+ .value_name("PORT")
+ .default_value("8001")
+ .required(false)
+ .value_parser(value_parser!(u16))
+ .help("Port for gRPC server"),
+ )
+ .arg(
+ Arg::new(Self::LIVETAIL_CAPACITY)
+ .long(Self::LIVETAIL_CAPACITY)
+ .env("P_LIVETAIL_CAPACITY")
+ .value_name("NUMBER")
+ .default_value("1000")
+ .required(false)
+ .value_parser(value_parser!(usize))
+ .help("Number of rows in livetail channel"),
+ )
+ .arg(
+ Arg::new(Self::QUERY_MEM_POOL_SIZE)
+ .long(Self::QUERY_MEM_POOL_SIZE)
+ .env("P_QUERY_MEMORY_LIMIT")
+ .value_name("Gib")
+ .required(false)
+ .value_parser(value_parser!(u8))
+ .help("Set a fixed memory limit for query"),
+ )
+ .arg(
+ Arg::new(Self::ROW_GROUP_SIZE)
+ .long(Self::ROW_GROUP_SIZE)
+ .env("P_PARQUET_ROW_GROUP_SIZE")
+ .value_name("NUMBER")
+ .required(false)
+ .default_value("16384")
+ .value_parser(value_parser!(usize))
+ .help("Number of rows in a row group"),
+ ).arg(
+ Arg::new(Self::MODE)
+ .long(Self::MODE)
+ .env("P_MODE")
+ .value_name("STRING")
+ .required(false)
+ .default_value("all")
+ .value_parser([
+ "query",
+ "ingest",
+ "all"])
+ .help("Mode of operation"),
+ )
+ .arg(
+ Arg::new(Self::PARQUET_COMPRESSION_ALGO)
+ .long(Self::PARQUET_COMPRESSION_ALGO)
+ .env("P_PARQUET_COMPRESSION_ALGO")
+ .value_name("[UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD]")
+ .required(false)
+ .default_value("lz4")
+ .value_parser([
+ "uncompressed",
+ "snappy",
+ "gzip",
+ "lzo",
+ "brotli",
+ "lz4",
+ "zstd"])
+ .help("Parquet compression algorithm"),
+ ).group(
+ ArgGroup::new("oidc")
+ .args([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER])
+ .requires_all([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER])
+ .multiple(true)
+ )
+ }
+}
+
+impl FromArgMatches for Cli {
+ fn from_arg_matches(m: &clap::ArgMatches) -> Result {
+ let mut s: Self = Self::default();
+ s.update_from_arg_matches(m)?;
+ Ok(s)
+ }
+
+ fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> {
+ self.local_cache_path = m.get_one::(Self::CACHE).cloned();
+ self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned();
+ self.tls_key_path = m.get_one::(Self::TLS_KEY).cloned();
+ self.domain_address = m.get_one::(Self::DOMAIN_URI).cloned();
+
+ self.address = m
+ .get_one::(Self::ADDRESS)
+ .cloned()
+ .expect("default value for address");
+ self.local_staging_path = m
+ .get_one::(Self::STAGING)
+ .cloned()
+ .expect("default value for staging");
+ self.local_cache_size = m
+ .get_one::(Self::CACHE_SIZE)
+ .cloned()
+ .expect("default value for cache size");
+ self.username = m
+ .get_one::(Self::USERNAME)
+ .cloned()
+ .expect("default for username");
+ self.password = m
+ .get_one::(Self::PASSWORD)
+ .cloned()
+ .expect("default for password");
+ self.check_update = m
+ .get_one::(Self::CHECK_UPDATE)
+ .cloned()
+ .expect("default for check update");
+ self.send_analytics = m
+ .get_one::(Self::SEND_ANALYTICS)
+ .cloned()
+ .expect("default for send analytics");
+ self.open_ai_key = m.get_one::(Self::OPEN_AI_KEY).cloned();
+ self.grpc_port = m
+ .get_one::(Self::GRPC_PORT)
+ .cloned()
+ .expect("default for livetail port");
+ self.livetail_channel_capacity = m
+ .get_one::(Self::LIVETAIL_CAPACITY)
+ .cloned()
+ .expect("default for livetail capacity");
+ // converts Gib to bytes before assigning
+ self.query_memory_pool_size = m
+ .get_one::(Self::QUERY_MEM_POOL_SIZE)
+ .cloned()
+ .map(|gib| gib as usize * 1024usize.pow(3));
+ self.row_group_size = m
+ .get_one::(Self::ROW_GROUP_SIZE)
+ .cloned()
+ .expect("default for row_group size");
+ self.parquet_compression = match m
+ .get_one::(Self::PARQUET_COMPRESSION_ALGO)
+ .expect("default for compression algo")
+ .as_str()
+ {
+ "uncompressed" => Compression::UNCOMPRESSED,
+ "snappy" => Compression::SNAPPY,
+ "gzip" => Compression::GZIP,
+ "lzo" => Compression::LZO,
+ "brotli" => Compression::BROTLI,
+ "lz4" => Compression::LZ4,
+ "zstd" => Compression::ZSTD,
+ _ => unreachable!(),
+ };
+
+ let openid_client_id = m.get_one::(Self::OPENID_CLIENT_ID).cloned();
+ let openid_client_secret = m.get_one::(Self::OPENID_CLIENT_SECRET).cloned();
+ let openid_issuer = m.get_one::(Self::OPENID_ISSUER).cloned();
+
+ self.openid = match (openid_client_id, openid_client_secret, openid_issuer) {
+ (Some(id), Some(secret), Some(issuer)) => {
+ let origin = if let Some(url) = self.domain_address.clone() {
+ oidc::Origin::Production(url)
+ } else {
+ oidc::Origin::Local {
+ socket_addr: self.address.clone(),
+ https: self.tls_cert_path.is_some() && self.tls_key_path.is_some(),
+ }
+ };
+ Some(OpenidConfig {
+ id,
+ secret,
+ issuer,
+ origin,
+ })
+ }
+ _ => None,
+ };
+
+ self.mode = match m
+ .get_one::(Self::MODE)
+ .expect("Mode not set")
+ .as_str()
+ {
+ "query" => Mode::Query,
+ "ingest" => Mode::Ingest,
+ "all" => Mode::All,
+ _ => unreachable!(),
+ };
+
+ Ok(())
+ }
+}
diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs
index 771eaac7..959eb2ed 100644
--- a/server/src/handlers/http.rs
+++ b/server/src/handlers/http.rs
@@ -16,360 +16,112 @@
*
*/
-use std::fs::File;
-use std::io::BufReader;
-use std::sync::Arc;
-
use actix_cors::Cors;
-use actix_web::{
- web::{self, resource},
- App, HttpServer,
-};
-use actix_web_prometheus::PrometheusMetrics;
-use actix_web_static_files::ResourceFiles;
-use log::info;
-use openid::Discovered;
-use rustls::{Certificate, PrivateKey, ServerConfig};
-use rustls_pemfile::{certs, pkcs8_private_keys};
-
-use crate::option::CONFIG;
-use crate::rbac::role::Action;
+use arrow_schema::Schema;
+use serde_json::Value;
-use self::middleware::{DisAllowRootUser, ModeFilter, RouteExt};
+use self::{cluster::get_ingester_info, query::Query};
-mod about;
-mod health_check;
-mod ingest;
+pub(crate) mod about;
+pub mod cluster;
+pub(crate) mod health_check;
+pub(crate) mod ingest;
mod kinesis;
-mod llm;
-mod logstream;
-mod middleware;
-mod oidc;
+pub(crate) mod llm;
+pub(crate) mod logstream;
+pub(crate) mod middleware;
+pub mod modal;
+pub(crate) mod oidc;
mod otel;
-mod query;
-mod rbac;
-mod role;
-
-include!(concat!(env!("OUT_DIR"), "/generated.rs"));
-
-const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760;
-const API_BASE_PATH: &str = "/api";
-const API_VERSION: &str = "v1";
-
-pub async fn run_http(
- prometheus: PrometheusMetrics,
- oidc_client: Option,
-) -> anyhow::Result<()> {
- let oidc_client = match oidc_client {
- Some(config) => {
- let client = config
- .connect(&format!("{API_BASE_PATH}/{API_VERSION}/o/code"))
- .await?;
- Some(Arc::new(client))
- }
- None => None,
- };
+pub(crate) mod query;
+pub(crate) mod rbac;
+pub(crate) mod role;
- let create_app = move || {
- App::new()
- .wrap(prometheus.clone())
- .configure(|cfg| configure_routes(cfg, oidc_client.clone()))
- .wrap(actix_web::middleware::Logger::default())
- .wrap(actix_web::middleware::Compress::default())
- .wrap(cross_origin_config())
- .wrap(ModeFilter)
- };
-
- let ssl_acceptor = match (
- &CONFIG.parseable.tls_cert_path,
- &CONFIG.parseable.tls_key_path,
- ) {
- (Some(cert), Some(key)) => {
- // init server config builder with safe defaults
- let config = ServerConfig::builder()
- .with_safe_defaults()
- .with_no_client_auth();
-
- // load TLS key/cert files
- let cert_file = &mut BufReader::new(File::open(cert)?);
- let key_file = &mut BufReader::new(File::open(key)?);
-
- // convert files to key/cert objects
- let cert_chain = certs(cert_file)?.into_iter().map(Certificate).collect();
-
- let mut keys: Vec = pkcs8_private_keys(key_file)?
- .into_iter()
- .map(PrivateKey)
- .collect();
-
- // exit if no keys could be parsed
- if keys.is_empty() {
- anyhow::bail!("Could not locate PKCS 8 private keys.");
- }
+pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760;
+pub const API_BASE_PATH: &str = "api";
+pub const API_VERSION: &str = "v1";
- let server_config = config.with_single_cert(cert_chain, keys.remove(0))?;
+pub(crate) fn base_path() -> String {
+ format!("/{API_BASE_PATH}/{API_VERSION}")
+}
- Some(server_config)
- }
- (_, _) => None,
- };
+pub fn metrics_path() -> String {
+ format!("{}/metrics", base_path())
+}
- // concurrent workers equal to number of cores on the cpu
- let http_server = HttpServer::new(create_app).workers(num_cpus::get());
- if let Some(config) = ssl_acceptor {
- http_server
- .bind_rustls(&CONFIG.parseable.address, config)?
- .run()
- .await?;
+pub(crate) fn cross_origin_config() -> Cors {
+ if cfg!(feature = "debug") {
+ Cors::permissive().block_on_origin_mismatch(false)
} else {
- http_server.bind(&CONFIG.parseable.address)?.run().await?;
+ Cors::default().block_on_origin_mismatch(false)
}
-
- Ok(())
}
-pub fn configure_routes(
- cfg: &mut web::ServiceConfig,
- oidc_client: Option>>,
-) {
- let generated = generate();
-
- //log stream API
- let logstream_api = web::scope("/{logstream}")
- .service(
- web::resource("")
- // PUT "/logstream/{logstream}" ==> Create log stream
- .route(
- web::put()
- .to(logstream::put_stream)
- .authorize_for_stream(Action::CreateStream),
- )
- // POST "/logstream/{logstream}" ==> Post logs to given log stream
- .route(
- web::post()
- .to(ingest::post_event)
- .authorize_for_stream(Action::Ingest),
- )
- // DELETE "/logstream/{logstream}" ==> Delete log stream
- .route(
- web::delete()
- .to(logstream::delete)
- .authorize_for_stream(Action::DeleteStream),
- )
- .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
- )
- .service(
- // GET "/logstream/{logstream}/info" ==> Get info for given log stream
- web::resource("/info").route(
- web::get()
- .to(logstream::get_stream_info)
- .authorize_for_stream(Action::GetStream),
- ),
- )
- .service(
- web::resource("/alert")
- // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream
- .route(
- web::put()
- .to(logstream::put_alert)
- .authorize_for_stream(Action::PutAlert),
- )
- // GET "/logstream/{logstream}/alert" ==> Get alert for given log stream
- .route(
- web::get()
- .to(logstream::get_alert)
- .authorize_for_stream(Action::GetAlert),
- ),
- )
- .service(
- // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream
- web::resource("/schema").route(
- web::get()
- .to(logstream::schema)
- .authorize_for_stream(Action::GetSchema),
- ),
- )
- .service(
- // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
- web::resource("/stats").route(
- web::get()
- .to(logstream::get_stats)
- .authorize_for_stream(Action::GetStats),
- ),
- )
- .service(
- web::resource("/retention")
- // PUT "/logstream/{logstream}/retention" ==> Set retention for given logstream
- .route(
- web::put()
- .to(logstream::put_retention)
- .authorize_for_stream(Action::PutRetention),
- )
- // GET "/logstream/{logstream}/retention" ==> Get retention for given logstream
- .route(
- web::get()
- .to(logstream::get_retention)
- .authorize_for_stream(Action::GetRetention),
- ),
- )
- .service(
- web::resource("/cache")
- // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream
- .route(
- web::put()
- .to(logstream::put_enable_cache)
- .authorize_for_stream(Action::PutCacheEnabled),
- )
- // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream
- .route(
- web::get()
- .to(logstream::get_cache_enabled)
- .authorize_for_stream(Action::GetCacheEnabled),
- ),
- );
-
- // User API
- let user_api = web::scope("/user")
- .service(
- web::resource("")
- // GET /user => List all users
- .route(web::get().to(rbac::list_users).authorize(Action::ListUser)),
- )
- .service(
- web::resource("/{username}")
- // PUT /user/{username} => Create a new user
- .route(web::post().to(rbac::post_user).authorize(Action::PutUser))
- // DELETE /user/{username} => Delete a user
- .route(
- web::delete()
- .to(rbac::delete_user)
- .authorize(Action::DeleteUser),
- )
- .wrap(DisAllowRootUser),
- )
- .service(
- web::resource("/{username}/role")
- // PUT /user/{username}/roles => Put roles for user
- .route(
- web::put()
- .to(rbac::put_role)
- .authorize(Action::PutUserRoles)
- .wrap(DisAllowRootUser),
- )
- .route(
- web::get()
- .to(rbac::get_role)
- .authorize_for_user(Action::GetUserRoles),
- ),
- )
- .service(
- web::resource("/{username}/generate-new-password")
- // POST /user/{username}/generate-new-password => reset password for this user
- .route(
- web::post()
- .to(rbac::post_gen_password)
- .authorize(Action::PutUser)
- .wrap(DisAllowRootUser),
- ),
- );
+pub fn base_path_without_preceding_slash() -> String {
+ format!("{API_BASE_PATH}/{API_VERSION}")
+}
- let llm_query_api = web::scope("/llm").service(
- web::resource("").route(
- web::post()
- .to(llm::make_llm_request)
- .authorize(Action::QueryLLM),
- ),
- );
+pub async fn fetch_schema(stream_name: &str) -> anyhow::Result {
+ let mut res = vec![];
+ let ima = get_ingester_info().await.unwrap();
- let role_api = web::scope("/role")
- // GET Role List
- .service(resource("").route(web::get().to(role::list).authorize(Action::ListRole)))
- .service(
- // PUT and GET Default Role
- resource("/default")
- .route(web::put().to(role::put_default).authorize(Action::PutRole))
- .route(web::get().to(role::get_default).authorize(Action::GetRole)),
- )
- .service(
- // PUT, GET, DELETE Roles
- resource("/{name}")
- .route(web::put().to(role::put).authorize(Action::PutRole))
- .route(web::delete().to(role::delete).authorize(Action::DeleteRole))
- .route(web::get().to(role::get).authorize(Action::GetRole)),
+ for im in ima {
+ let uri = format!(
+ "{}{}/logstream/{}/schema",
+ im.domain_name,
+ base_path_without_preceding_slash(),
+ stream_name
);
+ let reqw = reqwest::Client::new()
+ .get(uri)
+ .header(http::header::AUTHORIZATION, im.token.clone())
+ .header(http::header::CONTENT_TYPE, "application/json")
+ .send()
+ .await?;
- let mut oauth_api = web::scope("/o")
- .service(resource("/login").route(web::get().to(oidc::login)))
- .service(resource("/logout").route(web::get().to(oidc::logout)))
- .service(resource("/code").route(web::get().to(oidc::reply_login)));
-
- if let Some(client) = oidc_client {
- info!("Registered oidc client");
- oauth_api = oauth_api.app_data(web::Data::from(client))
+ if reqw.status().is_success() {
+ let v = serde_json::from_slice(&reqw.bytes().await?)?;
+ res.push(v);
+ }
}
- // Deny request if username is same as the env variable P_USERNAME.
- cfg.service(
- // Base path "{url}/api/v1"
- web::scope(&base_path())
- // .wrap(PathFilter)
- // POST "/query" ==> Get results of the SQL query passed in request body
- .service(
- web::resource("/query")
- .route(web::post().to(query::query).authorize(Action::Query)),
- )
- // POST "/ingest" ==> Post logs to given log stream based on header
- .service(
- web::resource("/ingest")
- .route(
- web::post()
- .to(ingest::ingest)
- .authorize_for_stream(Action::Ingest),
- )
- .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
- )
- // GET "/liveness" ==> Liveness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command
- .service(web::resource("/liveness").route(web::get().to(health_check::liveness)))
- // GET "/readiness" ==> Readiness check as per https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes
- .service(web::resource("/readiness").route(web::get().to(health_check::readiness)))
- // GET "/about" ==> Returns information about instance
- .service(
- web::resource("/about")
- .route(web::get().to(about::about).authorize(Action::GetAbout)),
- )
- .service(
- web::scope("/logstream")
- .service(
- // GET "/logstream" ==> Get list of all Log Streams on the server
- web::resource("")
- .route(web::get().to(logstream::list).authorize(Action::ListStream)),
- )
- .service(
- // logstream API
- logstream_api,
- ),
- )
- .service(user_api)
- .service(llm_query_api)
- .service(oauth_api)
- .service(role_api),
- )
- // GET "/" ==> Serve the static frontend directory
- .service(ResourceFiles::new("/", generated).resolve_not_found_to_root());
-}
-
-fn base_path() -> String {
- format!("{API_BASE_PATH}/{API_VERSION}")
-}
+ let new_schema = Schema::try_merge(res)?;
-pub fn metrics_path() -> String {
- format!("{}/metrics", base_path())
+ Ok(new_schema)
}
-fn cross_origin_config() -> Cors {
- if cfg!(feature = "debug") {
- Cors::permissive().block_on_origin_mismatch(false)
- } else {
- Cors::default().block_on_origin_mismatch(false)
+pub async fn send_query_request_to_ingester(query: &Query) -> anyhow::Result> {
+ // send the query request to the ingester
+ let mut res = vec![];
+ let ima = get_ingester_info().await.unwrap();
+
+ for im in ima.iter() {
+ let uri = format!(
+ "{}{}/{}",
+ im.domain_name,
+ base_path_without_preceding_slash(),
+ "query"
+ );
+ let reqw = reqwest::Client::new()
+ .post(uri)
+ .json(query)
+ .header(http::header::AUTHORIZATION, im.token.clone())
+ .header(http::header::CONTENT_TYPE, "application/json")
+ .send()
+ .await;
+
+ if let Ok(reqw) = reqw {
+ // do i need to do a success check??
+ let v: Value = serde_json::from_slice(&reqw.bytes().await?)?;
+ // the value returned is an array of json objects
+ // so it needs to be flattened
+ if let Some(arr) = v.as_array() {
+ for val in arr {
+ res.push(val.to_owned())
+ }
+ }
+ }
}
+
+ Ok(res)
}
diff --git a/server/src/handlers/http/about.rs b/server/src/handlers/http/about.rs
index 3f42ccc4..347cd0d3 100644
--- a/server/src/handlers/http/about.rs
+++ b/server/src/handlers/http/about.rs
@@ -20,9 +20,34 @@ use actix_web::web::Json;
use human_size::SpecificSize;
use serde_json::json;
-use crate::{about, option::CONFIG, storage::StorageMetadata, utils::update};
+use crate::{
+ about,
+ option::{Mode, CONFIG},
+ storage::StorageMetadata,
+ utils::update,
+};
use std::path::PathBuf;
+/// {
+/// "version": current_version,
+/// "uiVersion": ui_version,
+/// "commit": commit,
+/// "deploymentId": deployment_id,
+/// "updateAvailable": update_available,
+/// "latestVersion": latest_release,
+/// "llmActive": is_llm_active,
+/// "llmProvider": llm_provider,
+/// "oidcActive": is_oidc_active,
+/// "license": "AGPL-3.0-only",
+/// "mode": mode,
+/// "staging": staging,
+/// "cache": cache_details,
+/// "grpcPort": grpc_port,
+/// "store": {
+/// "type": CONFIG.get_storage_mode_string(),
+/// "path": store_endpoint
+/// }
+/// }
pub async fn about() -> Json {
let meta = StorageMetadata::global();
@@ -40,11 +65,15 @@ pub async fn about() -> Json {
let current_version = format!("v{}", current_release.released_version);
let commit = current_release.commit_hash;
let deployment_id = meta.deployment_id.to_string();
- let mode = CONFIG.mode_string();
- let staging = CONFIG.staging_dir();
+ let mode = CONFIG.parseable.mode.to_str();
+ let staging = if CONFIG.parseable.mode == Mode::Query {
+ "".to_string()
+ } else {
+ CONFIG.staging_dir().display().to_string()
+ };
let grpc_port = CONFIG.parseable.grpc_port;
- let store = CONFIG.storage().get_endpoint();
+ let store_endpoint = CONFIG.storage().get_endpoint();
let is_llm_active = &CONFIG.parseable.open_ai_key.is_some();
let llm_provider = is_llm_active.then_some("OpenAI");
let is_oidc_active = CONFIG.parseable.openid.is_some();
@@ -80,6 +109,9 @@ pub async fn about() -> Json {
"staging": staging,
"cache": cache_details,
"grpcPort": grpc_port,
- "store": store
+ "store": {
+ "type": CONFIG.get_storage_mode_string(),
+ "path": store_endpoint
+ }
}))
}
diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs
new file mode 100644
index 00000000..a5a8c864
--- /dev/null
+++ b/server/src/handlers/http/cluster/mod.rs
@@ -0,0 +1,403 @@
+/*
+ * Parseable Server (C) 2022 - 2024 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+pub mod utils;
+
+use crate::handlers::http::cluster::utils::{
+ check_liveness, ingester_meta_filename, to_url_string,
+};
+use crate::handlers::http::ingest::PostError;
+use crate::handlers::http::logstream::error::StreamError;
+use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY};
+use crate::option::CONFIG;
+
+use crate::metrics::prom_utils::Metrics;
+use crate::storage::ObjectStorageError;
+use crate::storage::PARSEABLE_ROOT_DIRECTORY;
+use actix_web::http::header;
+use actix_web::{HttpRequest, Responder};
+use bytes::Bytes;
+use http::StatusCode;
+use itertools::Itertools;
+use relative_path::RelativePathBuf;
+use serde_json::Value as JsonValue;
+use url::Url;
+
+type IngesterMetadataArr = Vec;
+
+use super::base_path_without_preceding_slash;
+
+use super::modal::IngesterMetadata;
+
+// forward the request to all ingesters to keep them in sync
+pub async fn sync_streams_with_ingesters(
+ stream_name: &str,
+ time_partition: &str,
+ static_schema: &str,
+ schema: Bytes,
+) -> Result<(), StreamError> {
+ let ingester_infos = get_ingester_info().await.map_err(|err| {
+ log::error!("Fatal: failed to get ingester info: {:?}", err);
+ StreamError::Anyhow(err)
+ })?;
+
+ let mut errored = false;
+ for ingester in ingester_infos.iter() {
+ let url = format!(
+ "{}{}/logstream/{}",
+ ingester.domain_name,
+ base_path_without_preceding_slash(),
+ stream_name
+ );
+
+ match send_stream_sync_request(
+ &url,
+ ingester.clone(),
+ time_partition,
+ static_schema,
+ schema.clone(),
+ )
+ .await
+ {
+ Ok(_) => continue,
+ Err(_) => {
+ errored = true;
+ break;
+ }
+ }
+ }
+
+ if errored {
+ for ingester in ingester_infos {
+ let url = format!(
+ "{}{}/logstream/{}",
+ ingester.domain_name,
+ base_path_without_preceding_slash(),
+ stream_name
+ );
+
+ // roll back the stream creation
+ send_stream_rollback_request(&url, ingester.clone()).await?;
+ }
+
+ // this might be a bit too much
+ return Err(StreamError::Custom {
+ msg: "Failed to sync stream with ingesters".to_string(),
+ status: StatusCode::INTERNAL_SERVER_ERROR,
+ });
+ }
+
+ Ok(())
+}
+
+/// get the cumulative stats from all ingesters
+pub async fn fetch_stats_from_ingesters(
+ stream_name: &str,
+) -> Result, StreamError> {
+ let mut stats = Vec::new();
+
+ let ingester_infos = get_ingester_info().await.map_err(|err| {
+ log::error!("Fatal: failed to get ingester info: {:?}", err);
+ StreamError::Anyhow(err)
+ })?;
+
+ for ingester in ingester_infos {
+ let url = format!(
+ "{}{}/logstream/{}/stats",
+ ingester.domain_name,
+ base_path_without_preceding_slash(),
+ stream_name
+ );
+
+ match utils::send_stats_request(&url, ingester.clone()).await {
+ Ok(Some(res)) => {
+ match serde_json::from_str::(&res.text().await.unwrap()) {
+ Ok(stat) => stats.push(stat),
+ Err(err) => {
+ log::error!(
+ "Could not parse stats from ingester: {}\n Error: {:?}",
+ ingester.domain_name,
+ err
+ );
+ continue;
+ }
+ }
+ }
+ Ok(None) => {
+ log::error!("Ingester at {} is not reachable", &ingester.domain_name);
+ continue;
+ }
+ Err(err) => {
+ log::error!(
+ "Fatal: failed to fetch stats from ingester: {}\n Error: {:?}",
+ ingester.domain_name,
+ err
+ );
+ return Err(err);
+ }
+ }
+ }
+
+ Ok(stats)
+}
+
+async fn send_stream_sync_request(
+ url: &str,
+ ingester: IngesterMetadata,
+ time_partition: &str,
+ static_schema: &str,
+ schema: Bytes,
+) -> Result<(), StreamError> {
+ if !utils::check_liveness(&ingester.domain_name).await {
+ return Ok(());
+ }
+
+ let client = reqwest::Client::new();
+ let res = client
+ .put(url)
+ .header(header::CONTENT_TYPE, "application/json")
+ .header(TIME_PARTITION_KEY, time_partition)
+ .header(STATIC_SCHEMA_FLAG, static_schema)
+ .header(header::AUTHORIZATION, ingester.token)
+ .body(schema)
+ .send()
+ .await
+ .map_err(|err| {
+ log::error!(
+ "Fatal: failed to forward create stream request to ingester: {}\n Error: {:?}",
+ ingester.domain_name,
+ err
+ );
+ StreamError::Network(err)
+ })?;
+
+ if !res.status().is_success() {
+ log::error!(
+ "failed to forward create stream request to ingester: {}\nResponse Returned: {:?}",
+ ingester.domain_name,
+ res
+ );
+ return Err(StreamError::Network(res.error_for_status().unwrap_err()));
+ }
+
+ Ok(())
+}
+
+/// send a rollback request to all ingesters
+async fn send_stream_rollback_request(
+ url: &str,
+ ingester: IngesterMetadata,
+) -> Result<(), StreamError> {
+ if !utils::check_liveness(&ingester.domain_name).await {
+ return Ok(());
+ }
+
+ let client = reqwest::Client::new();
+ let resp = client
+ .delete(url)
+ .header(header::CONTENT_TYPE, "application/json")
+ .header(header::AUTHORIZATION, ingester.token)
+ .send()
+ .await
+ .map_err(|err| {
+ // log the error and return a custom error
+ log::error!(
+ "Fatal: failed to rollback stream creation: {}\n Error: {:?}",
+ ingester.domain_name,
+ err
+ );
+ StreamError::Network(err)
+ })?;
+
+ // if the response is not successful, log the error and return a custom error
+ // this could be a bit too much, but we need to be sure it covers all cases
+ if !resp.status().is_success() {
+ log::error!(
+ "failed to rollback stream creation: {}\nResponse Returned: {:?}",
+ ingester.domain_name,
+ resp
+ );
+ return Err(StreamError::Custom {
+ msg: format!(
+ "failed to rollback stream creation: {}\nResponse Returned: {:?}",
+ ingester.domain_name,
+ resp.text().await.unwrap_or_default()
+ ),
+ status: StatusCode::INTERNAL_SERVER_ERROR,
+ });
+ }
+
+ Ok(())
+}
+
+pub async fn get_cluster_info() -> Result {
+ let ingester_infos = get_ingester_info().await.map_err(|err| {
+ log::error!("Fatal: failed to get ingester info: {:?}", err);
+ StreamError::Anyhow(err)
+ })?;
+
+ let mut infos = vec![];
+
+ for ingester in ingester_infos {
+ let uri = Url::parse(&format!(
+ "{}{}/about",
+ ingester.domain_name,
+ base_path_without_preceding_slash()
+ ))
+ .expect("should always be a valid url");
+
+ let resp = reqwest::Client::new()
+ .get(uri)
+ .header(header::AUTHORIZATION, ingester.token.clone())
+ .header(header::CONTENT_TYPE, "application/json")
+ .send()
+ .await;
+
+ let (reachable, staging_path, error, status) = if let Ok(resp) = resp {
+ let status = Some(resp.status().to_string());
+
+ let resp_data = resp.bytes().await.map_err(|err| {
+ log::error!("Fatal: failed to parse ingester info to bytes: {:?}", err);
+ StreamError::Network(err)
+ })?;
+
+ let sp = serde_json::from_slice::(&resp_data)
+ .map_err(|err| {
+ log::error!("Fatal: failed to parse ingester info: {:?}", err);
+ StreamError::SerdeError(err)
+ })?
+ .get("staging")
+ .unwrap()
+ .as_str()
+ .unwrap()
+ .to_string();
+
+ (true, sp, None, status)
+ } else {
+ (
+ false,
+ "".to_owned(),
+ resp.as_ref().err().map(|e| e.to_string()),
+ resp.unwrap_err().status().map(|s| s.to_string()),
+ )
+ };
+
+ infos.push(utils::ClusterInfo::new(
+ &ingester.domain_name,
+ reachable,
+ staging_path,
+ CONFIG.storage().get_endpoint(),
+ error,
+ status,
+ ));
+ }
+
+ Ok(actix_web::HttpResponse::Ok().json(infos))
+}
+
+pub async fn get_cluster_metrics() -> Result {
+ let ingester_metadata = get_ingester_info().await.map_err(|err| {
+ log::error!("Fatal: failed to get ingester info: {:?}", err);
+ PostError::Invalid(err)
+ })?;
+
+ let mut dresses = vec![];
+
+ for ingester in ingester_metadata {
+ let uri = Url::parse(&format!(
+ "{}{}/metrics",
+ &ingester.domain_name,
+ base_path_without_preceding_slash()
+ ))
+ .unwrap();
+
+ let res = reqwest::Client::new()
+ .get(uri)
+ .header(header::CONTENT_TYPE, "application/json")
+ .send()
+ .await;
+
+ if let Ok(res) = res {
+ let text = res.text().await.map_err(PostError::NetworkError)?;
+ let lines: Vec> =
+ text.lines().map(|line| Ok(line.to_owned())).collect_vec();
+
+ let sample = prometheus_parse::Scrape::parse(lines.into_iter())
+ .map_err(|err| PostError::CustomError(err.to_string()))?
+ .samples;
+
+ dresses.push(Metrics::from_prometheus_samples(
+ sample,
+ ingester.domain_name,
+ ));
+ } else {
+ log::warn!(
+ "Failed to fetch metrics from ingester: {}\n",
+ ingester.domain_name,
+ );
+ }
+ }
+
+ Ok(actix_web::HttpResponse::Ok().json(dresses))
+}
+
+// update the .query.json file and return the new IngesterMetadataArr
+pub async fn get_ingester_info() -> anyhow::Result {
+ let store = CONFIG.storage().get_object_store();
+
+ let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
+ let arr = store
+ .get_objects(Some(&root_path))
+ .await?
+ .iter()
+ // this unwrap will most definateley shoot me in the foot later
+ .map(|x| serde_json::from_slice::(x).unwrap_or_default())
+ .collect_vec();
+
+ Ok(arr)
+}
+
+pub async fn remove_ingester(req: HttpRequest) -> Result {
+ let domain_name: String = req.match_info().get("ingester").unwrap().parse().unwrap();
+ let domain_name = to_url_string(domain_name);
+
+ if check_liveness(&domain_name).await {
+ return Err(PostError::Invalid(anyhow::anyhow!("Node Online")));
+ }
+
+ let ingester_meta_filename = ingester_meta_filename(&domain_name);
+ let object_store = CONFIG.storage().get_object_store();
+ let msg = match object_store
+ .try_delete_ingester_meta(ingester_meta_filename)
+ .await
+ {
+ Ok(_) => {
+ format!("Node {} Removed Successfully", domain_name)
+ }
+ Err(err) => {
+ if matches!(err, ObjectStorageError::IoError(_)) {
+ format!("Node {} Not Found", domain_name)
+ } else {
+ format!("Error Removing Node {}\n Reason: {}", domain_name, err)
+ }
+ }
+ };
+
+ log::info!("{}", &msg);
+ Ok((msg, StatusCode::OK))
+}
diff --git a/server/src/handlers/http/cluster/utils.rs b/server/src/handlers/http/cluster/utils.rs
new file mode 100644
index 00000000..1dea1c9e
--- /dev/null
+++ b/server/src/handlers/http/cluster/utils.rs
@@ -0,0 +1,265 @@
+/*
+ * Parseable Server (C) 2022 - 2024 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+use crate::handlers::http::{logstream::error::StreamError, modal::IngesterMetadata};
+use actix_web::http::header;
+use chrono::{DateTime, Utc};
+use http::StatusCode;
+use itertools::Itertools;
+use reqwest::Response;
+use serde::{Deserialize, Serialize};
+use url::Url;
+
+#[derive(Debug, Default, Serialize, Deserialize)]
+pub struct QueriedStats {
+ pub stream: String,
+ pub time: DateTime,
+ pub ingestion: IngestionStats,
+ pub storage: StorageStats,
+}
+
+impl QueriedStats {
+ pub fn new(
+ stream: &str,
+ time: DateTime,
+ ingestion: IngestionStats,
+ storage: StorageStats,
+ ) -> Self {
+ Self {
+ stream: stream.to_string(),
+ time,
+ ingestion,
+ storage,
+ }
+ }
+}
+
+#[derive(Debug, Default, Serialize, Deserialize)]
+pub struct ClusterInfo {
+ domain_name: String,
+ reachable: bool,
+ staging_path: String,
+ storage_path: String,
+ error: Option, // error message if the ingester is not reachable
+ status: Option, // status message if the ingester is reachable
+}
+
+impl ClusterInfo {
+ pub fn new(
+ domain_name: &str,
+ reachable: bool,
+ staging_path: String,
+ storage_path: String,
+ error: Option,
+ status: Option,
+ ) -> Self {
+ Self {
+ domain_name: domain_name.to_string(),
+ reachable,
+ staging_path,
+ storage_path,
+ error,
+ status,
+ }
+ }
+}
+
+#[derive(Debug, Default, Serialize, Deserialize)]
+pub struct IngestionStats {
+ pub count: u64,
+ pub size: String,
+ pub format: String,
+}
+
+impl IngestionStats {
+ pub fn new(count: u64, size: String, format: &str) -> Self {
+ Self {
+ count,
+ size,
+ format: format.to_string(),
+ }
+ }
+}
+
+#[derive(Debug, Default, Serialize, Deserialize)]
+pub struct StorageStats {
+ size: String,
+ format: String,
+}
+
+impl StorageStats {
+ pub fn new(size: String, format: &str) -> Self {
+ Self {
+ size,
+ format: format.to_string(),
+ }
+ }
+}
+
+pub fn merge_quried_stats(stats: Vec) -> QueriedStats {
+ // get the actual creation time
+ // let min_creation_time = stats
+ // .iter()
+ // .map(|x| x.creation_time.parse::>().unwrap())
+ // .min()
+ // .unwrap(); // should never be None
+
+ // get the stream name
+ let stream_name = stats[0].stream.clone();
+
+ // get the first event at
+ // let min_first_event_at = stats
+ // .iter()
+ // .map(|x| match x.first_event_at.as_ref() {
+ // we can directly unwrap here because
+ // we are sure that the first_event_at is a valid date
+ // Some(fea) => fea.parse::>().unwrap(),
+ // None => Utc::now(), // current time ie the max time
+ // })
+ // .min()
+ // .unwrap(); // should never be None
+
+ let min_time = stats.iter().map(|x| x.time).min().unwrap_or_else(Utc::now);
+
+ let cumulative_ingestion =
+ stats
+ .iter()
+ .map(|x| &x.ingestion)
+ .fold(IngestionStats::default(), |acc, x| IngestionStats {
+ count: acc.count + x.count,
+ size: format!(
+ "{} Bytes",
+ acc.size.split(' ').collect_vec()[0]
+ .parse::()
+ .unwrap_or_default()
+ + x.size.split(' ').collect_vec()[0]
+ .parse::()
+ .unwrap_or_default()
+ ),
+ format: x.format.clone(),
+ });
+
+ let cumulative_storage =
+ stats
+ .iter()
+ .map(|x| &x.storage)
+ .fold(StorageStats::default(), |acc, x| StorageStats {
+ size: format!(
+ "{} Bytes",
+ acc.size.split(' ').collect_vec()[0]
+ .parse::()
+ .unwrap_or_default()
+ + x.size.split(' ').collect_vec()[0]
+ .parse::()
+ .unwrap_or_default()
+ ),
+ format: x.format.clone(),
+ });
+
+ QueriedStats::new(
+ &stream_name,
+ min_time,
+ cumulative_ingestion,
+ cumulative_storage,
+ )
+}
+
+pub async fn check_liveness(domain_name: &str) -> bool {
+ let uri = match Url::parse(&format!("{}liveness", domain_name)) {
+ Ok(uri) => uri,
+ Err(err) => {
+ log::error!("Node Indentifier Failed To Parse: {}", err);
+ return false;
+ }
+ };
+
+ let reqw = reqwest::Client::new()
+ .get(uri)
+ .header(header::CONTENT_TYPE, "application/json")
+ .send()
+ .await;
+
+ reqw.is_ok()
+}
+
+/// send a request to the ingester to fetch its stats
+pub async fn send_stats_request(
+ url: &str,
+ ingester: IngesterMetadata,
+) -> Result