From cd8d73c0f32466142c9865a2bb946101ea3d81a7 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 26 Aug 2024 10:03:00 -0700 Subject: [PATCH 1/5] add multi-file and parquet-aware prefetching --- python/cudf/cudf/io/parquet.py | 15 ++++ python/cudf/cudf/utils/ioutils.py | 119 ++++++++++++++++++++++++++---- 2 files changed, 118 insertions(+), 16 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 6b895abbf66..b8ca9a2c93a 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -571,11 +571,26 @@ def read_parquet( ) filepath_or_buffer = paths if paths else filepath_or_buffer + # Prepare remote-IO options + prefetch_options = kwargs.pop("prefetch_options", {}) + if not ioutils._is_local_filesystem(fs): + method = prefetch_options.get("method", "parquet") + if method == "parquet": + prefetch_options = prefetch_options.update( + { + "method": method, + "columns": columns, + # All paths must have the same row-group selection + "row_groups": row_groups[0] if row_groups else None, + } + ) + filepaths_or_buffers = ioutils.get_reader_filepath_or_buffer( path_or_data=filepath_or_buffer, fs=fs, storage_options=storage_options, bytes_per_thread=bytes_per_thread, + prefetch_options=prefetch_options, ) # Warn user if they are not using cudf for IO diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index e5944d7093c..ea7556291a4 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -18,6 +18,12 @@ from cudf.core._compat import PANDAS_LT_300 from cudf.utils.docutils import docfmt_partial +try: + import fsspec.parquet as fsspec_parquet + +except ImportError: + fsspec_parquet = None + _BYTES_PER_THREAD_DEFAULT = 256 * 1024 * 1024 _ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 @@ -1617,6 +1623,7 @@ def get_reader_filepath_or_buffer( warn_on_raw_text_input=None, warn_meta=None, expand_dir_pattern=None, + prefetch_options=None, ): """{docstring}""" @@ -1687,26 +1694,15 @@ def get_reader_filepath_or_buffer( raw_text_input = True elif fs is not None: - # TODO: We can use cat_ranges and/or parquet-aware logic - # to copy all remote data into host memory at once here. - # The current solution iterates over files, and copies - # ALL data from each file (even when we are performing - # partial IO, and don't need the entire file) if len(paths) == 0: raise FileNotFoundError( f"{input_sources} could not be resolved to any files" ) - filepaths_or_buffers = [ - BytesIO( - _fsspec_data_transfer( - fpath, - fs=fs, - mode=mode, - bytes_per_thread=bytes_per_thread, - ) - ) - for fpath in paths - ] + filepaths_or_buffers = _prefetch_remote_buffers( + paths, + fs, + **(prefetch_options or {}), + ) else: raw_text_input = True @@ -2096,3 +2092,94 @@ def _read_byte_ranges( for worker in workers: worker.join() + + +def _get_remote_bytes_all( + remote_paths, fs, *, blocksize=_BYTES_PER_THREAD_DEFAULT +): + if ( + len(remote_paths) >= 8 # Heuristic to avoid fs.sizes + or max(sizes := fs.sizes(remote_paths)) <= blocksize + ): + # Don't bother braking up individual files + return fs.cat_ranges(remote_paths, None, None) + else: + # Construct list of paths, starts, and ends + paths, starts, ends = [], [], [] + for i, remote_path in enumerate(remote_paths): + for j in range(0, sizes[i], blocksize): + paths.append(remote_path) + starts.append(j) + ends.append(min(j + blocksize, sizes[i])) + + # Collect the byte ranges + chunks = fs.cat_ranges(paths, starts, ends) + + # Construct local byte buffers + buffers = [] + path_counts = np.unique(paths, return_counts=True)[1] + for i, remote_path in enumerate(remote_paths): + bytes_arr = bytearray() + for j in range(path_counts[i]): + bytes_arr.extend(chunks.pop(0)) + buffers.append(bytes(bytes_arr)) + return buffers + + +def _get_remote_bytes_parquet( + remote_paths, + fs, + *, + columns=None, + row_groups=None, + blocksize=_BYTES_PER_THREAD_DEFAULT, +): + if fsspec_parquet is None or (columns is None and row_groups is None): + return _get_remote_bytes_all(remote_paths, fs, blocksize=blocksize) + + sizes = fs.sizes(remote_paths) + data = fsspec_parquet._get_parquet_byte_ranges( + remote_paths, + fs, + columns=columns, + row_groups=row_groups, + max_block=blocksize, + ) + + buffers = [] + for size, path in zip(sizes, remote_paths): + path_data = data.pop(path) + buf = np.zeros(size, dtype="b") + for range_offset in list(path_data.keys()): + chunk = path_data.pop(range_offset) + buf[range_offset[0] : range_offset[1]] = np.frombuffer( + chunk, dtype="b" + ) + buffers.append(buf.tobytes()) + return buffers + + +def _prefetch_remote_buffers( + paths, + fs, + *, + method="all", + **prefetch_options, +): + # Gather bytes ahead of time for remote filesystems + if fs and paths and not _is_local_filesystem(fs): + _prefetchers = { + "parquet": _get_remote_bytes_parquet, + "all": _get_remote_bytes_all, + } + if method not in _prefetchers: + raise NotImplementedError( + f"{method} is not a supported remote-data prefetcher" + ) + return _prefetchers.get(method)( + paths, + fs, + **prefetch_options, + ) + else: + return paths From d12b8bcaaa5fa2301b148c660617de750e19e3c2 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 28 Aug 2024 19:24:46 -0700 Subject: [PATCH 2/5] add test coverage --- python/cudf/cudf/tests/test_s3.py | 27 +++++++++++++++++++++++++++ python/cudf/cudf/utils/ioutils.py | 14 ++++++++------ 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index 3b23a53091e..1a6ec927921 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -229,6 +229,33 @@ def test_read_parquet( assert_eq(expect, got2) +@pytest.mark.parametrize("method", [None, "all", "parquet"]) +def test_read_parquet_prefetch_options( + s3_base, + s3so, + pdf, + method, +): + fname = "test_parquet_reader_prefetch_options.parquet" + bucket = "parquet" + buffer = BytesIO() + pdf.to_parquet(path=buffer) + buffer.seek(0) + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): + if method is None: + prefetch_options = {} + else: + prefetch_options = {"method": method} + got = cudf.read_parquet( + f"s3://{bucket}/{fname}", + storage_options=s3so, + prefetch_options=prefetch_options, + columns=["String"], + ) + expect = pdf[["String"]] + assert_eq(expect, got) + + @pytest.mark.parametrize("bytes_per_thread", [32, 1024]) @pytest.mark.parametrize("columns", [None, ["List", "Struct"]]) @pytest.mark.parametrize("index", [None, "Integer"]) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index ea7556291a4..13f6f9f1307 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -2168,18 +2168,20 @@ def _prefetch_remote_buffers( ): # Gather bytes ahead of time for remote filesystems if fs and paths and not _is_local_filesystem(fs): - _prefetchers = { - "parquet": _get_remote_bytes_parquet, - "all": _get_remote_bytes_all, - } - if method not in _prefetchers: + try: + prefetcher = { + "parquet": _get_remote_bytes_parquet, + "all": _get_remote_bytes_all, + }[method] + except KeyError: raise NotImplementedError( f"{method} is not a supported remote-data prefetcher" ) - return _prefetchers.get(method)( + return prefetcher( paths, fs, **prefetch_options, ) + else: return paths From b9d060c073440d97c952f48308766d8a7fc5d668 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 30 Aug 2024 07:48:32 -0700 Subject: [PATCH 3/5] address code review --- python/cudf/cudf/io/parquet.py | 24 +++++++++++-- python/cudf/cudf/utils/ioutils.py | 57 +++++++++++++++++++------------ 2 files changed, 56 insertions(+), 25 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 6733a1f1df1..2d22635d5c0 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -580,14 +580,32 @@ def read_parquet( # Prepare remote-IO options prefetch_options = kwargs.pop("prefetch_options", {}) if not ioutils._is_local_filesystem(fs): - method = prefetch_options.get("method", "parquet") + method = prefetch_options.get("method") + _row_groups = None + if method in (None, "parquet"): + if row_groups is None: + # Don't use 'parquet' prefetcher for column + # projection alone. + method = method or "all" + elif all(r == row_groups[0] for r in row_groups): + # Row group selection means we are probably + # reading half the file or less. We should + # avoid a full file transfer by default. + method = "parquet" + _row_groups = row_groups[0] + elif (method := method or "all") == "parquet": + raise ValueError( + "The 'parquet' prefetcher requires a uniform " + "row-group selection for all paths within the " + "same `read_parquet` call. " + "Got: {row_groups}" + ) if method == "parquet": prefetch_options = prefetch_options.update( { "method": method, "columns": columns, - # All paths must have the same row-group selection - "row_groups": row_groups[0] if row_groups else None, + "row_groups": _row_groups, } ) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 22543d805c5..91c9dc52dc5 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -1,6 +1,8 @@ # Copyright (c) 2019-2024, NVIDIA CORPORATION. import datetime +import functools +import operator import os import urllib import warnings @@ -193,6 +195,10 @@ allow_mismatched_pq_schemas : boolean, default False If True, enables reading (matching) columns specified in `columns` and `filters` options from the input files with otherwise mismatched schemas. +prefetch_options : dict, default None + WARNING: This is an experimental feature (added in 24.10). + Dictionary of options to use to prefetch bytes from remote storage. + These options are passed through to `get_reader_filepath_or_buffer`. Returns ------- @@ -1445,6 +1451,13 @@ Glob pattern to use when expanding directories into file paths (e.g. "*.json"). If this parameter is not specified, directories will not be expanded. +prefetch_options : dict, default None + WARNING: This is an experimental feature (added in 24.10). + Dictionary of options to use to prefetch bytes from remote storage. + These options are only used when `path_or_data` is a list of remote + paths. If 'method' is set to 'all' (the default), the only supported + option is 'blocksize' (default 256 MB). If method is set to 'parquet', + 'columns' and 'row_groups' are also supported (default None). Returns ------- @@ -2100,32 +2113,32 @@ def _read_byte_ranges( def _get_remote_bytes_all( remote_paths, fs, *, blocksize=_BYTES_PER_THREAD_DEFAULT ): - if ( - len(remote_paths) >= 8 # Heuristic to avoid fs.sizes - or max(sizes := fs.sizes(remote_paths)) <= blocksize - ): - # Don't bother braking up individual files + # TODO: Experiment with a heuristic to avoid the fs.sizes + # call when we are reading many files at once (the latency + # of collecting the file sizes is unnecessary in this case) + if max(sizes := fs.sizes(remote_paths)) <= blocksize: + # Don't bother breaking up individual files return fs.cat_ranges(remote_paths, None, None) else: # Construct list of paths, starts, and ends - paths, starts, ends = [], [], [] - for i, remote_path in enumerate(remote_paths): - for j in range(0, sizes[i], blocksize): - paths.append(remote_path) - starts.append(j) - ends.append(min(j + blocksize, sizes[i])) + paths, starts, ends = zip( + *[ + (r, j, min(j + blocksize, s)) + for r, s in zip(remote_paths, sizes) + for j in range(0, s, blocksize) + ] + ) # Collect the byte ranges chunks = fs.cat_ranges(paths, starts, ends) # Construct local byte buffers - buffers = [] - path_counts = np.unique(paths, return_counts=True)[1] - for i, remote_path in enumerate(remote_paths): - bytes_arr = bytearray() - for j in range(path_counts[i]): - bytes_arr.extend(chunks.pop(0)) - buffers.append(bytes(bytes_arr)) + buffers = [ + functools.reduce(operator.add, chunks[:counts]) + for _, counts in zip( + remote_paths, np.unique(paths, return_counts=True)[1] + ) + ] return buffers @@ -2151,10 +2164,10 @@ def _get_remote_bytes_parquet( buffers = [] for size, path in zip(sizes, remote_paths): - path_data = data.pop(path) - buf = np.zeros(size, dtype="b") - for range_offset in list(path_data.keys()): - chunk = path_data.pop(range_offset) + path_data = data[path] + buf = np.empty(size, dtype="b") + for range_offset in path_data.keys(): + chunk = path_data[range_offset] buf[range_offset[0] : range_offset[1]] = np.frombuffer( chunk, dtype="b" ) From 9428b9db0b3b864e195dc1c04f0a57307d1c287d Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 3 Sep 2024 10:50:10 -0700 Subject: [PATCH 4/5] code review and test coverage update --- python/cudf/cudf/io/parquet.py | 7 +++++ python/cudf/cudf/tests/test_s3.py | 48 ++++++++++++++++++++++--------- python/cudf/cudf/utils/ioutils.py | 24 +++++++++------- 3 files changed, 55 insertions(+), 24 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 2d22635d5c0..2b3d4da0990 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -580,6 +580,13 @@ def read_parquet( # Prepare remote-IO options prefetch_options = kwargs.pop("prefetch_options", {}) if not ioutils._is_local_filesystem(fs): + # The default prefetch method depends on the + # `row_groups` argument. In most cases we will use + # method="all" by default, because it is fastest + # when we need to read most of the file(s). + # If a (simple) `row_groups` selection is made, we + # use method="parquet" to avoid transferring the + # entire file over the network method = prefetch_options.get("method") _row_groups = None if method in (None, "parquet"): diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index 1a6ec927921..0958b68084d 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -229,30 +229,50 @@ def test_read_parquet( assert_eq(expect, got2) -@pytest.mark.parametrize("method", [None, "all", "parquet"]) +@pytest.mark.parametrize("method", ["all", "parquet"]) +@pytest.mark.parametrize("blocksize", [1024 * 1024, 1024]) def test_read_parquet_prefetch_options( s3_base, s3so, pdf, method, + blocksize, ): - fname = "test_parquet_reader_prefetch_options.parquet" bucket = "parquet" - buffer = BytesIO() - pdf.to_parquet(path=buffer) - buffer.seek(0) - with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): - if method is None: - prefetch_options = {} - else: - prefetch_options = {"method": method} + fname_1 = "test_parquet_reader_prefetch_options_1.parquet" + buffer_1 = BytesIO() + pdf.to_parquet(path=buffer_1) + buffer_1.seek(0) + + fname_2 = "test_parquet_reader_prefetch_options_2.parquet" + buffer_2 = BytesIO() + pdf_2 = pdf.copy() + pdf_2["Integer"] += 1 + pdf_2.to_parquet(path=buffer_2) + buffer_2.seek(0) + + with s3_context( + s3_base=s3_base, + bucket=bucket, + files={ + fname_1: buffer_1, + fname_2: buffer_2, + }, + ): got = cudf.read_parquet( - f"s3://{bucket}/{fname}", + [ + f"s3://{bucket}/{fname_1}", + f"s3://{bucket}/{fname_2}", + ], storage_options=s3so, - prefetch_options=prefetch_options, - columns=["String"], + prefetch_options={ + "method": method, + "blocksize": blocksize, + }, + columns=["String", "Integer"], ) - expect = pdf[["String"]] + + expect = pd.concat([pdf, pdf_2], ignore_index=True)[["String", "Integer"]] assert_eq(expect, got) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 91c9dc52dc5..facb320a566 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -2121,23 +2121,27 @@ def _get_remote_bytes_all( return fs.cat_ranges(remote_paths, None, None) else: # Construct list of paths, starts, and ends - paths, starts, ends = zip( - *[ - (r, j, min(j + blocksize, s)) - for r, s in zip(remote_paths, sizes) - for j in range(0, s, blocksize) - ] + paths, starts, ends = map( + list, + zip( + *( + (r, j, min(j + blocksize, s)) + for r, s in zip(remote_paths, sizes) + for j in range(0, s, blocksize) + ) + ), ) # Collect the byte ranges chunks = fs.cat_ranges(paths, starts, ends) # Construct local byte buffers + # (Need to make sure path offsets are ordered correctly) + unique_count = dict(zip(*np.unique(paths, return_counts=True))) + offset = np.cumsum([0] + [unique_count[p] for p in remote_paths]) buffers = [ - functools.reduce(operator.add, chunks[:counts]) - for _, counts in zip( - remote_paths, np.unique(paths, return_counts=True)[1] - ) + functools.reduce(operator.add, chunks[offset[i] : offset[i + 1]]) + for i in range(len(remote_paths)) ] return buffers From ae9a71eaf46e75f2a825776aa1bae459ea4372b6 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 4 Sep 2024 07:10:49 -0700 Subject: [PATCH 5/5] Update comments to address code review --- python/cudf/cudf/io/parquet.py | 4 ++-- python/cudf/cudf/utils/ioutils.py | 11 +++++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 2b3d4da0990..62be7378e9e 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -591,8 +591,8 @@ def read_parquet( _row_groups = None if method in (None, "parquet"): if row_groups is None: - # Don't use 'parquet' prefetcher for column - # projection alone. + # If the user didn't specify a method, don't use + # 'parquet' prefetcher for column projection alone. method = method or "all" elif all(r == row_groups[0] for r in row_groups): # Row group selection means we are probably diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index facb320a566..1627107b57d 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -196,7 +196,8 @@ If True, enables reading (matching) columns specified in `columns` and `filters` options from the input files with otherwise mismatched schemas. prefetch_options : dict, default None - WARNING: This is an experimental feature (added in 24.10). + WARNING: This is an experimental feature and may be removed at any + time without warning or deprecation period. Dictionary of options to use to prefetch bytes from remote storage. These options are passed through to `get_reader_filepath_or_buffer`. @@ -1452,7 +1453,8 @@ (e.g. "*.json"). If this parameter is not specified, directories will not be expanded. prefetch_options : dict, default None - WARNING: This is an experimental feature (added in 24.10). + WARNING: This is an experimental feature and may be removed at any + time without warning or deprecation period. Dictionary of options to use to prefetch bytes from remote storage. These options are only used when `path_or_data` is a list of remote paths. If 'method' is set to 'all' (the default), the only supported @@ -2194,8 +2196,9 @@ def _prefetch_remote_buffers( "all": _get_remote_bytes_all, }[method] except KeyError: - raise NotImplementedError( - f"{method} is not a supported remote-data prefetcher" + raise ValueError( + f"{method} is not a supported remote-data prefetcher." + " Expected 'parquet' or 'all'." ) return prefetcher( paths,