Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory limitation in netcdf export step #1091

Closed
nvogtvincent opened this issue Oct 13, 2021 · 13 comments
Closed

Memory limitation in netcdf export step #1091

nvogtvincent opened this issue Oct 13, 2021 · 13 comments

Comments

@nvogtvincent
Copy link
Contributor

The read_from_npy function in particlefilesoa.py generates a 64-bit array with dimension (number_of_trajectories, number_of_unique_time_recods) so the capacity to store this array in memory is a requirement to export the npy files dumped by parcels to netcdf. For long runs/frequent output, this restriction is much stricter than the memory limit on the number of active particles/field size. Furthermore, if there is insufficient memory to store this array, this will only be flagged as an OOM memory during export to netcdf (i.e. after the simulation has completed), which may result in a lot of time being wasted.

I have two suggestions:

  • It is not possible to predict the size of this array before particle tracking has completed, but I think the minimum possible size is (number_of_trajectories, run_time/output_dt), and all of these variables are available prior to execution. So whilst this would not guarantee that there is sufficient memory, throwing a warning if 8*number_of_trajectories*run_time/output_dt > available_memory would at least catch runs that are guaranteed to fail at the export step (assuming that BG memory use does not vary by much).
  • Evaluate whether 8*(self.maxid_written+1)*len(self.time_written) >= psutil.virtual_memory()[1] in the early stages of the export function and, if true, read the numpy files and write to the netcdf file in chunks, rather than trying to store all data from a particular variable in memory at once. I've written up a naive modification of the relevant functions in particlefilesoa.py below (which splits particle IDs into chunks with limits in id_range, and only loads particle IDs satisfying id_range[0] <= id < id_range[1] at once). This is not a great implementation and I have only carried out basic tests, but I would be happy to work on this further if you think this method is reasonable and useful.
def read_from_npy(self, file_list, time_steps, var, id_range):
    """
    Read NPY-files for one variable using a loop over all files.

    Attention:
    For ParticleSet structures other than SoA, and structures where ID != index, this has to be overridden.

    :param file_list: List that  contains all file names in the output directory
    :param time_steps: Number of time steps that were written in out directory
    :param var: name of the variable to read
    """

    valid_id = np.arange(id_range[0], id_range[1])
    n_ids = len(valid_id)

    data = np.nan * np.zeros((n_ids, time_steps))
    time_index = np.zeros(n_ids, dtype=np.int64)
    t_ind_used = np.zeros(time_steps, dtype=np.int64)

    # loop over all files
    for npyfile in file_list:
        try:
            data_dict = np.load(npyfile, allow_pickle=True).item()
        except NameError:
            raise RuntimeError('Cannot combine npy files into netcdf file because your ParticleFile is '
                               'still open on interpreter shutdown.\nYou can use '
                               '"parcels_convert_npydir_to_netcdf %s" to convert these to '
                               'a NetCDF file yourself.\nTo avoid this error, make sure you '
                               'close() your ParticleFile at the end of your script.' % self.tempwritedir)

        id_avail = np.array(data_dict["id"], dtype=np.int64)
        id_mask_full = np.in1d(id_avail, valid_id) # which ids in data are present in this chunk
        id_mask_chunk = np.in1d(valid_id, id_avail) # which ids in this chunk are present in data
        t_ind = time_index[id_mask_chunk] if 'once' not in file_list[0] else 0
        t_ind_used[t_ind] = 1
        data[id_mask_chunk, t_ind] = data_dict[var][id_mask_full]
        time_index[id_mask_chunk] = time_index[id_mask_chunk] + 1

    # remove rows and columns that are completely filled with nan values
    tmp = data[time_index > 0, :]
    return tmp[:, t_ind_used == 1]


def export(self):
    """
    Exports outputs in temporary NPY-files to NetCDF file

    Attention:
    For ParticleSet structures other than SoA, and structures where ID != index, this has to be overridden.
    """

    if MPI:
        # The export can only start when all threads are done.
        MPI.COMM_WORLD.Barrier()
        if MPI.COMM_WORLD.Get_rank() > 0:
            return  # export only on threat 0

    # Retrieve all temporary writing directories and sort them in numerical order
    temp_names = sorted(glob(os.path.join("%s" % self.tempwritedir_base, "*")),
                        key=lambda x: int(os.path.basename(x)))

    if len(temp_names) == 0:
        raise RuntimeError("No npy files found in %s" % self.tempwritedir_base)

    global_maxid_written = -1
    global_time_written = []
    global_file_list = []
    if len(self.var_names_once) > 0:
        global_file_list_once = []
    for tempwritedir in temp_names:
        if os.path.exists(tempwritedir):
            pset_info_local = np.load(os.path.join(tempwritedir, 'pset_info.npy'), allow_pickle=True).item()
            global_maxid_written = np.max([global_maxid_written, pset_info_local['maxid_written']])
            for npyfile in pset_info_local['file_list']:
                tmp_dict = np.load(npyfile, allow_pickle=True).item()
                global_time_written.append([t for t in tmp_dict['time']])
            global_file_list += pset_info_local['file_list']
            if len(self.var_names_once) > 0:
                global_file_list_once += pset_info_local['file_list_once']
    self.maxid_written = global_maxid_written
    self.time_written = np.unique(global_time_written)

    for var in self.var_names:
        # Find available memory to check if output file is too large
        avail_mem = psutil.virtual_memory()[1]
        req_mem   = (self.maxid_written+1)*len(self.time_written)*8*1.2

        if req_mem > avail_mem:
            # Read id_per_chunk ids at a time to keep memory use down
            total_chunks = int(np.ceil(req_mem/avail_mem))
            id_per_chunk = int(np.ceil((self.maxid_written+1)/total_chunks))
        else:
            total_chunks = 1
            id_per_chunk = self.maxid_written+1

        for chunk in range(total_chunks):
            # Minimum and maximum id for this chunk
            id_range = [chunk*id_per_chunk,
                        np.min(((chunk+1)*id_per_chunk, self.maxid_written+1))]

            # Read chunk-sized data from NPY-files
            data = self.read_from_npy(global_file_list, len(self.time_written), var, id_range)
            if var == self.var_names[0]:
                self.open_netcdf_file((self.maxid_written+1, data.shape[1]))
            varout = 'z' if var == 'depth' else var
            getattr(self, varout)[id_range[0]:id_range[1], :] = data

    if len(self.var_names_once) > 0:
        for var in self.var_names_once:
            getattr(self, var)[:] = self.read_from_npy(global_file_list_once, 1, var, [0, self.maxid_written+1])

    self.close_netcdf_file()
@erikvansebille
Copy link
Member

Thanks for raising this Issue and possible workaround, @nvogtvincent! As a pure coincidence, @SLYpma, @quintenbohte and myself were discussing how to circumvent these memory issues too at the time you raised this Issue!

I think your suggestion of working with chunks could indeed work. I don't entirely understand whether the different chunks all end up in the same NetCDF file (and if so, how can you keep appending data?), or whether you get one NetCDF file per chunk. Feel free to put this into a fork and create a Pull Request, then we can work on it together (and you get the credit for this idea). Creating a Pull Request also better highlights exactly which changes you made to the code

More generally, I think the problem is that NetCDF just isn't very suited for trajectory data, especially when the trajectories have variable length. I've been discussing this before with @CKehl, but maybe we should think of other datastructures (databases??) to store the data, that are more flexible than the array-based NetCDF format. Although of course it would mean that users also need to learn new tools. Any thoughts welcome!

@nvogtvincent
Copy link
Contributor Author

nvogtvincent commented Oct 14, 2021

Thanks for the reply @erikvansebille - I have submitted a pull request (code is very much WIP, there is one significant issue which I have detailed in the comments). Regardless of whether or not this is an acceptable workaround, I certainly agree that netcdf is not the ideal file format for trajectory data for precisely the reasons you point out. I believe that Ariane and TRACMASS both output text files which are certainly very flexible, but I'm not sure about the disadvantages (compression?). I guess a text-based database like json or xml would basically have the same issues.

@daanreijnders
Copy link
Collaborator

Just wanted to link a discussion I saw earlier over at MITgcm/MITgcm#291 and ocean-transport/floater#78, about Lagrangian data structures.

@JamiePringle
Copy link
Collaborator

JamiePringle commented Mar 25, 2022

I am running a very large particle tracking simulation of global connectivity in the Mercator 1/12 degree model, and this is a problem I am running into. It is best for me to run with lots of particles at once, for it minimizes the reading of model fields in from disk. I have a few questions on this thread:

  1. thanks to @nvogtvincent for his example code. (This comments was edited because I just found parcels_convert_npydir_to_netcdf, thank you to whomever wrote that!)

  2. Can I get this change if I download the development version of the code?

  3. On data formats: HDF5, which is the underlying format of modern netCDF, and recent versions of netCDF support the concept of "ragged" arrays, which have varying length for each row. This would seem ideal for variable length trajectory data. I had been planning to explore these for more efficiently storing my output, and would be happy to report back what I find once I have done so. For information on netcdf see https://unidata.github.io/python-training/workshop/Bonus/netcdf-writing/ and for HDF5 see https://docs.h5py.org/en/stable/special.html ; in both search for "ragged."

  4. on databases -- databases tend to tradeoff larger file sizes for faster access. I often use them for float data (in particular SQLlite), and find they make it very easy and quick to access data sets that are much larger than memory. But they will often lead to larger file sizes (and sqlite does not support single or double precision floats).

The advantage of sqlite is that the database is a single file, and the user need not install a database server. But once you set up your code for one SQL database, it will work (with small modeification) with most SQL databases.

I am happy to help with this, since it addresses a problem I am having now. It is great if we can avoid duplicating effort.

If no one else is doing so, I might start with writing the output to a ragged netcdf file because my floats have a fixed duration of existence which is less than the total oceanparcels run duration.

Jamie

@nvogtvincent
Copy link
Contributor Author

@JamiePringle This issue you tagged me in was largely solved by PR#1095, this was implemented after the 2.3.0 release though so you will need to download the development build to get it.

@JamiePringle
Copy link
Collaborator

@nvogtvincent Thank you. In the meantime, I will try to convert the output to a netCDF file with ragged rows, to see how practical that is to use. I shall share results on a new thread once I have them. I will write a standalone utility to convert the existing netcdf to ragged rows until people think the format is useful enough to include.

@daanreijnders
Copy link
Collaborator

I'm thinking that implementing zarr could be very useful. I don't have much experience with it, but as I understand it, zarr arrays can be (dynamically) written in chunks, much in the same way as Parcels now spits out numpy arrays. The added benefit is that the zarr output can be read directly with xarray, without the need for any netCDF conversion. Moreover, zarr also supports ragged arrays, which is useful for simulations similar to @JamiePringle's.

@JamiePringle
Copy link
Collaborator

@daanreijnders See my netCDF code in "ragged netCDF output #1157"

I am intrigued by Zarr, but do not have experience with it yet. One comment -- even if it can be written in chunks, parcels may not produce the data in the right order. Parcels produces lots of particle data for a given time; often the best way to write chunks is as sets of all time data for a single drifter...

But I don't really know -- it would be nice if someone could re-write my "ragged netCDF" utility to write Zarr, and then they could be inter compared. It should not be that hard (TM).

@JamiePringle
Copy link
Collaborator

@erikvansebille, @nvogtvincent and @CKehl are there any plans to merge the chunking patch ( #1092 ) into the development branch, or is there an issue with it that I can help fix? As far as I can tell the unit test output, it is just failing a lint check, but there is no output to figure out why it is failing.

Even with a 128Gb memory machine, ocean parcels is failing save its output due to memory limitations for my dense connectivity runs. I am having to save data with an ad-hoc script from the temporary *.npy files.

If y'all do not want to pursue #1092 I am happy to try to contribute a code to solve this problem. Perhaps something along the ragged netCDF output I shared in discussion would make it easier to write the output without having to have all times and trajectories in memory at once.

I will contribute a standalone code to go from *.npy temporary files to ragged netcdf (I have to do so to get my work done), but I don't want to duplicate the effort on #1092 unless there is some reason y'all think this patch won't go forward.

Jamie

@nvogtvincent
Copy link
Contributor Author

Hi @JamiePringle, I don't have any immediate plans to finish #1092 because (1) it wasn't a very elegant solution and (2) I remember that I had problems using this code on HPCs because psutil.virtual_memory()[1] wasn't returning the correct amount of available memory on our cluster.

I'm also occasionally running into OOMs during particlefile export so I'm not surprised that you're also having problems, but I've just been working around this because unfortunately I haven't got the time to write a general fix for this. So if you're willing to try to implement a solution, at least from my perspective, that would be great.

But FYI, whilst ragged netCDF output could be a really useful addition to Parcels and could partially mitigate this problem, it isn't a complete fix (unlike some method involving chunking) because I've had OOMs with extremely large particle numbers even using write_ondelete=True, i.e. the smallest possible array that includes all particles. I'm wondering if Dask or something similar has a chunked netcdf export option?

@JamiePringle
Copy link
Collaborator

@nvogtvincent ok, thanks. I am writing now a *.npy to netCDF converter in which no portion of any variable is fully in memory at any one time. It is possible to write partial data to netCDF using either extensible dimensions or by pre-allocating the array.

I am writing it as a stand-alone script to make it easy to debug. Once done thought can be put into how to integrate into parcels

I don't think this will take long (he says in full naiveté), but it might take me a few days to finish because of other professorial responsibilities.

@JamiePringle
Copy link
Collaborator

@nvogtvincent @erikvansebille I have created a stand alone python code that takes the output in the out-XXXXX directory and converts it into a netCDF file which is, as far as I can tell, exactly the same as the default output from parcels. The only feature I have not implemented is the creation of variables that are only written once.

The maximum memory usage of this code is (maximum number of time records for a single drifter)*(number of drifters in a single MPI rank). So in the case where you use MPI to parallelize the runs, or when drifters only last a portion of the parcels run, this memory usage is much less than in the existing code. I convert a run where each drifter exists for 64 time records and there are 47 million particles using about 12Gb of memory.

The downside is that all of the *.npy files are read twice. I don't think it is much slower than the existing code, but then again my filesystem is heavily tuned to cache often read files.

This code could easily be tuned to use even less memory by only caching the data for a single variable at a time, but at the expense of reading the *.npy files more times. I do not think worth it. As it is I can use it to process runs with 10s of millions of drifters without memory issues.

I have not yet tried to integrate it into parcels. There are decisions @erikvansebille and @CKehl need to make about tradeoffs between memory usage an IO. ALSO, there are many different things that can effect the *.npy files. Before thinking of incorporating this code into parcels, I think the following should happen:

  • This code should be tested on other folks' output records. There are lots of potential edge cases. I have taken care to process user defined variables in the particle set properly, but, for example, I have assumed the user defined variables map onto netCDF types
  • I have not implemented the *_once variables, since I have no examples of runs that produce them
  • I don't know what to do if 'lonlatdepth_dtype' is ever inconsistent with 'var_dtypes'
  • I have only tested this on SOA particles... I think...
  • I have assumed that particle ID is unique to each drifter
  • The output is not sorted by particle ID. Not a problem for me, but is different from existing output.
  • I assume dictionaries are ordered, so this limits me to python versions >= 3.7. If this is an issue I can specify orderedDicts explicitly.

To run the code, one must keep the temporary output file. I am not sure how to do this properly, but I do it by not calling .close() on the pset. I am not sure this works for non-MPI runs, but could be made to by changing how the temporary directory is created. I always use MPI.

The code is then called with: "python convertToNetcdf_from_npy_lowMem.py outDirectoryName outputFileName" As written it is rather chatty, but if you tire of that you can set "printDebug=False".

The code is in the attached zip file.
convertToNetcdf_from_npy_lowMem.zip

@erikvansebille
Copy link
Member

This has been fixed now with the implementation of native zarr output in #1199 and v2.4.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants