Xarray: apply_ufunc(dask='parallelized') with multiple outputs

Created on 9 Jan 2018  Â·  17Comments  Â·  Source: pydata/xarray

I have an application where I'd like to use apply_ufunc with dask on a function that requires multiple inputs and outputs. This was left as a TODO item in the #1517. However, its not clear to me looking at the code how this can be done given the current form of dask's atop. I'm hoping @shoyer has already thought of a clever solution here...

Code Sample, a copy-pastable example if possible

def func(foo, bar):

    assert foo.shape == bar.shape
    spam = np.zeros_like(bar)
    spam2 = np.full_like(bar, 2)


    return spam, spam2

foo = xr.DataArray(np.zeros((10, 10))).chunk()
bar = xr.DataArray(np.zeros((10, 10))).chunk() + 5

xrfunc = xr.apply_ufunc(func, foo, bar,
                        output_core_dims=[[], []],
                        dask='parallelized')

Problem description

This currently raises a NotImplementedError.

Expected Output

Multiple dask arrays. In my example above, two dask arrays.

Output of xr.show_versions()

INSTALLED VERSIONS

commit: None
python: 3.6.4.final.0
python-bits: 64
OS: Linux
OS-release: 4.4.86+
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: en_US.UTF-8
LANG: en_US.UTF-8
LOCALE: en_US.UTF-8

xarray: 0.10.0+dev.c92020a
pandas: 0.22.0
numpy: 1.13.3
scipy: 1.0.0
netCDF4: 1.3.1
h5netcdf: 0.5.0
Nio: None
zarr: 2.2.0a2.dev176
bottleneck: 1.2.1
cyordereddict: None
dask: 0.16.0
distributed: 1.20.2+36.g7387410
matplotlib: 2.1.1
cartopy: None
seaborn: None
setuptools: 38.4.0
pip: 9.0.1
conda: 4.3.29
pytest: 3.3.2
IPython: 6.2.1
sphinx: None

cc @mrocklin, @arbennett

All 17 comments

We need atop to support multiple output arguments (https://github.com/dask/dask/issues/702), or potentially a specialized wrapper for generalized ufuncs in dask (https://github.com/dask/dask/issues/1176).

@shoyer - dask now has a apply_gufunc. Is this something we should try to include in xr.apply_ufunct or a new function xr.apply_gufunc?

xref: https://github.com/dask/dask/pull/3109

FYI @magonser

I think we can do this inside the existing xarray.apply_ufunc, simply by using apply_gufunc instead of atop for the case where signature.num_outputs > 1 (which current raises NotImplementedError):
https://github.com/pydata/xarray/blob/master/xarray/core/computation.py#L601

Any updates or progress here? I’m trying to use xarray.apply_ufunc with scipy.stats.linregress which returns

  • slope
  • intercept
  • rvalue
  • pvalue
  • stderr

@andersy005 here is a very little demo of linear regression using lstsq (not linregress) in which only slope and intercept are kept. It is here applied to an array of sea surface temperature.
I hope it can help.

ds = xr.open_dataset('sst_2D.nc', chunks={'X': 30, 'Y': 30})
def ulinregress(x, y): # the universal function
    ny, nx, nt = y.shape ; y = np.moveaxis(y, -1, 0).reshape((nt, -1)) # nt, ny*nx
    return np.linalg.lstsq(np.vstack([x, np.ones(nt)]).T, y)[0].T.reshape(ny, nx, 2)
time = (ds['time'] - np.datetime64("1950-01-01")) / np.timedelta64(1, 'D')
ab = xr.apply_ufunc(ulinregress, time, ds['sst'], dask='parallelized', 
                    input_core_dims=[['time'], ['time']], 
                    output_dtypes=['d'], output_sizes={'coef': 2, }, output_core_dims=[['coef']])
series = ds['sst'][:, 0, 0].load()
line = series.copy() ; line[:] = ab[0, 0, 0] * time + ab[0, 0, 1]
series.plot(label='Original') ; line.plot(label='Linear regression') ; plt.legend();

@andersy005 - is what you are working on related to #3349?

is what you are working on related to #3349?

@rabernat, indeed! Let me know if you have bandwidth to take on the polyfit implementation in xarray. Otherwise, I'd be happy to help out/work on it late October.

I definitely don't have bandwidth! I'm happy to see you working on it.

On Mon, Oct 7, 2019 at 2:01 PM Anderson Banihirwe notifications@github.com
wrote:

is what you are working on related to #3349
https://github.com/pydata/xarray/issues/3349?

@rabernat https://github.com/rabernat, indeed! Let me know if you have
bandwidth to take on the polyfit implementation in xarray. Otherwise, I'd
be happy to help out/work on it late October.

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/pydata/xarray/issues/1815?email_source=notifications&email_token=AAJEKJWGRLZODWGS57UF6I3QNN2RJA5CNFSM4ELBUQLKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEARIK5I#issuecomment-539133301,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAJEKJQSAPJ3DAOTH24T7RTQNN2RJANCNFSM4ELBUQLA
.

This looks essentially the same to @stefraynaud's answer, but I came across this stackoverflow response here: https://stackoverflow.com/questions/52094320/with-xarray-how-to-parallelize-1d-operations-on-a-multidimensional-dataset.

@andersy005, I imagine you're far past this now. And this might have been related to discussions with Genevieve and I anyways.

def new_linregress(x, y):
    # Wrapper around scipy linregress to use in apply_ufunc
    slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)
    return np.array([slope, intercept, r_value, p_value, std_err])

# return a new DataArray
stats = xr.apply_ufunc(new_linregress, ds[x], ds[y],
                       input_core_dims=[['year'], ['year']],
                       output_core_dims=[["parameter"]],
                       vectorize=True,
                       dask="parallelized",
                       output_dtypes=['float64'],
                       output_sizes={"parameter": 5},
                      )

I imagine you're far past this now. And this might have been related to discussions with Genevieve and I anyways.

Thank you for the update, @bradyrx! Yes, it was related to discussions with @gelsworth

I think ideally it would be nice to return multiple DataArrays or a Dataset of variables. But I'm really happy with this solution. I'm using it on a 600GB dataset of particle trajectories and was able to write a ufunc to go through and return each particle's x, y, z location when it met a certain condition.

I think having something simple like the stackoverflow snippet I posted would be great for the docs as an apply_ufunc example. I'd be happy to lead this if folks think it's a good idea.

I think ideally it would be nice to return multiple DataArrays or a Dataset of variables.

What's the current status of this? I've similar requirements, single DataArray as input, multiple DataArrays as output.

Still needs to be implemented. Stephan's comment suggests a path forward (https://github.com/pydata/xarray/issues/1815#issuecomment-440089606)

One issue I see is that this would return multiple dask objects, correct? So to get the results from them, you'd have to run .compute() on each separately. I think it's a valid assumption to expect that the multiple output objects would share a lot of the same computational pipeline. So would you be re-doing the same computation by running .compute() separately on these objects?

The earlier mentioned code snippets provide a nice path forward, since you can just run compute on one object, and then split its result (or however you name it) dimension into multiple individual objects. Thoughts?

So would you be re-doing the same computation by running .compute() separately on these objects?

Yes. but you can do dask.compute(xarray_obj1, xarray_obj2,...) or combine those objects appropriately into a Dataset and then call compute on that.

So would you be re-doing the same computation by running .compute() separately on these objects?

Yes. but you can do dask.compute(xarray_obj1, xarray_obj2,...) or combine those objects appropriately into a Dataset and then call compute on that.

Good call. I figured there was a workaround.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

tfurf picture tfurf  Â·  4Comments

Yefee picture Yefee  Â·  4Comments

equaeghe picture equaeghe  Â·  4Comments

jhamman picture jhamman  Â·  5Comments

phausamann picture phausamann  Â·  3Comments