Dask is awesome, but it isn't always easy to use it for parallel operations. In many cases, especially when wrapping routines from external libraries, it is most straightforward to express operations in terms of a function that expects and returns xray objects loaded into memory.
Dask array has a map_blocks function/method, but it's applicability is limited because dask.array doesn't have axis names for unambiguously identifying dimensions. da.atop can handle many of these cases, but it's not the easiest to use. Fortunately, we have sufficient metadata in xray that we could probably parallelize many atop operations automatically by inferring result dimensions and dtypes from applying the function once. See here for more discussion on the dask side: https://github.com/blaze/dask/issues/702
So I would like to add some convenience methods for automatic parallelization with dask of a function defined on xray objects loaded into memory. In addition to a map_blocks method/function, it would be useful to add some sort of parallel_apply method to groupby objects that works very similarly, by lazily applying a function that takes and returns xray objects loaded into memory.
But do the xray objects have to exist in memory? I was thinking this could also work along with open_mfdataset. It just loads and operates on the chunk it needs.
Like the idea of applying this to groupby objects. I wonder if it could be done transparently to the user...
Indeed, there's no need to load the entire dataset into memory first. I think open_mfdataset is the model to emulate here -- it's parallelism that just works.
I'm not quite sure how to do this transparently in groupby operations yet. The problem is that you do want to apply some groupby operations on dask arrays without loading the entire group into memory, if there are only a few groups on a large datasets and the function itself is written in terms of dask operations. I think we will probably need some syntax to disambiguate that scenario.
:+1: Very useful idea!
With the single machine version of dask, we need to run one block first to infer the appropriate metadata for constructing the combined dataset.
Potentially a better approach would be to optionally leverage dask.distributed, which has the ability to run computation at the same time as graph construction. map_blocks could then kick off a bunch of map tasks to execute in parallel, and only worry about reassembling the blocks in a reduce after the results have come in.
I'm adding this note to express an interest in the functionality described in Stephan's original description, i.e. a parallel_apply method/function which would apply a function in parallel utilizing multiple CPUs. I have (finally) worked out how to use groupby and apply for my application but it would be much more useful if I could apply functions in parallel to take advantage of multiple CPUs. What's the expected effort to make something like this available in xarray? Several months ago I worked on doing this sort of thing without xarray using the multiprocessing module and a shared memory object and I may revisit that soon, but I expect that a solution using xarray will be more elegant so if such a thing is coming in the foreseeable future then I may wait on that and focus on other tasks. Can anyone advise?
Does #964 help on this?
I think #964 provides a viable path forward here.
Previously, I was imagining the user provides an function that maps xarray.DataArray -> xarray.DataArray. Such functions are tricky to parallelize with dask.array because need to run them to figure out the result dimensions/coordinates.
In contrast, with a user defined function ndarray -> ndarray, it's fairly straightforward to parallelize these with dask array (e.g., using dask.array.elemwise or dask.array.map_blocks). Then we could add the metadata back in afterwards with #964.
In principle, we could do this automatically -- especially if dask had a way to parallelize arbitrary NumPy generalized universal functions. Then the user could write something like xarray.apply(func, data, signature=signature, dask_array='auto') to automatically parallelize func over their data. In fact, I had this in some previous commits for #964, but took it out for now, just to reduce scope for the change.
This is good news for me as the functions I will apply take a ndarray as
input and return a corresponding ndarray as output. Once this is available
in xarray I'll be eager to give it a whirl...
I have a preliminary implementation up in https://github.com/pydata/xarray/pull/1517
This issue was closed by #1517. But there was plenty of discussion above about parallelizing groupby. Does #1517 make parallel groupby automatically work? My understanding is no. If that's the case, we probably need to open a new issue for parallel groupby.
cc @mrocklin
@rabernat Agreed, let's open a new issue for that.
Most helpful comment
I have a preliminary implementation up in https://github.com/pydata/xarray/pull/1517