Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Add options to infer_schema
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Aug 9, 2023
1 parent b09e580 commit 81ce186
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 28 deletions.
101 changes: 76 additions & 25 deletions src/io/parquet/read/schema/convert.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! This module has a single entry point, [`parquet_to_arrow_schema`].
//! This module has entry points, [`parquet_to_arrow_schema`] and the more configurable [`parquet_to_arrow_schema_with_options`].
use parquet2::schema::{
types::{
FieldInfo, GroupConvertedType, GroupLogicalType, IntegerType, ParquetType, PhysicalType,
Expand All @@ -8,11 +8,17 @@ use parquet2::schema::{
};

use crate::datatypes::{DataType, Field, IntervalUnit, TimeUnit};
use crate::io::parquet::read::schema::SchemaInferenceOptions;

/// Converts [`ParquetType`]s to a [`Field`], ignoring parquet fields that do not contain
/// any physical column.
pub fn parquet_to_arrow_schema(fields: &[ParquetType]) -> Vec<Field> {

Check warning on line 15 in src/io/parquet/read/schema/convert.rs

View workflow job for this annotation

GitHub Actions / cross (wasm32-unknown-unknown)

function `parquet_to_arrow_schema` is never used

Check warning on line 15 in src/io/parquet/read/schema/convert.rs

View workflow job for this annotation

GitHub Actions / cross (i686-unknown-linux-gnu)

function `parquet_to_arrow_schema` is never used

Check warning on line 15 in src/io/parquet/read/schema/convert.rs

View workflow job for this annotation

GitHub Actions / cross (mips-unknown-linux-gnu)

function `parquet_to_arrow_schema` is never used

Check warning on line 15 in src/io/parquet/read/schema/convert.rs

View workflow job for this annotation

GitHub Actions / cross (powerpc64-unknown-linux-gnu)

function `parquet_to_arrow_schema` is never used

Check warning on line 15 in src/io/parquet/read/schema/convert.rs

View workflow job for this annotation

GitHub Actions / cross (powerpc-unknown-linux-gnu)

function `parquet_to_arrow_schema` is never used

Check warning on line 15 in src/io/parquet/read/schema/convert.rs

View workflow job for this annotation

GitHub Actions / cross (arm-linux-androideabi)

function `parquet_to_arrow_schema` is never used
fields.iter().filter_map(to_field).collect::<Vec<_>>()
parquet_to_arrow_schema_with_options(fields, &None)
}

/// Like [`parquet_to_arrow_schema`] but with configurable options which affect the behavior of schema inference
pub fn parquet_to_arrow_schema_with_options(fields: &[ParquetType], options: &Option<SchemaInferenceOptions>) -> Vec<Field> {
fields.iter().filter_map(|f| to_field(f, options.as_ref().unwrap_or(&Default::default()))).collect::<Vec<_>>()
}

fn from_int32(
Expand Down Expand Up @@ -169,7 +175,7 @@ fn from_fixed_len_byte_array(
}

/// Maps a [`PhysicalType`] with optional metadata to a [`DataType`]
fn to_primitive_type_inner(primitive_type: &PrimitiveType) -> DataType {
fn to_primitive_type_inner(primitive_type: &PrimitiveType, options: &SchemaInferenceOptions) -> DataType {
match primitive_type.physical_type {
PhysicalType::Boolean => DataType::Boolean,
PhysicalType::Int32 => {
Expand All @@ -178,7 +184,7 @@ fn to_primitive_type_inner(primitive_type: &PrimitiveType) -> DataType {
PhysicalType::Int64 => {
from_int64(primitive_type.logical_type, primitive_type.converted_type)
}
PhysicalType::Int96 => DataType::Timestamp(TimeUnit::Nanosecond, None),
PhysicalType::Int96 => DataType::Timestamp(options.int96_coerce_to_timeunit, None),
PhysicalType::Float => DataType::Float32,
PhysicalType::Double => DataType::Float64,
PhysicalType::ByteArray => {
Expand All @@ -195,8 +201,8 @@ fn to_primitive_type_inner(primitive_type: &PrimitiveType) -> DataType {
/// Entry point for converting parquet primitive type to arrow type.
///
/// This function takes care of repetition.
fn to_primitive_type(primitive_type: &PrimitiveType) -> DataType {
let base_type = to_primitive_type_inner(primitive_type);
fn to_primitive_type(primitive_type: &PrimitiveType, options: &SchemaInferenceOptions) -> DataType {
let base_type = to_primitive_type_inner(primitive_type, options);

if primitive_type.field_info.repetition == Repetition::Repeated {
DataType::List(Box::new(Field::new(
Expand All @@ -214,23 +220,24 @@ fn non_repeated_group(
converted_type: &Option<GroupConvertedType>,
fields: &[ParquetType],
parent_name: &str,
options: &SchemaInferenceOptions,
) -> Option<DataType> {
debug_assert!(!fields.is_empty());
match (logical_type, converted_type) {
(Some(GroupLogicalType::List), _) => to_list(fields, parent_name),
(None, Some(GroupConvertedType::List)) => to_list(fields, parent_name),
(Some(GroupLogicalType::Map), _) => to_list(fields, parent_name),
(Some(GroupLogicalType::List), _) => to_list(fields, parent_name, options),
(None, Some(GroupConvertedType::List)) => to_list(fields, parent_name, options),
(Some(GroupLogicalType::Map), _) => to_list(fields, parent_name, options),
(None, Some(GroupConvertedType::Map) | Some(GroupConvertedType::MapKeyValue)) => {
to_map(fields)
to_map(fields, options)
}
_ => to_struct(fields),
_ => to_struct(fields, options),
}
}

/// Converts a parquet group type to an arrow [`DataType::Struct`].
/// Returns [`None`] if all its fields are empty
fn to_struct(fields: &[ParquetType]) -> Option<DataType> {
let fields = fields.iter().filter_map(to_field).collect::<Vec<Field>>();
fn to_struct(fields: &[ParquetType], options: &SchemaInferenceOptions) -> Option<DataType> {
let fields = fields.iter().filter_map(|f| to_field(f, options)).collect::<Vec<Field>>();
if fields.is_empty() {
None
} else {
Expand All @@ -240,8 +247,8 @@ fn to_struct(fields: &[ParquetType]) -> Option<DataType> {

/// Converts a parquet group type to an arrow [`DataType::Struct`].
/// Returns [`None`] if all its fields are empty
fn to_map(fields: &[ParquetType]) -> Option<DataType> {
let inner = to_field(&fields[0])?;
fn to_map(fields: &[ParquetType], options: &SchemaInferenceOptions) -> Option<DataType> {
let inner = to_field(&fields[0], options)?;
Some(DataType::Map(Box::new(inner), false))
}

Expand All @@ -254,16 +261,17 @@ fn to_group_type(
converted_type: &Option<GroupConvertedType>,
fields: &[ParquetType],
parent_name: &str,
options: &SchemaInferenceOptions,
) -> Option<DataType> {
debug_assert!(!fields.is_empty());
if field_info.repetition == Repetition::Repeated {
Some(DataType::List(Box::new(Field::new(
&field_info.name,
to_struct(fields)?,
to_struct(fields, options)?,
is_nullable(field_info),
))))
} else {
non_repeated_group(logical_type, converted_type, fields, parent_name)
non_repeated_group(logical_type, converted_type, fields, parent_name, options)
}
}

Expand All @@ -279,10 +287,10 @@ pub(crate) fn is_nullable(field_info: &FieldInfo) -> bool {
/// Converts parquet schema to arrow field.
/// Returns `None` iff the parquet type has no associated primitive types,
/// i.e. if it is a column-less group type.
fn to_field(type_: &ParquetType) -> Option<Field> {
fn to_field(type_: &ParquetType, options: &SchemaInferenceOptions) -> Option<Field> {
Some(Field::new(
&type_.get_field_info().name,
to_data_type(type_)?,
to_data_type(type_, options)?,
is_nullable(type_.get_field_info()),
))
}
Expand All @@ -291,21 +299,21 @@ fn to_field(type_: &ParquetType) -> Option<Field> {
///
/// To fully understand this algorithm, please refer to
/// [parquet doc](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md).
fn to_list(fields: &[ParquetType], parent_name: &str) -> Option<DataType> {
fn to_list(fields: &[ParquetType], parent_name: &str, options: &SchemaInferenceOptions) -> Option<DataType> {
let item = fields.first().unwrap();

let item_type = match item {
ParquetType::PrimitiveType(primitive) => Some(to_primitive_type_inner(primitive)),
ParquetType::PrimitiveType(primitive) => Some(to_primitive_type_inner(primitive, options)),
ParquetType::GroupType { fields, .. } => {
if fields.len() == 1
&& item.name() != "array"
&& item.name() != format!("{parent_name}_tuple")
{
// extract the repetition field
let nested_item = fields.first().unwrap();
to_data_type(nested_item)
to_data_type(nested_item, options)
} else {
to_struct(fields)
to_struct(fields, options)
}
}
}?;
Expand Down Expand Up @@ -346,9 +354,9 @@ fn to_list(fields: &[ParquetType], parent_name: &str) -> Option<DataType> {
///
/// If this schema is a group type and none of its children is reserved in the
/// conversion, the result is Ok(None).
pub(crate) fn to_data_type(type_: &ParquetType) -> Option<DataType> {
pub(crate) fn to_data_type(type_: &ParquetType, options: &SchemaInferenceOptions) -> Option<DataType> {
match type_ {
ParquetType::PrimitiveType(primitive) => Some(to_primitive_type(primitive)),
ParquetType::PrimitiveType(primitive) => Some(to_primitive_type(primitive, options)),
ParquetType::GroupType {
field_info,
logical_type,
Expand All @@ -364,6 +372,7 @@ pub(crate) fn to_data_type(type_: &ParquetType) -> Option<DataType> {
converted_type,
fields,
&field_info.name,
options,
)
}
}
Expand Down Expand Up @@ -973,4 +982,46 @@ mod tests {
assert_eq!(arrow_fields, fields);
Ok(())
}

#[test]
fn test_int96_options() -> Result<()> {
for tu in [TimeUnit::Second, TimeUnit::Microsecond, TimeUnit::Millisecond, TimeUnit::Nanosecond] {
let message_type = "
message arrow_schema {
REQUIRED INT96 int96_field;
OPTIONAL GROUP int96_list (LIST) {
REPEATED GROUP list {
OPTIONAL INT96 element;
}
}
REQUIRED GROUP int96_struct {
REQUIRED INT96 int96_field;
}
}
";
let coerced_to = DataType::Timestamp(tu, None);
let arrow_fields = vec![
Field::new("int96_field", coerced_to.clone(), false),
Field::new(
"int96_list",
DataType::List(Box::new(Field::new("element", coerced_to.clone(), true))),
true,
),
Field::new(
"int96_struct",
DataType::Struct(vec![
Field::new("int96_field", coerced_to.clone(), false),
]),
false,
),
];

let parquet_schema = SchemaDescriptor::try_from_message(message_type)?;
let fields = parquet_to_arrow_schema_with_options(parquet_schema.fields(), &Some(SchemaInferenceOptions{
int96_coerce_to_timeunit: tu,
}));
assert_eq!(arrow_fields, fields);
}
Ok(())
}
}
29 changes: 26 additions & 3 deletions src/io/parquet/read/schema/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
//! APIs to handle Parquet <-> Arrow schemas.
use crate::datatypes::Schema;
use crate::datatypes::{Schema, TimeUnit};
use crate::error::Result;

mod convert;
mod metadata;

pub use convert::parquet_to_arrow_schema;
pub use convert::parquet_to_arrow_schema_with_options;
pub use metadata::read_schema_from_metadata;
pub use parquet2::metadata::{FileMetaData, KeyValue, SchemaDescriptor};
pub use parquet2::schema::types::ParquetType;
Expand All @@ -14,18 +14,41 @@ pub(crate) use convert::*;

use self::metadata::parse_key_value_metadata;

/// Options when inferring schemas from Parquet
pub struct SchemaInferenceOptions {
/// When inferring schemas from the Parquet INT96 timestamp type, this is the corresponding TimeUnit
/// in the inferred Arrow Timestamp type.
///
/// This defaults to `TimeUnit::Nanosecond`, but INT96 timestamps outside of the range of years 1678-2262,
/// will overflow when parsed as `Timestamp(TimeUnit::Nanosecond)`. Setting this to a lower resolution
/// (e.g. TimeUnit::Milliseconds) will result in loss of precision, but support a larger range of dates
/// without overflowing when parsing the data.
pub int96_coerce_to_timeunit: TimeUnit,
}

impl Default for SchemaInferenceOptions {
fn default() -> Self {
SchemaInferenceOptions { int96_coerce_to_timeunit: TimeUnit::Nanosecond }
}
}

/// Infers a [`Schema`] from parquet's [`FileMetaData`]. This first looks for the metadata key
/// `"ARROW:schema"`; if it does not exist, it converts the parquet types declared in the
/// file's parquet schema to Arrow's equivalent.
/// # Error
/// This function errors iff the key `"ARROW:schema"` exists but is not correctly encoded,
/// indicating that that the file's arrow metadata was incorrectly written.
pub fn infer_schema(file_metadata: &FileMetaData) -> Result<Schema> {
infer_schema_with_options(file_metadata, &None)
}

/// Like [`infer_schema`] but with configurable options which affects the behavior of inference
pub fn infer_schema_with_options(file_metadata: &FileMetaData, options: &Option<SchemaInferenceOptions>) -> Result<Schema> {
let mut metadata = parse_key_value_metadata(file_metadata.key_value_metadata());

let schema = read_schema_from_metadata(&mut metadata)?;
Ok(schema.unwrap_or_else(|| {
let fields = parquet_to_arrow_schema(file_metadata.schema().fields());
let fields = parquet_to_arrow_schema_with_options(file_metadata.schema().fields(), options);
Schema { fields, metadata }
}))
}

0 comments on commit 81ce186

Please sign in to comment.