Esmvaltool: Switch to iris 2

Created on 20 Jun 2018  路  26Comments  路  Source: ESMValGroup/ESMValTool

At the moment we are still using iris 1.13 because of issue #318. We would like to switch to iris 2 because the expectation is that iris 1.13 will not be maintained indefinitely.

enhancement iris

All 26 comments

I am having a lot of trouble with PRIMAVERA outputs due to its huge size. I think we can manage this better with Iris 2 and dask, so I am gonna try to make the switch.

Let me know how you're doing, so we don't do double work.

I will upload the branch and update it so you can see what I am up to.

For now, it's just updating iris and adding an option in the config file to pass it a scheduler.json information. Bad news is that I am having a pickle error from dask that I don't know what really means

This is the error. Any idea, @bjlittle?

  File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/iris/fileformats/netcdf.py", line 2343, in save
    fill_value=fill_value)
  File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/iris/fileformats/netcdf.py", line 977, in write
    fill_value=fill_value)
  File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/iris/fileformats/netcdf.py", line 2013, in _create_cf_data_variable
    fill_value_to_check)
  File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/iris/fileformats/netcdf.py", line 1990, in store
    da.store([data], [target])
  File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/dask/array/core.py", line 949, in store
    result.compute(**kwargs)
  File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/dask/base.py", line 154, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/dask/base.py", line 407, in compute
    results = get(dsk, keys, **kwargs)
  File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/distributed/client.py", line 2098, in get
    direct=direct)
  File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/distributed/client.py", line 1508, in gather
    asynchronous=asynchronous)
  File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/distributed/client.py", line 615, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/distributed/utils.py", line 253, in sync
    six.reraise(*error[0])
  File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/six.py", line 693, in reraise
    raise value
  File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/distributed/utils.py", line 238, in f
    result[0] = yield make_coro()
  File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/distributed/client.py", line 1385, in _gather
    traceback)
  File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "/home/Earth/jvegas/.conda/envs/esmvaltool3/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 59, in loads
    return pickle.loads(x)
EOFError: Ran out of input

Just to comment that I have problems when trying to use dask.distributed. No problems arise if if don't try to connect to a dask scheduler

See the issue in iris portal https://github.com/SciTools/iris/issues/3157

By the way, we get no performance gain using the threaded scheduler. See the GIL in all its glory:

bokeh_plot

Nice picture, what tool did you use to make it? Are you using dask arrays with that example? Or just our preprocessor? Because in the current preprocessor we convert everything to numpy arrays in many functions, so I wouldn't expect much parallelism there.

Memory issues can be expected when switching to Iris 2, as reported for example in #677.

howdy chaps, I am starting to have a look at this again and will be running some tests with iris 2.2.0 - are there any updates on this so I don't start reinventing the wheel?

also, just to keep track of my previous tests and very informed comments from @bjlittle @jvegasbsc and @pelson, am linking the previous issue: https://github.com/ESMValGroup/ESMValTool/issues/318

@bjlittle Happy New Year, man!! Hope things are good with you and you had a good break :beer:
New year, new testing and some interesting results right here: I finally caught the gist of lazy and realized data in the new iris and I thought about testing iris 2.2.0 against iris 1.13 for a few cases. Basically, my little test case looks like this:

# apply possible fix to dask
import dask
# this id deprecated
# dask.config.set(get=dask.get)
# optionality:
#    x.compute(scheduler='single-threaded')
#    x.compute(scheduler='threads')
#    x.compute(scheduler='processes')
# loop through options
#dask.config.set(scheduler='single-threaded')
#dask.config.set(scheduler='threads')
dask.config.set(scheduler='processes')

import iris
import time

#print(iris.__version__)

c1 = iris.load_cube('data1.nc')
c2 = iris.load_cube('data2.nc')

# lazy/realized data ops
t1 = time.time()
#res = (c1 - c2) * (c1 * c2 / (c1 + c2))**2.
res = (c1.data - c2.data) * (c1.data * c2.data / (c1.data + c2.data))**2.
t2 = time.time()
print('op time', t2 - t1)

# saving: did once when using lazy data only
#iris.save(res, 'res.nc')
#t3 = time.time()
#print('save time', t3 - t2)

where I ran the lazy (no cube.data but just cube in operation) and realized (cube.data in operation) and for iris 2.2.0 I ran with no dask settings and then with different dask settings. For review purposes the test lives on Jasmin at /home/users/valeriu/esmvaltool_dask_test. I tested only timing and no memory so far; timing has been averaged over 20 iterations of the same code (just the operation and save); my numpy versions are identical for both cases 1.15.1; dask is 1.0. Here are my conclusions and attached plots:

  • lazy data operations are MUCH faster with iris2.2.0/dask by a factor of 5;
  • saving a cube is about 3 times slower in iris2.2.0 than in iris1.13 (WTF?);
  • fully realized data operations are slower than lazy data operations (doh!): 50% slower for iris1.13 and 10x slower for iris2.2.0 (!!!!);
  • fully realized data operations for iris2.2.0 are slower than for iris1.13:

    • about 3x slower if no dask.config.set() settings are applied;

    • about 7x slower if run with dask.config.set(scheduler='single-threaded');

    • about 3x slower if run with dask.config.set(scheduler='threads');

    • about 2x slower if run with dask.config.set(scheduler='processes');

So, as @pelson pointed out in the previous (closed) issue, different settings of the dask configuration do indeed influence performnce by quite a bit.
Any thoughts on this?
@jvegasbsc @bouweandela @mattiarighi
lazy_operation
lazy_save
realized_operation
realized_processes_operation
realized_singlethreaded_operation
realized_threads_operation

OK guys, I ran some more tests: bad news twice: once, because scheduler: processes does not work with ESMValTool's multithreading (daemonic tasks can not have children, sounds quite ominous :grin: ) and twice, because a comparison run of a fairly involved preprocessor (see below) with 8 threads clearly shows that iris2+dask is slower than iris1.13:

Preprocessor:

  pptest:
    extract_levels:
      levels: 85000
      scheme: linear
    regrid:
      target_grid: 1x1
      scheme: linear
    mask_fillvalues:
      threshold_fraction: 0.95
    multi_model_statistics:
      span: overlap
      statistics: [mean, median]

run it 10x to get some statistics, max_parallel_tasks=8:

iris 1.13:  13.6 +/- 1.4 s per run
iris 2.2.0 default dask:  25.8 +/- 2.8 s per run
iris 2.2.0 scheduler: 
threads and multiprocessing-method: spawn: 27.2 +/- 2.3 s per run

The way I tested this was by editing the configuration file of dask in ~/.config/dask/distributed.yml; I will next try explicitly importing dask and setting the scheduler to: processes in each of the preprocessor modules. Will report back

bummer, found out that the default dask.config.config(scheduler) is indeed threads so progress equals zero; scheduler=processes offers improvement only if the data is realized everywhere and we use only a single thread so it's not usable for our purposes. I will actually go on now and look at the preprocessor modules and see where and when we can switch from calls to realized to lazy data; It would really help if the iris guys had a best compromise configuration for dask so we can just use that configuration file rather than tweak dask on the fly - @bjlittle ?

@valeriupredoi Here are some comments on your results:

  • From the large variability in runtimes it looks like the system you are using to test is quite overloaded with tasks, which will cause tasks to be executed much slower than necessary.
  • The mean is not representative of the actual runtime, the shortest runtime is more likely to be the correct time. This is because your computer is busy doing other stuff if it takes longer.
  • saving a cube is about 3 times slower in iris2.2.0 than in iris1.13 (WTF?);

    From looking at your script, it looks like the data is only realized (i.e. loaded from file and computed) at the save step. So that explains why this takes longer.

  • I would recommend using actual preprocessor functions for testing instead of synthetic benchmarks, as this gives much more relevant results on which code may be slow(er)/need to be optimized.

Finally, I recommend switching to Iris 2 as soon as possible even if this incurs some slowdown. I believe many people are already using Iris 2, given all switches we have in the code for Iris version selection. We can then start working on making the preprocessor faster/more memory efficient or if we need iris support for that, file an issue with iris. Sticking with Iris 1 is not a real option in the long run, because it will not be further developed. Especially with the upcoming large CMIP6 datasets, I expect that those will be much easier to handle with Iris 2/dask than with Iris 1.

Finally, I recommend switching to Iris 2 as soon as possible even if this incurs some slowdown.

I agree with you. We can not expect to fix all drawbacks coming from the switch before switching: it is just too much work to be done. Also, if we keep using iris 1.13 we just keep adding more stuff that we will need to optimize / fix on the future

same, I think we need to switch to iris 2.2.0 asap - in fact, I have been running all my stuff with it for the past week or so. @bouweandela cheers for the detailed analysis, man, I agree with your points bar the one that says the tests should probably be done on a pristine node - I think the real-life experience (eg a busy node on Jasmin, like my results have been done on) is more useful to get an idea of the differences - indeed one introduces a lot of noise due to the use of the node by other users but we assume that noise to have the same distribution everytime you run stuff enough multiple times (assuming a Poisson statistic, probably the best representation for it ie the more tests you do the closer the mean is to the true value). Actually, comparing min times as you suggest and mean times they are almost exactly at a factor of 2-3 slower with iris2. Before we start thinking how to not realize data in the Preproc modules I think it'd be very useful if we could possibly find a more optimal set of config settings in .config/dask/distributed.yml which is currently empty with default dask. That'd be the no brainer solution that involves the least amount of work :grin:

hey guys, this thing actually bugs me so I ran a few more things: I have followed @bouweandela suggestion to run on a relatively clean node and use actual ESMValTool examples so I did this on my laptop using a very basic recipe that does a regrid preprocessor and a zonal mean for a diagnostic:

Single-machine testing with iris
--------------------------------

Single-machine running: my laptop with no other
transitory process running in the background.

iris 1.13.0
max_parallel_task = 8
0:00:17.270246
0:00:17.195842
0:00:16.610380
0:00:17.019490
0:00:16.886401

iris 2.2.0
max_parallel_task = 8
0:00:20.693597
0:00:20.592311
0:00:21.633979
0:00:20.578326
0:00:20.793864

so expectedly numbers are stable and it looks like the bad wolf is not so bad after all - we're taking a hit of only ~25% in time. I also found the specific iris module that deals with the handling of dask or numpy arrays (lazy/realised data):

lazy/realised data set in _lazy_data.py in iris:

anaconda3/envs/esmvaltool/lib/python3.6/site-packages/iris/_lazy_data.py

chunks are set there based on data shape and a MAX chunks parameter
similar to what biggus used to have. We can actually play with that eg:

 _MAX_CHUNK_SIZE = 8 * 1024 * 1024 * 2  # default
 _MAX_CHUNK_SIZE = 4 * 512 * 512 * 2  # not improving
 _MAX_CHUNK_SIZE = 16 * 2048 * 2048 * 4  # not improving
 _MAX_CHUNK_SIZE = 32 * 4096 * 4096 * 8  # not improving
 _MAX_CHUNK_SIZE = 512  # freaking slow
 _MAX_CHUNK_SIZE = 80 * 10240 * 10240 * 20  # not improving

It appears that any change of this constant is not affecting run time
unless it's very small ie 16 or 512 which is pretty straightforward

we can also play with the actual chunks as well:

    chunks = tuple(int(i/10) for i in data.shape)  # slows by factor 4
    chunks = tuple(int(i*10) for i in data.shape)  # no improvement
    chunks = tuple(int(i/2) for i in data.shape)  # slows by 5-10%
    chunks = tuple(int(i*4) for i in data.shape)   # no improvement
    chunks = tuple(int(i*100) for i in data.shape)  # no improvement
    chunks = tuple(2 for i in data.shape)  # superslow
    chunks = tuple(512 for i in data.shape)  # no improvement

and finally try to set a client up:

iris 2.2.0
set in _lazy_data:
from dask.distributed import Client
client = Client(processes=False)
max_parallel_task = 1

(max_parallel_task > 1 stalls completely;
I think workers are not talking to each other!)
about 30% slower than if we didn't create the Client

so it looks like, in terms of looking at the iris code as a possible place to gain in performance it's not gonna happen ie the iris guys have done everything right . I'd reckon that we have to look into not realizing so much of the cubes' data in our scripts. I am also wondering if setting workers properly up would be a good idea, but I don't know yet how to do that - @jvegasbsc has done some tests I believe. Another thing I don't know is when, for instance, we perform cube collapses (we do a lot of them) - will this happen with automatic data realization?

OK I got some good news and onbiously some bad news:
I ran this recipe through both iris 1 and 2 on my laptop so we don't get any node traffic from elsewhere and with single-thread so we can measure the behavior correctly:

datasets:
  - {dataset: MPI-ESM-LR,  project: CMIP5,  mip: Amon,  exp: historical,  ensemble: r1i1p1,  start_year: 1950,  end_year: 2000}
  - {dataset: NorESM1-M,   project: CMIP5,  mip: Amon,  exp: historical,  ensemble: r1i1p1,  start_year: 1950,  end_year: 2000}

preprocessors:
  simple_preproc:
    regrid:
      target_grid: 1x1
      scheme: linear
    multi_model_statistics:
      span: overlap
      statistics: [mean,]

diagnostics:
  validation_basic:
    description: "zonal means"
    variables:
      tas: # simple wee variable
        preprocessor: simple_preproc
        field: T2Ms
    scripts: Null

and see the memory=f(absolute time) plots attached
iris1
iris2

(the black vertical lines are start times of fix_data, regrid and multimodel modules)
The good news:

  • memory usage is very consistently close between iris 1 and 2 (in fact some 10% less in iris 2, but that's probably fluctuation);
  • time (excluding multimodel) is about 30% more for iris 2 vs iris 1 but this is consistent to my previous investigations (see above);
  • memory estimation according to my equations in #810 works great (small number of datasets, R ~ 3, approx average file size before regrid is F ~ 80MB and after regrid F ~ 160MB);

The bad news:

  • multimodel is really slow in iris 2: multimodel lasts for about 160s in iris 1 and about 320s in iris 2, so twice the time

Note that I didn't apply any tweaks to iris 2's dask (out-the-box)

I will create a branch where we can test this further but I have limited data on my laptop and would not want to perform these tests on Jasmin - as @bouweandela mentioned, we should run these on a free node.

How much work would it be to switch to dask arrays in the preprocessor?
Can we expect this to improve the performance?

no need to do that - we just don't have to call cube.data as many times as we are now, if we work with cube only structures, iris will automatically work with dask (lazy) arrays and do all the operations on them rather than on numpy arrays; of course this would be the ideal case only, we'll have to call cube.data at some point and when that happens iris switches from dask to numpy, the question is how many times we really need to do this

here is the bit of iris documentation that tells us about lazy and realized data: https://scitools.org.uk/iris/docs/latest/userguide/real_and_lazy_data.html

what puzzles me still is why such modules like multimodel that in ESMValTool rely almost 100% on realized data operations are so much slower in iris 2 than 1, given that, since all operations are on numpy arrays and numpy hasn't changed, we still take a decent hit in run time....

How much work would it be to switch to dask arrays in the preprocessor?
Can we expect this to improve the performance?

@jvegasbsc do you think this would be possible with a benefit on performance in modules like fix_data or mask that actually rely on direct access of individual data values?

@jvegasbsc do you think this would be possible with a benefit on performance in modules like fix_data or mask that actually rely on direct access of individual data values?

I think that we should update them to become lazy operations on Iris cubes. Unfortunately, I have not seen any examples on Iris documentation about how to do it

with the fix from https://github.com/ESMValGroup/ESMValTool/pull/816 things look better and smoother: same test case as above, same comparison, only the bugfix from _multimodel in
iris1_2
iris2_2

:grin:

the time delta between the two vertical lines is the time for regrid, past the rightmost vertical line is multimodel, check data and save

have created us a branch for dask purposes (ie performance improvements): https://github.com/ESMValGroup/ESMValTool/tree/version2_dask - no PR just yet but note that I have already change multimode to reflect #816 and have started putting dask arrays in it, also recipes/examples/recipe_dask.yml is the test case above. Hopefully we'll be able to find the golden egg and improve something in this branch and not delete it in misery in a couple months :grin:

Was this page helpful?
0 / 5 - 0 ratings