Xarray: Parallel tasks on subsets of a dask array wrapped in an xarray Dataset

Created on 21 Jul 2020  路  5Comments  路  Source: pydata/xarray

I have a large xarray.Dataset stored as a zarr. I want to perform some custom operations on it that cannot be done by just using numpy-like functions that a Dask cluster will automatically deal with. Therefore, I partition the dataset into small subsets and for each subset submit to my Dask cluster a task of the form

def my_task(zarr_path, subset_index):
    ds = xarray.open_zarr(zarr_path)  # this returns an xarray.Dataset containing a dask.array
    sel = ds.sel(partition_index)
    sel  = sel.load()  # I want to get the data into memory
    # then do my custom operations
    ...

However, I have noticed this creates a "task within a task": when a worker receives "my_task", it in turn submits tasks to the cluster to load the relevant part of the dataset. To avoid this and ensure that the full task is executed within the worker, I am submitting instead the task:

def my_task_2(zarr_path, subset_index):
    with dask.config.set(scheduler="threading"):
        my_task(zarr_path, subset_index)

Is this the best way to do this? What's the best practice for this kind of situation?

I have already posted this on stackoverflow but did not get any answer, so I am adding this here hoping it increases visibility. Apologies if this is considered "pollution".
https://stackoverflow.com/questions/62874267/parallel-tasks-on-subsets-of-a-dask-array-wrapped-in-an-xarray-dataset

Most helpful comment

The reason is that my function here must be applied along the time dimension (e.g., a rolling median in time), but my data is chunked across the time dimension

This is a fundamental problem that is rather hard to solve without creating a copy of the data.

We just released the rechunker package, which makes it easy to create a copy of your data with a different chunking scheme (e.g contiguous in time, chunked in space). If you have enough disk space to store a copy, this might be a good solution.

All 5 comments

cannot be done by just using numpy-like functions

did you look at apply_ufunc (examples) and map_blocks? Functions applied with apply_ufunc will receive whatever was wrapped by dask while map_blocks allows you to work with xarray objects. See also the docs.

Thanks for your answer. Yes I looked at apply_ufunc and map_blocks and cannot use these here. The reason is that my function here must be applied along the time dimension (e.g., a rolling median in time), but my data is chunked across the time dimension. I could of course re-chunk the data (create a copy where there are no chunks along the time dimension), but I would like to know if this can be avoided.

You could try dask's map_overlap to share "halo" or Ghost points between chunks. Also see https://image.dask.org/en/latest/dask_image.ndfilters.html#dask_image.ndfilters.median_filter

The reason is that my function here must be applied along the time dimension (e.g., a rolling median in time), but my data is chunked across the time dimension

This is a fundamental problem that is rather hard to solve without creating a copy of the data.

We just released the rechunker package, which makes it easy to create a copy of your data with a different chunking scheme (e.g contiguous in time, chunked in space). If you have enough disk space to store a copy, this might be a good solution.

This is a fundamental problem that is rather hard to solve without creating a copy of the data.

We just released the rechunker package, which makes it easy to create a copy of your data with a different chunking scheme (e.g contiguous in time, chunked in space). If you have enough disk space to store a copy, this might be a good solution.

Thanks for confirming and pointing me to rechunker, that looks nice.

Was this page helpful?
0 / 5 - 0 ratings