Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
feat: better error message when reader feather v1
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Aug 6, 2023
1 parent d5c78e7 commit 302265b
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 15 deletions.
14 changes: 11 additions & 3 deletions src/ffi/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ unsafe fn get_buffer_ptr<T: NativeType>(
let ptr = *buffers.add(index);
if ptr.is_null() {
return Err(Error::oos(format!(
"An array of type {data_type:?}
"An array of type {data_type:?} with a non-zero length

Check warning on line 212 in src/ffi/array.rs

View check run for this annotation

Codecov / codecov/patch

src/ffi/array.rs#L212

Added line #L212 was not covered by tests
must have a non-null buffer {index}"
)));
}
Expand All @@ -235,9 +235,14 @@ unsafe fn create_buffer<T: NativeType>(
owner: InternalArrowArray,
index: usize,
) -> Result<Buffer<T>> {
let len = buffer_len(array, data_type, index)?;

if len == 0 {
return Ok(Buffer::new());
}

let ptr = get_buffer_ptr(array, data_type, index)?;

let len = buffer_len(array, data_type, index)?;
let offset = buffer_offset(array, data_type, index);
let bytes = Bytes::from_foreign(ptr, len, BytesAllocator::InternalArrowArray(owner));

Expand All @@ -258,9 +263,12 @@ unsafe fn create_bitmap(
// we can use the null count directly
is_validity: bool,
) -> Result<Bitmap> {
let len: usize = array.length.try_into().expect("length to fit in `usize`");
if len == 0 {
return Ok(Bitmap::new());

Check warning on line 268 in src/ffi/array.rs

View check run for this annotation

Codecov / codecov/patch

src/ffi/array.rs#L268

Added line #L268 was not covered by tests
}
let ptr = get_buffer_ptr(array, data_type, index)?;

let len: usize = array.length.try_into().expect("length to fit in `usize`");
let offset: usize = array.offset.try_into().expect("offset to fit in `usize`");
let bytes_len = bytes_for(offset + len);
let bytes = Bytes::from_foreign(ptr, bytes_len, BytesAllocator::InternalArrowArray(owner));
Expand Down
3 changes: 2 additions & 1 deletion src/io/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ pub mod append;
pub mod read;
pub mod write;

const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
const ARROW_MAGIC_V1: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
const ARROW_MAGIC_V2: [u8; 4] = [b'F', b'E', b'A', b'1'];
pub(crate) const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];

/// Struct containing `dictionary_id` and nested `IpcField`, allowing users
Expand Down
9 changes: 6 additions & 3 deletions src/io/ipc/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::datatypes::Schema;
use crate::error::{Error, Result};
use crate::io::ipc::IpcSchema;

use super::super::{ARROW_MAGIC, CONTINUATION_MARKER};
use super::super::{ARROW_MAGIC_V1, ARROW_MAGIC_V2, CONTINUATION_MARKER};
use super::common::*;
use super::schema::fb_to_schema;
use super::Dictionaries;
Expand Down Expand Up @@ -151,7 +151,7 @@ fn read_footer_len<R: Read + Seek>(reader: &mut R) -> Result<(u64, usize)> {
reader.read_exact(&mut footer)?;
let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap());

if footer[4..] != ARROW_MAGIC {
if footer[4..] != ARROW_MAGIC_V1 {
return Err(Error::from(OutOfSpecKind::InvalidFooter));
}
let footer_len = footer_len
Expand Down Expand Up @@ -215,7 +215,10 @@ pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata
let mut magic_buffer: [u8; 6] = [0; 6];
let start = reader.stream_position()?;
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != ARROW_MAGIC {
if magic_buffer != ARROW_MAGIC_V1 {
if &magic_buffer[..4] == ARROW_MAGIC_V2 {
return Err(Error::NotYetImplemented("feather v1 not supported".into()));
}

Check warning on line 221 in src/io/ipc/read/file.rs

View check run for this annotation

Codecov / codecov/patch

src/io/ipc/read/file.rs#L219-L221

Added lines #L219 - L221 were not covered by tests
return Err(Error::from(OutOfSpecKind::InvalidHeader));
}

Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::array::*;
use crate::chunk::Chunk;
use crate::datatypes::{Field, Schema};
use crate::error::{Error, Result};
use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER};
use crate::io::ipc::{IpcSchema, ARROW_MAGIC_V1, CONTINUATION_MARKER};

use super::common::{apply_projection, prepare_projection, read_dictionary, read_record_batch};
use super::file::{deserialize_footer, get_record_batch};
Expand Down Expand Up @@ -135,7 +135,7 @@ async fn read_footer_len<R: AsyncRead + AsyncSeek + Unpin>(reader: &mut R) -> Re
reader.read_exact(&mut footer).await?;
let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap());

if footer[4..] != ARROW_MAGIC {
if footer[4..] != ARROW_MAGIC_V1 {
return Err(Error::from(OutOfSpecKind::InvalidFooter));
}
footer_len
Expand Down
6 changes: 3 additions & 3 deletions src/io/ipc/write/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::schema::serialize_schema;
use super::{default_ipc_fields, schema_to_bytes, Record};
use crate::datatypes::*;
use crate::error::{Error, Result};
use crate::io::ipc::{IpcField, ARROW_MAGIC};
use crate::io::ipc::{IpcField, ARROW_MAGIC_V1};

type WriteOutput<W> = (usize, Option<Block>, Vec<Block>, Option<W>);

Expand Down Expand Up @@ -105,7 +105,7 @@ where
}

async fn start(mut writer: W, encoded: EncodedData) -> Result<WriteOutput<W>> {
writer.write_all(&ARROW_MAGIC[..]).await?;
writer.write_all(&ARROW_MAGIC_V1[..]).await?;
writer.write_all(&[0, 0]).await?;
let (meta, data) = write_message(&mut writer, encoded).await?;

Expand Down Expand Up @@ -149,7 +149,7 @@ where
writer
.write_all(&(footer.len() as i32).to_le_bytes())
.await?;
writer.write_all(&ARROW_MAGIC).await?;
writer.write_all(&ARROW_MAGIC_V1).await?;
writer.close().await?;

Ok((0, None, vec![], None))
Expand Down
6 changes: 3 additions & 3 deletions src/io/ipc/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use arrow_format::ipc::planus::Builder;

use super::{
super::IpcField,
super::ARROW_MAGIC,
super::ARROW_MAGIC_V1,
common::{DictionaryTracker, EncodedData, WriteOptions},
common_sync::{write_continuation, write_message},
default_ipc_fields, schema, schema_to_bytes,
Expand Down Expand Up @@ -114,7 +114,7 @@ impl<W: Write> FileWriter<W> {
return Err(Error::oos("The IPC file can only be started once"));
}
// write magic to header
self.writer.write_all(&ARROW_MAGIC[..])?;
self.writer.write_all(&ARROW_MAGIC_V1[..])?;
// create an 8-byte boundary after the header
self.writer.write_all(&[0, 0])?;
// write the schema, set the written bytes to the schema
Expand Down Expand Up @@ -205,7 +205,7 @@ impl<W: Write> FileWriter<W> {
self.writer.write_all(footer_data)?;
self.writer
.write_all(&(footer_data.len() as i32).to_le_bytes())?;
self.writer.write_all(&ARROW_MAGIC)?;
self.writer.write_all(&ARROW_MAGIC_V1)?;
self.writer.flush()?;
self.state = State::Finished;

Expand Down

0 comments on commit 302265b

Please sign in to comment.