Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(parquet)!: coerce_types flag for date64 #6313

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 111 additions & 9 deletions parquet/src/arrow/array_reader/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,8 @@ where
// As there is not always a 1:1 mapping between Arrow and Parquet, there
// are datatypes which we must convert explicitly.
// These are:
// - date64: we should cast int32 to date32, then date32 to date64.
// - decimal: cast in32 to decimal, int64 to decimal
// - decimal: cast int32 to decimal, int64 to decimal
let array = match target_type {
ArrowType::Date64 => {
// this is cheap as it internally reinterprets the data
let a = arrow_cast::cast(&array, &ArrowType::Date32)?;
arrow_cast::cast(&a, target_type)?
}
ArrowType::Decimal128(p, s) => {
// Apply conversion to all elements regardless of null slots as the conversion
// to `i128` is infallible. This improves performance by avoiding a branch in
Expand Down Expand Up @@ -305,9 +299,9 @@ mod tests {
use crate::util::test_common::rand_gen::make_pages;
use crate::util::InMemoryPageIterator;
use arrow::datatypes::ArrowPrimitiveType;
use arrow_array::{Array, PrimitiveArray};
use arrow_array::{Array, Date32Array, Date64Array, PrimitiveArray};

use arrow::datatypes::DataType::Decimal128;
use arrow::datatypes::DataType::{Date32, Date64, Decimal128};
use rand::distributions::uniform::SampleUniform;
use std::collections::VecDeque;

Expand Down Expand Up @@ -545,6 +539,14 @@ mod tests {
arrow::datatypes::Int32Type,
i32
);
test_primitive_array_reader_one_type!(
crate::data_type::Int64Type,
PhysicalType::INT64,
"DATE",
arrow::datatypes::Date64Type,
arrow::datatypes::Int64Type,
i64
);
test_primitive_array_reader_one_type!(
crate::data_type::Int32Type,
PhysicalType::INT32,
Expand Down Expand Up @@ -783,4 +785,104 @@ mod tests {
assert_ne!(array, &data_decimal_array)
}
}

#[test]
fn test_primitive_array_reader_date32_type() {
// parquet `INT32` to date
let message_type = "
message test_schema {
REQUIRED INT32 date1 (DATE);
}
";
let schema = parse_message_type(message_type)
.map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
.unwrap();
let column_desc = schema.column(0);

// create the array reader
{
let mut data = Vec::new();
let mut page_lists = Vec::new();
make_column_chunks::<Int32Type>(
column_desc.clone(),
Encoding::PLAIN,
100,
-99999999,
99999999,
&mut Vec::new(),
&mut Vec::new(),
&mut data,
&mut page_lists,
true,
2,
);
let page_iterator = InMemoryPageIterator::new(page_lists);

let mut array_reader =
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
.unwrap();

// read data from the reader
// the data type is date
let array = array_reader.next_batch(50).unwrap();
assert_eq!(array.data_type(), &Date32);
let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
let data_date_array = data[0..50]
.iter()
.copied()
.map(Some)
.collect::<Date32Array>();
assert_eq!(array, &data_date_array);
}
}

#[test]
fn test_primitive_array_reader_date64_type() {
// parquet `INT64` to date
let message_type = "
message test_schema {
REQUIRED INT64 date1 (DATE);
}
";
let schema = parse_message_type(message_type)
.map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
.unwrap();
let column_desc = schema.column(0);

// create the array reader
{
let mut data = Vec::new();
let mut page_lists = Vec::new();
make_column_chunks::<Int64Type>(
column_desc.clone(),
Encoding::PLAIN,
100,
-999999999999999999,
999999999999999999,
&mut Vec::new(),
&mut Vec::new(),
&mut data,
&mut page_lists,
true,
2,
);
let page_iterator = InMemoryPageIterator::new(page_lists);

let mut array_reader =
PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, None)
.unwrap();

// read data from the reader
// the data type is date
let array = array_reader.next_batch(50).unwrap();
assert_eq!(array.data_type(), &Date64);
let array = array.as_any().downcast_ref::<Date64Array>().unwrap();
let data_date_array = data[0..50]
.iter()
.copied()
.map(Some)
.collect::<Date64Array>();
assert_eq!(array, &data_date_array);
}
}
}
115 changes: 113 additions & 2 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -948,8 +948,8 @@ mod tests {
use arrow_array::builder::*;
use arrow_array::cast::AsArray;
use arrow_array::types::{
Decimal128Type, Decimal256Type, DecimalType, Float16Type, Float32Type, Float64Type,
Time32MillisecondType, Time64MicrosecondType,
Date32Type, Date64Type, Decimal128Type, Decimal256Type, DecimalType, Float16Type,
Float32Type, Float64Type, Time32MillisecondType, Time64MicrosecondType,
};
use arrow_array::*;
use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
Expand Down Expand Up @@ -1288,6 +1288,117 @@ mod tests {
Ok(())
}

#[test]
fn test_date32_roundtrip() -> Result<()> {
use arrow_array::Date32Array;

let schema = Arc::new(Schema::new(vec![Field::new(
"date32",
ArrowDataType::Date32,
false,
)]));

let mut buf = Vec::with_capacity(1024);

let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;

let original = RecordBatch::try_new(
schema,
vec![Arc::new(Date32Array::from(vec![
-1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
]))],
)?;

writer.write(&original)?;
writer.close()?;

let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
let ret = reader.next().unwrap()?;
assert_eq!(ret, original);

// Ensure can be downcast to the correct type
ret.column(0).as_primitive::<Date32Type>();

Ok(())
}

#[test]
fn test_date64_roundtrip() -> Result<()> {
use arrow_array::Date64Array;

let schema = Arc::new(Schema::new(vec![
Field::new("small-date64", ArrowDataType::Date64, false),
Field::new("big-date64", ArrowDataType::Date64, false),
Field::new("invalid-date64", ArrowDataType::Date64, false),
]));

let mut default_buf = Vec::with_capacity(1024);
let mut coerce_buf = Vec::with_capacity(1024);

let coerce_props = WriterProperties::builder().set_coerce_types(true).build();

let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
let mut coerce_writer =
ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;

static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;

let original = RecordBatch::try_new(
schema,
vec![
// small-date64
Arc::new(Date64Array::from(vec![
-1_000_000 * NUM_MILLISECONDS_IN_DAY,
-1_000 * NUM_MILLISECONDS_IN_DAY,
0,
1_000 * NUM_MILLISECONDS_IN_DAY,
1_000_000 * NUM_MILLISECONDS_IN_DAY,
])),
// big-date64
Arc::new(Date64Array::from(vec![
-10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
-1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
0,
1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
])),
// invalid-date64
Arc::new(Date64Array::from(vec![
-1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
-1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1,
1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
])),
],
)?;

default_writer.write(&original)?;
coerce_writer.write(&original)?;

default_writer.close()?;
coerce_writer.close()?;

let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;

let default_ret = default_reader.next().unwrap()?;
let coerce_ret = coerce_reader.next().unwrap()?;

// Roundtrip should be successful when default writer used
assert_eq!(default_ret, original);

// Only small-date64 should roundtrip successfully when coerce_types writer is used
assert_eq!(coerce_ret.column(0), original.column(0));
assert_ne!(coerce_ret.column(1), original.column(1));
assert_ne!(coerce_ret.column(2), original.column(2));

// Ensure both can be downcast to the correct type
default_ret.column(0).as_primitive::<Date64Type>();
coerce_ret.column(0).as_primitive::<Date64Type>();

Ok(())
}
struct RandFixedLenGen {}

impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
Expand Down
17 changes: 2 additions & 15 deletions parquet/src/arrow/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,7 @@ macro_rules! get_statistics {
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
))),
DataType::Date64 => Ok(Arc::new(Date64Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator)
.map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000)),
[<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
))),
DataType::Timestamp(unit, timezone) =>{
let iter = [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied());
Expand Down Expand Up @@ -945,19 +944,7 @@ macro_rules! get_data_page_statistics {
})
},
DataType::Date32 => Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
DataType::Date64 => Ok(
Arc::new(
Date64Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter()
.map(|x| {
x.and_then(|x| i64::try_from(x).ok())
})
.map(|x| x.map(|x| x * 24 * 60 * 60 * 1000))
}).flatten()
)
)
),
DataType::Date64 => Ok(Arc::new(Date64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
DataType::Decimal128(precision, scale) => Ok(Arc::new(
Decimal128Array::from_iter([<$stat_type_prefix Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
DataType::Decimal256(precision, scale) => Ok(Arc::new(
Expand Down
14 changes: 10 additions & 4 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,11 @@ impl<W: Write + Send> ArrowWriter<W> {
arrow_schema: SchemaRef,
options: ArrowWriterOptions,
) -> Result<Self> {
let mut props = options.properties;
let schema = match options.schema_root {
Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s)?,
None => arrow_to_parquet_schema(&arrow_schema)?,
Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s, props.coerce_types())?,
None => arrow_to_parquet_schema(&arrow_schema, props.coerce_types())?,
};
let mut props = options.properties;
if !options.skip_arrow_metadata {
// add serialized arrow schema
add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
Expand Down Expand Up @@ -549,8 +549,8 @@ impl ArrowColumnChunk {
/// ]));
///
/// // Compute the parquet schema
/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref()).unwrap();
/// let props = Arc::new(WriterProperties::default());
/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref(), props.coerce_types()).unwrap();
///
/// // Create writers for each of the leaf columns
/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();
Expand Down Expand Up @@ -858,6 +858,12 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
}
ColumnWriter::Int64ColumnWriter(ref mut typed) => {
match column.data_type() {
ArrowDataType::Date64 => {
let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;

let array = array.as_primitive::<Int64Type>();
write_primitive(typed, array.values(), levels)
}
ArrowDataType::Int64 => {
let array = column.as_primitive::<Int64Type>();
write_primitive(typed, array.values(), levels)
Expand Down
Loading
Loading