Skip to content

Commit

Permalink
Multi-file and Parquet-aware prefetching from remote storage (#16657)
Browse files Browse the repository at this point in the history
Follow up to #16613
Supersedes #16166

Improves remote-IO read performance when multiple files are read at once. Also enables partial IO for remote Parquet files (previously removed in `24.10` by #16589).

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Lawrence Mitchell (https://github.com/wence-)

URL: #16657
  • Loading branch information
rjzamora committed Sep 4, 2024
1 parent 26091a4 commit 1b6f02d
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 16 deletions.
40 changes: 40 additions & 0 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,11 +577,51 @@ def read_parquet(
)
filepath_or_buffer = paths if paths else filepath_or_buffer

# Prepare remote-IO options
prefetch_options = kwargs.pop("prefetch_options", {})
if not ioutils._is_local_filesystem(fs):
# The default prefetch method depends on the
# `row_groups` argument. In most cases we will use
# method="all" by default, because it is fastest
# when we need to read most of the file(s).
# If a (simple) `row_groups` selection is made, we
# use method="parquet" to avoid transferring the
# entire file over the network
method = prefetch_options.get("method")
_row_groups = None
if method in (None, "parquet"):
if row_groups is None:
# If the user didn't specify a method, don't use
# 'parquet' prefetcher for column projection alone.
method = method or "all"
elif all(r == row_groups[0] for r in row_groups):
# Row group selection means we are probably
# reading half the file or less. We should
# avoid a full file transfer by default.
method = "parquet"
_row_groups = row_groups[0]
elif (method := method or "all") == "parquet":
raise ValueError(
"The 'parquet' prefetcher requires a uniform "
"row-group selection for all paths within the "
"same `read_parquet` call. "
"Got: {row_groups}"
)
if method == "parquet":
prefetch_options = prefetch_options.update(
{
"method": method,
"columns": columns,
"row_groups": _row_groups,
}
)

filepaths_or_buffers = ioutils.get_reader_filepath_or_buffer(
path_or_data=filepath_or_buffer,
fs=fs,
storage_options=storage_options,
bytes_per_thread=bytes_per_thread,
prefetch_options=prefetch_options,
)

# Warn user if they are not using cudf for IO
Expand Down
47 changes: 47 additions & 0 deletions python/cudf/cudf/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,53 @@ def test_read_parquet(
assert_eq(expect, got2)


@pytest.mark.parametrize("method", ["all", "parquet"])
@pytest.mark.parametrize("blocksize", [1024 * 1024, 1024])
def test_read_parquet_prefetch_options(
s3_base,
s3so,
pdf,
method,
blocksize,
):
bucket = "parquet"
fname_1 = "test_parquet_reader_prefetch_options_1.parquet"
buffer_1 = BytesIO()
pdf.to_parquet(path=buffer_1)
buffer_1.seek(0)

fname_2 = "test_parquet_reader_prefetch_options_2.parquet"
buffer_2 = BytesIO()
pdf_2 = pdf.copy()
pdf_2["Integer"] += 1
pdf_2.to_parquet(path=buffer_2)
buffer_2.seek(0)

with s3_context(
s3_base=s3_base,
bucket=bucket,
files={
fname_1: buffer_1,
fname_2: buffer_2,
},
):
got = cudf.read_parquet(
[
f"s3://{bucket}/{fname_1}",
f"s3://{bucket}/{fname_2}",
],
storage_options=s3so,
prefetch_options={
"method": method,
"blocksize": blocksize,
},
columns=["String", "Integer"],
)

expect = pd.concat([pdf, pdf_2], ignore_index=True)[["String", "Integer"]]
assert_eq(expect, got)


@pytest.mark.parametrize("bytes_per_thread", [32, 1024])
@pytest.mark.parametrize("columns", [None, ["List", "Struct"]])
@pytest.mark.parametrize("index", [None, "Integer"])
Expand Down
141 changes: 125 additions & 16 deletions python/cudf/cudf/utils/ioutils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright (c) 2019-2024, NVIDIA CORPORATION.

import datetime
import functools
import operator
import os
import urllib
import warnings
Expand All @@ -18,6 +20,12 @@
from cudf.core._compat import PANDAS_LT_300
from cudf.utils.docutils import docfmt_partial

try:
import fsspec.parquet as fsspec_parquet

except ImportError:
fsspec_parquet = None

_BYTES_PER_THREAD_DEFAULT = 256 * 1024 * 1024
_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024

Expand Down Expand Up @@ -187,6 +195,11 @@
allow_mismatched_pq_schemas : boolean, default False
If True, enables reading (matching) columns specified in `columns` and `filters`
options from the input files with otherwise mismatched schemas.
prefetch_options : dict, default None
WARNING: This is an experimental feature and may be removed at any
time without warning or deprecation period.
Dictionary of options to use to prefetch bytes from remote storage.
These options are passed through to `get_reader_filepath_or_buffer`.
Returns
-------
Expand Down Expand Up @@ -1439,6 +1452,14 @@
Glob pattern to use when expanding directories into file paths
(e.g. "*.json"). If this parameter is not specified, directories
will not be expanded.
prefetch_options : dict, default None
WARNING: This is an experimental feature and may be removed at any
time without warning or deprecation period.
Dictionary of options to use to prefetch bytes from remote storage.
These options are only used when `path_or_data` is a list of remote
paths. If 'method' is set to 'all' (the default), the only supported
option is 'blocksize' (default 256 MB). If method is set to 'parquet',
'columns' and 'row_groups' are also supported (default None).
Returns
-------
Expand Down Expand Up @@ -1620,6 +1641,7 @@ def get_reader_filepath_or_buffer(
warn_on_raw_text_input=None,
warn_meta=None,
expand_dir_pattern=None,
prefetch_options=None,
):
"""{docstring}"""

Expand Down Expand Up @@ -1690,26 +1712,15 @@ def get_reader_filepath_or_buffer(
raw_text_input = True

elif fs is not None:
# TODO: We can use cat_ranges and/or parquet-aware logic
# to copy all remote data into host memory at once here.
# The current solution iterates over files, and copies
# ALL data from each file (even when we are performing
# partial IO, and don't need the entire file)
if len(paths) == 0:
raise FileNotFoundError(
f"{input_sources} could not be resolved to any files"
)
filepaths_or_buffers = [
BytesIO(
_fsspec_data_transfer(
fpath,
fs=fs,
mode=mode,
bytes_per_thread=bytes_per_thread,
)
)
for fpath in paths
]
filepaths_or_buffers = _prefetch_remote_buffers(
paths,
fs,
**(prefetch_options or {}),
)
else:
raw_text_input = True

Expand Down Expand Up @@ -2099,3 +2110,101 @@ def _read_byte_ranges(

for worker in workers:
worker.join()


def _get_remote_bytes_all(
remote_paths, fs, *, blocksize=_BYTES_PER_THREAD_DEFAULT
):
# TODO: Experiment with a heuristic to avoid the fs.sizes
# call when we are reading many files at once (the latency
# of collecting the file sizes is unnecessary in this case)
if max(sizes := fs.sizes(remote_paths)) <= blocksize:
# Don't bother breaking up individual files
return fs.cat_ranges(remote_paths, None, None)
else:
# Construct list of paths, starts, and ends
paths, starts, ends = map(
list,
zip(
*(
(r, j, min(j + blocksize, s))
for r, s in zip(remote_paths, sizes)
for j in range(0, s, blocksize)
)
),
)

# Collect the byte ranges
chunks = fs.cat_ranges(paths, starts, ends)

# Construct local byte buffers
# (Need to make sure path offsets are ordered correctly)
unique_count = dict(zip(*np.unique(paths, return_counts=True)))
offset = np.cumsum([0] + [unique_count[p] for p in remote_paths])
buffers = [
functools.reduce(operator.add, chunks[offset[i] : offset[i + 1]])
for i in range(len(remote_paths))
]
return buffers


def _get_remote_bytes_parquet(
remote_paths,
fs,
*,
columns=None,
row_groups=None,
blocksize=_BYTES_PER_THREAD_DEFAULT,
):
if fsspec_parquet is None or (columns is None and row_groups is None):
return _get_remote_bytes_all(remote_paths, fs, blocksize=blocksize)

sizes = fs.sizes(remote_paths)
data = fsspec_parquet._get_parquet_byte_ranges(
remote_paths,
fs,
columns=columns,
row_groups=row_groups,
max_block=blocksize,
)

buffers = []
for size, path in zip(sizes, remote_paths):
path_data = data[path]
buf = np.empty(size, dtype="b")
for range_offset in path_data.keys():
chunk = path_data[range_offset]
buf[range_offset[0] : range_offset[1]] = np.frombuffer(
chunk, dtype="b"
)
buffers.append(buf.tobytes())
return buffers


def _prefetch_remote_buffers(
paths,
fs,
*,
method="all",
**prefetch_options,
):
# Gather bytes ahead of time for remote filesystems
if fs and paths and not _is_local_filesystem(fs):
try:
prefetcher = {
"parquet": _get_remote_bytes_parquet,
"all": _get_remote_bytes_all,
}[method]
except KeyError:
raise ValueError(
f"{method} is not a supported remote-data prefetcher."
" Expected 'parquet' or 'all'."
)
return prefetcher(
paths,
fs,
**prefetch_options,
)

else:
return paths

0 comments on commit 1b6f02d

Please sign in to comment.