Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replay WAL entries respect index #4565

Merged
merged 14 commits into from
Aug 28, 2024
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ pub struct UpgradeRegion {
/// it's helpful to verify whether the leader region is ready.
#[serde(with = "humantime_serde")]
pub wait_for_replay_timeout: Option<Duration>,
/// The hint for replaying memtable.
#[serde(default)]
pub location_id: Option<u64>,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
Expand Down
1 change: 1 addition & 0 deletions src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout: None,
location_id: None,
});
assert!(
heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
Expand Down
11 changes: 10 additions & 1 deletion src/datanode/src/heartbeat/handler/upgrade_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ impl HandlerContext {
region_id,
last_entry_id,
wait_for_replay_timeout,
location_id,
}: UpgradeRegion,
) -> BoxFuture<'static, InstructionReply> {
Box::pin(async move {
Expand Down Expand Up @@ -62,6 +63,7 @@ impl HandlerContext {
RegionRequest::Catchup(RegionCatchupRequest {
set_writable: true,
entry_id: last_entry_id,
location_id,
}),
)
.await?;
Expand Down Expand Up @@ -151,6 +153,7 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
Expand Down Expand Up @@ -191,6 +194,7 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
Expand Down Expand Up @@ -232,6 +236,7 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
Expand Down Expand Up @@ -274,8 +279,9 @@ mod tests {
.clone()
.handle_upgrade_region_instruction(UpgradeRegion {
region_id,
last_entry_id: None,
wait_for_replay_timeout,
last_entry_id: None,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
Expand All @@ -293,6 +299,7 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout: Some(Duration::from_millis(500)),
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
Expand Down Expand Up @@ -337,6 +344,7 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout: None,
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
Expand All @@ -354,6 +362,7 @@ mod tests {
region_id,
last_entry_id: None,
wait_for_replay_timeout: Some(Duration::from_millis(200)),
location_id: None,
})
.await;
assert_matches!(reply, InstructionReply::UpgradeRegion(_));
Expand Down
1 change: 1 addition & 0 deletions src/log-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ common-telemetry.workspace = true
common-time.workspace = true
common-wal.workspace = true
delta-encoding = "0.4"
derive_builder.workspace = true
futures.workspace = true
futures-util.workspace = true
itertools.workspace = true
Expand Down
9 changes: 9 additions & 0 deletions src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,15 @@ pub enum Error {
error: object_store::Error,
},

#[snafu(display("Failed to read index, path: {path}"))]
ReadIndex {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: object_store::Error,
path: String,
},

#[snafu(display(
"The length of meta if exceeded the limit: {}, actual: {}",
limit,
Expand Down
5 changes: 0 additions & 5 deletions src/log-store/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,10 @@

pub(crate) mod client_manager;
pub(crate) mod consumer;
/// TODO(weny): remove it.
#[allow(dead_code)]
#[allow(unused_imports)]
pub(crate) mod index;
pub mod log_store;
pub(crate) mod producer;
pub(crate) mod util;
/// TODO(weny): remove it.
#[allow(dead_code)]
pub(crate) mod worker;

pub use index::{default_index_file, GlobalIndexCollector};
Expand Down
24 changes: 23 additions & 1 deletion src/log-store/src/kafka/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};

use common_telemetry::debug;
use derive_builder::Builder;
use futures::future::{BoxFuture, Fuse, FusedFuture};
use futures::{FutureExt, Stream};
use pin_project::pin_project;
Expand Down Expand Up @@ -60,40 +61,61 @@ struct FetchResult {
used_offset: i64,
}

const MAX_BATCH_SIZE: usize = 52428800;
const AVG_RECORD_SIZE: usize = 256 * 1024;

/// The [`Consumer`] struct represents a Kafka consumer that fetches messages from
/// a Kafka cluster. Yielding records respecting the [`RegionWalIndexIterator`].
#[pin_project]
#[derive(Builder)]
#[builder(pattern = "owned")]
pub struct Consumer {
#[builder(default = "-1")]
last_high_watermark: i64,

/// The client is used to fetch records from kafka topic.
client: Arc<dyn FetchClient>,

/// The max batch size in a single fetch request.
#[builder(default = "MAX_BATCH_SIZE")]
max_batch_size: usize,

/// The max wait milliseconds.
#[builder(default = "500")]
max_wait_ms: u32,

/// The avg record size
#[builder(default = "AVG_RECORD_SIZE")]
avg_record_size: usize,

/// Termination flag
#[builder(default = "false")]
terminated: bool,

/// The buffer of records.
buffer: RecordsBuffer,

/// The fetch future.
#[builder(default = "Fuse::terminated()")]
fetch_fut: Fuse<BoxFuture<'static, rskafka::client::error::Result<FetchResult>>>,
}

struct RecordsBuffer {
pub(crate) struct RecordsBuffer {
buffer: VecDeque<RecordAndOffset>,

index: Box<dyn RegionWalIndexIterator>,
}

impl RecordsBuffer {
/// Creates an empty [`RecordsBuffer`]
pub fn new(index: Box<dyn RegionWalIndexIterator>) -> Self {
RecordsBuffer {
buffer: VecDeque::new(),
index,
}
}
}

impl RecordsBuffer {
fn pop_front(&mut self) -> Option<RecordAndOffset> {
while let Some(index) = self.index.peek() {
Expand Down
9 changes: 5 additions & 4 deletions src/log-store/src/kafka/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ pub use collector::GlobalIndexCollector;
pub(crate) use collector::{IndexCollector, NoopCollector};
pub(crate) use encoder::{IndexEncoder, JsonIndexEncoder};
pub(crate) use iterator::{
MultipleRegionWalIndexIterator, NextBatchHint, RegionWalIndexIterator, RegionWalRange,
RegionWalVecIndex,
build_region_wal_index_iterator, NextBatchHint, RegionWalIndexIterator, MIN_BATCH_WINDOW_SIZE,
};
#[cfg(test)]
pub(crate) use iterator::{MultipleRegionWalIndexIterator, RegionWalRange, RegionWalVecIndex};

pub fn default_index_file(datanode_id: u64) -> String {
format!("__datanode/{datanode_id}/index.json")
pub fn default_index_file(location_id: u64) -> String {
format!("__datanode/{location_id}/index.json")
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
}
Loading