Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"compute_on_defer" not working with dask array #741

Closed
claudiofgcardoso opened this issue Feb 10, 2020 · 18 comments
Closed

"compute_on_defer" not working with dask array #741

claudiofgcardoso opened this issue Feb 10, 2020 · 18 comments
Labels

Comments

@claudiofgcardoso
Copy link

claudiofgcardoso commented Feb 10, 2020

Hello all,

I realized that 'fieldset.U.data' is now a dask array:
dask.array<where, shape=(3, 28, 205, 223), dtype=float32, chunksize=(1, 28, 205, 223), chunktype=numpy.ndarray>

This makes the simplest operation using 'compute_on_defer' not working:

def compute(fieldset):
    for tind in fieldset.W[0].loaded_time_indices:
        fieldset.W[0].data[tind,0]=0
        fieldset.W[1].data[tind, 0] = fieldset.W[1].data[tind, 0].clip(min=0)

Result:
NotImplementedError: Item assignment with <class 'tuple'> not supported

Any ideas of how to go around this?

@erikvansebille
Copy link
Member

Thanks for reporting, @claudiofgcardoso. You're right that with the new dask development, compute_on_defer may not work as expected anymore because mixing numpy methods in the compute function with dask Fields may lead to the entire array being converted to numpy

However, the output from your function seems to hint at something else being wrong. Can you investigate a bit which of the two lines in your compute functions break? And what if you do 'simpler' functions? This kind of info is really useful to us for debugging!

@claudiofgcardoso
Copy link
Author

I changed the compute function to something much simple, such as:

def compute(fieldset):
    fieldset.W[0].data[:, 0, :, :] = 0.
    fieldset.W[1].data[:, 0, :, :] = 0.

The outcome was the same:

fieldset.computeTimeChunk(1,1)
Traceback (most recent call last):

  File "<ipython-input-5-a7923779232e>", line 1, in <module>
    fieldset.computeTimeChunk(1,1)

  File "/home/ccardoso/anaconda3/lib/python3.7/site-packages/parcels/fieldset.py", line 853, in computeTimeChunk
    self.compute_on_defer(self)

  File "<ipython-input-3-1f2061b49552>", line 19, in compute
    fieldset.W[0].data[:, 0, :, :] = 0.

  File "/home/ccardoso/anaconda3/lib/python3.7/site-packages/dask/array/core.py", line 1480, in __setitem__
    "Item assignment with %s not supported" % type(key)

NotImplementedError: Item assignment with <class 'tuple'> not supported 

I believe this happens because I'm trying to do a direct assignment of individual elements (values on depth level 0) in a dask array, something that seems to be not possible in such arrays (https://stackoverflow.com/questions/40935756/item-assignment-not-supported-in-dask)

@claudiofgcardoso
Copy link
Author

The dask implementation also seems to be incompatible with particle.show() function:

pset.show(field=fieldset.V[1])
Traceback (most recent call last):

  File "<ipython-input-16-3676ee07a0bd>", line 1, in <module>
    pset.show(field=fieldset.V[1])

  File "/home/ccardoso/anaconda3/lib/python3.7/site-packages/parcels/particleset.py", line 521, in show
    projection=projection, land=land, vmin=vmin, vmax=vmax, savefile=savefile, animation=animation, **kwargs)

  File "/home/ccardoso/anaconda3/lib/python3.7/site-packages/parcels/plotting.py", line 78, in plotparticles
    titlestr='Particles and ', depth_level=depth_level)

  File "/home/ccardoso/anaconda3/lib/python3.7/site-packages/parcels/plotting.py", line 201, in plotfield
    d[:-1, :-1] = (data[0][:-1, 1:] + data[0][1:, 1:]) / 2.

  File "/home/ccardoso/anaconda3/lib/python3.7/site-packages/dask/array/core.py", line 1480, in __setitem__
    "Item assignment with %s not supported" % type(key)

NotImplementedError: Item assignment with <class 'tuple'> not supported 

@erikvansebille
Copy link
Member

The dask implementation also seems to be incompatible with particle.show() function:

Hmm, this is surprising because in principle the Field should be computed with dask as soon as it is needed by the plotting routine. I can't reproduce your error here. Can you send me the full script you're using that gives you this error?

@claudiofgcardoso
Copy link
Author

claudiofgcardoso commented Feb 12, 2020

Here's the script
run_Jan2014.txt, you can download a ROMS file example here.

@erikvansebille
Copy link
Member

Thank you. Can you also provide the roms_grid_d02_Zlev_Okean.nc file?

@claudiofgcardoso
Copy link
Author

Sorry about that! Here are the missing files:
roms_grid_d02_Zlev_Okean.nc
roms_d02_Z_Okean_unbeaching_vel.nc

@erikvansebille
Copy link
Member

I still can't run it because I miss the wind and mohid files? Could you either provide all the required files, or make your script simpler (but still failing) so that I can test it here?

@CKehl CKehl added the bug label Feb 13, 2020
@claudiofgcardoso
Copy link
Author

You could run the script without the wind forcing (by defining wind=0).. But it is better to include it to reproduce the errors.
Mohid files (river data to release particles): Node_Rib.zip
and the wind file: WRF

@CKehl
Copy link
Contributor

CKehl commented Feb 13, 2020

Hi - just as a little background: Claudio, you are right - direct item assignment for 'lazy-load' dask arrays is not possible, as the dask documentation already says. This is because dask explicitly manages its data as a 'virtual array', meaning that a dask array is just the description of which arrays (by name/index) it has, and what its bounding box values are. Dask arrays have no data, until the one moment when dask.array.core.Array.data[] (as a property) is called. That call transforms the virtual array into a physical array by loading the chunks that are required within the requested array indices. One will also see this because the data[] call actually returns a field where type(data[]) is numpy.ndarray, whereas type(Field.data) is dask.array.core.Array (when being loaded via from_nemo, from_pop, from_netcdf and so forth).

That means that e.g. in your plotting call, changing pset.show(field=fieldset.V[1]) to pset.show(field=fieldset.V.data[1]) may actually work. I'll test that case locally with some data - the behaviour should be independent of the actual dataset.

@erikvansebille and me had a talk about the behaviour of defered arrays and their working with dask, cause it is a more involved problem. That is because fields not generated from NetCDF but by from_xarray or from_data actually have physical array data. This means that, depending on how the Field is constructed, the library that is used to sum/concatenate/etc. the field data within the defered array will need to adapt dynamically, and possibly in a simple-to-use manner for the oceanic modeller. Thus, the general error with deferred arrays may take a bit more time to be fixed. Still, we thank you for raising the error cause that effect from making the lazy loading work was outside my field-of-view for now. Thanks.

@claudiofgcardoso
Copy link
Author

Hi @CKehl, thank you a lot for the detailed explanation on the functioning of "dask"! I will test if your suggestion solves the issue for the compute_on_defer.

No need to thank me for that, I'm well aware that all user inputs are very important for you guys at this stage.. I will keep doing this, as long as it helps!

@claudiofgcardoso
Copy link
Author

For info, I tested the suggestion from @CKehl (pset.show(field=fieldset.U[1].data[1])) and the following occurred (my U is a summed field):

Traceback (most recent call last):

  File "<ipython-input-20-8992ce18526e>", line 1, in <module>
    pset.show(field=fieldset.U[1].data[1])

  File "/home/ccardoso/anaconda3/lib/python3.7/site-packages/parcels/particleset.py", line 521, in show
    projection=projection, land=land, vmin=vmin, vmax=vmax, savefile=savefile, animation=animation, **kwargs)

  File "/home/ccardoso/anaconda3/lib/python3.7/site-packages/parcels/plotting.py", line 73, in plotparticles
    field = getattr(particles.fieldset, field)

TypeError: getattr(): attribute name must be string

@CKehl
Copy link
Contributor

CKehl commented Feb 24, 2020

I'm looking into it more in-depth now

Update: testing the potential fix locally later. Issue here is that, because the plotting relies on having a graphical output, it cannot be easily tested in the CI workflow. Still, we take care of it. That also means that your original call to pset.show(field=fieldset.U[1]) itself should work after the fix.

CKehl added a commit that referenced this issue Feb 25, 2020
CKehl added a commit to CKehl/parcels that referenced this issue Feb 26, 2020
CKehl added a commit to CKehl/parcels that referenced this issue Feb 26, 2020
@CKehl CKehl linked a pull request Mar 4, 2020 that will close this issue
@CKehl CKehl removed a link to a pull request Mar 4, 2020
@CKehl
Copy link
Contributor

CKehl commented Apr 21, 2020

Dear @claudiofgcardoso ,
could you pull the new parcels version 2.1.5 from conda-forge, update your conda environment with the new version, re-run your experiments and give us an update on the state of affairs ? We invested some considerable expansion and re-work to the lazy-load chunking interface, which potentially resolves your issue. Your feedback on the new version would be very welcome.
Cheers,
Christian

@CKehl CKehl closed this as completed May 11, 2020
@claudiofgcardoso
Copy link
Author

Hi @CKehl,

Sorry for the very late reply ( needed to finish a pending task before diving back to this one). I updated Parcels with the latest version. While testing the compute_on_defer I realized that my compute functions still don't work:

def compute(fieldset):
        for tind in fieldset.W[0].loaded_time_indices:
            fieldset.W[0].data[tind,0]=0
            fieldset.W[1].data[tind, 0] = fieldset.W[1].data[tind, 0].clip(min=0)

or the simplified version:

def compute(fieldset):
         fieldset.W[0].data[:, 0, :, :] = 0.
         fieldset.W[1].data[:, 0, :, :] = 0.

I need this operation in order to set ROMS vertical velocities at the surface to 0 m/s (otherwise particles may be out of the domain with depth < 0). I believe this is related with the fact that dask arrays cannot be edited.. I'll have to find a workaround for this one. Any suggestion will be highly welcome! Will keep you updated about the other issues.

Cheers,
Cláudio

@erikvansebille
Copy link
Member

Hi @claudiofgcardoso, there are at least a few workaround options that could work in this case:

  1. Use the ErrorCode.ThroughSurface error to explicitly catch when particles get an out of bound through the surface, and then push them back in the water. Drawback is that Error handling is done in scipy, so this could be slow if a lot of particles cross the surface

  2. Run with field_chunksize=False as in this documentation. That could, however, lead to much higher memory use (and again a slower simulation, depending on the balance between ParticleSet size and FieldSet size).

  3. It may be that you compute functions don't work because fieldset.W is a list in your case. This is because you are either using SummedFields or NestedFields, I assume. Now, you could forgo these 'fancy' objects by just writing your own advection kernel instead of using the built-in advection kernel. Especially for SummedFields this would be fairly trivial. E.g. the simple Euler Forward for two FieldSets UV1 and UV2 would become

def AdvectionEE_2fieldset(particle, fieldset, time):
    """Advection of particles using Explicit Euler (aka Euler Forward) integration for two fieldsets"""
    (u1, v1) = fieldset.UV1[time, particle.depth, particle.lat, particle.lon]
    (u2, v2) = fieldset.UV2[time, particle.depth, particle.lat, particle.lon]
    particle.lon += (u1 + u2) * particle.dt
    particle.lat += (v1 + v2) * particle.dt

Let me know whether these three workarounds have worked for you!

@claudiofgcardoso
Copy link
Author

Thanks for the suggestions @erikvansebille,

I'm trying to implement the following apporach (last lines) in my advection kernel:

def AdvectionRK4_3D(particle, fieldset, time):
    """Advection of particles using fourth-order Runge-Kutta integration including vertical velocity.
    W downward velocity (positive goes downward, negative goes upwards)  

    Function needs to be converted to Kernel object before execution"""
    if particle.beached == 0:
        if fieldset.track_kernels: print('Particle [%d] advection' %particle.id)
        particle.prev_lon = particle.lon  # Set the stored values for next iteration.
        particle.prev_lat = particle.lat   
        (u1, v1, w) = fieldset.UVW[time, particle.depth, particle.lat, particle.lon]
        lon1 = particle.lon + u1*.5*particle.dt
        lat1 = particle.lat + v1*.5*particle.dt
        (u2, v2) = fieldset.UV[time + .5 * particle.dt, particle.depth, lat1, lon1]
        lon2 = particle.lon + u2*.5*particle.dt
        lat2 = particle.lat + v2*.5*particle.dt
        (u3, v3) = fieldset.UV[time + .5 * particle.dt, particle.depth, lat2, lon2]
        lon3 = particle.lon + u3*particle.dt
        lat3 = particle.lat + v3*particle.dt
        (u4, v4) = fieldset.UV[time + particle.dt, particle.depth, lat3, lon3]
        
        particle.lon += (u1 + 2*u2 + 2*u3 + u4) / 6. * particle.dt
        particle.lat += (v1 + 2*v2 + 2*v3 + v4) / 6. * particle.dt
        d = particle.depth + w * particle.dt
        d += particle.Ws * particle.dt #settling velocity: positive values for sinking
        # particle.depth = d if d > 0 else 0
        if d >= 0:
            particle.depth = d 
        else:
            print('Particle [%d] left the domain through the surface with depth = %g' % (particle.id, d))
            particle.depth = 0
        particle.beached = 2  

This approach does not allow the usage of a 4th order scheme to calculate the vertical displacement of a particle (if 'dep1' is already out of the surface, the calculation of 'dep2' will already though an error). So, I use the 4th order advection scheme for the horizontal advection and a simple Euler forward advection for vertical advection.. It works but it is not entirely accurate. I'm thinking that maybe I should just set to 0 positive velocities (W = upward velocity) at the surface from ROMS files before loading them into parcels.

@claudiofgcardoso
Copy link
Author

Also, I noticed that in this new version of Parcels one cannot assign a variable named "p" within kernels in JIT mode, although it can handle in Scipy mode. I believe this is related with the fact that parcels already uses the variable "p" for accessing particles within the simulation.

erikvansebille added a commit that referenced this issue May 15, 2020
This fixes the Issue that users can't use variable named `p` in their Custom Kernels (see also #741)
hrsdawson pushed a commit to hrsdawson/parcels that referenced this issue Jul 29, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants