Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reducing memory errors in dict_to_netcdf #1095

Merged
merged 10 commits into from
Oct 25, 2021
5 changes: 3 additions & 2 deletions parcels/collection/collectionaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ def _to_write_particles(pd, time):
"""We don't want to write a particle that is not started yet.
Particle will be written if particle.time is between time-dt/2 and time+dt (/2)
"""
return [i for i, p in enumerate(pd) if time - np.abs(p.dt/2) <= p.time < time + np.abs(p.dt) and np.isfinite(p.id)]
return [i for i, p in enumerate(pd) if (((time - np.abs(p.dt/2) <= p.time < time + np.abs(p.dt))
or (np.isnan(p.dt) and np.equal(time, p.time)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this part ? Where does that come from ? How could that condition occur ?

and np.isfinite(p.id))]


def _is_particle_started_yet(particle, time):
Expand Down Expand Up @@ -928,7 +930,6 @@ def toDictionary(self, pfile, time, deleted_only=False):
data_dict[var] = np.array([np.int64(getattr(p, var)) for p in self._data[indices_to_write]])
else:
data_dict[var] = np.array([getattr(p, var) for p in self._data[indices_to_write]])
pfile.maxid_written = np.maximum(pfile.maxid_written, np.max(data_dict['id']))

pset_errs = [p for p in self._data[indices_to_write] if p.state != OperationCode.Delete and abs(time-p.time) > 1e-3 and np.isfinite(p.time)]
for p in pset_errs:
Expand Down
6 changes: 3 additions & 3 deletions parcels/collection/collectionsoa.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ def _to_write_particles(pd, time):
"""We don't want to write a particle that is not started yet.
Particle will be written if particle.time is between time-dt/2 and time+dt (/2)
"""
return (np.less_equal(time - np.abs(pd['dt']/2), pd['time'], where=np.isfinite(pd['time']))
& np.greater_equal(time + np.abs(pd['dt'] / 2), pd['time'], where=np.isfinite(pd['time']))
return ((np.less_equal(time - np.abs(pd['dt']/2), pd['time'], where=np.isfinite(pd['time']))
& np.greater_equal(time + np.abs(pd['dt'] / 2), pd['time'], where=np.isfinite(pd['time']))
| ((np.isnan(pd['dt'])) & np.equal(time, pd['time'], where=np.isfinite(pd['time']))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this part ? Where does that come from ? How could that condition occur ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because particle.dt is not set yet before a call to pset.execute; so previously we couldn't write particles before execution. Because of a bug in the unit tests (now fixed), this wasn't picked up

& (np.isfinite(pd['id']))
& (np.isfinite(pd['time'])))

Expand Down Expand Up @@ -849,7 +850,6 @@ def toDictionary(self, pfile, time, deleted_only=False):
if np.any(indices_to_write):
for var in pfile.var_names:
data_dict[var] = self._data[var][indices_to_write]
pfile.maxid_written = np.maximum(pfile.maxid_written, np.max(data_dict['id']))

pset_errs = ((self._data['state'][indices_to_write] != OperationCode.Delete) & np.greater(np.abs(time - self._data['time'][indices_to_write]), 1e-3, where=np.isfinite(self._data['time'][indices_to_write])))
if np.count_nonzero(pset_errs) > 0:
Expand Down
46 changes: 26 additions & 20 deletions parcels/particlefile/particlefileaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def get_pset_info_attributes(self):
For ParticleSet structures other than SoA, and structures where ID != index, this has to be overridden.
"""
attributes = ['name', 'var_names', 'var_names_once', 'time_origin', 'lonlatdepth_dtype',
'file_list', 'file_list_once', 'maxid_written', 'parcels_mesh', 'metadata']
'file_list', 'file_list_once', 'parcels_mesh', 'metadata']
return attributes

def read_from_npy(self, file_list, time_steps, var):
Expand All @@ -70,9 +70,14 @@ def read_from_npy(self, file_list, time_steps, var):
:param time_steps: Number of time steps that were written in out directory
:param var: name of the variable to read
"""
data = np.nan * np.zeros((self.maxid_written+1, time_steps))
time_index = np.zeros(self.maxid_written+1, dtype=np.int64)
t_ind_used = np.zeros(time_steps, dtype=np.int64)
maxtime_steps = max(time_steps.values()) if time_steps.keys() else 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here the variable name is misleading - is it really the max. 'timestep' or rather the max. 'time' itself ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I've now renamed to n_timesteps and max_timesteps

data = np.nan * np.zeros((len(time_steps), maxtime_steps))
time_index = np.zeros(len(time_steps))
id_index = {}
count = 0
for i in sorted(time_steps.keys()):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorting the field here can potentially be quite time-consuming ... especially with millions of elements. But if your approach requires it: ok.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we only do this ParticleFile.export() once. If we want reproducibility of the output file (i.e. the particles IDs increase with row number), then it's important

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

k, I see that. Obviously, for ordered-collection psets, I would not do that sorting (as the collection is pre-sorted). That said, for AoS and SoA, one may need to do that trick indeed.

id_index[i] = count
count += 1

# loop over all files
for npyfile in file_list:
Expand All @@ -84,15 +89,14 @@ def read_from_npy(self, file_list, time_steps, var):
'"parcels_convert_npydir_to_netcdf %s" to convert these to '
'a NetCDF file yourself.\nTo avoid this error, make sure you '
'close() your ParticleFile at the end of your script.' % self.tempwritedir)
id_ind = np.array(data_dict["id"], dtype=np.int64)
t_ind = time_index[id_ind] if 'once' not in file_list[0] else 0
t_ind_used[t_ind] = 1
data[id_ind, t_ind] = data_dict[var]
time_index[id_ind] = time_index[id_ind] + 1
for ii, i in enumerate(data_dict["id"]):
id_ind = id_index[i]
t_ind = int(time_index[id_ind]) if 'once' not in file_list[0] else 0
data[id_ind, t_ind] = data_dict[var][ii]
time_index[id_ind] = time_index[id_ind] + 1

# remove rows and columns that are completely filled with nan values
tmp = data[time_index > 0, :]
return tmp[:, t_ind_used == 1]
return data[time_index > 0, :]

def export(self):
"""
Expand All @@ -114,34 +118,36 @@ def export(self):
if len(temp_names) == 0:
raise RuntimeError("No npy files found in %s" % self.tempwritedir_base)

global_maxid_written = -1
global_time_written = []
time_steps = {}
global_file_list = []
global_file_list_once = None
if len(self.var_names_once) > 0:
global_file_list_once = []
for tempwritedir in temp_names:
if os.path.exists(tempwritedir):
pset_info_local = np.load(os.path.join(tempwritedir, 'pset_info.npy'), allow_pickle=True).item()
global_maxid_written = np.max([global_maxid_written, pset_info_local['maxid_written']])
for npyfile in pset_info_local['file_list']:
tmp_dict = np.load(npyfile, allow_pickle=True).item()
global_time_written.append([t for t in tmp_dict['time']])
for i in tmp_dict['id']:
if i in time_steps:
time_steps[i] += 1
else:
time_steps[i] = 1
global_file_list += pset_info_local['file_list']
if len(self.var_names_once) > 0:
global_file_list_once += pset_info_local['file_list_once']
self.maxid_written = global_maxid_written
self.time_written = np.unique(global_time_written)

for var in self.var_names:
data = self.read_from_npy(global_file_list, len(self.time_written), var)
data = self.read_from_npy(global_file_list, time_steps, var)
if var == self.var_names[0]:
self.open_netcdf_file(data.shape)
varout = 'z' if var == 'depth' else var
getattr(self, varout)[:, :] = data

if len(self.var_names_once) > 0:
time_steps_once = {}
for i in time_steps:
time_steps_once[i] = 1
for var in self.var_names_once:
getattr(self, var)[:] = self.read_from_npy(global_file_list_once, 1, var)
getattr(self, var)[:] = self.read_from_npy(global_file_list_once, time_steps_once, var)

self.close_netcdf_file()
45 changes: 26 additions & 19 deletions parcels/particlefile/particlefilesoa.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def get_pset_info_attributes(self):
For ParticleSet structures other than SoA, and structures where ID != index, this has to be overridden.
"""
attributes = ['name', 'var_names', 'var_names_once', 'time_origin', 'lonlatdepth_dtype',
'file_list', 'file_list_once', 'maxid_written', 'parcels_mesh', 'metadata']
'file_list', 'file_list_once', 'parcels_mesh', 'metadata']
return attributes

def read_from_npy(self, file_list, time_steps, var):
Expand All @@ -70,9 +70,14 @@ def read_from_npy(self, file_list, time_steps, var):
:param time_steps: Number of time steps that were written in out directory
:param var: name of the variable to read
"""
data = np.nan * np.zeros((self.maxid_written+1, time_steps))
time_index = np.zeros(self.maxid_written+1, dtype=np.int64)
t_ind_used = np.zeros(time_steps, dtype=np.int64)
maxtime_steps = max(time_steps.values()) if time_steps.keys() else 0
data = np.nan * np.zeros((len(time_steps), maxtime_steps))
time_index = np.zeros(len(time_steps))
id_index = {}
count = 0
for i in sorted(time_steps.keys()):
id_index[i] = count
count += 1

# loop over all files
for npyfile in file_list:
Expand All @@ -84,15 +89,14 @@ def read_from_npy(self, file_list, time_steps, var):
'"parcels_convert_npydir_to_netcdf %s" to convert these to '
'a NetCDF file yourself.\nTo avoid this error, make sure you '
'close() your ParticleFile at the end of your script.' % self.tempwritedir)
id_ind = np.array(data_dict["id"], dtype=np.int64)
t_ind = time_index[id_ind] if 'once' not in file_list[0] else 0
t_ind_used[t_ind] = 1
data[id_ind, t_ind] = data_dict[var]
time_index[id_ind] = time_index[id_ind] + 1
for ii, i in enumerate(data_dict["id"]):
id_ind = id_index[i]
t_ind = int(time_index[id_ind]) if 'once' not in file_list[0] else 0
data[id_ind, t_ind] = data_dict[var][ii]
time_index[id_ind] = time_index[id_ind] + 1

# remove rows and columns that are completely filled with nan values
tmp = data[time_index > 0, :]
return tmp[:, t_ind_used == 1]
return data[time_index > 0, :]

def export(self):
"""
Expand All @@ -115,33 +119,36 @@ def export(self):
if len(temp_names) == 0:
raise RuntimeError("No npy files found in %s" % self.tempwritedir_base)

global_maxid_written = -1
global_time_written = []
time_steps = {}
global_file_list = []
if len(self.var_names_once) > 0:
global_file_list_once = []
for tempwritedir in temp_names:
if os.path.exists(tempwritedir):
pset_info_local = np.load(os.path.join(tempwritedir, 'pset_info.npy'), allow_pickle=True).item()
global_maxid_written = np.max([global_maxid_written, pset_info_local['maxid_written']])
for npyfile in pset_info_local['file_list']:
tmp_dict = np.load(npyfile, allow_pickle=True).item()
global_time_written.append([t for t in tmp_dict['time']])
for i in tmp_dict['id']:
if i in time_steps:
time_steps[i] += 1
else:
time_steps[i] = 1
global_file_list += pset_info_local['file_list']
if len(self.var_names_once) > 0:
global_file_list_once += pset_info_local['file_list_once']
self.maxid_written = global_maxid_written
self.time_written = np.unique(global_time_written)

for var in self.var_names:
data = self.read_from_npy(global_file_list, len(self.time_written), var)
data = self.read_from_npy(global_file_list, time_steps, var)
if var == self.var_names[0]:
self.open_netcdf_file(data.shape)
varout = 'z' if var == 'depth' else var
getattr(self, varout)[:, :] = data

if len(self.var_names_once) > 0:
time_steps_once = {}
for i in time_steps:
time_steps_once[i] = 1
for var in self.var_names_once:
getattr(self, var)[:] = self.read_from_npy(global_file_list_once, 1, var)
getattr(self, var)[:] = self.read_from_npy(global_file_list_once, time_steps_once, var)

self.close_netcdf_file()
9 changes: 7 additions & 2 deletions tests/test_particle_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,16 @@ def test_pfile_array_remove_particles(fieldset, pset_mode, mode, tmpdir, npart=1
filepath = tmpdir.join("pfile_array_remove_particles.nc")
pset = pset_type[pset_mode]['pset'](fieldset, pclass=ptype[mode],
lon=np.linspace(0, 1, npart),
lat=0.5*np.ones(npart))
lat=0.5*np.ones(npart), time=0)
pfile = pset.ParticleFile(filepath)
pfile.write(pset, 0)
pset.remove_indices(3)
for p in pset:
p.time = 1
pfile.write(pset, 1)
ncfile = close_and_compare_netcdffiles(filepath, pfile)
timearr = ncfile.variables['time'][:]
assert type(timearr[3, 1]) is not type(timearr[3, 0]) # noqa
ncfile.close()


Expand Down Expand Up @@ -104,14 +108,15 @@ def test_pfile_array_remove_all_particles(fieldset, pset_mode, mode, tmpdir, npa
filepath = tmpdir.join("pfile_array_remove_particles.nc")
pset = pset_type[pset_mode]['pset'](fieldset, pclass=ptype[mode],
lon=np.linspace(0, 1, npart),
lat=0.5*np.ones(npart))
lat=0.5*np.ones(npart), time=0)
pfile = pset.ParticleFile(filepath)
pfile.write(pset, 0)
for _ in range(npart):
pset.remove_indices(-1)
pfile.write(pset, 1)
pfile.write(pset, 2)
ncfile = close_and_compare_netcdffiles(filepath, pfile)
assert ncfile.variables['time'][:].shape == (npart, 1)
ncfile.close()


Expand Down