Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Implement filesystem="arrow" in dask_cudf.read_parquet #16684

Draft
wants to merge 26 commits into
base: branch-24.10
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
469bc5e
allow pyarrow-based read with cudf backend
rjzamora Aug 27, 2024
f20cc25
re-org
rjzamora Aug 27, 2024
8f0f598
temporary change for debugging
rjzamora Aug 28, 2024
64fd701
adjust for upstream bug
rjzamora Aug 28, 2024
8e0c902
remove stale comment
rjzamora Aug 28, 2024
18e1c08
add file aggregation
rjzamora Aug 28, 2024
5215a05
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Aug 29, 2024
c51a7bb
test coverage
rjzamora Aug 29, 2024
b7a90c1
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Aug 29, 2024
43274e2
allow aggregate_files=True
rjzamora Aug 30, 2024
63c3f04
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Aug 30, 2024
a1bd43c
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Aug 30, 2024
e3ca47f
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 3, 2024
12c09a5
fix test
rjzamora Sep 3, 2024
daee7ec
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Sep 4, 2024
d068103
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 4, 2024
257eb26
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Sep 5, 2024
ec38b1e
Make isinstance check pass for proxy ndarrays (#16601)
Matt711 Sep 5, 2024
853c76b
Performance improvement for strings::slice for wide strings (#16574)
davidwendt Sep 5, 2024
bdd2bab
skip for pyarrow<15
rjzamora Sep 6, 2024
d943d8d
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 6, 2024
eb9eee0
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Sep 10, 2024
b9c5147
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Sep 10, 2024
ec04e78
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Sep 18, 2024
e391789
Merge branch 'branch-24.10' into dask-cudf-arrow-filesystem
rjzamora Sep 19, 2024
e154d01
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-a…
rjzamora Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 156 additions & 27 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from functools import partial

import cupy as cp
import fsspec
import numpy as np
import pandas as pd
import pyarrow as pa
from packaging.version import Version
from pandas.api.types import is_scalar

import dask.dataframe as dd
Expand Down Expand Up @@ -52,6 +54,10 @@
get_parallel_type.register(cudf.BaseIndex, lambda _: Index)


# Required for Arrow filesystem support in read_parquet
PYARROW_GE_15 = Version(pa.__version__) >= Version("15.0.0")


@meta_nonempty.register(cudf.BaseIndex)
@_dask_cudf_performance_tracking
def _nonempty_index(idx):
Expand Down Expand Up @@ -507,25 +513,6 @@ def _unsupported_kwargs(old, new, kwargs):
)


def _raise_unsupported_parquet_kwargs(
open_file_options=None, filesystem=None, **kwargs
):
import fsspec

if open_file_options is not None:
raise ValueError(
"The open_file_options argument is no longer supported "
"by the 'cudf' backend."
)

if filesystem not in ("fsspec", None) and not isinstance(
filesystem, fsspec.AbstractFileSystem
):
raise ValueError(
f"filesystem={filesystem} is not supported by the 'cudf' backend."
)


# Register cudf->pandas
to_pandas_dispatch = PandasBackendEntrypoint.to_backend_dispatch()

Expand Down Expand Up @@ -604,13 +591,32 @@ def from_dict(
)

@staticmethod
def read_parquet(*args, engine=None, **kwargs):
def read_parquet(
*args,
open_file_options=None,
filesystem=None,
engine=None,
**kwargs,
):
from dask_cudf.io.parquet import CudfEngine

_raise_unsupported_parquet_kwargs(**kwargs)
if open_file_options is not None:
raise ValueError(
"The open_file_options argument is no longer supported "
"by the 'cudf' backend."
)

if filesystem not in ("fsspec", None) and not isinstance(
filesystem, fsspec.AbstractFileSystem
):
raise ValueError(
f"filesystem={filesystem} is not supported by the legacy 'cudf' backend."
)

return _default_backend(
dd.read_parquet,
*args,
filesystem=filesystem,
engine=CudfEngine,
**kwargs,
)
Expand Down Expand Up @@ -695,15 +701,138 @@ def from_dict(
)

@staticmethod
def read_parquet(*args, engine=None, **kwargs):
def read_parquet(
path,
columns=None,
filters=None,
categories=None,
index=None,
storage_options=None,
dtype_backend=None,
calculate_divisions=False,
ignore_metadata_file=False,
metadata_task_size=None,
split_row_groups="infer",
blocksize="default",
aggregate_files=None,
parquet_file_extension=(".parq", ".parquet", ".pq"),
filesystem="fsspec",
engine=None,
arrow_to_pandas=None,
open_file_options=None,
**kwargs,
):
import dask_expr as dx
from fsspec.utils import stringify_path
from pyarrow import fs as pa_fs

from dask_cudf.io.parquet import CudfEngine
from dask.core import flatten
from dask.dataframe.utils import pyarrow_strings_enabled

_raise_unsupported_parquet_kwargs(**kwargs)
return _default_backend(
dx.read_parquet, *args, engine=CudfEngine, **kwargs
)
if not isinstance(path, str):
path = stringify_path(path)

kwargs["dtype_backend"] = dtype_backend
if arrow_to_pandas:
kwargs["arrow_to_pandas"] = arrow_to_pandas

if dtype_backend is not None:
raise NotImplementedError()
if arrow_to_pandas is not None:
raise NotImplementedError()
if open_file_options is not None:
raise ValueError(
"The open_file_options argument is no longer supported "
"by the 'cudf' backend."
)

if filters is not None:
for filter in flatten(filters, container=list):
col, op, val = filter
if op == "in" and not isinstance(val, (set, list, tuple)):
raise TypeError(
"Value of 'in' filter must be a list, set or tuple."
)

if (
isinstance(filesystem, pa_fs.FileSystem)
or isinstance(filesystem, str)
and filesystem.lower() in ("arrow", "pyarrow")
):
# Depends on distributed
# (See: https://github.com/dask/dask/issues/11352)
import distributed # noqa: F401

from dask_cudf.expr._expr import CudfReadParquetPyarrowFS

if not PYARROW_GE_15:
raise RuntimeError(
"Arrow filesystem support requires pyarrow>=15"
)

if metadata_task_size is not None:
raise NotImplementedError(
"metadata_task_size is not supported when using the pyarrow filesystem."
)
if split_row_groups != "infer":
raise NotImplementedError(
"split_row_groups is not supported when using the pyarrow filesystem."
)
if blocksize == "default":
blocksize = "256 MiB"
if aggregate_files not in (None, True):
raise NotImplementedError(
f"aggregate_files={aggregate_files} is not supported when using the pyarrow filesystem."
)
if parquet_file_extension != (".parq", ".parquet", ".pq"):
raise NotImplementedError(
"parquet_file_extension is not supported when using the pyarrow filesystem."
)
if engine is not None:
raise NotImplementedError(
"engine is not supported when using the pyarrow filesystem."
)

return dx.new_collection(
CudfReadParquetPyarrowFS(
path,
columns=dx._util._convert_to_list(columns),
filters=filters,
categories=categories,
index=index,
calculate_divisions=calculate_divisions,
storage_options=storage_options,
filesystem=filesystem,
ignore_metadata_file=ignore_metadata_file,
arrow_to_pandas=arrow_to_pandas,
pyarrow_strings_enabled=pyarrow_strings_enabled(),
kwargs=kwargs,
_series=isinstance(columns, str),
_blocksize=blocksize,
)
)
else:
from dask_cudf.io.parquet import CudfEngine

return _default_backend(
dx.read_parquet,
path,
columns=columns,
filters=filters,
categories=categories,
index=index,
storage_options=storage_options,
calculate_divisions=calculate_divisions,
ignore_metadata_file=ignore_metadata_file,
metadata_task_size=metadata_task_size,
split_row_groups=split_row_groups,
blocksize=blocksize,
aggregate_files=aggregate_files,
parquet_file_extension=parquet_file_extension,
filesystem=filesystem,
engine=CudfEngine,
**kwargs,
)

@staticmethod
def read_csv(
Expand Down
120 changes: 120 additions & 0 deletions python/dask_cudf/dask_cudf/expr/_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
import functools

import dask_expr._shuffle as _shuffle_module
import pandas as pd
from dask_expr import new_collection
from dask_expr._cumulative import CumulativeBlockwise
from dask_expr._expr import Elemwise, Expr, VarColumns
from dask_expr._reductions import Reduction, Var
from dask_expr.io.io import FusedParquetIO
from dask_expr.io.parquet import ReadParquetPyarrowFS

from dask.dataframe.core import is_dataframe_like, make_meta, meta_nonempty
from dask.dataframe.dispatch import is_categorical_dtype
Expand All @@ -17,6 +20,123 @@
##


class CudfFusedParquetIO(FusedParquetIO):
@staticmethod
def _load_multiple_files(
frag_filters,
columns,
schema,
*to_pandas_args,
):
import pyarrow as pa

from dask.base import apply, tokenize
from dask.threaded import get

token = tokenize(frag_filters, columns, schema)
name = f"pq-file-{token}"
dsk = {
(name, i): (
CudfReadParquetPyarrowFS._fragment_to_table,
frag,
filter,
columns,
schema,
)
for i, (frag, filter) in enumerate(frag_filters)
}
dsk[name] = (
apply,
pa.concat_tables,
[list(dsk.keys())],
{"promote_options": "permissive"},
)
return CudfReadParquetPyarrowFS._table_to_pandas(
get(dsk, name),
*to_pandas_args,
)


class CudfReadParquetPyarrowFS(ReadParquetPyarrowFS):
_parameters = ReadParquetPyarrowFS._parameters + ["_blocksize"]
_defaults = ReadParquetPyarrowFS._defaults | {"_blocksize": None}

@functools.cached_property
def _dataset_info(self):
from dask_cudf.io.parquet import set_object_dtypes_from_pa_schema

dataset_info = super()._dataset_info
meta_pd = dataset_info["base_meta"]
if isinstance(meta_pd, cudf.DataFrame):
return dataset_info

# Convert to cudf
# (drop unsupported timezone information)
for k, v in meta_pd.dtypes.items():
if isinstance(v, pd.DatetimeTZDtype) and v.tz is not None:
meta_pd[k] = meta_pd[k].dt.tz_localize(None)
meta_cudf = cudf.from_pandas(meta_pd)

# Re-set "object" dtypes to align with pa schema
kwargs = dataset_info.get("kwargs", {})
set_object_dtypes_from_pa_schema(
meta_cudf,
kwargs.get("schema", None),
)

dataset_info["base_meta"] = meta_cudf
self.operands[type(self)._parameters.index("_dataset_info_cache")] = (
dataset_info
)
return dataset_info

@staticmethod
def _table_to_pandas(
table,
index_name,
arrow_to_pandas,
dtype_backend,
pyarrow_strings_enabled,
):
df = cudf.DataFrame.from_arrow(table)
if index_name is not None:
df = df.set_index(index_name)
return df

def _tune_up(self, parent):
if self._fusion_compression_factor >= 1:
return
if isinstance(parent, CudfFusedParquetIO):
return
return parent.substitute(self, CudfFusedParquetIO(self))

@property
def _fusion_compression_factor(self):
from dask import config
from dask.utils import parse_bytes

approx_stats = self.approx_statistics()
total_uncompressed = 0
after_projection = 0
col_op = self.operand("columns") or self.columns
for col in approx_stats["columns"]:
total_uncompressed += col["total_uncompressed_size"]
if col["path_in_schema"] in col_op:
after_projection += col["total_uncompressed_size"]

min_size = parse_bytes(
config.get("dataframe.parquet.minimum-partition-size")
)

total_uncompressed = max(total_uncompressed, min_size)
ratio = after_projection / total_uncompressed

if self._blocksize:
ratio *= total_uncompressed / parse_bytes(self._blocksize)

return max(ratio, 0.001)


class ToCudfBackend(Elemwise):
# TODO: Inherit from ToBackend when rapids-dask-dependency
# is pinned to dask>=2024.8.1
Expand Down
Loading
Loading