Is your feature request related to a problem? Please describe.
I would like to supply a preprocess argument to save_mfdataset that gets applied to each dataset before getting written to disk, similar to how open_mfdataset gives you such option. Specifically, have a dataset that I want to split by unique values along dimension, apply some further logic to each sub-dataset, then save each sub-dataset to a different file. Currently I'm able to split and save using the following code provided in the API docs:
years, datasets = zip(*ds.groupby("time.year"))
paths = ["%s.nc" % y for y in years]
xr.save_mfdataset(datasets, paths)
What's missing is the ability to insert further logic to each of the sub-datasets given by the groupby object. If I try iterating through datasets here and chain further operations to each element, the calculations begin to execute serially even though ds is a dask array:
save_mfdataset([ds.foo() for ds in datasets], paths)
Describe the solution you'd like
Instead, I'd like the ability to do:
xr.save_mfdataset(datasets, paths, preprocess=lambda ds: ds.foo())
Describe alternatives you've considered
Not sure.
you could use dask.delayed here
new_datasets = [dask.delayed(your_function)(dset) for dset in datasets]
xr.save_mfdataset(new_datasets, paths)
I think this will work, but I've never used save_mfdataset. This is how preprocess is implemented with open_mfdataset btw.
Unfortunately that doesn't work:
TypeError: save_mfdataset only supports writing Dataset objects, received type <class 'dask.delayed.Delayed'>
You could write to netCDF in your_function and avoid save_mfdataset altogether...
I guess this is a good argument for adding a preprocess kwarg.
I think we could support delayed objects in save_mfdataset, at least in principle. But if you're OK using delayed objects, you might as well write each netCDF file separately using dask.delayed, e.g.,
def write_dataset(dataset, path):
your_function(ds).to_netcdf(path)
result = [dask.delayed(write_dataset)(ds, path) for ds, path in zip(datasets, path)]
dask.compute(result)
Thank you, this works for me. However, it's quite slow and seems to scale faster than linearly as the length of datasets increases (the number of groups in the groupby).
Could it be connected to https://github.com/pydata/xarray/issues/2912#issuecomment-485497398 where they suggest to use save_mfdataset instead of to_netcdf? If so, there's a stronger case for supporting delayed objects in save_mfdataset as you said.
Appreciate the help!
Are you using multiple threads or multiple processes? IIUC you should be using multiple processes for max writing efficiency.
Multiple threads (the default), because it's recommended "for numeric code that releases the GIL (like NumPy, Pandas, Scikit-Learn, Numba, …)" according to the dask docs.
I guess I could do multi-threaded for the compute part (everything up to the definition of ds), then multi-process for the write part, but doesn't that then require me to load everything into memory before writing?
doesn't that then require me to load everything into memory before writing?
I think so.
I would try multiple processes and see if that is fast enough for what you want to do. Or else, write to zarr. This will be parallelized and is a lot easier than dealing with HDF5
Sounds good, I'll do this in the meantime. Still quite interested in save_mfdataset dealing with these lower level details, if possible. The ideal case would be loading with load_mfdataset, defining some ops lazily, then piping that directly to save_mfdataset.
Most helpful comment
I think we could support delayed objects in
save_mfdataset, at least in principle. But if you're OK using delayed objects, you might as well write each netCDF file separately usingdask.delayed, e.g.,