Xarray: Writing a netCDF file is unexpectedly slow

Created on 21 Apr 2019  路  9Comments  路  Source: pydata/xarray

ncdat=xr.open_mfdataset(nclist, concat_dim='time')

ncdat['lat']=ncdat['lat'].isel(time=0).drop('time')
ncdat['lon']=ncdat['lon'].isel(time=0).drop('time')
ncdat=ncdat.rename({'north_south':'lat', 'east_west':'lon'})

lat_coords=ncdat.lat[:,0] #Extract latitudes
lon_coords=ncdat.lon[0,:] #Extract longitudes

ncdat=ncdat.drop(['lat','lon'])

reformatted_ncdat=ncdat.assign_coords(lat=lat_coords,lon=lon_coords, time=ncdat.coords['time'])

ncdat = reformatted_ncdat.sortby('time')
ncdat.to_netcdf('testing.nc')

Problem description

After some processing, I am left with this xarray dataset ncdat which I want to export to a netCDF file.

<xarray.Dataset>
Dimensions:                 (lat: 59, lon: 75, time: 500)
Coordinates:
  * time                    (time) datetime64[ns] 2007-01-22 ... 2008-06-04
  * lat                     (lat) float32 -4.25 -4.15 ... 1.4500003 1.5500002
  * lon                     (lon) float32 29.049988 29.149994 ... 36.450012
Data variables:
    Streamflow_tavg         (time, lat, lon) float32 dask.array<shape=(500, 59, 75), chunksize=(1, 59, 75)>
    RiverDepth_tavg         (time, lat, lon) float32 dask.array<shape=(500, 59, 75), chunksize=(1, 59, 75)>
    RiverFlowVelocity_tavg  (time, lat, lon) float32 dask.array<shape=(500, 59, 75), chunksize=(1, 59, 75)>
    FloodedFrac_tavg        (time, lat, lon) float32 dask.array<shape=(500, 59, 75), chunksize=(1, 59, 75)>
    SurfElev_tavg           (time, lat, lon) float32 dask.array<shape=(500, 59, 75), chunksize=(1, 59, 75)>
    SWS_tavg                (time, lat, lon) float32 dask.array<shape=(500, 59, 75), chunksize=(1, 59, 75)>
Attributes:
    missing_value:           -9999.0
    NUM_SOIL_LAYERS:         1
    SOIL_LAYER_THICKNESSES:  1.0
    title:                   LIS land surface model output
    institution:             NASA GSFC
    source:                  model_not_specified
    history:                 created on date: 2019-04-19T09:11:12.992
    references:              Kumar_etal_EMS_2006, Peters-Lidard_etal_ISSE_2007
    conventions:             CF-1.6
    comment:                 website: http://lis.gsfc.nasa.gov/
    MAP_PROJECTION:          EQUIDISTANT CYLINDRICAL
    SOUTH_WEST_CORNER_LAT:   -4.25
    SOUTH_WEST_CORNER_LON:   29.05
    DX:                      0.1
    DY:                      0.1

But the problem is it takes an inordinately long time to export. Almost 10 mins for this particular file which is only 35M.

How can I expedite this process? Is there anything wrong with the structure of ncdat?

Expected Output

A netCDF file

Output of xr.show_versions()

INSTALLED VERSIONS

commit: None
python: 3.7.3 | packaged by conda-forge | (default, Mar 27 2019, 23:01:00)
[GCC 7.3.0]
python-bits: 64
OS: Linux
OS-release: 3.0.101-0.47.105-default
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: None
LANG: en_US.UTF-8
LOCALE: en_US.UTF-8
libhdf5: 1.10.4
libnetcdf: 4.6.2

xarray: 0.12.1
pandas: 0.24.2
numpy: 1.16.2
scipy: 1.2.1
netCDF4: 1.5.0.1
pydap: None
h5netcdf: None
h5py: None
Nio: None
zarr: None
cftime: 1.0.3.4
nc_time_axis: None
PseudonetCDF: None
rasterio: None
cfgrib: None
iris: None
bottleneck: 1.2.1
dask: 1.2.0
distributed: 1.27.0
matplotlib: 3.0.3
cartopy: 0.17.0
seaborn: 0.9.0
setuptools: 41.0.0
pip: 19.0.3
conda: None
pytest: None
IPython: 7.4.0
sphinx: None

backends usage question

All 9 comments

You're using dask, so the Dataset is being lazily computed. If one part of your pipeline is very expensive (perhaps reading the original data from disk?) then the process of saving can be very slow.

I would suggest doing some profiling, e.g., as shown in this example: http://docs.dask.org/en/latest/diagnostics-local.html#example

Once we know what the slow part is, that will hopefully make opportunities for improvement more obvious.

Are there "best practices" for a situation like this? Parallel writes? save_mfdataset?

ping @jhamman @rabernat

It really depends on the underlying cause. In most cases, writing a file to disk is not the slow part, only the place where the slow-down is manifested.

Since the final dataset size is quite manageable, I would start by forcing computation before the write step:

ncdat.load().to_netcdf(...)

While writing of xarray datasets backed by dask is possible, its a poorly optimized operation. Most of this comes from constraints in netCDF4/HDF5. There are ways to side step some of these challenges (save_mfdataset and the distributed dask scheduler) but they are probably overkill for this use case.

Diagnosis

Thank you very much! I found this. For now, I will use the load() option.

Loading netCDFs

In [8]: time ncdat=reformat_LIS_outputs(outlist)
CPU times: user 7.78 s, sys: 220 ms, total: 8 s
Wall time: 8.02 s

Slower export

In [6]: time ncdat.to_netcdf('test_slow')
CPU times: user 12min, sys: 8.19 s, total: 12min 9s
Wall time: 12min 14s

Faster export

In [9]: time ncdat.load().to_netcdf('test_faster.nc')
CPU times: user 42.6 s, sys: 2.82 s, total: 45.4 s
Wall time: 54.6 s

There are ways to side step some of these challenges (save_mfdataset and the distributed dask scheduler)

@jhamman Could you elaborate on these ways ?

I am having severe slow-downs when writing Datasets by blocks (backed by dask). I have also noticed that the slowdowns do not occur when writing to ramdisk. Here are the timings of to_netcdf, which uses default engine and encoding (the nc file is 4.3 GB) :

  • When writing to ramdisk (/dev/shm/) : 2min 1s
  • When writing to /tmp/ : 27min 28s
  • When writing to /tmp/ after .load(), as suggested here : 34s (.load takes 1min 43s)

The workaround suggested here works, but the datasets may not always fit in memory, and it fails the essential purpose of dask...

Note: I am using dask 2.3.0 and xarray 0.12.3

@fsteinmetz - in my experience, the main thing to consider here is how and when xarray's backends lock/block for certain operations. The hdf5 library is not thread safe and so we implement a global lock around all hdf5 read/write operations. In most cases, this means we can only do one read or one write at a time per process. We have found that using Dask's distributed (or mulitprocessing) scheduler allows us to bypass the thread locks required by hdf5 by using multiple processes. We also need a per file lock when writing, so using multiple output datasets theoretically allows for concurrent writes (provided your filesystem and OS support this).

Finally, its best not to jump to the complicated explanations first. If you have many small dask chunks in your dataset, both reading and writing will be quite inefficient. This is simply because there is some non-trivial overhead when accessing partial datasets. This is even worse when the dataset is chunked/compressed.

Hope that helps.

I suspect it could work pretty well to explicitly rechunk your dataset into larger chunks (e.g., with the Dataset.chunk() method). This way you could continue to use dask for lazy writes, but reduce the overhead of writing individual chunks.

Thanks for the explanations @jhamman and @shoyer :)
Actually it turns out that I was not using particularly small chunks, but the filesystem for /tmp was faulty... After trying on a reliable filesystem, the results are much more reasonable.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

mathause picture mathause  路  4Comments

zxdawn picture zxdawn  路  3Comments

benbovy picture benbovy  路  3Comments

jacklovell picture jacklovell  路  4Comments

jhamman picture jhamman  路  5Comments