Skip to content

Commit

Permalink
stop panic in MetadataLoader on invalid data (#6367)
Browse files Browse the repository at this point in the history
* stop panic in MetadataLoader on invalid data

* better check for invalid prefect

* limit hint instead of erroring

* import FOOTER_SIZE
  • Loading branch information
samuelcolvin committed Sep 16, 2024
1 parent b4de692 commit 341ec35
Showing 1 changed file with 21 additions and 10 deletions.
31 changes: 21 additions & 10 deletions parquet/src/arrow/async_reader/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
use crate::file::FOOTER_SIZE;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
Expand Down Expand Up @@ -53,7 +54,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
///
/// See [`fetch_parquet_metadata`] for the meaning of the individual parameters
pub async fn load(mut fetch: F, file_size: usize, prefetch: Option<usize>) -> Result<Self> {
if file_size < 8 {
if file_size < FOOTER_SIZE {
return Err(ParquetError::EOF(format!(
"file size of {file_size} is less than footer"
)));
Expand All @@ -62,20 +63,22 @@ impl<F: MetadataFetch> MetadataLoader<F> {
// If a size hint is provided, read more than the minimum size
// to try and avoid a second fetch.
let footer_start = if let Some(size_hint) = prefetch {
// check for hint smaller than footer
let size_hint = std::cmp::max(size_hint, FOOTER_SIZE);
file_size.saturating_sub(size_hint)
} else {
file_size - 8
file_size - FOOTER_SIZE
};

let suffix = fetch.fetch(footer_start..file_size).await?;
let suffix_len = suffix.len();

let mut footer = [0; 8];
footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
let mut footer = [0; FOOTER_SIZE];
footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]);

let length = decode_footer(&footer)?;

if file_size < length + 8 {
if file_size < length + FOOTER_SIZE {
return Err(ParquetError::EOF(format!(
"file size of {} is less than footer + metadata {}",
file_size,
Expand All @@ -84,14 +87,14 @@ impl<F: MetadataFetch> MetadataLoader<F> {
}

// Did not fetch the entire file metadata in the initial read, need to make a second request
let (metadata, remainder) = if length > suffix_len - 8 {
let metadata_start = file_size - length - 8;
let meta = fetch.fetch(metadata_start..file_size - 8).await?;
let (metadata, remainder) = if length > suffix_len - FOOTER_SIZE {
let metadata_start = file_size - length - FOOTER_SIZE;
let meta = fetch.fetch(metadata_start..file_size - FOOTER_SIZE).await?;
(decode_metadata(&meta)?, None)
} else {
let metadata_start = file_size - length - 8 - footer_start;
let metadata_start = file_size - length - FOOTER_SIZE - footer_start;

let slice = &suffix[metadata_start..suffix_len - 8];
let slice = &suffix[metadata_start..suffix_len - FOOTER_SIZE];
(
decode_metadata(slice)?,
Some((footer_start, suffix.slice(..metadata_start))),
Expand Down Expand Up @@ -273,6 +276,14 @@ mod tests {
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);

// Metadata hint too small - below footer size
fetch_count.store(0, Ordering::SeqCst);
let actual = fetch_parquet_metadata(&mut fetch, len, Some(7))
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
assert_eq!(fetch_count.load(Ordering::SeqCst), 2);

// Metadata hint too small
fetch_count.store(0, Ordering::SeqCst);
let actual = fetch_parquet_metadata(&mut fetch, len, Some(10))
Expand Down

0 comments on commit 341ec35

Please sign in to comment.