diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 810a804e428..f0cab953458 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -33,6 +33,7 @@ _is_local_filesystem, _open_remote_files, ) +from cudf.utils.utils import maybe_filter_deprecation class CudfEngine(ArrowDatasetEngine): @@ -110,39 +111,50 @@ def _read_paths( ), ) - # Use cudf to read in data - try: - df = cudf.read_parquet( - paths_or_fobs, - engine="cudf", - columns=columns, - row_groups=row_groups if row_groups else None, - dataset_kwargs=dataset_kwargs, - categorical_partitions=False, - **kwargs, - ) - except RuntimeError as err: - # TODO: Remove try/except after null-schema issue is resolved - # (See: https://github.com/rapidsai/cudf/issues/12702) - if len(paths_or_fobs) > 1: - df = cudf.concat( - [ - cudf.read_parquet( - pof, - engine="cudf", - columns=columns, - row_groups=row_groups[i] - if row_groups - else None, - dataset_kwargs=dataset_kwargs, - categorical_partitions=False, - **kwargs, - ) - for i, pof in enumerate(paths_or_fobs) - ] + # Filter out deprecation warning unless the user + # specifies open_file_options and/or use_python_file_object. + # Otherwise, the FutureWarning is out of their control. + with maybe_filter_deprecation( + ( + not open_file_options + and "use_python_file_object" not in kwargs + ), + message="Support for reading pyarrow's NativeFile is deprecated", + category=FutureWarning, + ): + # Use cudf to read in data + try: + df = cudf.read_parquet( + paths_or_fobs, + engine="cudf", + columns=columns, + row_groups=row_groups if row_groups else None, + dataset_kwargs=dataset_kwargs, + categorical_partitions=False, + **kwargs, ) - else: - raise err + except RuntimeError as err: + # TODO: Remove try/except after null-schema issue is resolved + # (See: https://github.com/rapidsai/cudf/issues/12702) + if len(paths_or_fobs) > 1: + df = cudf.concat( + [ + cudf.read_parquet( + pof, + engine="cudf", + columns=columns, + row_groups=row_groups[i] + if row_groups + else None, + dataset_kwargs=dataset_kwargs, + categorical_partitions=False, + **kwargs, + ) + for i, pof in enumerate(paths_or_fobs) + ] + ) + else: + raise err # Apply filters (if any are defined) df = _apply_post_filters(df, filters) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_s3.py b/python/dask_cudf/dask_cudf/io/tests/test_s3.py index 3947c69aaa5..ac3245b3748 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_s3.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_s3.py @@ -9,6 +9,8 @@ import pyarrow.fs as pa_fs import pytest +from dask.dataframe import assert_eq + import dask_cudf moto = pytest.importorskip("moto", minversion="3.1.6") @@ -102,6 +104,11 @@ def s3_context(s3_base, bucket, files=None): pass +@pytest.fixture +def pdf(scope="module"): + return pd.DataFrame({"a": [1, 2, 3, 4], "b": [2.1, 2.2, 2.3, 2.4]}) + + def test_read_csv(s3_base, s3so): with s3_context( s3_base=s3_base, bucket="daskcsv", files={"a.csv": b"a,b\n1,2\n3,4\n"} @@ -112,6 +119,22 @@ def test_read_csv(s3_base, s3so): assert df.a.sum().compute() == 4 +def test_read_csv_warns(s3_base, s3so): + with s3_context( + s3_base=s3_base, + bucket="daskcsv_warns", + files={"a.csv": b"a,b\n1,2\n3,4\n"}, + ): + with pytest.warns(FutureWarning): + df = dask_cudf.read_csv( + "s3://daskcsv_warns/*.csv", + blocksize="50 B", + storage_options=s3so, + use_python_file_object=True, + ) + assert df.a.sum().compute() == 4 + + @pytest.mark.parametrize( "open_file_options", [ @@ -120,8 +143,7 @@ def test_read_csv(s3_base, s3so): {"open_file_func": None}, ], ) -def test_read_parquet(s3_base, s3so, open_file_options): - pdf = pd.DataFrame({"a": [1, 2, 3, 4], "b": [2.1, 2.2, 2.3, 2.4]}) +def test_read_parquet_open_file_options(s3_base, s3so, open_file_options, pdf): buffer = BytesIO() pdf.to_parquet(path=buffer) buffer.seek(0) @@ -142,3 +164,63 @@ def test_read_parquet(s3_base, s3so, open_file_options): assert df.a.sum().compute() == 10 with pytest.warns(FutureWarning): assert df.b.sum().compute() == 9 + + +def test_read_parquet(s3_base, s3so, pdf): + fname = "test_parquet_reader_dask.parquet" + bucket = "parquet" + buffer = BytesIO() + pdf.to_parquet(path=buffer) + buffer.seek(0) + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): + got = dask_cudf.read_parquet( + f"s3://{bucket}/{fname}", + storage_options=s3so, + ) + assert_eq(pdf, got) + + +def test_read_parquet_use_python_file_object(s3_base, s3so, pdf): + fname = "test_parquet_use_python_file_object.parquet" + bucket = "parquet" + buffer = BytesIO() + pdf.to_parquet(path=buffer) + buffer.seek(0) + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): + with pytest.warns(FutureWarning): + got = dask_cudf.read_parquet( + f"s3://{bucket}/{fname}", + storage_options=s3so, + read={"use_python_file_object": True}, + ).head() + assert_eq(pdf, got) + + +def test_read_orc(s3_base, s3so, pdf): + fname = "test_orc_reader_dask.orc" + bucket = "orc" + buffer = BytesIO() + pdf.to_orc(path=buffer) + buffer.seek(0) + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): + got = dask_cudf.read_orc( + f"s3://{bucket}/{fname}", + storage_options=s3so, + ) + assert_eq(pdf, got) + + +def test_read_orc_use_python_file_object(s3_base, s3so, pdf): + fname = "test_orc_use_python_file_object.orc" + bucket = "orc" + buffer = BytesIO() + pdf.to_orc(path=buffer) + buffer.seek(0) + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): + with pytest.warns(FutureWarning): + got = dask_cudf.read_orc( + f"s3://{bucket}/{fname}", + storage_options=s3so, + use_python_file_object=True, + ).head() + assert_eq(pdf, got)