From 81e88190af463c4ae85938a653e9973dd1701fc2 Mon Sep 17 00:00:00 2001 From: Anu-Ra-g Date: Thu, 27 Jun 2024 17:52:01 +0530 Subject: [PATCH 1/3] added parse_grib_idx function --- kerchunk/grib2.py | 71 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 69 insertions(+), 2 deletions(-) diff --git a/kerchunk/grib2.py b/kerchunk/grib2.py index 52b02269..e92668f1 100644 --- a/kerchunk/grib2.py +++ b/kerchunk/grib2.py @@ -3,10 +3,12 @@ import io import logging from collections import defaultdict -from typing import Iterable, List, Dict, Set - +from typing import Iterable, List, Dict, Set, TYPE_CHECKING, Optional import ujson +if TYPE_CHECKING: + import pandas as pd + try: import cfgrib except ModuleNotFoundError as err: # pragma: no cover @@ -582,3 +584,68 @@ def correct_hrrr_subhf_step(group: Dict) -> Dict: group["refs"]["step/0"] = enocded_val return group + + +def parse_grib_idx( + fs: fsspec.AbstractFileSystem, + *, + basename: str, + suffix: str = "idx", + tstamp: Optional["pd.Timestamp"] = None, + validate: bool = False, +) -> "pd.DataFrame": + """ + Standalone method used to extract metadata from a grib2 idx file(text) from NODD. + + The function takes idx file, extracts the metadata known as attrs (variables with + level and forecast time) from each idx entry and converts it into pandas + DataFrame. The dataframe is later to build the one-to-one mapping to the grib file metadata. + + Parameters + ---------- + fs : fsspec.AbstractFileSystem + The file system to read from. + basename : str + The base name is the full path to the grib file. + suffix : str + The suffix is the ending for the idx file. + tstamp : Optional[pd.Timestamp] + The timestamp to use for when the data was indexed + validate : bool + The validation if the metadata table has duplicate attrs. + + Returns + ------- + pandas.DataFrame : The data frame containing the results. + """ + import pandas as pd + + fname = f"{basename}.{suffix}" + + baseinfo = fs.info(basename) + + result = None + + try: + result = pd.read_csv(fname, sep=":", header=None).loc[:, :5] + result.columns = ["idx", "offset", "date", "attrs", "level", "forecast"] + result["attrs"] = ( + result["attrs"] + ":" + result["level"] + ":" + result["forecast"] + ) + result.drop(columns=["level", "forecast"], inplace=True) + except Exception as e: + raise ValueError(f"Could not parse {fname}") from e + + result = result.assign( + length=( + result.offset.shift(periods=-1, fill_value=baseinfo["size"]) - result.offset + ), + idx_uri=fname, + grib_uri=basename, + indexed_at=tstamp if tstamp else pd.Timestamp.now(), + ) + + if validate and not result["attrs"].is_unique: + raise ValueError(f"Attribute mapping for grib file {basename} is not unique)") + + return result.set_index("idx") From eca50f21fb9886fed49f073adc91e44d3a38654a Mon Sep 17 00:00:00 2001 From: Anu-Ra-g Date: Mon, 8 Jul 2024 18:36:20 +0530 Subject: [PATCH 2/3] removed fs parameter in parse_grib_idx --- kerchunk/grib2.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/kerchunk/grib2.py b/kerchunk/grib2.py index e92668f1..bdda84fc 100644 --- a/kerchunk/grib2.py +++ b/kerchunk/grib2.py @@ -587,10 +587,9 @@ def correct_hrrr_subhf_step(group: Dict) -> Dict: def parse_grib_idx( - fs: fsspec.AbstractFileSystem, - *, basename: str, suffix: str = "idx", + storage_options: Dict = {}, tstamp: Optional["pd.Timestamp"] = None, validate: bool = False, ) -> "pd.DataFrame": @@ -603,12 +602,12 @@ def parse_grib_idx( Parameters ---------- - fs : fsspec.AbstractFileSystem - The file system to read from. basename : str The base name is the full path to the grib file. suffix : str The suffix is the ending for the idx file. + storage_options: dict + For accessing the data, passed to filesystem tstamp : Optional[pd.Timestamp] The timestamp to use for when the data was indexed validate : bool @@ -620,21 +619,20 @@ def parse_grib_idx( """ import pandas as pd + fs, _ = fsspec.core.url_to_fs(basename, **storage_options) + fname = f"{basename}.{suffix}" baseinfo = fs.info(basename) - result = None + result = pd.read_csv(fs.open(fname), header=None, names=["raw_data"]) + result[["idx", "offset", "date", "attrs"]] = result["raw_data"].str.split( + ":", expand=True, n=3 + ) + result["offset"] = result["offset"].astype(int) - try: - result = pd.read_csv(fname, sep=":", header=None).loc[:, :5] - result.columns = ["idx", "offset", "date", "attrs", "level", "forecast"] - result["attrs"] = ( - result["attrs"] + ":" + result["level"] + ":" + result["forecast"] - ) - result.drop(columns=["level", "forecast"], inplace=True) - except Exception as e: - raise ValueError(f"Could not parse {fname}") from e + # dropping the original single "raw_data" column before the formatting + result.drop(columns=["raw_data"], inplace=True) result = result.assign( length=( From d37bd5690dcc1b84f5d800368d556593e5a56815 Mon Sep 17 00:00:00 2001 From: Anu-Ra-g Date: Mon, 8 Jul 2024 20:03:36 +0530 Subject: [PATCH 3/3] made suggested updates --- kerchunk/grib2.py | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/kerchunk/grib2.py b/kerchunk/grib2.py index bdda84fc..18cf3ec5 100644 --- a/kerchunk/grib2.py +++ b/kerchunk/grib2.py @@ -589,14 +589,13 @@ def correct_hrrr_subhf_step(group: Dict) -> Dict: def parse_grib_idx( basename: str, suffix: str = "idx", - storage_options: Dict = {}, - tstamp: Optional["pd.Timestamp"] = None, + storage_options: Optional[Dict] = None, validate: bool = False, ) -> "pd.DataFrame": """ - Standalone method used to extract metadata from a grib2 idx file(text) from NODD. + Parses per-message metadata from a grib2.idx file (text-type) to a dataframe of attributes - The function takes idx file, extracts the metadata known as attrs (variables with + The function uses the idx file, extracts the metadata known as attrs (variables with level and forecast time) from each idx entry and converts it into pandas DataFrame. The dataframe is later to build the one-to-one mapping to the grib file metadata. @@ -608,8 +607,6 @@ def parse_grib_idx( The suffix is the ending for the idx file. storage_options: dict For accessing the data, passed to filesystem - tstamp : Optional[pd.Timestamp] - The timestamp to use for when the data was indexed validate : bool The validation if the metadata table has duplicate attrs. @@ -619,20 +616,21 @@ def parse_grib_idx( """ import pandas as pd - fs, _ = fsspec.core.url_to_fs(basename, **storage_options) + fs, _ = fsspec.core.url_to_fs(basename, **(storage_options or {})) fname = f"{basename}.{suffix}" baseinfo = fs.info(basename) - result = pd.read_csv(fs.open(fname), header=None, names=["raw_data"]) - result[["idx", "offset", "date", "attrs"]] = result["raw_data"].str.split( - ":", expand=True, n=3 - ) - result["offset"] = result["offset"].astype(int) + with fs.open(fname) as f: + result = pd.read_csv(f, header=None, names=["raw_data"]) + result[["idx", "offset", "date", "attrs"]] = result["raw_data"].str.split( + ":", expand=True, n=3 + ) + result["offset"] = result["offset"].astype(int) - # dropping the original single "raw_data" column before the formatting - result.drop(columns=["raw_data"], inplace=True) + # dropping the original single "raw_data" column after formatting + result.drop(columns=["raw_data"], inplace=True) result = result.assign( length=( @@ -640,7 +638,6 @@ def parse_grib_idx( ), idx_uri=fname, grib_uri=basename, - indexed_at=tstamp if tstamp else pd.Timestamp.now(), ) if validate and not result["attrs"].is_unique: