I have a situation where I build large zarr arrays based on chunks which correspond to how I am reading data off a filesystem, for best I/O performance. Then I set these as variables on an xarray dataset which I want to persist to zarr, but with different chunks more optimal for querying.
One problem I ran into is that manually selecting chunks of a dataset prior to to_zarr results in https://github.com/pydata/xarray/blob/66be9c5db7d86ea385c3a4cd4295bfce67e3f25b/xarray/backends/zarr.py#L83
It's difficult for me to understand exactly how to select chunks manually at the dataset level which would also make this zarr "final chunk" constraint happy. I would have been satisfied however with letting zarr choose chunks for me, but could not find a way to trigger this through the xarray API short of "unchunking" it first, which would lead to loading entire variables into memory. I came up with the following hack to trigger zarr's automatic chunking despite having differently defined chunks on my xarray dataset:
# monkey patch to get zarr to ignore dask chunks and use its own heuristics
def copy_func(f):
g = types.FunctionType(f.__code__, f.__globals__, name=f.__name__,
argdefs=f.__defaults__,
closure=f.__closure__)
g = functools.update_wrapper(g, f)
g.__kwdefaults__ = f.__kwdefaults__
return g
orig_determine_zarr_chunks = copy_func(xr.backends.zarr._determine_zarr_chunks)
xr.backends.zarr._determine_zarr_chunks = lambda enc_chunks, var_chunks, ndim: orig_determine_zarr_chunks(enc_chunks, None, ndim)
The next problem to contend with is that da.store between zarr stores with differing chunks between source and destination is astronomically slow. The first thing to attempt would be to rechunk the dask arrays according to the destination zarr chunks, but xarray's consistent chunks constraint blocks this strategy as far as I can tell. Once again I took the dirty hack approach and inject a rechunking on a per-variable basis during the to_zarr operation, as follows:
# monkey patch to make dask arrays writable with different chunks than zarr dest
# could do without this but would have to contend with 'inconsistent chunks' on dataset
def sync_using_zarr_copy(self, compute=True):
if self.sources:
import dask.array as da
rechunked_sources = [source.rechunk(target.chunks)
for source, target in zip(self.sources, self.targets)]
delayed_store = da.store(rechunked_sources, self.targets,
lock=self.lock, compute=compute,
flush=True)
self.sources = []
self.targets = []
return delayed_store
xr.backends.common.ArrayWriter.sync = sync_using_zarr_copy
I may have missed something in the API that would have made this easier, or another workaround which would be less hacky, but in any case I'm wondering if this scenario could be handled elegantly in xarray.
I'm not sure if there is a plan going forward to make legal xarray chunks 100% compatible with zarr; if so that would go a fair ways in alleviating the first problem. Alternatively, perhaps the xarray API could expose some ability to adjust chunks according to zarr's liking, as well as the option of defaulting entirely to zarr's heuristics for chunking.
As for the performance issue with differing chunks, I'm not sure whether my rechunking patch could be applied without causing side-effects. Or where the right place to solve this would be-- perhaps it could be more naturally addressed within da.store.
I just pushed a new xarray release (0.10.8) earlier today. We had a fix for zarr chunking in there (https://github.com/pydata/xarray/pull/2228) -- does that solve your issue?
Ah, that's great. I do see some improvement. Specifically, I can now set chunks using xarray, and successfully write to zarr, and reopen it. However, when reopening it I do find that the chunks have been inconsistently applied (some fields have the expected chunksize whereas some small fields have the entire variable in one chunk). Furthermore, trying to write a second time with to_zarr leads to:
*** NotImplementedError: Specified zarr chunks (100,) would overlap multiple dask chunks ((100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 4),). This is not implemented in xarray yet. Consider rechunking the data usingchunk()or specifying different chunks in encoding.
Trying to reapply the original chunks with xr.Dataset.chunk succeeds, and ds.chunks no longer reports "inconsistent chunks", but trying to write still produces the same error.
I also tried loading my entire dataset into memory, allowing the initial to_zarr to default to zarr's chunking heuristics. Trying to read and write a second time again results in the same error:
NotImplementedError: Specified zarr chunks (63170,) would overlap multiple dask chunks ((63170, 63170, 63170, 63170, 63170, 63170, 63170, 63169),). This is not implemented in xarray yet. Consider rechunking the data usingchunk()or specifying different chunks in encoding.
I tried this round-tripping experiment with my monkey patches, and it works for a sequence of read/write/read/write... without any intervention in between. This only works for default zarr-chunking, however, since the patch to xr.backends.zarr._determine_zarr_chunks overrides whatever chunks are on the originating dataset.
Curious: Is there any downside in xarray to using datasets with inconsistent chunks? I take it that it is a supported configuration because xarray allows it to happen, but just outputs that error when calling ds.chunks, which is just a sort of convenience method for looking at chunks across a whole dataset which happens to have consistent chunks...?
One other thing to add: it might be nice to have an option to allow zarr auto-chunking even when chunks!={}. I don't know how sensitive zarr performance is to chunksizes, but it'd be nice to have some form of sane auto-chunking available when you don't want to bother with manually choosing.
Curious: Is there any downside in xarray to using datasets with inconsistent chunks?
No, there's no downside here. It's just not possible to define a single dict of chunks in this case.
Can you look into the encoding attributes of any variables you load from disk?
It would also help to come up with a self-contained example that reproduces this using dummy data.
I took a closer look and noticed my one-dimensional fields of size 505359 were reporting a chunksize or 63170. Turns out that's enough to come up with a minimal repro:
>>> xr.__version__
'0.10.8'
>>> ds=xr.Dataset({'foo': (['bar'], np.zeros((505359,)))})
>>> ds.to_zarr('test.zarr')
<xarray.backends.zarr.ZarrStore object at 0x7fd9680f7fd0>
>>> ds2=xr.open_zarr('test.zarr')
>>> ds2
<xarray.Dataset>
Dimensions: (bar: 505359)
Dimensions without coordinates: bar
Data variables:
foo (bar) float64 dask.array<shape=(505359,), chunksize=(63170,)>
>>> ds2.foo.encoding
{'chunks': (63170,), 'compressor': Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0), 'filters':
None, '_FillValue': nan, 'dtype': dtype('float64')}
>>> ds2.to_zarr('test2.zarr')
raises
NotImplementedError: Specified zarr chunks (63170,) would overlap multiple dask chunks ((63170, 63170, 63
170, 63170, 63170, 63170, 63170, 63169),). This is not implemented in xarray yet. Consider rechunking th
e data using `chunk()` or specifying different chunks in encoding.
Hi, I'm new to xarray & zarr ,
After reading a zarr file, I re-chunk the data using xarray.Dataset.chunk. Then create a newly chunked data stored as zarr file with xarray.Dataset.to_zarr But I get error message:
'NotImplementedError: Specified zarr chunks (200, 100, 1) would overlap multiple dask chunks ((50, 50, 50, 50), (25, 25, 25, 25), (10000,)). This is not implemented in xarray yet. Consider rechunking the data using chunk() or specifying different chunks in encoding.'
My xarray version is12.1, & and my understanding is that according to this post https://github.com/pydata/xarray/issues/2300 .it is fixed, thus so it is implemented to 12.1??
Then why do I get 'notimplemented error ?
Do I have to use 'del dsread.data.encoding['chunks']. each time before using 'Dataset.to_zarr' as a workaround? but probably I am missing somthing. I hope someone can point me out...
I made a notebook here for reproducing the pb.
https://github.com/tinaok/Pangeo-for-beginners/blob/master/3-1%20zarr%20and%20re-chunking%20bug%20report.ipynb
thanks for your help, regards Tina
I am getting the same error too.
Hi all. I am looking into this issue, trying to figure out if it is still a thing. I just tried @chrisbarber's MRE above using xarray v 0.15.
import xarray as xr
ds=xr.Dataset({'foo': (['bar'], np.zeros((505359,)))})
ds.to_zarr('test.zarr', mode='w')
ds2=xr.open_zarr('test.zarr')
ds2.to_zarr('test2.zarr', mode='w')
This now works without error, thanks to #2487.
I can trigger the error in a third step:
ds3 = ds2.chunk({'bar': 10000})
ds3.to_zarr('test3.zarr', mode='w')
raises
NotImplementedError: Specified zarr chunks (63170,) would overlap multiple dask chunks ((10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 5359),). This is not implemented in xarray yet. Consider rechunking the data using `chunk()` or specifying different chunks in encoding.
The problem is that, even though we rechunked the data, chunks key is still present in encoding.
>>> print(ds3.foo.encoding)
{'chunks': (63170,), 'compressor': Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0), 'filters': None, '_FillValue': nan, 'dtype': dtype('float64')}
This was populated with the variable was read from test.zarr.
As a workaround, you can delete the encoding (either just the chunk attribute or all of it):
ds3.foo.encoding = {}
ds3.to_zarr('test3.zarr', mode='w')
This allows the operation to complete successfully.
For all the users stuck on this problem (e.g. @abarciauskas-bgse):
encoding keyword in to_zarr.For xarray developers, the question is whether the chunk() method should delete existing chunks attributes from encoding.
the question is whether the chunk() method should delete existing chunks attributes from encoding.
IMO this is the user-friendly thing to do.
If there is a non-dimension coordinate, the error is also tickled.
import xarray as xr
import numpy as np
ds=xr.Dataset({'foo': (['bar'], np.zeros((100,)))})
# Problem also affects non-dimension coords
ds.coords['baz'] = ('bar', ['mah'] * 100)
ds.to_zarr('test.zarr', mode='w')
ds2 = xr.open_zarr('test.zarr')
ds3 = ds2.chunk({'bar': 2})
ds3.foo.encoding = {}
ds3.coords['baz'].encoding = {} # Need this, too.
ds3.to_zarr('test3.zarr', mode='w')
I arrived here due to a different use case / problem, which ultimately I solved, but I think there's value in documenting it here.
My use case is the following workflow:
1 . take raw data, build a dataset, append it to a zarr store Z
2 . analyze the data on Z, then maybe goto 1.
Step 2's performance is much better when data on Z is chunked properly along the appending dimension 'frame' (chunks of size 50), however step 1 only adds 1 element along it. I end up with Z having chunks (1,1,1,1,1...) on 'frame'.
On xarray 0.16.0, this seems solvable via the encoding parameter, if we take care to only use it on the store creation.
Before that version, I was using something like the monkey patch posted by @chrisbarber .
Code:
import shutil
import xarray as xr
import numpy as np
import tempfile
zarr_path = tempfile.mkdtemp()
def append_test(ds,chunks):
shutil.rmtree(zarr_path)
for i in range(21):
d = ds.isel(frame=slice(i,i+1))
d = d.chunk(chunks)
d.to_zarr(zarr_path,consolidated=True,**(dict(mode='a',append_dim='frame') if i>0 else {}))
dsa = xr.open_zarr(str(zarr_path),consolidated=True)
print(dsa.chunks,dsa.dims)
#sometime before 0.16.0
import contextlib
@contextlib.contextmanager
def change_determine_zarr_chunks(chunks):
orig_determine_zarr_chunks = xr.backends.zarr._determine_zarr_chunks
try:
def new_determine_zarr_chunks( enc_chunks, var_chunks, ndim, name):
da = ds[name]
zchunks = tuple(chunks[dim] if (dim in chunks and chunks[dim] is not None) else da.shape[i] for i,dim in enumerate(da.dims))
return zchunks
xr.backends.zarr._determine_zarr_chunks = new_determine_zarr_chunks
yield
finally:
xr.backends.zarr._determine_zarr_chunks = orig_determine_zarr_chunks
chunks = {'frame':10,'other':50}
ds = xr.Dataset({'data':xr.DataArray(data=np.random.rand(100,100),dims=('frame','other'))})
append_test(ds,chunks)
with change_determine_zarr_chunks(chunks):
append_test(ds,chunks)
#with 0.16.0
def append_test_encoding(ds,chunks):
shutil.rmtree(zarr_path)
encoding = {}
for k,v in ds.variables.items():
encoding[k]={'chunks':tuple(chunks[dk] if dk in chunks else v.shape[i] for i,dk in enumerate(v.dims))}
for i in range(21):
d = ds.isel(frame=slice(i,i+1))
d = d.chunk(chunks)
d.to_zarr(zarr_path,consolidated=True,**(dict(mode='a',append_dim='frame') if i>0 else dict(encoding = encoding)))
dsa = xr.open_zarr(str(zarr_path),consolidated=True)
print(dsa.chunks,dsa.dims)
append_test_encoding(ds,chunks)
Frozen(SortedKeysDict({'frame': (1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 'other': (50, 50)})) Frozen(SortedKeysDict({'frame': 21, 'other': 100}))
Frozen(SortedKeysDict({'frame': (10, 10, 1), 'other': (50, 50)})) Frozen(SortedKeysDict({'frame': 21, 'other': 100}))
Frozen(SortedKeysDict({'frame': (10, 10, 1), 'other': (50, 50)})) Frozen(SortedKeysDict({'frame': 21, 'other': 100}))
Most helpful comment
Hi, I'm new to xarray & zarr ,
After reading a zarr file, I re-chunk the data using xarray.Dataset.chunk. Then create a newly chunked data stored as zarr file with xarray.Dataset.to_zarr But I get error message:
'NotImplementedError: Specified zarr chunks (200, 100, 1) would overlap multiple dask chunks ((50, 50, 50, 50), (25, 25, 25, 25), (10000,)). This is not implemented in xarray yet. Consider rechunking the data using
chunk()or specifying different chunks in encoding.'My xarray version is12.1, & and my understanding is that according to this post https://github.com/pydata/xarray/issues/2300 .it is fixed, thus so it is implemented to 12.1??
Then why do I get 'notimplemented error ?
Do I have to use 'del dsread.data.encoding['chunks']. each time before using 'Dataset.to_zarr' as a workaround? but probably I am missing somthing. I hope someone can point me out...
I made a notebook here for reproducing the pb.
https://github.com/tinaok/Pangeo-for-beginners/blob/master/3-1%20zarr%20and%20re-chunking%20bug%20report.ipynb
thanks for your help, regards Tina