Xarray: Limiting threads/cores used by xarray(/dask?)

Created on 17 Sep 2018  路  9Comments  路  Source: pydata/xarray

I'm fairly new to xarray and I'm currently trying to leverage it to subset some NetCDFs. I'm running this on a shared server and would like to know how best to limit the processing power used by xarray so that it plays nicely with others. I've read through the dask and xarray documentation a bit but it doesn't seem clear to me how to set a cap on cpus/threads. Here's an example of a spatial subset:

import glob
import os
import xarray as xr

from multiprocessing.pool import ThreadPool
import dask

wd = os.getcwd()

test_data = os.path.join(wd, 'test_data')
lat_bnds = (43, 50)
lon_bnds = (-67, -80)
output = 'test_data_subset'

def subset_nc(ncfile, lat_bnds, lon_bnds, output):
    if not glob.os.path.exists(output):
        glob.os.makedirs(output)
    outfile = os.path.join(output, os.path.basename(ncfile).replace('.nc', '_subset.nc'))

    with dask.config.set(scheduler='threads', pool=ThreadPool(5)):
        ds = xr.open_dataset(ncfile, decode_times=False)

        ds_sub = ds.where(
            (ds.lon >= min(lon_bnds)) & (ds.lon <= max(lon_bnds)) & (ds.lat >= min(lat_bnds)) & (ds.lat <= max(lat_bnds)),
            drop=True)
        comp = dict(zlib=True, complevel=5)
        encoding = {var: comp for var in ds.data_vars}
        ds_sub.to_netcdf(outfile, format='NETCDF4', encoding=encoding)

list_files = glob.glob(os.path.join(test_data, '*'))
print(list_files)

for i in list_files:
    subset_nc(i, lat_bnds, lon_bnds, output)

I've tried a few variations on this by moving the ThreadPool configuration around but I still see way too much activity in the server's top (>3000% cpu activity). I'm not sure where the issue lies.

awaiting response backends dask usage question

Most helpful comment

hi, my testcode is running properly on 5 threads
thanks for the help

import xarray as xr
import os
import numpy
import sys
import dask
from multiprocessing.pool import ThreadPool 

#dask-worker = --nthreads 1

with dask.config.set(schedular='threads', pool=ThreadPool(5)):
    dset = xr.open_mfdataset("/data/Environmental_Data/Sea_Surface_Height/*/*.nc", engine='netcdf4', concat_dim='time', chunks={"latitude":180,"longitude":360})
    dset1 = dset["adt"]-dset["sla"]
    dset1.to_dataset(name = 'ssh_mean')
    dset["ssh_mean"] = dset1
    dset = dset.drop("crs")
    dset = dset.drop("lat_bnds")
    dset = dset.drop("lon_bnds")
    dset = dset.drop("__xarray_dataarray_variable__")
    dset = dset.drop("nv")
    dset_all_over_monthly_mean = dset.groupby("time.month").mean(dim="time", skipna=True)
    dset_all_over_season1_mean = dset_all_over_monthly_mean.sel(month=[1,2,3])
    dset_all_over_season1_mean.mean(dim="month",skipna=True)
    dset_all_over_season1_mean.to_netcdf("/data/Environmental_Data/dump/mean/all_over_season1_mean_ssh_copernicus_0.25deg_season1_data_mean.nc")

All 9 comments

Step 1 would be making sure that you're actually using dask :). Xarray only uses dask with open_dataset() if you supply the chunks keyword argument.

That said, xarray's only built-in support for parallelism is through Dask, so I'm not sure what is using all your CPU.

As per your suggestion, I retried with chunking and found a new error (due to the nature of my data having rotated poles, dask demanded that I save my data with astype(); this isn't my major concern so I'll deal with that somewhere else).

What I did notice was that when chunking was specified (ds = xr.open_dataset(ncfile).chunking({'time': 10})), I lost all parallelism and although I had specified different thread counts, the performance never crossed 110% (I imagine the extra 10% was due to I/O).

This is really a mystery and unfortunately, I haven't a clue how this beahviour is possible if parallel processing is disabled by default. The speed of my results when dask multprocessing isn't specified suggests that it must be using more processing power:

  • using Multiprocessing calls to CDO with 5 ForkPoolWorkers = ~2h/5 files (100% x 5 CPUs)
  • xarray without dask multiprocessing specifications = ~3min/5 files (spikes of 3500% on one CPU)

Could these spikes in CPU usage be due to other processes (e.g. memory usage, I/O)?

If your data using in-file HDF5 chunks/compression it's possible that HDF5 is uncompressing the data is parallel, though I haven't seen that before personally.

@Zeitsperre - this issue has been inactive for a while. Did you find a solution to y our problem?

i am also interest, I am running a lot of critical processes and I want to at least have 5 cores idleing.

On a few systems, I've noticed that I need to set the environment variable OMP_NUM_THREADS to 1 to limit parallel evaluation within dask threads. I wonder if that something like this is happening here?

xref: https://stackoverflow.com/questions/39422092/error-with-omp-num-threads-when-using-dask-distributed

hi, my testcode is running properly on 5 threads
thanks for the help

import xarray as xr
import os
import numpy
import sys
import dask
from multiprocessing.pool import ThreadPool 

#dask-worker = --nthreads 1

with dask.config.set(schedular='threads', pool=ThreadPool(5)):
    dset = xr.open_mfdataset("/data/Environmental_Data/Sea_Surface_Height/*/*.nc", engine='netcdf4', concat_dim='time', chunks={"latitude":180,"longitude":360})
    dset1 = dset["adt"]-dset["sla"]
    dset1.to_dataset(name = 'ssh_mean')
    dset["ssh_mean"] = dset1
    dset = dset.drop("crs")
    dset = dset.drop("lat_bnds")
    dset = dset.drop("lon_bnds")
    dset = dset.drop("__xarray_dataarray_variable__")
    dset = dset.drop("nv")
    dset_all_over_monthly_mean = dset.groupby("time.month").mean(dim="time", skipna=True)
    dset_all_over_season1_mean = dset_all_over_monthly_mean.sel(month=[1,2,3])
    dset_all_over_season1_mean.mean(dim="month",skipna=True)
    dset_all_over_season1_mean.to_netcdf("/data/Environmental_Data/dump/mean/all_over_season1_mean_ssh_copernicus_0.25deg_season1_data_mean.nc")

@Zeitsperre - are you still having problems in this area? If not, is okay if we close this issue?

Hi @jhamman, please excuse the lateness of this reply. It turned out that in the end all I needed to do was set OMP_NUM_THREADS to the number based on my cores I want to use (2 threads/core) before launching my processes. Thanks for the help and for keeping this open. Feel free to close this thread.

Was this page helpful?
0 / 5 - 0 ratings