Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Saving Parcels output directly in zarr format #1199

Merged
merged 89 commits into from
Oct 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
f232cf1
SIgnificant simplification of baseparticlefile by using ds.to_zarr()
erikvansebille Jul 12, 2022
f08fea6
Updating particlefile.write() to support deleting particles
erikvansebille Jul 13, 2022
f354d1a
Updating particlefilesoa and -aos to new simpler writing
erikvansebille Jul 13, 2022
831ca7c
Updating output-logger message in pset.execute()
erikvansebille Jul 13, 2022
684d9cc
Updating test_particle_file to zarr dumping (first part)
erikvansebille Jul 13, 2022
084a68d
Update baseparticlefile.py
erikvansebille Jul 15, 2022
8101eb8
Further additions to test_particle_file
erikvansebille Jul 15, 2022
762a4ea
Updating dump_to_zarr to support adding and removing particles
erikvansebille Jul 19, 2022
d5554a9
dump_to_zarr passing all tests in test_particle_file
erikvansebille Jul 20, 2022
97a2950
Vecorising the dump_to_zarr
erikvansebille Jul 20, 2022
a40c110
Removing convert_npydir_to_netcdf.py script as not needed with dump_t…
erikvansebille Jul 20, 2022
6a4f2c4
Cleanign up particlefilesoa.py and particlefileaos.py
erikvansebille Jul 20, 2022
4ff05ed
Removing old dump_to_zarr attempt via xarray DataSets appending
erikvansebille Jul 20, 2022
8cb7a2d
Removing conert_at_end keyword from baseparticlefile (as not needed w…
erikvansebille Jul 20, 2022
0e6f5d7
Cleaning up test_particle_file
erikvansebille Jul 20, 2022
1909047
Fixing dump_to_zarr in unit tests in other function than test_particl…
erikvansebille Jul 20, 2022
ad70153
Update example_mitgcm.py
erikvansebille Jul 20, 2022
fe3ae3d
Update parcels_tutorial to zarr output
erikvansebille Jul 20, 2022
e527c00
Update NedetedFields tutorial to zarr output
erikvansebille Jul 20, 2022
8bed36e
Update tutorial_Argofloats to use zarr
erikvansebille Jul 20, 2022
01cfbc4
Update tutorial_SummedFields.ipynb
erikvansebille Jul 20, 2022
d98efa2
Update tutorial_delaystart.ipynb
erikvansebille Jul 20, 2022
93e4340
Update tutorial_Argofloats.ipynb
erikvansebille Jul 20, 2022
b97c73e
Update tutorial_diffusion.ipynb
erikvansebille Jul 20, 2022
65cc361
Merge branch 'master' into dump_to_zarr
erikvansebille Jul 20, 2022
494145e
Update tutorial_interaction to zarr output
erikvansebille Jul 20, 2022
c19e112
Update tutorial_interpolation to zarr output
erikvansebille Jul 20, 2022
b172e26
Update tutorial_nemo_curvilinear to use zarr output
erikvansebille Jul 20, 2022
3837830
Updating tutorial_plotting to default zarr output
erikvansebille Jul 20, 2022
6859a42
Update tutorial_parcels_structure to zarr output
erikvansebille Jul 20, 2022
1177146
Update tutorial_periodic_boundaries.ipynb
erikvansebille Jul 20, 2022
7cb9a32
Update tutorial_sampling.ipynb
erikvansebille Jul 20, 2022
b91728e
Update tutorial_timevaryingdepthdimensions.ipynb
erikvansebille Jul 20, 2022
e489036
Update tutorial_particle_field_interaction.ipynb
erikvansebille Jul 20, 2022
8c533e1
Update tutorial_particle_field_interaction.ipynb
erikvansebille Jul 20, 2022
7678189
Updating Field.to_write for new zarr output
erikvansebille Jul 20, 2022
2a95e29
Updating unit test for field.to_write to zarr output
erikvansebille Jul 20, 2022
04cb9ba
Adding support for MPI in dump_to_zarr
erikvansebille Jul 21, 2022
f7b0e6a
Removing .nc extension in output files from tests, tutorials and exam…
erikvansebille Jul 21, 2022
ef946ee
Further changes from .nc to .zarr in unitest particlefiles
erikvansebille Jul 21, 2022
14b3d03
Adding chunks argument to ParticleFile
erikvansebille Jul 21, 2022
d31c5f3
Update to chunk treatment in dump_to_zarr
erikvansebille Jul 22, 2022
d82a67c
Changing default zarr chunk to 10 timesteps
erikvansebille Jul 25, 2022
e4ea433
Merge branch 'master' into dump_to_zarr
erikvansebille Jul 25, 2022
0c63c2c
Cleanup of particlefile.to_write()
erikvansebille Jul 26, 2022
fa79b21
Merge branch 'master' into dump_to_zarr
erikvansebille Jul 26, 2022
f2e4da9
Updating more examples to zarr output
erikvansebille Jul 26, 2022
da449d8
Moving extending zarr dimensions to separate method
erikvansebille Jul 26, 2022
31b3a6b
Simplifying data_dicts from collections_toDict
erikvansebille Jul 26, 2022
0ef5d96
Removing use of toDictionary
erikvansebille Jul 27, 2022
b997bc3
Making trajectory a to_write='once' variable by default
erikvansebille Jul 27, 2022
cd11094
Further cleanup of baseparticlefile
erikvansebille Jul 28, 2022
b2caf85
Simplifying computation of to_write='once' variable indices
erikvansebille Jul 28, 2022
9e5381b
Updating baseparticlefile to support mpi
erikvansebille Jul 28, 2022
5bf4955
Further fix to mpi and zarr
erikvansebille Jul 28, 2022
cca1a32
Fixing bug in setting fileoffsets when particleset is empty
erikvansebille Jul 29, 2022
381c375
Fixing bug in MPI-zarr writing where extending ons-dimension could be…
erikvansebille Jul 29, 2022
ffd3b42
Cleaning up baseparticlefile
erikvansebille Aug 1, 2022
adfd755
Renaming particle.fileid to particle.once_written
erikvansebille Aug 1, 2022
0d07501
Merge branch 'master' into dump_to_zarr
erikvansebille Aug 1, 2022
641ad20
Merge branch 'master' into dump_to_zarr
erikvansebille Aug 3, 2022
4bac223
Merge branch 'master' into dump_to_zarr
erikvansebille Aug 4, 2022
0667291
Removing for-loop from baseparticlefile
erikvansebille Aug 5, 2022
c2e0965
Merge branch 'master' into dump_to_zarr
erikvansebille Aug 22, 2022
229488d
Updating test_mpi to also run with 8 particles
erikvansebille Aug 24, 2022
b1c5d1e
Fixing small bug when chunks < maxids in baseparticlefile
erikvansebille Aug 24, 2022
8a18cfb
Merge branch 'master' into dump_to_zarr
erikvansebille Aug 25, 2022
18c75c2
Updating delaystart tutorial to use zarr output
erikvansebille Aug 25, 2022
f789075
Updating MPI version of zarr writing
erikvansebille Aug 25, 2022
6e2c05e
Fixing bug when MPI not installed
erikvansebille Aug 25, 2022
dd61b21
Making trajectory a coordinate variable in zarr
erikvansebille Aug 26, 2022
6a9a32b
Merge branch 'master' into dump_to_zarr
erikvansebille Aug 29, 2022
31ea667
Simplifying code in baseparticlefile for findings ids to write
erikvansebille Aug 29, 2022
1a9731d
Adding an obs coordinate to the zarr file
erikvansebille Aug 29, 2022
44b8593
Add unit test to check if zarr trajectory.dtype is int64
erikvansebille Aug 29, 2022
4279a20
Fixing bug where trajectory written as float
erikvansebille Aug 29, 2022
8a02ccd
Fixing naming of zarr files for MPI
erikvansebille Aug 30, 2022
3365460
Adding info on zarr output in MPI to documentation
erikvansebille Aug 30, 2022
1f2b2a0
Adding info on MPI concatenation to output tutorial
erikvansebille Aug 30, 2022
051c8bf
Updating text of zarr explanation in MPI and output tutorials
erikvansebille Aug 30, 2022
66807da
Updating default zarr chunks to 1 obs
erikvansebille Aug 30, 2022
8846861
Updating MPI documentation to also import os.path
erikvansebille Aug 31, 2022
01bf12f
Add documentation to combine lMPI run output into single Zarr store
JamiePringle Sep 6, 2022
125dc61
Moving documentation_largeoutputfile to parcels/examples folder
erikvansebille Sep 13, 2022
0b1f4a2
Adding cell on large output runs to mpi documentation
erikvansebille Sep 13, 2022
cfd4821
update documentation to fix xaray datetime64[ns] bug
JamiePringle Sep 13, 2022
a2dacfd
Merge branch 'master' into dump_to_zarr
erikvansebille Oct 4, 2022
c1f4f36
Pinning dask version to 2022.9.0
erikvansebille Oct 5, 2022
0326f77
Fixing 3D plotting bug in Windows
erikvansebille Oct 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion environment_py3_win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ dependencies:
- scipy>=0.16.0
- tqdm
- xarray>=0.5.1
- dask>=2.0
- dask<=2022.9.0
- cftime>=1.3.1
- ipykernel
- pytest
Expand Down
98 changes: 24 additions & 74 deletions parcels/collection/collectionaos.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from datetime import timedelta as delta
from operator import attrgetter # NOQA

from ctypes import c_void_p
Expand Down Expand Up @@ -28,24 +27,6 @@
__all__ = ['ParticleCollectionAOS', 'ParticleCollectionIterableAOS', 'ParticleCollectionIteratorAOS']


def _to_write_particles(pd, time):
"""We don't want to write a particle that is not started yet.
Particle will be written if particle.time is between time-dt/2 and time+dt (/2)
"""
return [i for i, p in enumerate(pd) if (((time - np.abs(p.dt/2) <= p.time < time + np.abs(p.dt))
or (np.isnan(p.dt) and np.equal(time, p.time)))
and np.isfinite(p.id))]


def _is_particle_started_yet(particle, time):
"""We don't want to write a particle that is not started yet.
Particle will be written if:
* particle.time is equal to time argument of pfile.write()
* particle.time is before time (in case particle was deleted between previous export and current one)
"""
return (particle.dt*particle.time <= particle.dt*time or np.isclose(particle.time, time))


def _convert_to_flat_array(var):
"""Convert lists and single integers/floats to one-dimensional numpy arrays

Expand Down Expand Up @@ -890,64 +871,33 @@ def cstruct(self):
cstruct = self._data_c.ctypes.data_as(c_void_p)
return cstruct

def toDictionary(self, pfile, time, deleted_only=False):
"""
Convert all Particle data from one time step to a python dictionary.
:param pfile: ParticleFile object requesting the conversion
:param time: Time at which to write ParticleSet
:param deleted_only: Flag to write only the deleted Particles
returns two dictionaries: one for all variables to be written each outputdt,
and one for all variables to be written once

This function depends on the specific collection in question and thus needs to be specified in specific
derivative classes.
def _to_write_particles(self, pd, time):
"""We don't want to write a particle that is not started yet.
Particle will be written if particle.time is between time-dt/2 and time+dt (/2)
"""
data_dict = {}
data_dict_once = {}

time = time.total_seconds() if isinstance(time, delta) else time
return np.array([i for i, p in enumerate(pd) if (((time - np.abs(p.dt/2) <= p.time < time + np.abs(p.dt))
or (np.isnan(p.dt) and np.equal(time, p.time)))
and np.isfinite(p.id))])

indices_to_write = []
if pfile.lasttime_written != time and \
(pfile.write_ondelete is False or deleted_only):
if self._ncount == 0:
logger.warning("ParticleSet is empty on writing as array at time %g" % time)
else:
if deleted_only:
if type(deleted_only) not in [list, np.ndarray] and deleted_only in [True, 1]:
data_states = [p.state for p in self._data]
indices_to_write = np.where(np.isin(data_states, [OperationCode.Delete]))[0]
elif type(deleted_only) in [list, np.ndarray] and len(deleted_only) > 0:
if type(deleted_only[0]) in [int, np.int32, np.uint32]:
indices_to_write = deleted_only
elif isinstance(deleted_only[0], ScipyParticle):
indices_to_write = [i for i, p in self._data if p in deleted_only]
else:
indices_to_write = _to_write_particles(self._data, time)
if len(indices_to_write) > 0:
for var in pfile.var_names:
if 'id' in var:
data_dict[var] = np.array([np.int64(getattr(p, var)) for p in self._data[indices_to_write]])
else:
data_dict[var] = np.array([getattr(p, var) for p in self._data[indices_to_write]])

pset_errs = [p for p in self._data[indices_to_write] if p.state != OperationCode.Delete and abs(time-p.time) > 1e-3 and np.isfinite(p.time)]
for p in pset_errs:
logger.warning_once('time argument in pfile.write() is %g, but a particle has time % g.' % (time, p.time))

if len(pfile.var_names_once) > 0:
# _to_write_particles(self._data, time)
first_write = [p for p in self._data if _is_particle_started_yet(p, time) and (np.int64(p.id) not in pfile.written_once)]
if np.any(first_write):
data_dict_once['id'] = np.array([p.id for p in first_write]).astype(dtype=np.int64)
for var in pfile.var_names_once:
data_dict_once[var] = np.array([getattr(p, var) for p in first_write])
pfile.written_once.extend(np.array(data_dict_once['id']).astype(dtype=np.int64).tolist())

if deleted_only is False:
pfile.lasttime_written = time
def getvardata(self, var, indices=None):
if indices is None:
return np.array([getattr(p, var) for p in self._data])
else:
try:
return np.array([getattr(p, var) for p in self._data[indices]])
except: # Can occur for zero-length ParticleSets
return None

def setvardata(self, var, index, val):
if isinstance(index, (np.int64, int, np.int32)):
setattr(self._data[index], var, val)
else:
for i, v in zip(index, val):
setattr(self._data[i], var, v)

return data_dict, data_dict_once
def setallvardata(self, var, val):
for i in range(len(self._data)):
setattr(self._data[i], var, val)

def toArray(self):
"""
Expand Down
21 changes: 10 additions & 11 deletions parcels/collection/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -898,19 +898,18 @@ def __getattr__(self, name):
else:
return False

def has_write_once_variables(self):
for var in self.ptype.variables:
if var.to_write == 'once':
return True
return False

@abstractmethod
def toDictionary(self):
"""
Convert all Particle data from one time step to a python dictionary.
:param pfile: ParticleFile object requesting the conversion
:param time: Time at which to write ParticleSet
:param deleted_only: Flag to write only the deleted Particles
returns two dictionaries: one for all variables to be written each outputdt,
and one for all variables to be written once
def getvardata(self, var, indices=None):
pass

This function depends on the specific collection in question and thus needs to be specified in specific
derivatives classes.
"""
@abstractmethod
def setvardata(self, var, index, val):
pass

@abstractmethod
Expand Down
91 changes: 21 additions & 70 deletions parcels/collection/collectionsoa.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from datetime import timedelta as delta
from operator import attrgetter
from ctypes import Structure, POINTER
from bisect import bisect_left
Expand Down Expand Up @@ -26,26 +25,6 @@
'See http://oceanparcels.org/#parallel_install for more information')


def _to_write_particles(pd, time):
"""We don't want to write a particle that is not started yet.
Particle will be written if particle.time is between time-dt/2 and time+dt (/2)
"""
return ((np.less_equal(time - np.abs(pd['dt']/2), pd['time'], where=np.isfinite(pd['time']))
& np.greater_equal(time + np.abs(pd['dt'] / 2), pd['time'], where=np.isfinite(pd['time']))
| ((np.isnan(pd['dt'])) & np.equal(time, pd['time'], where=np.isfinite(pd['time']))))
& (np.isfinite(pd['id']))
& (np.isfinite(pd['time'])))


def _is_particle_started_yet(pd, time):
"""We don't want to write a particle that is not started yet.
Particle will be written if:
* particle.time is equal to time argument of pfile.write()
* particle.time is before time (in case particle was deleted between previous export and current one)
"""
return np.less_equal(pd['dt']*pd['time'], pd['dt']*time) | np.isclose(pd['time'], time)


def _convert_to_flat_array(var):
"""Convert lists and single integers/floats to one-dimensional numpy arrays

Expand Down Expand Up @@ -153,7 +132,7 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p
self._data['depth'][:] = depth
self._data['time'][:] = time
self._data['id'][:] = pid
self._data['fileid'][:] = -1
self._data['once_written'][:] = 0

# special case for exceptions which can only be handled from scipy
self._data['exception'] = np.empty(self.ncount, dtype=object)
Expand Down Expand Up @@ -815,58 +794,30 @@ def flatten_dense_data_array(vname):
cstruct = CParticles(*cdata)
return cstruct

def toDictionary(self, pfile, time, deleted_only=False):
def _to_write_particles(self, pd, time):
"""We don't want to write a particle that is not started yet.
Particle will be written if particle.time is between time-dt/2 and time+dt (/2)
"""
Convert all Particle data from one time step to a python dictionary.
:param pfile: ParticleFile object requesting the conversion
:param time: Time at which to write ParticleSet
:param deleted_only: Flag to write only the deleted Particles
returns two dictionaries: one for all variables to be written each outputdt,
and one for all variables to be written once
return np.where((np.less_equal(time - np.abs(pd['dt'] / 2), pd['time'], where=np.isfinite(pd['time']))
& np.greater_equal(time + np.abs(pd['dt'] / 2), pd['time'], where=np.isfinite(pd['time']))
| ((np.isnan(pd['dt'])) & np.equal(time, pd['time'], where=np.isfinite(pd['time']))))
& (np.isfinite(pd['id']))
& (np.isfinite(pd['time'])))[0]

This function depends on the specific collection in question and thus needs to be specified in specific
derivative classes.
"""

data_dict = {}
data_dict_once = {}
def getvardata(self, var, indices=None):
if indices is None:
return self._data[var]
else:
try:
return self._data[var][indices]
except: # Can occur for zero-length ParticleSets
return None

time = time.total_seconds() if isinstance(time, delta) else time
def setvardata(self, var, index, val):
self._data[var][index] = val

indices_to_write = []
if pfile.lasttime_written != time and \
(pfile.write_ondelete is False or deleted_only is not False):
if self._data['id'].size == 0:
logger.warning("ParticleSet is empty on writing as array at time %g" % time)
else:
if deleted_only is not False:
if type(deleted_only) not in [list, np.ndarray] and deleted_only in [True, 1]:
indices_to_write = np.where(np.isin(self._data['state'],
[OperationCode.Delete]))[0]
elif type(deleted_only) in [list, np.ndarray]:
indices_to_write = deleted_only
else:
indices_to_write = _to_write_particles(self._data, time)
if np.any(indices_to_write):
for var in pfile.var_names:
data_dict[var] = self._data[var][indices_to_write]

pset_errs = ((self._data['state'][indices_to_write] != OperationCode.Delete) & np.greater(np.abs(time - self._data['time'][indices_to_write]), 1e-3, where=np.isfinite(self._data['time'][indices_to_write])))
if np.count_nonzero(pset_errs) > 0:
logger.warning_once('time argument in pfile.write() is {}, but particles have time {}'.format(time, self._data['time'][pset_errs]))

if len(pfile.var_names_once) > 0:
first_write = (_to_write_particles(self._data, time) & _is_particle_started_yet(self._data, time) & np.isin(self._data['id'], pfile.written_once, invert=True))
if np.any(first_write):
data_dict_once['id'] = np.array(self._data['id'][first_write]).astype(dtype=np.int64)
for var in pfile.var_names_once:
data_dict_once[var] = self._data[var][first_write]
pfile.written_once.extend(np.array(self._data['id'][first_write]).astype(dtype=np.int64).tolist())

if deleted_only is False:
pfile.lasttime_written = time

return data_dict, data_dict_once
def setallvardata(self, var, val):
self._data[var][:] = val

def toArray(self):
"""
Expand Down
Loading