From ec1cb2a6942ba6288ca0e0df4dd09117e7be2041 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti Date: Mon, 2 Sep 2024 10:40:48 +0200 Subject: [PATCH] feat(spooler): Add initialization of stacks (#3967) --- CHANGELOG.md | 1 + relay-server/benches/benches.rs | 8 +- relay-server/src/service.rs | 2 +- relay-server/src/services/buffer/common.rs | 33 +++ .../services/buffer/envelope_buffer/mod.rs | 280 +++++++++++------- .../services/buffer/envelope_stack/memory.rs | 4 +- .../services/buffer/envelope_stack/sqlite.rs | 50 +--- .../services/buffer/envelope_store/sqlite.rs | 62 +--- relay-server/src/services/buffer/mod.rs | 86 ++++-- .../services/buffer/stack_provider/memory.rs | 12 +- .../src/services/buffer/stack_provider/mod.rs | 32 +- .../services/buffer/stack_provider/sqlite.rs | 26 +- relay-server/src/services/buffer/testutils.rs | 51 ++++ relay-server/src/statsd.rs | 3 + 14 files changed, 396 insertions(+), 254 deletions(-) create mode 100644 relay-server/src/services/buffer/common.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index ff8d9cf687..074b7e1c92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ - Forward profiles of non-sampled transactions (with no options filtering). ([#3963](https://github.com/getsentry/relay/pull/3963)) - Make EnvelopeBuffer a Service. ([#3965](https://github.com/getsentry/relay/pull/3965)) - No longer send COGS data to dedicated Kafka topic. ([#3953](https://github.com/getsentry/relay/pull/3953)) +- Allow creation of `SqliteEnvelopeBuffer` from config, and load existing stacks from db on startup. ([#3967](https://github.com/getsentry/relay/pull/3967)) ## 24.8.0 diff --git a/relay-server/benches/benches.rs b/relay-server/benches/benches.rs index b79dafcb48..a028eca60f 100644 --- a/relay-server/benches/benches.rs +++ b/relay-server/benches/benches.rs @@ -259,7 +259,9 @@ fn benchmark_envelope_buffer(c: &mut Criterion) { |envelopes| { runtime.block_on(async { let mut buffer = - PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone()); + PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone()) + .await + .unwrap(); for envelope in envelopes.into_iter() { buffer.push(envelope).await.unwrap(); } @@ -289,7 +291,9 @@ fn benchmark_envelope_buffer(c: &mut Criterion) { |envelopes| { runtime.block_on(async { let mut buffer = - PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone()); + PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone()) + .await + .unwrap(); let n = envelopes.len(); for envelope in envelopes.into_iter() { let public_key = envelope.meta().public_key(); diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 9a1dd32e42..a214b23837 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -241,7 +241,7 @@ impl ServiceState { .spawn_handler(processor_rx); let envelope_buffer = EnvelopeBufferService::new( - &config, + config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), project_cache.clone(), ) diff --git a/relay-server/src/services/buffer/common.rs b/relay-server/src/services/buffer/common.rs new file mode 100644 index 0000000000..924f8aa1c8 --- /dev/null +++ b/relay-server/src/services/buffer/common.rs @@ -0,0 +1,33 @@ +use relay_base_schema::project::ProjectKey; + +use crate::Envelope; + +/// Struct that represents two project keys. +#[derive(Debug, Clone, Copy, Eq, Hash, Ord, PartialOrd, PartialEq)] +pub struct ProjectKeyPair { + pub own_key: ProjectKey, + pub sampling_key: ProjectKey, +} + +impl ProjectKeyPair { + pub fn new(own_key: ProjectKey, sampling_key: ProjectKey) -> Self { + Self { + own_key, + sampling_key, + } + } + + pub fn from_envelope(envelope: &Envelope) -> Self { + let own_key = envelope.meta().public_key(); + let sampling_key = envelope.sampling_key().unwrap_or(own_key); + Self::new(own_key, sampling_key) + } + + pub fn iter(&self) -> impl Iterator { + let Self { + own_key, + sampling_key, + } = self; + std::iter::once(*own_key).chain((own_key != sampling_key).then_some(*sampling_key)) + } +} diff --git a/relay-server/src/services/buffer/envelope_buffer/mod.rs b/relay-server/src/services/buffer/envelope_buffer/mod.rs index da45769b9d..fda5366d09 100644 --- a/relay-server/src/services/buffer/envelope_buffer/mod.rs +++ b/relay-server/src/services/buffer/envelope_buffer/mod.rs @@ -3,17 +3,19 @@ use std::collections::BTreeSet; use std::convert::Infallible; use std::time::Instant; +use hashbrown::HashSet; use relay_base_schema::project::ProjectKey; use relay_config::Config; use crate::envelope::Envelope; +use crate::services::buffer::common::ProjectKeyPair; use crate::services::buffer::envelope_stack::sqlite::SqliteEnvelopeStackError; use crate::services::buffer::envelope_stack::EnvelopeStack; use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError; use crate::services::buffer::stack_provider::memory::MemoryStackProvider; use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider; use crate::services::buffer::stack_provider::StackProvider; -use crate::statsd::{RelayCounters, RelayGauges}; +use crate::statsd::{RelayCounters, RelayGauges, RelayTimers}; use crate::utils::MemoryChecker; /// Polymorphic envelope buffering interface. @@ -36,12 +38,27 @@ pub enum PolymorphicEnvelopeBuffer { impl PolymorphicEnvelopeBuffer { /// Creates either a memory-based or a disk-based envelope buffer, /// depending on the given configuration. - pub fn from_config(config: &Config, memory_checker: MemoryChecker) -> Self { - if config.spool_envelopes_path().is_some() { - panic!("Disk backend not yet supported for spool V2"); - } + pub async fn from_config( + config: &Config, + memory_checker: MemoryChecker, + ) -> Result { + let buffer = if config.spool_envelopes_path().is_some() { + let buffer = EnvelopeBuffer::::new(config).await?; + Self::Sqlite(buffer) + } else { + let buffer = EnvelopeBuffer::::new(memory_checker); + Self::InMemory(buffer) + }; + + Ok(buffer) + } - Self::InMemory(EnvelopeBuffer::::new(memory_checker)) + /// Initializes the envelope buffer. + pub async fn initialize(&mut self) { + match self { + PolymorphicEnvelopeBuffer::InMemory(buffer) => buffer.initialize().await, + PolymorphicEnvelopeBuffer::Sqlite(buffer) => buffer.initialize().await, + } } /// Adds an envelope to the buffer. @@ -74,7 +91,7 @@ impl PolymorphicEnvelopeBuffer { /// Marks a project as ready or not ready. /// - /// The buffer reprioritizes its envelopes based on this information. + /// The buffer re-prioritizes its envelopes based on this information. /// Returns `true` if at least one priority was changed. pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { match self { @@ -88,10 +105,10 @@ impl PolymorphicEnvelopeBuffer { /// Non-ready stacks are deprioritized when they are marked as seen, such that /// the next call to `.peek()` will look at a different stack. This prevents /// head-of-line blocking. - pub fn mark_seen(&mut self, stack_key: &StackKey) { + pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair) { match self { - Self::Sqlite(buffer) => buffer.mark_seen(stack_key), - Self::InMemory(buffer) => buffer.mark_seen(stack_key), + Self::Sqlite(buffer) => buffer.mark_seen(project_key_pair), + Self::InMemory(buffer) => buffer.mark_seen(project_key_pair), } } @@ -108,7 +125,10 @@ impl PolymorphicEnvelopeBuffer { #[derive(Debug, thiserror::Error)] pub enum EnvelopeBufferError { #[error("sqlite")] - Sqlite(#[from] SqliteEnvelopeStackError), + SqliteStore(#[from] SqliteEnvelopeStoreError), + + #[error("sqlite")] + SqliteStack(#[from] SqliteEnvelopeStackError), #[error("failed to push envelope to the buffer")] PushFailed, @@ -119,14 +139,14 @@ pub enum EnvelopeBufferError { /// An envelope buffer that holds an individual stack for each project/sampling project combination. /// -/// Envelope stacks are organized in a priority queue, and are reprioritized every time an envelope +/// Envelope stacks are organized in a priority queue, and are re-prioritized every time an envelope /// is pushed, popped, or when a project becomes ready. #[derive(Debug)] struct EnvelopeBuffer { /// The central priority queue. - priority_queue: priority_queue::PriorityQueue, Priority>, + priority_queue: priority_queue::PriorityQueue, Priority>, /// A lookup table to find all stacks involving a project. - stacks_by_project: hashbrown::HashMap>, + stacks_by_project: hashbrown::HashMap>, /// A provider of stacks that provides utilities to create stacks, check their capacity... /// /// This indirection is needed because different stack implementations might need different @@ -148,7 +168,7 @@ impl EnvelopeBuffer { #[allow(dead_code)] impl EnvelopeBuffer { /// Creates an empty sqlite-based buffer. - pub async fn new(config: &Config) -> Result { + pub async fn new(config: &Config) -> Result { Ok(Self { stacks_by_project: Default::default(), priority_queue: Default::default(), @@ -161,28 +181,40 @@ impl EnvelopeBuffer

where EnvelopeBufferError: From<::Error>, { - /// Pushes an envelope to the appropriate envelope stack and reprioritizes the stack. + /// Initializes the [`EnvelopeBuffer`] given the initialization state from the + /// [`StackProvider`]. + pub async fn initialize(&mut self) { + relay_statsd::metric!(timer(RelayTimers::SpoolInitialization), { + let initialization_state = self.stack_provider.initialize().await; + self.load_stacks(initialization_state.project_key_pairs) + .await; + }); + } + + /// Pushes an envelope to the appropriate envelope stack and re-prioritizes the stack. /// /// If the envelope stack does not exist, a new stack is pushed to the priority queue. /// The priority of the stack is updated with the envelope's received_at time. pub async fn push(&mut self, envelope: Box) -> Result<(), EnvelopeBufferError> { let received_at = envelope.meta().start_time(); - let stack_key = StackKey::from_envelope(&envelope); + let project_key_pair = ProjectKeyPair::from_envelope(&envelope); if let Some(( QueueItem { key: _, value: stack, }, _, - )) = self.priority_queue.get_mut(&stack_key) + )) = self.priority_queue.get_mut(&project_key_pair) { stack.push(envelope).await?; } else { - self.push_stack(envelope); + self.push_stack(ProjectKeyPair::from_envelope(&envelope), Some(envelope)) + .await?; } - self.priority_queue.change_priority_by(&stack_key, |prio| { - prio.received_at = received_at; - }); + self.priority_queue + .change_priority_by(&project_key_pair, |prio| { + prio.received_at = received_at; + }); Ok(()) } @@ -217,7 +249,7 @@ where let Some((QueueItem { key, value: stack }, _)) = self.priority_queue.peek_mut() else { return Ok(None); }; - let stack_key = *key; + let project_key_pair = *key; let envelope = stack.pop().await.unwrap().expect("found an empty stack"); let next_received_at = stack @@ -226,43 +258,48 @@ where .map(|next_envelope| next_envelope.meta().start_time()); match next_received_at { None => { - self.pop_stack(stack_key); + self.pop_stack(project_key_pair); } Some(next_received_at) => { - self.priority_queue.change_priority_by(&stack_key, |prio| { - prio.received_at = next_received_at; - }); + self.priority_queue + .change_priority_by(&project_key_pair, |prio| { + prio.received_at = next_received_at; + }); } } Ok(Some(envelope)) } - /// Reprioritizes all stacks that involve the given project key by setting it to "ready". + /// Re-prioritizes all stacks that involve the given project key by setting it to "ready". /// /// Returns `true` if at least one priority was changed. pub fn mark_ready(&mut self, project: &ProjectKey, is_ready: bool) -> bool { let mut changed = false; - if let Some(stack_keys) = self.stacks_by_project.get(project) { - for stack_key in stack_keys { - self.priority_queue.change_priority_by(stack_key, |stack| { - let mut found = false; - for (subkey, readiness) in [ - (stack_key.own_key, &mut stack.readiness.own_project_ready), - ( - stack_key.sampling_key, - &mut stack.readiness.sampling_project_ready, - ), - ] { - if subkey == *project { - found = true; - if *readiness != is_ready { - changed = true; - *readiness = is_ready; + if let Some(project_key_pair) = self.stacks_by_project.get(project) { + for project_key_pair in project_key_pair { + self.priority_queue + .change_priority_by(project_key_pair, |stack| { + let mut found = false; + for (subkey, readiness) in [ + ( + project_key_pair.own_key, + &mut stack.readiness.own_project_ready, + ), + ( + project_key_pair.sampling_key, + &mut stack.readiness.sampling_project_ready, + ), + ] { + if subkey == *project { + found = true; + if *readiness != is_ready { + changed = true; + *readiness = is_ready; + } } } - } - debug_assert!(found); - }); + debug_assert!(found); + }); } } changed @@ -273,32 +310,47 @@ where /// Non-ready stacks are deprioritized when they are marked as seen, such that /// the next call to `.peek()` will look at a different stack. This prevents /// head-of-line blocking. - pub fn mark_seen(&mut self, stack_key: &StackKey) { - self.priority_queue.change_priority_by(stack_key, |stack| { - stack.last_peek = Instant::now(); - }); + pub fn mark_seen(&mut self, project_key_pair: &ProjectKeyPair) { + self.priority_queue + .change_priority_by(project_key_pair, |stack| { + stack.last_peek = Instant::now(); + }); } - fn push_stack(&mut self, envelope: Box) { - let received_at = envelope.meta().start_time(); - let stack_key = StackKey::from_envelope(&envelope); + /// Pushes a new [`EnvelopeStack`] with the given [`Envelope`] inserted. + async fn push_stack( + &mut self, + project_key_pair: ProjectKeyPair, + envelope: Option>, + ) -> Result<(), EnvelopeBufferError> { + let received_at = envelope + .as_ref() + .map_or(Instant::now(), |e| e.meta().start_time()); + + let mut stack = self.stack_provider.create_stack(project_key_pair); + if let Some(envelope) = envelope { + stack.push(envelope).await?; + } + let previous_entry = self.priority_queue.push( QueueItem { - key: stack_key, - value: self.stack_provider.create_stack(envelope), + key: project_key_pair, + value: stack, }, Priority::new(received_at), ); debug_assert!(previous_entry.is_none()); - for project_key in stack_key.iter() { + for project_key in project_key_pair.iter() { self.stacks_by_project .entry(project_key) .or_default() - .insert(stack_key); + .insert(project_key_pair); } relay_statsd::metric!( gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64 ); + + Ok(()) } /// Returns `true` if the underlying storage has the capacity to store more envelopes. @@ -306,55 +358,36 @@ where self.stack_provider.has_store_capacity() } - fn pop_stack(&mut self, stack_key: StackKey) { - for project_key in stack_key.iter() { + /// Pops an [`EnvelopeStack`] with the supplied [`EnvelopeBufferError`]. + fn pop_stack(&mut self, project_key_pair: ProjectKeyPair) { + for project_key in project_key_pair.iter() { self.stacks_by_project .get_mut(&project_key) .expect("project_key is missing from lookup") - .remove(&stack_key); + .remove(&project_key_pair); } - self.priority_queue.remove(&stack_key); + self.priority_queue.remove(&project_key_pair); relay_statsd::metric!( gauge(RelayGauges::BufferStackCount) = self.priority_queue.len() as u64 ); } + + /// Creates all the [`EnvelopeStack`]s with no data given a set of [`ProjectKeyPair`]. + async fn load_stacks(&mut self, project_key_pairs: HashSet) { + for project_key_pair in project_key_pairs { + self.push_stack(project_key_pair, None) + .await + .expect("Pushing an empty stack raised an error"); + } + } } /// Contains a reference to the first element in the buffer, together with its stack's ready state. pub enum Peek<'a> { Empty, Ready(&'a Envelope), - NotReady(StackKey, &'a Envelope), -} - -#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct StackKey { - own_key: ProjectKey, - sampling_key: ProjectKey, -} - -impl StackKey { - pub fn from_envelope(envelope: &Envelope) -> Self { - let own_key = envelope.meta().public_key(); - let sampling_key = envelope.sampling_key().unwrap_or(own_key); - Self::new(own_key, sampling_key) - } - - pub fn iter(&self) -> impl Iterator { - let Self { - own_key, - sampling_key, - } = self; - std::iter::once(*own_key).chain((own_key != sampling_key).then_some(*sampling_key)) - } - - fn new(own_key: ProjectKey, sampling_key: ProjectKey) -> Self { - Self { - own_key, - sampling_key, - } - } + NotReady(ProjectKeyPair, &'a Envelope), } #[derive(Debug)] @@ -458,17 +491,19 @@ impl Readiness { #[cfg(test)] mod tests { - use std::str::FromStr; - use std::sync::Arc; - use relay_common::Dsn; use relay_event_schema::protocol::EventId; use relay_sampling::DynamicSamplingContext; + use std::str::FromStr; + use std::sync::Arc; use uuid::Uuid; use crate::envelope::{Item, ItemType}; use crate::extractors::RequestMeta; + use crate::services::buffer::common::ProjectKeyPair; + use crate::services::buffer::testutils::utils::mock_envelopes; use crate::utils::MemoryStat; + use crate::SqliteEnvelopeStore; use super::*; @@ -515,18 +550,20 @@ mod tests { envelope } - fn mock_memory_checker() -> MemoryChecker { - let config: Arc<_> = Config::from_json_value(serde_json::json!({ + fn mock_config(path: &str) -> Arc { + Config::from_json_value(serde_json::json!({ "spool": { - "health": { - "max_memory_percent": 1.0 + "envelopes": { + "path": path } } })) .unwrap() - .into(); + .into() + } - MemoryChecker::new(MemoryStat::default(), config.clone()) + fn mock_memory_checker() -> MemoryChecker { + MemoryChecker::new(MemoryStat::default(), mock_config("my/db/path").clone()) } #[tokio::test] @@ -799,10 +836,10 @@ mod tests { let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap(); - let stack_key1 = StackKey::new(project_key1, project_key2); - let stack_key2 = StackKey::new(project_key2, project_key1); + let project_key_pair1 = ProjectKeyPair::new(project_key1, project_key2); + let project_key_pair2 = ProjectKeyPair::new(project_key2, project_key1); - assert_ne!(stack_key1, stack_key2); + assert_ne!(project_key_pair1, project_key_pair2); let mut buffer = EnvelopeBuffer::::new(mock_memory_checker()); buffer @@ -864,4 +901,39 @@ mod tests { }; assert_eq!(envelope.event_id(), Some(event_id_1)); } + + #[tokio::test] + async fn test_initialize_buffer() { + let path = std::env::temp_dir() + .join(Uuid::new_v4().to_string()) + .into_os_string() + .into_string() + .unwrap(); + let config = mock_config(&path); + let mut store = SqliteEnvelopeStore::prepare(&config).await.unwrap(); + let mut buffer = EnvelopeBuffer::::new(&config) + .await + .unwrap(); + + // We write 5 envelopes to disk so that we can check if they are loaded. These envelopes + // belong to the same project keys, so they belong to the same envelope stack. + let envelopes = mock_envelopes(10); + assert!(store + .insert_many(envelopes.iter().map(|e| e.as_ref().try_into().unwrap())) + .await + .is_ok()); + + // We assume that the buffer is empty. + assert!(buffer.priority_queue.is_empty()); + assert!(buffer.stacks_by_project.is_empty()); + + buffer.initialize().await; + + // We assume that we loaded only 1 envelope stack, because of the project keys combinations + // of the envelopes we inserted above. + assert_eq!(buffer.priority_queue.len(), 1); + // We expect to have an entry per project key, since we have 1 pair, the total entries + // should be 2. + assert_eq!(buffer.stacks_by_project.len(), 2); + } } diff --git a/relay-server/src/services/buffer/envelope_stack/memory.rs b/relay-server/src/services/buffer/envelope_stack/memory.rs index 5e8087010f..d9723e601a 100644 --- a/relay-server/src/services/buffer/envelope_stack/memory.rs +++ b/relay-server/src/services/buffer/envelope_stack/memory.rs @@ -8,8 +8,8 @@ use super::EnvelopeStack; pub struct MemoryEnvelopeStack(#[allow(clippy::vec_box)] Vec>); impl MemoryEnvelopeStack { - pub fn new(envelope: Box) -> Self { - Self(vec![envelope]) + pub fn new() -> Self { + Self(vec![]) } } diff --git a/relay-server/src/services/buffer/envelope_stack/sqlite.rs b/relay-server/src/services/buffer/envelope_stack/sqlite.rs index 2117ae3d53..3dd1aea820 100644 --- a/relay-server/src/services/buffer/envelope_stack/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_stack/sqlite.rs @@ -230,60 +230,12 @@ impl EnvelopeStack for SqliteEnvelopeStack { #[cfg(test)] mod tests { - use std::collections::BTreeMap; use std::time::{Duration, Instant}; - use uuid::Uuid; - use relay_base_schema::project::ProjectKey; - use relay_event_schema::protocol::EventId; - use relay_sampling::DynamicSamplingContext; use super::*; - use crate::envelope::{Envelope, Item, ItemType}; - use crate::extractors::RequestMeta; - use crate::services::buffer::testutils::utils::setup_db; - - fn request_meta() -> RequestMeta { - let dsn = "https://a94ae32be2584e0bbd7a4cbb95971fee:@sentry.io/42" - .parse() - .unwrap(); - - RequestMeta::new(dsn) - } - - fn mock_envelope(instant: Instant) -> Box { - let event_id = EventId::new(); - let mut envelope = Envelope::from_request(Some(event_id), request_meta()); - - let dsc = DynamicSamplingContext { - trace_id: Uuid::new_v4(), - public_key: ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), - release: Some("1.1.1".to_string()), - user: Default::default(), - replay_id: None, - environment: None, - transaction: Some("transaction1".into()), - sample_rate: None, - sampled: Some(true), - other: BTreeMap::new(), - }; - - envelope.set_dsc(dsc); - envelope.set_start_time(instant); - - envelope.add_item(Item::new(ItemType::Transaction)); - - envelope - } - - #[allow(clippy::vec_box)] - fn mock_envelopes(count: usize) -> Vec> { - let instant = Instant::now(); - (0..count) - .map(|i| mock_envelope(instant - Duration::from_secs((count - i) as u64))) - .collect() - } + use crate::services::buffer::testutils::utils::{mock_envelope, mock_envelopes, setup_db}; #[tokio::test] #[should_panic] diff --git a/relay-server/src/services/buffer/envelope_store/sqlite.rs b/relay-server/src/services/buffer/envelope_store/sqlite.rs index 3969d54075..8987e98c49 100644 --- a/relay-server/src/services/buffer/envelope_store/sqlite.rs +++ b/relay-server/src/services/buffer/envelope_store/sqlite.rs @@ -7,6 +7,7 @@ use std::time::Duration; use crate::envelope::EnvelopeError; use crate::extractors::StartTime; +use crate::services::buffer::common::ProjectKeyPair; use crate::statsd::RelayGauges; use crate::Envelope; use futures::stream::StreamExt; @@ -381,7 +382,7 @@ impl SqliteEnvelopeStore { /// `own_key` and `project_key` that are found in the database. pub async fn project_key_pairs( &self, - ) -> Result, SqliteEnvelopeStoreError> { + ) -> Result, SqliteEnvelopeStoreError> { let project_key_pairs = build_get_project_key_pairs() .fetch_all(&self.db) .await @@ -422,9 +423,7 @@ fn extract_envelope(row: SqliteRow) -> Result, SqliteEnvelopeStore } /// Deserializes a pair of [`ProjectKey`] from the database. -fn extract_project_key_pair( - row: SqliteRow, -) -> Result<(ProjectKey, ProjectKey), SqliteEnvelopeStoreError> { +fn extract_project_key_pair(row: SqliteRow) -> Result { let own_key = row .try_get("own_key") .map_err(SqliteEnvelopeStoreError::FetchError) @@ -439,7 +438,7 @@ fn extract_project_key_pair( }); match (own_key, sampling_key) { - (Ok(own_key), Ok(sampling_key)) => Ok((own_key, sampling_key)), + (Ok(own_key), Ok(sampling_key)) => Ok(ProjectKeyPair::new(own_key, sampling_key)), // Report the first found error. (Err(err), _) | (_, Err(err)) => { relay_log::error!("failed to extract a queue key from the spool record: {err}"); @@ -501,62 +500,15 @@ pub fn build_get_project_key_pairs<'a>() -> Query<'a, Sqlite, SqliteArguments<'a #[cfg(test)] mod tests { - use hashbrown::HashSet; - use std::collections::BTreeMap; - use std::time::{Duration, Instant}; + use std::time::Duration; use tokio::time::sleep; - use uuid::Uuid; use relay_base_schema::project::ProjectKey; use relay_event_schema::protocol::EventId; - use relay_sampling::DynamicSamplingContext; use super::*; - use crate::envelope::{Envelope, Item, ItemType}; - use crate::extractors::RequestMeta; - use crate::services::buffer::testutils::utils::setup_db; - - fn request_meta() -> RequestMeta { - let dsn = "https://a94ae32be2584e0bbd7a4cbb95971fee:@sentry.io/42" - .parse() - .unwrap(); - - RequestMeta::new(dsn) - } - - fn mock_envelope(instant: Instant) -> Box { - let event_id = EventId::new(); - let mut envelope = Envelope::from_request(Some(event_id), request_meta()); - - let dsc = DynamicSamplingContext { - trace_id: Uuid::new_v4(), - public_key: ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), - release: Some("1.1.1".to_string()), - user: Default::default(), - replay_id: None, - environment: None, - transaction: Some("transaction1".into()), - sample_rate: None, - sampled: Some(true), - other: BTreeMap::new(), - }; - - envelope.set_dsc(dsc); - envelope.set_start_time(instant); - - envelope.add_item(Item::new(ItemType::Transaction)); - - envelope - } - - #[allow(clippy::vec_box)] - fn mock_envelopes(count: usize) -> Vec> { - let instant = Instant::now(); - (0..count) - .map(|i| mock_envelope(instant - Duration::from_secs((count - i) as u64))) - .collect() - } + use crate::services::buffer::testutils::utils::{mock_envelopes, setup_db}; #[tokio::test] async fn test_insert_and_delete_envelopes() { @@ -607,7 +559,7 @@ mod tests { assert_eq!(project_key_pairs.len(), 1); assert_eq!( project_key_pairs.into_iter().last().unwrap(), - (own_key, sampling_key) + ProjectKeyPair::new(own_key, sampling_key) ); } diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 02dcb68cfc..b2437a9d91 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -18,11 +18,16 @@ use crate::statsd::RelayCounters; use crate::utils::MemoryChecker; pub use envelope_buffer::EnvelopeBufferError; +// pub for benchmarks pub use envelope_buffer::PolymorphicEnvelopeBuffer; -pub use envelope_stack::sqlite::SqliteEnvelopeStack; // pub for benchmarks -pub use envelope_stack::EnvelopeStack; // pub for benchmarks -pub use envelope_store::sqlite::SqliteEnvelopeStore; // pub for benchmarks - +// pub for benchmarks +pub use envelope_stack::sqlite::SqliteEnvelopeStack; +// pub for benchmarks +pub use envelope_stack::EnvelopeStack; +// pub for benchmarks +pub use envelope_store::sqlite::SqliteEnvelopeStore; + +mod common; mod envelope_buffer; mod envelope_stack; mod envelope_store; @@ -32,7 +37,7 @@ mod testutils; /// Message interface for [`EnvelopeBufferService`]. #[derive(Debug)] pub enum EnvelopeBuffer { - /// An fresh envelope that gets pushed into the buffer by the request handler. + /// A fresh envelope that gets pushed into the buffer by the request handler. Push(Box), /// Informs the service that a project has no valid project state and must be marked as not ready. /// @@ -80,7 +85,8 @@ impl ObservableEnvelopeBuffer { /// Spool V2 service which buffers envelopes and forwards them to the project cache when a project /// becomes ready. pub struct EnvelopeBufferService { - buffer: PolymorphicEnvelopeBuffer, + config: Arc, + memory_checker: MemoryChecker, project_cache: Addr, has_capacity: Arc, sleep: Duration, @@ -94,12 +100,13 @@ impl EnvelopeBufferService { /// NOTE: until the V1 spooler implementation is removed, this function returns `None` /// if V2 spooling is not configured. pub fn new( - config: &Config, + config: Arc, memory_checker: MemoryChecker, project_cache: Addr, ) -> Option { config.spool_v2().then(|| Self { - buffer: PolymorphicEnvelopeBuffer::from_config(config, memory_checker), + config, + memory_checker, project_cache, has_capacity: Arc::new(AtomicBool::new(true)), sleep: Duration::ZERO, @@ -116,19 +123,19 @@ impl EnvelopeBufferService { } /// Tries to pop an envelope for a ready project. - /// - /// Returns the amount of time we should wait until next pop - async fn try_pop(&mut self) -> Result<(), EnvelopeBufferError> { + async fn try_pop( + &mut self, + buffer: &mut PolymorphicEnvelopeBuffer, + ) -> Result<(), EnvelopeBufferError> { relay_log::trace!("EnvelopeBufferService peek"); - match self.buffer.peek().await? { + match buffer.peek().await? { Peek::Empty => { relay_log::trace!("EnvelopeBufferService empty"); self.sleep = Duration::MAX; // wait for reset by `handle_message`. } Peek::Ready(_) => { relay_log::trace!("EnvelopeBufferService pop"); - let envelope = self - .buffer + let envelope = buffer .pop() .await? .expect("Element disappeared despite exclusive excess"); @@ -147,14 +154,18 @@ impl EnvelopeBufferService { } } // deprioritize the stack to prevent head-of-line blocking - self.buffer.mark_seen(&stack_key); + buffer.mark_seen(&stack_key); self.sleep = DEFAULT_SLEEP; } } Ok(()) } - async fn handle_message(&mut self, message: EnvelopeBuffer) { + async fn handle_message( + &mut self, + buffer: &mut PolymorphicEnvelopeBuffer, + message: EnvelopeBuffer, + ) { match message { EnvelopeBuffer::Push(envelope) => { // NOTE: This function assumes that a project state update for the relevant @@ -162,24 +173,24 @@ impl EnvelopeBufferService { // For better separation of concerns, this prefetch should be triggered from here // once buffer V1 has been removed. relay_log::trace!("EnvelopeBufferService push"); - self.push(envelope).await; + self.push(buffer, envelope).await; } EnvelopeBuffer::NotReady(project_key, envelope) => { relay_log::trace!("EnvelopeBufferService project not ready"); - self.buffer.mark_ready(&project_key, false); + buffer.mark_ready(&project_key, false); relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesReturned) += 1); - self.push(envelope).await; + self.push(buffer, envelope).await; } EnvelopeBuffer::Ready(project_key) => { relay_log::trace!("EnvelopeBufferService project ready {}", &project_key); - self.buffer.mark_ready(&project_key, true); + buffer.mark_ready(&project_key, true); } }; self.sleep = Duration::ZERO; } - async fn push(&mut self, envelope: Box) { - if let Err(e) = self.buffer.push(envelope).await { + async fn push(&mut self, buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box) { + if let Err(e) = buffer.push(envelope).await { relay_log::error!( error = &e as &dyn std::error::Error, "failed to push envelope" @@ -187,9 +198,9 @@ impl EnvelopeBufferService { } } - fn update_observable_state(&self) { + fn update_observable_state(&self, buffer: &mut PolymorphicEnvelopeBuffer) { self.has_capacity - .store(self.buffer.has_capacity(), Ordering::Relaxed); + .store(buffer.has_capacity(), Ordering::Relaxed); } } @@ -197,17 +208,34 @@ impl Service for EnvelopeBufferService { type Interface = EnvelopeBuffer; fn spawn_handler(mut self, mut rx: Receiver) { + let config = self.config.clone(); + let memory_checker = self.memory_checker.clone(); tokio::spawn(async move { + let buffer = PolymorphicEnvelopeBuffer::from_config(&config, memory_checker).await; + + let mut buffer = match buffer { + Ok(buffer) => buffer, + Err(error) => { + relay_log::error!( + error = &error as &dyn std::error::Error, + "failed to start the envelope buffer service", + ); + std::process::exit(1); + } + }; + buffer.initialize().await; + relay_log::info!("EnvelopeBufferService start"); loop { relay_log::trace!("EnvelopeBufferService loop"); + tokio::select! { // NOTE: we do not select a bias here. // On the one hand, we might want to prioritize dequeing over enqueing // so we do not exceed the buffer capacity by starving the dequeue. // on the other hand, prioritizing old messages violates the LIFO design. () = tokio::time::sleep(self.sleep) => { - if let Err(e) = self.try_pop().await { + if let Err(e) = self.try_pop(&mut buffer).await { relay_log::error!( error = &e as &dyn std::error::Error, "failed to pop envelope" @@ -215,13 +243,15 @@ impl Service for EnvelopeBufferService { } } Some(message) = rx.recv() => { - self.handle_message(message).await; + self.handle_message(&mut buffer, message).await; } else => break, } - self.update_observable_state(); + + self.update_observable_state(&mut buffer); } + relay_log::info!("EnvelopeBufferService stop"); }); } @@ -249,7 +279,7 @@ mod tests { .unwrap(), ); let memory_checker = MemoryChecker::new(MemoryStat::default(), config.clone()); - let service = EnvelopeBufferService::new(&config, memory_checker, Addr::dummy()).unwrap(); + let service = EnvelopeBufferService::new(config, memory_checker, Addr::dummy()).unwrap(); // Set capacity to false: service.has_capacity.store(false, Ordering::Relaxed); diff --git a/relay-server/src/services/buffer/stack_provider/memory.rs b/relay-server/src/services/buffer/stack_provider/memory.rs index 66f367f38a..bd47d340dd 100644 --- a/relay-server/src/services/buffer/stack_provider/memory.rs +++ b/relay-server/src/services/buffer/stack_provider/memory.rs @@ -1,7 +1,7 @@ +use crate::services::buffer::common::ProjectKeyPair; use crate::services::buffer::envelope_stack::memory::MemoryEnvelopeStack; -use crate::services::buffer::stack_provider::StackProvider; +use crate::services::buffer::stack_provider::{InitializationState, StackProvider}; use crate::utils::MemoryChecker; -use crate::Envelope; #[derive(Debug)] pub struct MemoryStackProvider { @@ -19,8 +19,12 @@ impl MemoryStackProvider { impl StackProvider for MemoryStackProvider { type Stack = MemoryEnvelopeStack; - fn create_stack(&self, envelope: Box) -> Self::Stack { - MemoryEnvelopeStack::new(envelope) + async fn initialize(&self) -> InitializationState { + InitializationState::empty() + } + + fn create_stack(&self, _: ProjectKeyPair) -> Self::Stack { + MemoryEnvelopeStack::new() } fn has_store_capacity(&self) -> bool { diff --git a/relay-server/src/services/buffer/stack_provider/mod.rs b/relay-server/src/services/buffer/stack_provider/mod.rs index 79e36c4393..8d878b0b11 100644 --- a/relay-server/src/services/buffer/stack_provider/mod.rs +++ b/relay-server/src/services/buffer/stack_provider/mod.rs @@ -1,15 +1,43 @@ -use crate::{Envelope, EnvelopeStack}; +use crate::services::buffer::common::ProjectKeyPair; +use crate::EnvelopeStack; +use hashbrown::HashSet; +use std::future::Future; pub mod memory; pub mod sqlite; +/// State of the initialization of the [`StackProvider`]. +/// +/// This state is necessary for initializing resources whenever a [`StackProvider`] is used. +#[derive(Debug)] +pub struct InitializationState { + pub project_key_pairs: HashSet, +} + +impl InitializationState { + /// Create a new [`InitializationState`]. + pub fn new(project_key_pairs: HashSet) -> Self { + Self { project_key_pairs } + } + + /// Creates a new empty [`InitializationState`]. + pub fn empty() -> Self { + Self { + project_key_pairs: HashSet::new(), + } + } +} + /// A provider of [`EnvelopeStack`] instances that is responsible for creating them. pub trait StackProvider: std::fmt::Debug { /// The implementation of [`EnvelopeStack`] that this manager creates. type Stack: EnvelopeStack; + /// Initializes the [`StackProvider`]. + fn initialize(&self) -> impl Future; + /// Creates an [`EnvelopeStack`]. - fn create_stack(&self, envelope: Box) -> Self::Stack; + fn create_stack(&self, project_key_pair: ProjectKeyPair) -> Self::Stack; /// Returns `true` if the store used by this [`StackProvider`] has space to add new /// stacks or items to the stacks. diff --git a/relay-server/src/services/buffer/stack_provider/sqlite.rs b/relay-server/src/services/buffer/stack_provider/sqlite.rs index a2d49c62d1..0ac9816b1d 100644 --- a/relay-server/src/services/buffer/stack_provider/sqlite.rs +++ b/relay-server/src/services/buffer/stack_provider/sqlite.rs @@ -1,10 +1,12 @@ use relay_config::Config; +use std::error::Error; +use crate::services::buffer::common::ProjectKeyPair; use crate::services::buffer::envelope_store::sqlite::{ SqliteEnvelopeStore, SqliteEnvelopeStoreError, }; -use crate::services::buffer::stack_provider::StackProvider; -use crate::{Envelope, SqliteEnvelopeStack}; +use crate::services::buffer::stack_provider::{InitializationState, StackProvider}; +use crate::SqliteEnvelopeStack; #[derive(Debug)] pub struct SqliteStackProvider { @@ -31,16 +33,26 @@ impl SqliteStackProvider { impl StackProvider for SqliteStackProvider { type Stack = SqliteEnvelopeStack; - fn create_stack(&self, envelope: Box) -> Self::Stack { - let own_key = envelope.meta().public_key(); - let sampling_key = envelope.sampling_key().unwrap_or(own_key); + async fn initialize(&self) -> InitializationState { + match self.envelope_store.project_key_pairs().await { + Ok(project_key_pairs) => InitializationState::new(project_key_pairs), + Err(error) => { + relay_log::error!( + error = &error as &dyn Error, + "failed to initialize the sqlite stack provider" + ); + InitializationState::empty() + } + } + } + fn create_stack(&self, project_key_pair: ProjectKeyPair) -> Self::Stack { SqliteEnvelopeStack::new( self.envelope_store.clone(), self.disk_batch_size, self.max_batches, - own_key, - sampling_key, + project_key_pair.own_key, + project_key_pair.sampling_key, ) } diff --git a/relay-server/src/services/buffer/testutils.rs b/relay-server/src/services/buffer/testutils.rs index 0cb2ed75d6..277bbe58f5 100644 --- a/relay-server/src/services/buffer/testutils.rs +++ b/relay-server/src/services/buffer/testutils.rs @@ -1,10 +1,20 @@ #[cfg(test)] pub mod utils { + use std::collections::BTreeMap; + use std::time::{Duration, Instant}; + + use relay_base_schema::project::ProjectKey; + use relay_event_schema::protocol::EventId; + use relay_sampling::DynamicSamplingContext; use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}; use sqlx::{Pool, Sqlite}; use tokio::fs::DirBuilder; use uuid::Uuid; + use crate::envelope::{Item, ItemType}; + use crate::extractors::RequestMeta; + use crate::Envelope; + /// Sets up a temporary SQLite database for testing purposes. pub async fn setup_db(run_migrations: bool) -> Pool { let path = std::env::temp_dir().join(Uuid::new_v4().to_string()); @@ -36,4 +46,45 @@ pub mod utils { db } + + pub fn request_meta() -> RequestMeta { + let dsn = "https://a94ae32be2584e0bbd7a4cbb95971fee:@sentry.io/42" + .parse() + .unwrap(); + + RequestMeta::new(dsn) + } + + pub fn mock_envelope(instant: Instant) -> Box { + let event_id = EventId::new(); + let mut envelope = Envelope::from_request(Some(event_id), request_meta()); + + let dsc = DynamicSamplingContext { + trace_id: Uuid::new_v4(), + public_key: ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(), + release: Some("1.1.1".to_string()), + user: Default::default(), + replay_id: None, + environment: None, + transaction: Some("transaction1".into()), + sample_rate: None, + sampled: Some(true), + other: BTreeMap::new(), + }; + + envelope.set_dsc(dsc); + envelope.set_start_time(instant); + + envelope.add_item(Item::new(ItemType::Transaction)); + + envelope + } + + #[allow(clippy::vec_box)] + pub fn mock_envelopes(count: usize) -> Vec> { + let instant = Instant::now(); + (0..count) + .map(|i| mock_envelope(instant - Duration::from_secs((count - i) as u64))) + .collect() + } } diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 6e3cf007a1..7ef24f64bc 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -512,6 +512,8 @@ pub enum RelayTimers { /// - `message`: The type of message that was processed. #[cfg(feature = "processing")] StoreServiceDuration, + /// Timing in milliseconds for the time it takes for initialize the spooler. + SpoolInitialization, } impl TimerMetric for RelayTimers { @@ -553,6 +555,7 @@ impl TimerMetric for RelayTimers { RelayTimers::MetricRouterServiceDuration => "metrics.router.message.duration", #[cfg(feature = "processing")] RelayTimers::StoreServiceDuration => "store.message.duration", + RelayTimers::SpoolInitialization => "spool.initialization", } } }