From 1b6f02d536d253465d2c601f222fb0acede8a942 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Wed, 4 Sep 2024 12:02:40 -0500 Subject: [PATCH] Multi-file and Parquet-aware prefetching from remote storage (#16657) Follow up to https://github.com/rapidsai/cudf/pull/16613 Supersedes https://github.com/rapidsai/cudf/pull/16166 Improves remote-IO read performance when multiple files are read at once. Also enables partial IO for remote Parquet files (previously removed in `24.10` by https://github.com/rapidsai/cudf/pull/16589). Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) - Lawrence Mitchell (https://github.com/wence-) URL: https://github.com/rapidsai/cudf/pull/16657 --- python/cudf/cudf/io/parquet.py | 40 +++++++++ python/cudf/cudf/tests/test_s3.py | 47 ++++++++++ python/cudf/cudf/utils/ioutils.py | 141 ++++++++++++++++++++++++++---- 3 files changed, 212 insertions(+), 16 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 526f12aa94e..62be7378e9e 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -577,11 +577,51 @@ def read_parquet( ) filepath_or_buffer = paths if paths else filepath_or_buffer + # Prepare remote-IO options + prefetch_options = kwargs.pop("prefetch_options", {}) + if not ioutils._is_local_filesystem(fs): + # The default prefetch method depends on the + # `row_groups` argument. In most cases we will use + # method="all" by default, because it is fastest + # when we need to read most of the file(s). + # If a (simple) `row_groups` selection is made, we + # use method="parquet" to avoid transferring the + # entire file over the network + method = prefetch_options.get("method") + _row_groups = None + if method in (None, "parquet"): + if row_groups is None: + # If the user didn't specify a method, don't use + # 'parquet' prefetcher for column projection alone. + method = method or "all" + elif all(r == row_groups[0] for r in row_groups): + # Row group selection means we are probably + # reading half the file or less. We should + # avoid a full file transfer by default. + method = "parquet" + _row_groups = row_groups[0] + elif (method := method or "all") == "parquet": + raise ValueError( + "The 'parquet' prefetcher requires a uniform " + "row-group selection for all paths within the " + "same `read_parquet` call. " + "Got: {row_groups}" + ) + if method == "parquet": + prefetch_options = prefetch_options.update( + { + "method": method, + "columns": columns, + "row_groups": _row_groups, + } + ) + filepaths_or_buffers = ioutils.get_reader_filepath_or_buffer( path_or_data=filepath_or_buffer, fs=fs, storage_options=storage_options, bytes_per_thread=bytes_per_thread, + prefetch_options=prefetch_options, ) # Warn user if they are not using cudf for IO diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index 3b23a53091e..0958b68084d 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -229,6 +229,53 @@ def test_read_parquet( assert_eq(expect, got2) +@pytest.mark.parametrize("method", ["all", "parquet"]) +@pytest.mark.parametrize("blocksize", [1024 * 1024, 1024]) +def test_read_parquet_prefetch_options( + s3_base, + s3so, + pdf, + method, + blocksize, +): + bucket = "parquet" + fname_1 = "test_parquet_reader_prefetch_options_1.parquet" + buffer_1 = BytesIO() + pdf.to_parquet(path=buffer_1) + buffer_1.seek(0) + + fname_2 = "test_parquet_reader_prefetch_options_2.parquet" + buffer_2 = BytesIO() + pdf_2 = pdf.copy() + pdf_2["Integer"] += 1 + pdf_2.to_parquet(path=buffer_2) + buffer_2.seek(0) + + with s3_context( + s3_base=s3_base, + bucket=bucket, + files={ + fname_1: buffer_1, + fname_2: buffer_2, + }, + ): + got = cudf.read_parquet( + [ + f"s3://{bucket}/{fname_1}", + f"s3://{bucket}/{fname_2}", + ], + storage_options=s3so, + prefetch_options={ + "method": method, + "blocksize": blocksize, + }, + columns=["String", "Integer"], + ) + + expect = pd.concat([pdf, pdf_2], ignore_index=True)[["String", "Integer"]] + assert_eq(expect, got) + + @pytest.mark.parametrize("bytes_per_thread", [32, 1024]) @pytest.mark.parametrize("columns", [None, ["List", "Struct"]]) @pytest.mark.parametrize("index", [None, "Integer"]) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 6b146be0fa3..1627107b57d 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -1,6 +1,8 @@ # Copyright (c) 2019-2024, NVIDIA CORPORATION. import datetime +import functools +import operator import os import urllib import warnings @@ -18,6 +20,12 @@ from cudf.core._compat import PANDAS_LT_300 from cudf.utils.docutils import docfmt_partial +try: + import fsspec.parquet as fsspec_parquet + +except ImportError: + fsspec_parquet = None + _BYTES_PER_THREAD_DEFAULT = 256 * 1024 * 1024 _ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 @@ -187,6 +195,11 @@ allow_mismatched_pq_schemas : boolean, default False If True, enables reading (matching) columns specified in `columns` and `filters` options from the input files with otherwise mismatched schemas. +prefetch_options : dict, default None + WARNING: This is an experimental feature and may be removed at any + time without warning or deprecation period. + Dictionary of options to use to prefetch bytes from remote storage. + These options are passed through to `get_reader_filepath_or_buffer`. Returns ------- @@ -1439,6 +1452,14 @@ Glob pattern to use when expanding directories into file paths (e.g. "*.json"). If this parameter is not specified, directories will not be expanded. +prefetch_options : dict, default None + WARNING: This is an experimental feature and may be removed at any + time without warning or deprecation period. + Dictionary of options to use to prefetch bytes from remote storage. + These options are only used when `path_or_data` is a list of remote + paths. If 'method' is set to 'all' (the default), the only supported + option is 'blocksize' (default 256 MB). If method is set to 'parquet', + 'columns' and 'row_groups' are also supported (default None). Returns ------- @@ -1620,6 +1641,7 @@ def get_reader_filepath_or_buffer( warn_on_raw_text_input=None, warn_meta=None, expand_dir_pattern=None, + prefetch_options=None, ): """{docstring}""" @@ -1690,26 +1712,15 @@ def get_reader_filepath_or_buffer( raw_text_input = True elif fs is not None: - # TODO: We can use cat_ranges and/or parquet-aware logic - # to copy all remote data into host memory at once here. - # The current solution iterates over files, and copies - # ALL data from each file (even when we are performing - # partial IO, and don't need the entire file) if len(paths) == 0: raise FileNotFoundError( f"{input_sources} could not be resolved to any files" ) - filepaths_or_buffers = [ - BytesIO( - _fsspec_data_transfer( - fpath, - fs=fs, - mode=mode, - bytes_per_thread=bytes_per_thread, - ) - ) - for fpath in paths - ] + filepaths_or_buffers = _prefetch_remote_buffers( + paths, + fs, + **(prefetch_options or {}), + ) else: raw_text_input = True @@ -2099,3 +2110,101 @@ def _read_byte_ranges( for worker in workers: worker.join() + + +def _get_remote_bytes_all( + remote_paths, fs, *, blocksize=_BYTES_PER_THREAD_DEFAULT +): + # TODO: Experiment with a heuristic to avoid the fs.sizes + # call when we are reading many files at once (the latency + # of collecting the file sizes is unnecessary in this case) + if max(sizes := fs.sizes(remote_paths)) <= blocksize: + # Don't bother breaking up individual files + return fs.cat_ranges(remote_paths, None, None) + else: + # Construct list of paths, starts, and ends + paths, starts, ends = map( + list, + zip( + *( + (r, j, min(j + blocksize, s)) + for r, s in zip(remote_paths, sizes) + for j in range(0, s, blocksize) + ) + ), + ) + + # Collect the byte ranges + chunks = fs.cat_ranges(paths, starts, ends) + + # Construct local byte buffers + # (Need to make sure path offsets are ordered correctly) + unique_count = dict(zip(*np.unique(paths, return_counts=True))) + offset = np.cumsum([0] + [unique_count[p] for p in remote_paths]) + buffers = [ + functools.reduce(operator.add, chunks[offset[i] : offset[i + 1]]) + for i in range(len(remote_paths)) + ] + return buffers + + +def _get_remote_bytes_parquet( + remote_paths, + fs, + *, + columns=None, + row_groups=None, + blocksize=_BYTES_PER_THREAD_DEFAULT, +): + if fsspec_parquet is None or (columns is None and row_groups is None): + return _get_remote_bytes_all(remote_paths, fs, blocksize=blocksize) + + sizes = fs.sizes(remote_paths) + data = fsspec_parquet._get_parquet_byte_ranges( + remote_paths, + fs, + columns=columns, + row_groups=row_groups, + max_block=blocksize, + ) + + buffers = [] + for size, path in zip(sizes, remote_paths): + path_data = data[path] + buf = np.empty(size, dtype="b") + for range_offset in path_data.keys(): + chunk = path_data[range_offset] + buf[range_offset[0] : range_offset[1]] = np.frombuffer( + chunk, dtype="b" + ) + buffers.append(buf.tobytes()) + return buffers + + +def _prefetch_remote_buffers( + paths, + fs, + *, + method="all", + **prefetch_options, +): + # Gather bytes ahead of time for remote filesystems + if fs and paths and not _is_local_filesystem(fs): + try: + prefetcher = { + "parquet": _get_remote_bytes_parquet, + "all": _get_remote_bytes_all, + }[method] + except KeyError: + raise ValueError( + f"{method} is not a supported remote-data prefetcher." + " Expected 'parquet' or 'all'." + ) + return prefetcher( + paths, + fs, + **prefetch_options, + ) + + else: + return paths