From 87781ea41ab174b50177d9e4543bfc47c5ccd4fc Mon Sep 17 00:00:00 2001 From: leoniewgnr Date: Tue, 15 Aug 2023 12:26:01 -0700 Subject: [PATCH 01/13] changed types from float64 to float32 --- neuralprophet/df_utils.py | 2 ++ neuralprophet/time_dataset.py | 16 ++++++++++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/neuralprophet/df_utils.py b/neuralprophet/df_utils.py index e51cbe745..77a0575ef 100644 --- a/neuralprophet/df_utils.py +++ b/neuralprophet/df_utils.py @@ -463,6 +463,8 @@ def check_dataframe( df["ds"] = df.loc[:, "ds"].astype(str) if not np.issubdtype(df["ds"].to_numpy().dtype, np.datetime64): df["ds"] = pd.to_datetime(df.loc[:, "ds"], utc=True).dt.tz_convert(None) + if df["y"].dtype != np.float32: + df["y"] = df["y"].astype(np.float32) if df.groupby("ID").apply(lambda x: x.duplicated("ds").any()).any(): raise ValueError("Column ds has duplicate values. Please remove duplicates.") diff --git a/neuralprophet/time_dataset.py b/neuralprophet/time_dataset.py index b3bcfe320..a7b2bb2d5 100644 --- a/neuralprophet/time_dataset.py +++ b/neuralprophet/time_dataset.py @@ -136,7 +136,15 @@ def init_after_tabularized(self, inputs, targets=None): if key in self.two_level_inputs: self.inputs[key] = OrderedDict({}) for name, features in data.items(): - self.inputs[key][name] = torch.from_numpy(features.astype(float)).type(inputs_dtype[key]) + if features.dtype != np.float32: # 1196.5 MiB + features = features.astype(np.float32, copy=False) + + tensor = torch.from_numpy(features) + + if tensor.dtype != inputs_dtype[key]: + self.inputs[key][name] = tensor.to(dtype=inputs_dtype[key]) + else: + self.inputs[key][name] = tensor else: if key == "timestamps": self.inputs[key] = data @@ -335,7 +343,7 @@ def _stride_lagged_features(df_col_name, feature_dims): series = df.loc[:, df_col_name].values # Added dtype=np.float64 to solve the problem with np.isnan for ubuntu test return np.array( - [series[i + max_lags - feature_dims : i + max_lags] for i in range(n_samples)], dtype=np.float64 + [series[i + max_lags - feature_dims : i + max_lags] for i in range(n_samples)], dtype=np.float32 ) def _stride_timestamps_for_forecasts(x): @@ -488,7 +496,7 @@ def fourier_series(dates, period, series_order): Matrix with seasonality features """ # convert to days since epoch - t = np.array((dates - datetime(1970, 1, 1)).dt.total_seconds().astype(float)) / (3600 * 24.0) + t = np.array((dates - datetime(1970, 1, 1)).dt.total_seconds().astype(np.float32)) / (3600 * 24.0) return fourier_series_t(t, period, series_order) @@ -603,7 +611,7 @@ def make_events_features(df, config_events: Optional[configure.ConfigEvents] = N if config_events is not None: for event, configs in config_events.items(): if event not in df.columns: - df[event] = np.zeros_like(df["ds"], dtype=np.float64) + df[event] = np.zeros_like(df["ds"], dtype=np.float32) feature = df[event] _create_event_offset_features(event, configs, feature, additive_events, multiplicative_events) From a5d714b9febaf0247e4fba1e0ea37ef1388bbc6a Mon Sep 17 00:00:00 2001 From: leoniewgnr Date: Tue, 15 Aug 2023 12:34:41 -0700 Subject: [PATCH 02/13] location of float 32 conversion --- neuralprophet/data/process.py | 2 ++ neuralprophet/df_utils.py | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/neuralprophet/data/process.py b/neuralprophet/data/process.py index 961eff4ad..c50147f78 100644 --- a/neuralprophet/data/process.py +++ b/neuralprophet/data/process.py @@ -517,6 +517,8 @@ def _handle_missing_data( df_to_add = df.groupby("ID", group_keys=False).apply(lambda x: x.loc[last_valid_index[x.name] + 1 :]) df = df_dropped log.info(f"Dropped {n_dropped} rows at the end with NaNs in 'y' column.") + if df["y"].dtype != np.float32: + df["y"] = df["y"].astype(np.float32) if config_missing.impute_missing: # impute missing values diff --git a/neuralprophet/df_utils.py b/neuralprophet/df_utils.py index 77a0575ef..e51cbe745 100644 --- a/neuralprophet/df_utils.py +++ b/neuralprophet/df_utils.py @@ -463,8 +463,6 @@ def check_dataframe( df["ds"] = df.loc[:, "ds"].astype(str) if not np.issubdtype(df["ds"].to_numpy().dtype, np.datetime64): df["ds"] = pd.to_datetime(df.loc[:, "ds"], utc=True).dt.tz_convert(None) - if df["y"].dtype != np.float32: - df["y"] = df["y"].astype(np.float32) if df.groupby("ID").apply(lambda x: x.duplicated("ds").any()).any(): raise ValueError("Column ds has duplicate values. Please remove duplicates.") From 34b67af05c6ef0b82e5486559bb5588715da91d8 Mon Sep 17 00:00:00 2001 From: leoniewgnr Date: Tue, 15 Aug 2023 14:42:03 -0700 Subject: [PATCH 03/13] deleted unnecessary type conversion --- neuralprophet/df_utils.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/neuralprophet/df_utils.py b/neuralprophet/df_utils.py index e51cbe745..116ecac78 100644 --- a/neuralprophet/df_utils.py +++ b/neuralprophet/df_utils.py @@ -459,8 +459,6 @@ def check_dataframe( raise ValueError("Dataframe must have columns 'ds' with the dates.") if df["ds"].isnull().any(): raise ValueError("Found NaN in column ds.") - if df["ds"].dtype == np.int64: - df["ds"] = df.loc[:, "ds"].astype(str) if not np.issubdtype(df["ds"].to_numpy().dtype, np.datetime64): df["ds"] = pd.to_datetime(df.loc[:, "ds"], utc=True).dt.tz_convert(None) if df.groupby("ID").apply(lambda x: x.duplicated("ds").any()).any(): From f84fb9eee857ff92dd0f0888585b31993f387a4a Mon Sep 17 00:00:00 2001 From: leoniewgnr Date: Tue, 15 Aug 2023 16:15:09 -0700 Subject: [PATCH 04/13] only for debugging --- neuralprophet/time_dataset.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/neuralprophet/time_dataset.py b/neuralprophet/time_dataset.py index a7b2bb2d5..dc7d68557 100644 --- a/neuralprophet/time_dataset.py +++ b/neuralprophet/time_dataset.py @@ -6,6 +6,7 @@ import numpy as np import pandas as pd import torch +from memory_profiler import profile from torch.utils.data.dataset import Dataset from neuralprophet import configure, utils @@ -63,6 +64,7 @@ def __init__(self, df, name, **kwargs): "events", "regressors", ] + log.info(f"Creating dataset for {name}") inputs, targets, drop_missing = tabularize_univariate_datetime(df, **kwargs) self.init_after_tabularized(inputs, targets) self.filter_samples_after_init(kwargs["prediction_frequency"]) @@ -111,6 +113,7 @@ def drop_nan_after_init(self, df, predict_steps, drop_missing): "Please either adjust imputation parameters, or set 'drop_missing' to True to drop those samples." ) + @profile def init_after_tabularized(self, inputs, targets=None): """Create Timedataset with data. Parameters From 23681cedb395a67cb3385a1199cc05e3c528636a Mon Sep 17 00:00:00 2001 From: leoniewgnr Date: Tue, 15 Aug 2023 20:17:27 -0700 Subject: [PATCH 05/13] parallelized time dataset --- neuralprophet/data/process.py | 6 +++++- neuralprophet/time_dataset.py | 32 +++++++++++++++++++++++++++----- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/neuralprophet/data/process.py b/neuralprophet/data/process.py index c50147f78..4c0312446 100644 --- a/neuralprophet/data/process.py +++ b/neuralprophet/data/process.py @@ -572,7 +572,7 @@ def _handle_missing_data( return df -def _create_dataset(model, df, predict_mode, prediction_frequency=None): +def _create_dataset(model, df, predict_mode, prediction_frequency=None, num_workers=0): """Construct dataset from dataframe. (Configured Hyperparameters can be overridden by explicitly supplying them. @@ -605,6 +605,9 @@ def _create_dataset(model, df, predict_mode, prediction_frequency=None): value: int forecast origin of the predictions to be made, e.g. 7 for 7am in case of 'daily-hour'. + num_workers : int + number of workers to use for data loading + Returns ------- TimeDataset @@ -612,6 +615,7 @@ def _create_dataset(model, df, predict_mode, prediction_frequency=None): df, _, _, _ = df_utils.prep_or_copy_df(df) return time_dataset.GlobalTimeDataset( df, + num_workers=num_workers, predict_mode=predict_mode, n_lags=model.n_lags, n_forecasts=model.n_forecasts, diff --git a/neuralprophet/time_dataset.py b/neuralprophet/time_dataset.py index dc7d68557..e5b90173e 100644 --- a/neuralprophet/time_dataset.py +++ b/neuralprophet/time_dataset.py @@ -1,6 +1,7 @@ import logging from collections import OrderedDict, defaultdict from datetime import datetime +from multiprocessing import Pool from typing import Optional import numpy as np @@ -17,7 +18,7 @@ class GlobalTimeDataset(Dataset): - def __init__(self, df, **kwargs): + def __init__(self, df, num_workers, **kwargs): """Initialize Timedataset from time-series df. Parameters ---------- @@ -27,11 +28,34 @@ def __init__(self, df, **kwargs): **kwargs : dict Identical to :meth:`tabularize_univariate_datetime` """ - # # TODO (future): vectorize - timedatasets = [TimeDataset(df_i, df_name, **kwargs) for df_name, df_i in df.groupby("ID")] + # TODO (future): vectorize + if num_workers > 0: + print(f"num_workers: {num_workers}") + grouped_dfs = list(df.groupby("ID")) + df_names = [item[0] for item in grouped_dfs] + dataframes = [item[1] for item in grouped_dfs] + timedatasets = self.parallel_time_datasets(dataframes, df_names, kwargs, num_workers) + else: + timedatasets = [TimeDataset(df_i, df_name, **kwargs) for df_name, df_i in df.groupby("ID")] + self.combined_timedataset = [item for timedataset in timedatasets for item in timedataset] self.length = sum(timedataset.length for timedataset in timedatasets) + @staticmethod + def worker_function(args): + df_i, df_name, kwargs = args + print(f"Creating dataset for {df_name}") + return TimeDataset(df_i, df_name, **kwargs) + + @staticmethod + def parallel_time_datasets(dataframes, df_names, kwargs, num_workers): + args = [(df_i, df_name, kwargs) for df_i, df_name in zip(dataframes, df_names)] + + with Pool(num_workers) as pool: + results = pool.map(GlobalTimeDataset.worker_function, args) + + return results + def __len__(self): return self.length @@ -64,7 +88,6 @@ def __init__(self, df, name, **kwargs): "events", "regressors", ] - log.info(f"Creating dataset for {name}") inputs, targets, drop_missing = tabularize_univariate_datetime(df, **kwargs) self.init_after_tabularized(inputs, targets) self.filter_samples_after_init(kwargs["prediction_frequency"]) @@ -113,7 +136,6 @@ def drop_nan_after_init(self, df, predict_steps, drop_missing): "Please either adjust imputation parameters, or set 'drop_missing' to True to drop those samples." ) - @profile def init_after_tabularized(self, inputs, targets=None): """Create Timedataset with data. Parameters From c13757e5ccda6d7c5bf767eab4159bfc53b61c81 Mon Sep 17 00:00:00 2001 From: leoniewgnr Date: Tue, 15 Aug 2023 20:24:57 -0700 Subject: [PATCH 06/13] fixed flake8 --- neuralprophet/time_dataset.py | 1 - 1 file changed, 1 deletion(-) diff --git a/neuralprophet/time_dataset.py b/neuralprophet/time_dataset.py index e5b90173e..8a60511d8 100644 --- a/neuralprophet/time_dataset.py +++ b/neuralprophet/time_dataset.py @@ -7,7 +7,6 @@ import numpy as np import pandas as pd import torch -from memory_profiler import profile from torch.utils.data.dataset import Dataset from neuralprophet import configure, utils From 23ca37447ea335ece32aabe8ae33487dc1e1cf57 Mon Sep 17 00:00:00 2001 From: leoniewgnr Date: Wed, 16 Aug 2023 09:24:52 -0700 Subject: [PATCH 07/13] print to log --- neuralprophet/time_dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/neuralprophet/time_dataset.py b/neuralprophet/time_dataset.py index 8a60511d8..8f2ca3886 100644 --- a/neuralprophet/time_dataset.py +++ b/neuralprophet/time_dataset.py @@ -29,7 +29,7 @@ def __init__(self, df, num_workers, **kwargs): """ # TODO (future): vectorize if num_workers > 0: - print(f"num_workers: {num_workers}") + log.info(f"num_workers: {num_workers}") grouped_dfs = list(df.groupby("ID")) df_names = [item[0] for item in grouped_dfs] dataframes = [item[1] for item in grouped_dfs] @@ -43,7 +43,7 @@ def __init__(self, df, num_workers, **kwargs): @staticmethod def worker_function(args): df_i, df_name, kwargs = args - print(f"Creating dataset for {df_name}") + log.info(f"Creating dataset for {df_name}") return TimeDataset(df_i, df_name, **kwargs) @staticmethod From 52ba9a874340cfc3cc9904247535f79ea296a808 Mon Sep 17 00:00:00 2001 From: leoniewgnr Date: Wed, 16 Aug 2023 09:56:08 -0700 Subject: [PATCH 08/13] changed position log statement --- neuralprophet/time_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neuralprophet/time_dataset.py b/neuralprophet/time_dataset.py index 8f2ca3886..4dbe3f338 100644 --- a/neuralprophet/time_dataset.py +++ b/neuralprophet/time_dataset.py @@ -28,8 +28,8 @@ def __init__(self, df, num_workers, **kwargs): Identical to :meth:`tabularize_univariate_datetime` """ # TODO (future): vectorize + log.info(f"num_workers: {num_workers}") if num_workers > 0: - log.info(f"num_workers: {num_workers}") grouped_dfs = list(df.groupby("ID")) df_names = [item[0] for item in grouped_dfs] dataframes = [item[1] for item in grouped_dfs] From 971ad6fd038154cbbdfd9bb4876672ac7fa49819 Mon Sep 17 00:00:00 2001 From: leoniewgnr Date: Thu, 17 Aug 2023 10:44:27 -0700 Subject: [PATCH 09/13] reverted parallelisation --- neuralprophet/forecaster.py | 3 +-- neuralprophet/time_dataset.py | 32 ++++---------------------------- 2 files changed, 5 insertions(+), 30 deletions(-) diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index dc865d14f..d0b03392a 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -2567,7 +2567,7 @@ def _init_train_loader(self, df, num_workers=0): self.config_country_holidays.init_holidays(df_merged) dataset = _create_dataset( - self, df, predict_mode=False, prediction_frequency=self.prediction_frequency + self, df, predict_mode=False, prediction_frequency=self.prediction_frequency, num_workers=num_workers ) # needs to be called after set_auto_seasonalities # Determine the max_number of epochs @@ -2577,7 +2577,6 @@ def _init_train_loader(self, df, num_workers=0): dataset, batch_size=self.config_train.batch_size, shuffle=True, - num_workers=num_workers, ) return loader diff --git a/neuralprophet/time_dataset.py b/neuralprophet/time_dataset.py index 4dbe3f338..b7a34122c 100644 --- a/neuralprophet/time_dataset.py +++ b/neuralprophet/time_dataset.py @@ -1,7 +1,6 @@ import logging from collections import OrderedDict, defaultdict from datetime import datetime -from multiprocessing import Pool from typing import Optional import numpy as np @@ -17,7 +16,7 @@ class GlobalTimeDataset(Dataset): - def __init__(self, df, num_workers, **kwargs): + def __init__(self, df, **kwargs): """Initialize Timedataset from time-series df. Parameters ---------- @@ -27,34 +26,11 @@ def __init__(self, df, num_workers, **kwargs): **kwargs : dict Identical to :meth:`tabularize_univariate_datetime` """ - # TODO (future): vectorize - log.info(f"num_workers: {num_workers}") - if num_workers > 0: - grouped_dfs = list(df.groupby("ID")) - df_names = [item[0] for item in grouped_dfs] - dataframes = [item[1] for item in grouped_dfs] - timedatasets = self.parallel_time_datasets(dataframes, df_names, kwargs, num_workers) - else: - timedatasets = [TimeDataset(df_i, df_name, **kwargs) for df_name, df_i in df.groupby("ID")] - + # # TODO (future): vectorize + timedatasets = [TimeDataset(df_i, df_name, **kwargs) for df_name, df_i in df.groupby("ID")] self.combined_timedataset = [item for timedataset in timedatasets for item in timedataset] self.length = sum(timedataset.length for timedataset in timedatasets) - @staticmethod - def worker_function(args): - df_i, df_name, kwargs = args - log.info(f"Creating dataset for {df_name}") - return TimeDataset(df_i, df_name, **kwargs) - - @staticmethod - def parallel_time_datasets(dataframes, df_names, kwargs, num_workers): - args = [(df_i, df_name, kwargs) for df_i, df_name in zip(dataframes, df_names)] - - with Pool(num_workers) as pool: - results = pool.map(GlobalTimeDataset.worker_function, args) - - return results - def __len__(self): return self.length @@ -160,7 +136,7 @@ def init_after_tabularized(self, inputs, targets=None): if key in self.two_level_inputs: self.inputs[key] = OrderedDict({}) for name, features in data.items(): - if features.dtype != np.float32: # 1196.5 MiB + if features.dtype != np.float32: features = features.astype(np.float32, copy=False) tensor = torch.from_numpy(features) From 2f8171458a71058dbe2485549d3810331d5aae27 Mon Sep 17 00:00:00 2001 From: leoniewgnr Date: Thu, 17 Aug 2023 10:48:57 -0700 Subject: [PATCH 10/13] reverted parallelisation --- neuralprophet/data/process.py | 6 +----- neuralprophet/forecaster.py | 3 ++- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/neuralprophet/data/process.py b/neuralprophet/data/process.py index 4c0312446..c50147f78 100644 --- a/neuralprophet/data/process.py +++ b/neuralprophet/data/process.py @@ -572,7 +572,7 @@ def _handle_missing_data( return df -def _create_dataset(model, df, predict_mode, prediction_frequency=None, num_workers=0): +def _create_dataset(model, df, predict_mode, prediction_frequency=None): """Construct dataset from dataframe. (Configured Hyperparameters can be overridden by explicitly supplying them. @@ -605,9 +605,6 @@ def _create_dataset(model, df, predict_mode, prediction_frequency=None, num_work value: int forecast origin of the predictions to be made, e.g. 7 for 7am in case of 'daily-hour'. - num_workers : int - number of workers to use for data loading - Returns ------- TimeDataset @@ -615,7 +612,6 @@ def _create_dataset(model, df, predict_mode, prediction_frequency=None, num_work df, _, _, _ = df_utils.prep_or_copy_df(df) return time_dataset.GlobalTimeDataset( df, - num_workers=num_workers, predict_mode=predict_mode, n_lags=model.n_lags, n_forecasts=model.n_forecasts, diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index d0b03392a..dc865d14f 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -2567,7 +2567,7 @@ def _init_train_loader(self, df, num_workers=0): self.config_country_holidays.init_holidays(df_merged) dataset = _create_dataset( - self, df, predict_mode=False, prediction_frequency=self.prediction_frequency, num_workers=num_workers + self, df, predict_mode=False, prediction_frequency=self.prediction_frequency ) # needs to be called after set_auto_seasonalities # Determine the max_number of epochs @@ -2577,6 +2577,7 @@ def _init_train_loader(self, df, num_workers=0): dataset, batch_size=self.config_train.batch_size, shuffle=True, + num_workers=num_workers, ) return loader From 9e73a00ca5a1282267919cc318bb8b44c05c09d5 Mon Sep 17 00:00:00 2001 From: leoniewgnr Date: Thu, 17 Aug 2023 12:09:37 -0700 Subject: [PATCH 11/13] finished all type conversions --- neuralprophet/data/process.py | 12 +++++++----- neuralprophet/df_utils.py | 2 +- neuralprophet/time_dataset.py | 6 +++--- tests/test_unit.py | 14 ++++++++++++++ 4 files changed, 25 insertions(+), 9 deletions(-) diff --git a/neuralprophet/data/process.py b/neuralprophet/data/process.py index c50147f78..4832e7e9e 100644 --- a/neuralprophet/data/process.py +++ b/neuralprophet/data/process.py @@ -76,7 +76,7 @@ def _reshape_raw_predictions_to_forecst_df( forecast = predicted[:, forecast_lag - 1, j] pad_before = max_lags + forecast_lag - 1 pad_after = n_forecasts - forecast_lag - yhat = np.concatenate(([np.NaN] * pad_before, forecast, [np.NaN] * pad_after)) + yhat = np.pad(forecast, (pad_before, pad_after), mode="constant", constant_values=np.NaN) if prediction_frequency is not None: ds = df_forecast["ds"].iloc[pad_before : -pad_after if pad_after > 0 else None] mask = df_utils.create_mask_for_prediction_frequency( @@ -86,7 +86,7 @@ def _reshape_raw_predictions_to_forecst_df( ) yhat = np.full((len(ds),), np.nan) yhat[mask] = forecast - yhat = np.concatenate(([np.NaN] * pad_before, yhat, [np.NaN] * pad_after)) + yhat = np.pad(yhat, (pad_before, pad_after), mode="constant", constant_values=np.NaN) # 0 is the median quantile index if j == 0: name = f"yhat{forecast_lag}" @@ -111,7 +111,7 @@ def _reshape_raw_predictions_to_forecst_df( forecast = components[comp][:, forecast_lag - 1, j] # 0 is the median quantile pad_before = max_lags + forecast_lag - 1 pad_after = n_forecasts - forecast_lag - yhat = np.concatenate(([np.NaN] * pad_before, forecast, [np.NaN] * pad_after)) + yhat = np.pad(forecast, (pad_before, pad_after), mode="constant", constant_values=np.NaN) if prediction_frequency is not None: ds = df_forecast["ds"].iloc[pad_before : -pad_after if pad_after > 0 else None] mask = df_utils.create_mask_for_prediction_frequency( @@ -121,7 +121,7 @@ def _reshape_raw_predictions_to_forecst_df( ) yhat = np.full((len(ds),), np.nan) yhat[mask] = forecast - yhat = np.concatenate(([np.NaN] * pad_before, yhat, [np.NaN] * pad_after)) + yhat = np.pad(yhat, (pad_before, pad_after), mode="constant", constant_values=np.NaN) if j == 0: # temporary condition to add only the median component name = f"{comp}{forecast_lag}" df_forecast[name] = yhat @@ -132,7 +132,9 @@ def _reshape_raw_predictions_to_forecst_df( for j in range(len(quantiles)): forecast_0 = components[comp][0, :, j] forecast_rest = components[comp][1:, n_forecasts - 1, j] - yhat = np.concatenate(([np.NaN] * max_lags, forecast_0, forecast_rest)) + yhat = yhat = np.pad( + np.concatenate((forecast_0, forecast_rest)), (max_lags, 0), mode="constant", constant_values=np.NaN + ) if prediction_frequency is not None: date_list = [] for key, value in prediction_frequency.items(): diff --git a/neuralprophet/df_utils.py b/neuralprophet/df_utils.py index 116ecac78..322b41be0 100644 --- a/neuralprophet/df_utils.py +++ b/neuralprophet/df_utils.py @@ -1020,7 +1020,7 @@ def convert_events_to_features(df, config_events: ConfigEvents, events_df): """ for event in config_events.keys(): - event_feature = pd.Series([0.0] * df.shape[0]) + event_feature = pd.Series(0, index=range(df.shape[0]), dtype="float32") # events_df may be None in case ID from original df is not provided in events df if events_df is None: dates = None diff --git a/neuralprophet/time_dataset.py b/neuralprophet/time_dataset.py index b7a34122c..1331dcae3 100644 --- a/neuralprophet/time_dataset.py +++ b/neuralprophet/time_dataset.py @@ -142,7 +142,9 @@ def init_after_tabularized(self, inputs, targets=None): tensor = torch.from_numpy(features) if tensor.dtype != inputs_dtype[key]: - self.inputs[key][name] = tensor.to(dtype=inputs_dtype[key]) + self.inputs[key][name] = tensor.to( + dtype=inputs_dtype[key] + ) # this can probably be removed, but was included in the previous code else: self.inputs[key][name] = tensor else: @@ -610,8 +612,6 @@ def make_events_features(df, config_events: Optional[configure.ConfigEvents] = N # create all user specified events if config_events is not None: for event, configs in config_events.items(): - if event not in df.columns: - df[event] = np.zeros_like(df["ds"], dtype=np.float32) feature = df[event] _create_event_offset_features(event, configs, feature, additive_events, multiplicative_events) diff --git a/tests/test_unit.py b/tests/test_unit.py index 6fc4a8913..37ec6961c 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -962,3 +962,17 @@ def test_multiple_countries(): assert "Christmas Day" not in holiday_names assert "Erster Weihnachtstag" in holiday_names assert "Neujahr" in holiday_names + + +def test_float32_inputs(): + # test if float32 inputs are forecasted as float32 outputs + df = pd.read_csv(PEYTON_FILE, nrows=NROWS) + df["y"] = df["y"].astype(np.float32) + m = NeuralProphet( + epochs=EPOCHS, + batch_size=BATCH_SIZE, + learning_rate=LR, + ) + m.fit(df, freq="D") + forecast = m.predict(df) + assert forecast["yhat1"].dtype == np.float32 From 5355a82588bd84fd5b88bf34c2b14204038194d8 Mon Sep 17 00:00:00 2001 From: leoniewgnr Date: Thu, 17 Aug 2023 12:11:40 -0700 Subject: [PATCH 12/13] deleted type conversion for input --- neuralprophet/data/process.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/neuralprophet/data/process.py b/neuralprophet/data/process.py index 4832e7e9e..88c7ab956 100644 --- a/neuralprophet/data/process.py +++ b/neuralprophet/data/process.py @@ -519,8 +519,6 @@ def _handle_missing_data( df_to_add = df.groupby("ID", group_keys=False).apply(lambda x: x.loc[last_valid_index[x.name] + 1 :]) df = df_dropped log.info(f"Dropped {n_dropped} rows at the end with NaNs in 'y' column.") - if df["y"].dtype != np.float32: - df["y"] = df["y"].astype(np.float32) if config_missing.impute_missing: # impute missing values From b3b5b369b9ab5bb48d1becbb31e989b6f4c1888c Mon Sep 17 00:00:00 2001 From: leoniewgnr Date: Thu, 17 Aug 2023 12:34:20 -0700 Subject: [PATCH 13/13] fixed typo --- neuralprophet/data/process.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neuralprophet/data/process.py b/neuralprophet/data/process.py index 88c7ab956..9f8861016 100644 --- a/neuralprophet/data/process.py +++ b/neuralprophet/data/process.py @@ -132,7 +132,7 @@ def _reshape_raw_predictions_to_forecst_df( for j in range(len(quantiles)): forecast_0 = components[comp][0, :, j] forecast_rest = components[comp][1:, n_forecasts - 1, j] - yhat = yhat = np.pad( + yhat = np.pad( np.concatenate((forecast_0, forecast_rest)), (max_lags, 0), mode="constant", constant_values=np.NaN ) if prediction_frequency is not None: