Skip to content

Commit

Permalink
[Bug] Remove loud NativeFile deprecation noise for read_parquet f…
Browse files Browse the repository at this point in the history
…rom S3 (#16415)

Important follow-up to #16132

Without this PR, using `dask_cudf.read_parquet("s3://...", ...)` will
result in loud deprecation warnings after `compute`/`persist` is called.
This is because dask will always pass `NativeFile` objects down to cudf.

My fault for missing this earlier!
  • Loading branch information
rjzamora committed Jul 30, 2024
1 parent bd302d7 commit 5feeaf3
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 34 deletions.
76 changes: 44 additions & 32 deletions python/dask_cudf/dask_cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
_is_local_filesystem,
_open_remote_files,
)
from cudf.utils.utils import maybe_filter_deprecation


class CudfEngine(ArrowDatasetEngine):
Expand Down Expand Up @@ -110,39 +111,50 @@ def _read_paths(
),
)

# Use cudf to read in data
try:
df = cudf.read_parquet(
paths_or_fobs,
engine="cudf",
columns=columns,
row_groups=row_groups if row_groups else None,
dataset_kwargs=dataset_kwargs,
categorical_partitions=False,
**kwargs,
)
except RuntimeError as err:
# TODO: Remove try/except after null-schema issue is resolved
# (See: https://github.com/rapidsai/cudf/issues/12702)
if len(paths_or_fobs) > 1:
df = cudf.concat(
[
cudf.read_parquet(
pof,
engine="cudf",
columns=columns,
row_groups=row_groups[i]
if row_groups
else None,
dataset_kwargs=dataset_kwargs,
categorical_partitions=False,
**kwargs,
)
for i, pof in enumerate(paths_or_fobs)
]
# Filter out deprecation warning unless the user
# specifies open_file_options and/or use_python_file_object.
# Otherwise, the FutureWarning is out of their control.
with maybe_filter_deprecation(
(
not open_file_options
and "use_python_file_object" not in kwargs
),
message="Support for reading pyarrow's NativeFile is deprecated",
category=FutureWarning,
):
# Use cudf to read in data
try:
df = cudf.read_parquet(
paths_or_fobs,
engine="cudf",
columns=columns,
row_groups=row_groups if row_groups else None,
dataset_kwargs=dataset_kwargs,
categorical_partitions=False,
**kwargs,
)
else:
raise err
except RuntimeError as err:
# TODO: Remove try/except after null-schema issue is resolved
# (See: https://github.com/rapidsai/cudf/issues/12702)
if len(paths_or_fobs) > 1:
df = cudf.concat(
[
cudf.read_parquet(
pof,
engine="cudf",
columns=columns,
row_groups=row_groups[i]
if row_groups
else None,
dataset_kwargs=dataset_kwargs,
categorical_partitions=False,
**kwargs,
)
for i, pof in enumerate(paths_or_fobs)
]
)
else:
raise err

# Apply filters (if any are defined)
df = _apply_post_filters(df, filters)
Expand Down
86 changes: 84 additions & 2 deletions python/dask_cudf/dask_cudf/io/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import pyarrow.fs as pa_fs
import pytest

from dask.dataframe import assert_eq

import dask_cudf

moto = pytest.importorskip("moto", minversion="3.1.6")
Expand Down Expand Up @@ -102,6 +104,11 @@ def s3_context(s3_base, bucket, files=None):
pass


@pytest.fixture
def pdf(scope="module"):
return pd.DataFrame({"a": [1, 2, 3, 4], "b": [2.1, 2.2, 2.3, 2.4]})


def test_read_csv(s3_base, s3so):
with s3_context(
s3_base=s3_base, bucket="daskcsv", files={"a.csv": b"a,b\n1,2\n3,4\n"}
Expand All @@ -112,6 +119,22 @@ def test_read_csv(s3_base, s3so):
assert df.a.sum().compute() == 4


def test_read_csv_warns(s3_base, s3so):
with s3_context(
s3_base=s3_base,
bucket="daskcsv_warns",
files={"a.csv": b"a,b\n1,2\n3,4\n"},
):
with pytest.warns(FutureWarning):
df = dask_cudf.read_csv(
"s3://daskcsv_warns/*.csv",
blocksize="50 B",
storage_options=s3so,
use_python_file_object=True,
)
assert df.a.sum().compute() == 4


@pytest.mark.parametrize(
"open_file_options",
[
Expand All @@ -120,8 +143,7 @@ def test_read_csv(s3_base, s3so):
{"open_file_func": None},
],
)
def test_read_parquet(s3_base, s3so, open_file_options):
pdf = pd.DataFrame({"a": [1, 2, 3, 4], "b": [2.1, 2.2, 2.3, 2.4]})
def test_read_parquet_open_file_options(s3_base, s3so, open_file_options, pdf):
buffer = BytesIO()
pdf.to_parquet(path=buffer)
buffer.seek(0)
Expand All @@ -142,3 +164,63 @@ def test_read_parquet(s3_base, s3so, open_file_options):
assert df.a.sum().compute() == 10
with pytest.warns(FutureWarning):
assert df.b.sum().compute() == 9


def test_read_parquet(s3_base, s3so, pdf):
fname = "test_parquet_reader_dask.parquet"
bucket = "parquet"
buffer = BytesIO()
pdf.to_parquet(path=buffer)
buffer.seek(0)
with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}):
got = dask_cudf.read_parquet(
f"s3://{bucket}/{fname}",
storage_options=s3so,
)
assert_eq(pdf, got)


def test_read_parquet_use_python_file_object(s3_base, s3so, pdf):
fname = "test_parquet_use_python_file_object.parquet"
bucket = "parquet"
buffer = BytesIO()
pdf.to_parquet(path=buffer)
buffer.seek(0)
with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}):
with pytest.warns(FutureWarning):
got = dask_cudf.read_parquet(
f"s3://{bucket}/{fname}",
storage_options=s3so,
read={"use_python_file_object": True},
).head()
assert_eq(pdf, got)


def test_read_orc(s3_base, s3so, pdf):
fname = "test_orc_reader_dask.orc"
bucket = "orc"
buffer = BytesIO()
pdf.to_orc(path=buffer)
buffer.seek(0)
with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}):
got = dask_cudf.read_orc(
f"s3://{bucket}/{fname}",
storage_options=s3so,
)
assert_eq(pdf, got)


def test_read_orc_use_python_file_object(s3_base, s3so, pdf):
fname = "test_orc_use_python_file_object.orc"
bucket = "orc"
buffer = BytesIO()
pdf.to_orc(path=buffer)
buffer.seek(0)
with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}):
with pytest.warns(FutureWarning):
got = dask_cudf.read_orc(
f"s3://{bucket}/{fname}",
storage_options=s3so,
use_python_file_object=True,
).head()
assert_eq(pdf, got)

0 comments on commit 5feeaf3

Please sign in to comment.