import numpy as np
import xarray as xr
ds = xr.Dataset({
'signal': (['das_time', 'das', 'record'], np.empty((1000, 120, 45))),
'min_height': (['das'], np.empty((120,))) # each DAS has a different resolution
})
def some_peak_finding_func(data1d, min_height):
"""process data1d with contraints by min_height"""
result = np.zeros((4,2)) # summary matrix with 2 peak characteristics
return result
ds_dask = ds.chunk({'record':3})
xr.apply_ufunc(some_peak_finding_func, ds_dask['signal'], ds_dask['min_height'],
input_core_dims=[['das_time'], []], # apply peak finding along trace
output_core_dims=[['peak_pos', 'pulse']],
vectorize=True, # up to here works without dask!
dask='parallelized',
output_sizes={'peak_pos': 4, 'pulse':2},
output_dtypes=[np.float],
)
fails with ValueError: cannot callvectorizewith a signature including new output dimensions on size 0 inputs because dask.array.utils.compute_meta() passes it 0-sized arrays.
This should work and works well on the non-chunked ds, without dask='parallelized' and the associated output* parameters.
I'm trying to parallelize a peak finding routine with dask (works well without it) and I hoped that dask='parallelized would make that simple. However, the peak finding needs to be vectorized and it works well with vectorize=True, butnp.vectorizeappears to have issues incompute_meta` which is internally issued by dask in blockwise application as indicated in the source code:
https://github.com/dask/dask/blob/e6ba8f5de1c56afeaed05c39c2384cd473d7c893/dask/array/utils.py#L118
A possible solution might be for apply_ufunc to pass meta directly to dask if it would be possible to foresee what meta should be. I suppose we are aiming for np.nadarray most of the time, though sparse might change that in the future.
I know I could use groupby-apply as an alternative, but there are several issues that made us use apply_ufunc instead:
xr.show_versions()commit: None
python: 3.7.4 (default, Aug 13 2019, 20:35:49)
[GCC 7.3.0]
python-bits: 64
OS: Linux
OS-release: 4.9.0-11-amd64
machine: x86_64
processor:
byteorder: little
LC_ALL: None
LANG: en_US.UTF-8
LOCALE: en_US.UTF-8
libhdf5: 1.10.4
libnetcdf: 4.6.1
xarray: 0.14.0
pandas: 0.25.1
numpy: 1.17.2
scipy: 1.3.1
netCDF4: 1.4.2
pydap: None
h5netcdf: 0.7.4
h5py: 2.9.0
Nio: None
zarr: None
cftime: 1.0.4.2
nc_time_axis: None
PseudoNetCDF: None
rasterio: None
cfgrib: None
iris: None
bottleneck: 1.2.1
dask: 2.5.2
distributed: 2.5.2
matplotlib: 3.1.1
cartopy: None
seaborn: 0.9.0
numbagg: None
setuptools: 41.4.0
pip: 19.2.3
conda: 4.7.12
pytest: 5.2.1
IPython: 7.8.0
sphinx: 2.2.0
Another approach would be to bypass compute_meta in dask.blockwise if dtype is provided which seems to be hinted at here
Perhaps this is an oversight in dask, what do you think?
I am having a similar problem. This impacts some of my frequently used code to compute correlations.
Here is a simplified example that used to work with older dependencies:
import xarray as xr
import numpy as np
from scipy.stats import linregress
def _ufunc(aa,bb):
out = linregress(aa,bb)
return np.array([out.slope, out.intercept])
def wrapper(a, b, dim='time'):
return xr.apply_ufunc(
_ufunc,a,b,
input_core_dims=[[dim], [dim]],
output_core_dims=[["parameter"]],
vectorize=True,
dask="parallelized",
output_dtypes=[a.dtype],
output_sizes={"parameter": 2},)
This works when passing numpy arrays:
a = xr.DataArray(np.random.rand(3, 13, 5), dims=['x', 'time', 'y'])
b = xr.DataArray(np.random.rand(3, 5, 13), dims=['x','y', 'time'])
wrapper(a,b)
[-0.54445474, 0.66997513],
[-0.22894182, 0.65433402],
[ 0.38536482, 0.20656073],
[ 0.25083224, 0.46955618]],
[[-0.21684891, 0.55521932],
[ 0.51621616, 0.20869272],
[-0.1502755 , 0.55526262],
[-0.25452988, 0.60823538],
[-0.20571622, 0.56950115]],
[[-0.22810421, 0.50423622],
[ 0.33002345, 0.36121484],
[ 0.37744774, 0.33081058],
[-0.10825559, 0.53772493],
[-0.12576656, 0.51722167]]])
Dimensions without coordinates: x, y, parameter
But when I convert both arrays to dask arrays, I get the same error as @smartass101.
wrapper(a.chunk({'x':2, 'time':-1}),b.chunk({'x':2, 'time':-1}))
ValueError Traceback (most recent call last)
1 a = xr.DataArray(np.random.rand(3, 13, 5), dims=['x', 'time', 'y'])
2 b = xr.DataArray(np.random.rand(3, 5, 13), dims=['x','y', 'time'])
----> 3 wrapper(a.chunk({'x':2, 'time':-1}),b.chunk({'x':2, 'time':-1}))
16 dask="parallelized",
17 output_dtypes=[a.dtype],
---> 18 output_sizes={"parameter": 2},)
~/miniconda/envs/euc_dynamics/lib/python3.7/site-packages/xarray/core/computation.py in apply_ufunc(func, input_core_dims, output_core_dims, exclude_dims, vectorize, join, dataset_join, dataset_fill_value, keep_attrs, kwargs, dask, output_dtypes, output_sizes, *args)
1042 join=join,
1043 exclude_dims=exclude_dims,
-> 1044 keep_attrs=keep_attrs
1045 )
1046 elif any(isinstance(a, Variable) for a in args):
~/miniconda/envs/euc_dynamics/lib/python3.7/site-packages/xarray/core/computation.py in apply_dataarray_vfunc(func, signature, join, exclude_dims, keep_attrs, args)
232
233 data_vars = [getattr(a, "variable", a) for a in args]
--> 234 result_var = func(data_vars)
235
236 if signature.num_outputs > 1:
~/miniconda/envs/euc_dynamics/lib/python3.7/site-packages/xarray/core/computation.py in apply_variable_ufunc(func, signature, exclude_dims, dask, output_dtypes, output_sizes, keep_attrs, args)
601 "apply_ufunc: {}".format(dask)
602 )
--> 603 result_data = func(input_data)
604
605 if signature.num_outputs == 1:
~/miniconda/envs/euc_dynamics/lib/python3.7/site-packages/xarray/core/computation.py in func(*arrays)
591 signature,
592 output_dtypes,
--> 593 output_sizes,
594 )
595
~/miniconda/envs/euc_dynamics/lib/python3.7/site-packages/xarray/core/computation.py in _apply_blockwise(func, args, input_dims, output_dims, signature, output_dtypes, output_sizes)
721 dtype=dtype,
722 concatenate=True,
--> 723 new_axes=output_sizes
724 )
725
~/miniconda/envs/euc_dynamics/lib/python3.7/site-packages/dask/array/blockwise.py in blockwise(func, out_ind, name, token, dtype, adjust_chunks, new_axes, align_arrays, concatenate, meta, args, *kwargs)
231 from .utils import compute_meta
232
--> 233 meta = compute_meta(func, dtype, args[::2], *kwargs)
234 if meta is not None:
235 return Array(graph, out, chunks, meta=meta)
~/miniconda/envs/euc_dynamics/lib/python3.7/site-packages/dask/array/utils.py in compute_meta(func, _dtype, args, *kwargs)
119 # with np.vectorize, such as dask.array.routines._isnonzero_vec().
120 if isinstance(func, np.vectorize):
--> 121 meta = func(*args_meta)
122 else:
123 try:
~/miniconda/envs/euc_dynamics/lib/python3.7/site-packages/numpy/lib/function_base.py in __call__(self, args, *kwargs)
2089 vargs.extend([kwargs[_n] for _n in names])
2090
-> 2091 return self._vectorize_call(func=func, args=vargs)
2092
2093 def _get_ufunc_and_otypes(self, func, args):
~/miniconda/envs/euc_dynamics/lib/python3.7/site-packages/numpy/lib/function_base.py in _vectorize_call(self, func, args)
2155 """Vectorized call to func over positional args."""
2156 if self.signature is not None:
-> 2157 res = self._vectorize_call_with_signature(func, args)
2158 elif not args:
2159 res = func()
~/miniconda/envs/euc_dynamics/lib/python3.7/site-packages/numpy/lib/function_base.py in _vectorize_call_with_signature(self, func, args)
2229 for dims in output_core_dims
2230 for dim in dims):
-> 2231 raise ValueError('cannot call vectorize with a signature '
2232 'including new output dimensions on size 0 '
2233 'inputs')
ValueError: cannot call vectorize with a signature including new output dimensions on size 0 inputs
This used to work like a charm...I however was sloppy in testing this functionality (a good reminder always to write tests immediately 馃檮 ), and I was not able to determine a combination of dependencies that would work. I am still experimenting and will report back
Could this behaviour be a bug introduced in dask at some point (as indicated by @smartass101 above)? cc'ing @dcherian @shoyer @mrocklin
EDIT: I can confirm that it seems to be a dask issue. If I restrict my dask version to <2.0, my tests (very similar to the above example) work.
Sounds similar. But I'm not sure why you get the 0d issue when even your chunks don't (from a quick reading) seem to have a 0 size in any of the dimensions. Could you please show us what is the resulting chunk setup?
This is the chunk setup

Might this be a problem resulting from numpy.vectorize?
The problem is that Dask, as of version 2.0, calls functions applied to dask arrays with size zero inputs, to figure out the output array type, e.g., is the output a dense numpy.ndarray or a sparse array?
Unfortunately, numpy.vectorize doesn't know how to large of a size 0 array to make, because it doesn't have anything like the output_sizes argument.
For xarray, we have a couple of options:
np.vectorize, then it should pass meta=np.ndarray into the relevant dask functions (e.g., dask.array.blockwise). This should avoid the need to evaluate with size 0 arrays.output_sizes argument to np.vectorize either upstream in NumPy or into a wrapper in Xarray.(1) is probably easiest here.
The problem is that Dask, as of version 2.0, calls functions applied to dask arrays with size zero inputs, to figure out the output array type, e.g., is the output a dense numpy.ndarray or a sparse array?
Yes, now I recall that this was the issue, yeah. It doesn't even depend on your actual data really.
Possible option 3. is to address https://github.com/dask/dask/issues/5642 directly (haven't found time to do a PR yet). Essentially from the code described in that issue I have the feeling that if a dtype is passed (as apply_ufunc does), then meta should not need to be calculated.
@shoyer's option 1 should be a relatively simple xarray PR is one of you is up for it.
I can give it a shot if you could point me to the appropriate place, since I have never messed with the dask internals of xarray.
meta should be passed to blockwise through _apply_blockwise with default None (I think) and np.ndarray if vectorize is True. You'll have to pass the vectorize kwarg down to this level I think.
metashould be passed toblockwisethrough_apply_blockwisewith defaultNone(I think) andnp.ndarrayifvectorize is True. You'll have to pass thevectorizekwarg down to this level I think.
I'm afraid that passing meta=None will not help as explained in
https://github.com/dask/dask/issues/5642 and seen around this line because in that case compute_meta will be called which might fail with a np.vectorize-wrapped function.
I belive a better solution would be to address https://github.com/dask/dask/issues/5642 so that meta isn't computed even though we already provide an output dtype.
Right the xarray solution is to set meta = np.ndarray if vectorize is True else None if the user doesn't explicitly provide meta. Or am I missing something?
meta = np.ndarray if vectorize is True else Noneif the user doesn't explicitly providemeta.
Yes, sorry, written this way I now see what you meant and that will likely work indeed.