Skip to content

Commit

Permalink
feat: postgres type conversions
Browse files Browse the repository at this point in the history
  • Loading branch information
aljazerzen committed Feb 24, 2024
1 parent d1b76bb commit bc07d0b
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 110 deletions.
42 changes: 21 additions & 21 deletions connector_arrow/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,40 +88,40 @@ macro_rules! impl_arrow_value_tuple {
};
}

impl_arrow_value_tuple!(
i32,
(
Date32Type,
Time32SecondType,
Time32MillisecondType,
IntervalYearMonthType,
)
);

impl_arrow_value_tuple!(
i64,
(
TimestampSecondType,
TimestampMillisecondType,
TimestampMicrosecondType,
TimestampNanosecondType,
)
);

impl_produce_unsupported!(
&'r dyn ArrowValue,
(
NullType,
Float16Type,
// BinaryType,
// Utf8Type,
Date32Type,
Date64Type,
Time32SecondType,
Time32MillisecondType,
Time64MicrosecondType,
Time64NanosecondType,
IntervalYearMonthType,
IntervalDayTimeType,
IntervalMonthDayNanoType,
DurationSecondType,
DurationMillisecondType,
DurationMicrosecondType,
DurationNanosecondType,
LargeBinaryType,
FixedSizeBinaryType,
LargeUtf8Type,
Decimal128Type,
Decimal256Type,
IntervalDayTimeType,
)
);

impl_arrow_value_tuple!(i128, (IntervalMonthDayNanoType, Decimal128Type,));

impl_arrow_value_tuple!(i256, (Decimal256Type,));

impl_arrow_value_tuple!(String, (LargeUtf8Type,));

impl_arrow_value_tuple!(Vec<u8>, (LargeBinaryType, FixedSizeBinaryType,));

impl_produce_unsupported!(&'r dyn ArrowValue, (NullType, Float16Type,));
21 changes: 17 additions & 4 deletions connector_arrow/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod protocol_simple;
mod schema;
mod types;

use arrow::datatypes::{DataType, TimeUnit};
use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
use postgres::Client;
use std::marker::PhantomData;
use thiserror::Error;
Expand Down Expand Up @@ -130,9 +130,9 @@ where
DataType::Time64(_) => Some(DataType::Int64),
DataType::Duration(_) => Some(DataType::Int64),

DataType::Utf8 => Some(DataType::LargeUtf8),
DataType::Binary => Some(DataType::LargeBinary),
DataType::FixedSizeBinary(_) => Some(DataType::LargeBinary),
DataType::LargeUtf8 => Some(DataType::Utf8),
DataType::LargeBinary => Some(DataType::Binary),
DataType::FixedSizeBinary(_) => Some(DataType::Binary),

DataType::Decimal128(_, _) => Some(DataType::Utf8),
DataType::Decimal256(_, _) => Some(DataType::Utf8),
Expand All @@ -156,6 +156,19 @@ where
"timestamptz" | "timestamp with time zone" => {
DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into()))
}
"date" => DataType::Date32,
"time" | "time without time zone" => DataType::Time64(TimeUnit::Microsecond),
"interval" => DataType::Interval(IntervalUnit::MonthDayNano),

"bytea" => DataType::Binary,
"bit" | "bit varying" | "varbit" => DataType::Binary,
_ if ty.starts_with("bit") => DataType::Binary,

"text" | "varchar" | "char" | "bpchar" => DataType::Utf8,
_ if ty.starts_with("varchar") | ty.starts_with("char") | ty.starts_with("bpchar") => {
DataType::Utf8
}

_ => return None,
})
}
Expand Down
185 changes: 128 additions & 57 deletions connector_arrow/src/postgres/protocol_extended.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use arrow::datatypes::*;
use arrow::record_batch::RecordBatch;
use itertools::Itertools;
use postgres::fallible_iterator::FallibleIterator;
use postgres::types::FromSql;
use postgres::types::{FromSql, Type};
use postgres::{Row, RowIter};

use crate::api::{ArrowValue, ResultReader, Statement};
Expand Down Expand Up @@ -153,27 +153,21 @@ impl_produce!(Int32Type, i32, Result::Ok);
impl_produce!(Int64Type, i64, Result::Ok);
impl_produce!(Float32Type, f32, Result::Ok);
impl_produce!(Float64Type, f64, Result::Ok);
impl_produce!(LargeBinaryType, Vec<u8>, Result::Ok);
impl_produce!(BinaryType, Binary, Binary::into_arrow);
impl_produce!(LargeBinaryType, Binary, Binary::into_arrow);
impl_produce!(Utf8Type, StrOrNum, StrOrNum::into_arrow);
impl_produce!(LargeUtf8Type, String, Result::Ok);
impl_produce!(
TimestampSecondType,
TimestampY2000,
TimestampY2000::into_second
);
impl_produce!(
TimestampMillisecondType,
TimestampY2000,
TimestampY2000::into_millisecond
);
impl_produce!(
TimestampMicrosecondType,
TimestampY2000,
TimestampY2000::into_microsecond
);
impl_produce!(Time64MicrosecondType, Time64, Time64::into_microsecond);
impl_produce!(Date32Type, DaysSinceY2000, DaysSinceY2000::into_date32);
impl_produce!(
TimestampNanosecondType,
TimestampY2000,
TimestampY2000::into_nanosecond
IntervalMonthDayNanoType,
IntervalMonthDayMicros,
IntervalMonthDayMicros::into_arrow
);

crate::impl_produce_unsupported!(
Expand All @@ -184,96 +178,173 @@ crate::impl_produce_unsupported!(
UInt32Type,
UInt64Type,
Float16Type,
// TimestampSecondType,
// TimestampMillisecondType,
// TimestampMicrosecondType,
// TimestampNanosecondType,
Date32Type,
TimestampSecondType,
TimestampMillisecondType,
TimestampNanosecondType,
Date64Type,
Time32SecondType,
Time32MillisecondType,
Time64MicrosecondType,
Time64NanosecondType,
IntervalYearMonthType,
IntervalDayTimeType,
IntervalMonthDayNanoType,
DurationSecondType,
DurationMillisecondType,
DurationMicrosecondType,
DurationNanosecondType,
BinaryType,
FixedSizeBinaryType,
Decimal128Type,
Decimal256Type,
)
);

struct Numeric(String);
struct StrOrNum(String);

impl StrOrNum {
fn into_arrow(self) -> Result<String, ConnectorError> {
Ok(self.0)
}
}

impl<'a> FromSql<'a> for Numeric {
impl<'a> FromSql<'a> for StrOrNum {
fn from_sql(
_ty: &postgres::types::Type,
ty: &Type,
raw: &'a [u8],
) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
Ok(super::decimal::from_sql(raw).map(Numeric)?)
if matches!(ty, &Type::NUMERIC) {
Ok(super::decimal::from_sql(raw).map(StrOrNum)?)
} else {
let slice = postgres_protocol::types::text_from_sql(raw)?;
Ok(StrOrNum(slice.to_string()))
}
}

fn accepts(_ty: &postgres::types::Type) -> bool {
fn accepts(_ty: &Type) -> bool {
true
}
}

impl<'c> transport::ProduceTy<'c, Utf8Type> for CellRef<'c> {
fn produce(self) -> Result<String, ConnectorError> {
Ok(self.0.get::<_, Numeric>(self.1).0)
const DUR_1970_TO_2000_DAYS: i32 = 10957;
const DUR_1970_TO_2000_SEC: i64 = DUR_1970_TO_2000_DAYS as i64 * 24 * 60 * 60;

struct TimestampY2000(i64);

impl<'a> FromSql<'a> for TimestampY2000 {
fn from_sql(
_ty: &Type,
raw: &'a [u8],
) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
postgres_protocol::types::timestamp_from_sql(raw).map(TimestampY2000)
}

fn produce_opt(self) -> Result<Option<String>, ConnectorError> {
Ok(self.0.get::<_, Option<Numeric>>(self.1).map(|n| n.0))
fn accepts(_ty: &Type) -> bool {
true
}
}

struct TimestampY2000(i64);

const DUR_1970_TO_2000_SEC: i64 = 10957 * 24 * 60 * 60;

impl TimestampY2000 {
fn into_nanosecond(self) -> Result<i64, ConnectorError> {
self.0
.checked_add(DUR_1970_TO_2000_SEC * 1000 * 1000)
.and_then(|micros_y1970| micros_y1970.mul_checked(1000).ok())
.ok_or(ConnectorError::DataOutOfRange)
}
fn into_microsecond(self) -> Result<i64, ConnectorError> {
self.0
.checked_add(DUR_1970_TO_2000_SEC * 1000 * 1000)
.ok_or(ConnectorError::DataOutOfRange)
}
fn into_millisecond(self) -> Result<i64, ConnectorError> {
self.0
.div_checked(1000)
.ok()
.and_then(|millis_y2000| millis_y2000.checked_add(DUR_1970_TO_2000_SEC * 1000))
.ok_or(ConnectorError::DataOutOfRange)
}

struct DaysSinceY2000(i32);

impl<'a> FromSql<'a> for DaysSinceY2000 {
fn from_sql(
_ty: &Type,
raw: &'a [u8],
) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
postgres_protocol::types::date_from_sql(raw).map(DaysSinceY2000)
}

fn accepts(_ty: &Type) -> bool {
true
}
fn into_second(self) -> Result<i64, ConnectorError> {
}

impl DaysSinceY2000 {
fn into_date32(self) -> Result<i32, ConnectorError> {
self.0
.div_checked(1_000_000)
.ok()
.and_then(|sec_y2000| sec_y2000.checked_add(DUR_1970_TO_2000_SEC))
.checked_add(DUR_1970_TO_2000_DAYS)
.ok_or(ConnectorError::DataOutOfRange)
}
}

impl<'a> FromSql<'a> for TimestampY2000 {
struct Time64(i64);

impl<'a> FromSql<'a> for Time64 {
fn from_sql(
_ty: &postgres::types::Type,
_ty: &Type,
raw: &'a [u8],
) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
postgres_protocol::types::timestamp_from_sql(raw).map(TimestampY2000)
postgres_protocol::types::time_from_sql(raw).map(Time64)
}
fn accepts(_ty: &Type) -> bool {
true
}
}

fn accepts(_ty: &postgres::types::Type) -> bool {
impl Time64 {
fn into_microsecond(self) -> Result<i64, ConnectorError> {
Ok(self.0)
}
}

struct IntervalMonthDayMicros(i32, i32, i64);

impl<'a> FromSql<'a> for IntervalMonthDayMicros {
fn from_sql(
_ty: &Type,
raw: &'a [u8],
) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
let micros = postgres_protocol::types::time_from_sql(&raw[0..8])?;
let days = postgres_protocol::types::int4_from_sql(&raw[8..12])?;
let months = postgres_protocol::types::int4_from_sql(&raw[12..16])?;
Ok(IntervalMonthDayMicros(months, days, micros))
}
fn accepts(_ty: &Type) -> bool {
true
}
}

impl IntervalMonthDayMicros {
fn into_arrow(self) -> Result<i128, ConnectorError> {
let nanos = (self.2.checked_mul(1000)).ok_or(ConnectorError::DataOutOfRange)?;

let mut bytes = [0; 16];
bytes[0..4].copy_from_slice(&self.0.to_be_bytes());
bytes[4..8].copy_from_slice(&self.1.to_be_bytes());
bytes[8..16].copy_from_slice(&nanos.to_be_bytes());
Ok(i128::from_be_bytes(bytes))
}
}

struct Binary<'a>(&'a [u8]);

impl<'a> FromSql<'a> for Binary<'a> {
fn from_sql(
ty: &Type,
raw: &'a [u8],
) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
Ok(if matches!(ty, &Type::VARBIT | &Type::BIT) {
let varbit = postgres_protocol::types::varbit_from_sql(raw)?;
dbg!(varbit.len());
dbg!(varbit.bytes());
Binary(varbit.bytes())
} else {
Binary(postgres_protocol::types::bytea_from_sql(raw))
})
}
fn accepts(_ty: &Type) -> bool {
true
}
}

impl Binary<'_> {
fn into_arrow(self) -> Result<Vec<u8>, ConnectorError> {
// this is a clone, that is needed because Produce requires Vec<u8>
Ok(self.0.to_vec())
}
}
6 changes: 3 additions & 3 deletions connector_arrow/src/postgres/protocol_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ crate::impl_produce_unsupported!(
DurationMillisecondType,
DurationMicrosecondType,
DurationNanosecondType,
BinaryType,
LargeBinaryType,
FixedSizeBinaryType,
Decimal128Type,
Decimal256Type,
Expand Down Expand Up @@ -181,9 +181,9 @@ impl<'r> transport::ProduceTy<'r, LargeUtf8Type> for CellRef<'r> {
}
}

impl<'r> transport::ProduceTy<'r, LargeBinaryType> for CellRef<'r> {
impl<'r> transport::ProduceTy<'r, BinaryType> for CellRef<'r> {
fn produce(self) -> Result<Vec<u8>, ConnectorError> {
transport::ProduceTy::<LargeBinaryType>::produce_opt(self)?.ok_or_else(err_null)
transport::ProduceTy::<BinaryType>::produce_opt(self)?.ok_or_else(err_null)
}

fn produce_opt(self) -> Result<Option<Vec<u8>>, ConnectorError> {
Expand Down
Loading

0 comments on commit bc07d0b

Please sign in to comment.