distributed.joblib: AssertionError: yield from wasn't used with future

Created on 1 Aug 2018  路  18Comments  路  Source: dask/distributed

Hi all, thank you for the excellent work on such a great library first. I used distributed as a backend of joblib, but when delayed functions are fed data that is of large size, it seems distributed.joblib will try to scatter the data first and an AssertionError is raised as below:

tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x000001CB0DF54AE8>, <Future finished exception=AssertionError("yield from wasn't used with future",)>)
Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 204, in maybe_to_futures
    f = call_data_futures[arg]
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 67, in __getitem__
    ref, val = self._data[id(obj)]
KeyError: 1971626397592

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\tornado\ioloop.py", line 759, in _run_callback
    ret = callback()
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\tornado\stack_context.py", line 276, in null_wrapper
    return fn(*args, **kwargs)
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\tornado\ioloop.py", line 780, in _discard_future_result
    future.result()
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\tornado\gen.py", line 1113, in run
    yielded = self.gen.send(value)
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 244, in callback_wrapper
    callback(result)  # gets called in separate thread
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\joblib\parallel.py", line 343, in __call__
    self.parallel.dispatch_next()
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\joblib\parallel.py", line 763, in dispatch_next
    if not self.dispatch_one_batch(self._original_iterator):
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\joblib\parallel.py", line 791, in dispatch_one_batch
    self._dispatch(tasks)
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\joblib\parallel.py", line 748, in _dispatch
    job = self._backend.apply_async(batch, callback=cb)
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 234, in apply_async
    func, args = self._to_func_args(func)
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 224, in _to_func_args
    args = list(maybe_to_futures(args))
  File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 212, in maybe_to_futures
    [f] = self.client.scatter([arg])
AssertionError: yield from wasn't used with future`

By comment out this scatter function, my code ran successfully and I think this may be a bug.

Most helpful comment

@jjerphan confirmed, thanks. Will take a closer look later today.

All 18 comments

The error is gone by updating to the lastest version of both joblib and distributed.

@ZuoMatthew I am facing the same error with the latest stable version of dask, distributed and joblib. Do you remember which versions did you use in order to make it work?

A minimal reproducible example would be useful if this problem still exists.

Sorry, my bad.

Here it is:

import numpy as np

from dask.distributed import Client, LocalCluster
from joblib import Parallel, delayed, parallel_backend

def sum_values(array, scalar):
    return (array + scalar).sum()

cluster = LocalCluster()
client = Client(cluster)

with parallel_backend("dask"):
    results = Parallel()(delayed(sum_values)(np.zeros(i), 5) for i in range(50000, 50010))

print(results)

Edit: using the libraries:

  • dask==1.2.0
  • distributed==1.27.0
  • joblib==0.13.2

@julioasotodv my experience is that when I run your example, but with smaller numbers, things work fine. When I run it with the numbers you've provided things take a while. In neither case does this look like the error raised in the original issue. Are you sure that this is the same issue?

If so, I unfortunately wasn't able to reproduce the error reported with the code that you've provided.

Hmm, that's weird.

The same code on my machine gets hung forever (wiht 0% CPU utilisation) until I Ctrl+C, and the traceback appears:

Traceback (most recent call last):
  File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/joblib/_dask.py", line 223, in maybe_to_futures
    f = call_data_futures[arg]
  File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/joblib/_dask.py", line 56, in __getitem__
    ref, val = self._data[id(obj)]
KeyError: 4669293792

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/tornado/gen.py", line 742, in run
    yielded = self.gen.send(value)
  File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/joblib/_dask.py", line 264, in callback_wrapper
    callback(result)  # gets called in separate thread
  File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/joblib/parallel.py", line 309, in __call__
    self.parallel.dispatch_next()
  File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/joblib/parallel.py", line 731, in dispatch_next
    if not self.dispatch_one_batch(self._original_iterator):
  File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/joblib/parallel.py", line 759, in dispatch_one_batch
    self._dispatch(tasks)
  File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/joblib/parallel.py", line 716, in _dispatch
    job = self._backend.apply_async(batch, callback=cb)
  File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/joblib/_dask.py", line 254, in apply_async
    func, args = self._to_func_args(func)
  File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/joblib/_dask.py", line 243, in _to_func_args
    args = list(maybe_to_futures(args))
  File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/joblib/_dask.py", line 231, in maybe_to_futures
    [f] = self.client.scatter([arg])
AssertionError: yield from wasn't used with future

However, I just discovered that it happens with:

  • dask==1.2.0
  • distributed==1.27.0

But It doesn't with:

  • dask==0.20.0
  • distributed==1.24.0

That said, it just happens with large arrays (such as np.zeros(50000)). With small arrays it works just fine.

I can reproduce the issue on my machine.


Machine details

$ uname -a
Darwin 18.6.0 Darwin Kernel Version 18.6.0: Thu Apr 25 23:16:27 PDT 2019; root:xnu-4903.261.4~2/RELEASE_X86_64 x86_64

In [1]: import numpy as np
   ...:
   ...: from dask.distributed import Client, LocalCluster
   ...: from joblib import Parallel, delayed, parallel_backend
   ...:
   ...: def sum_values(array, scalar):
   ...:     return (array + scalar).sum()
   ...:
   ...: cluster = LocalCluster()
   ...: client = Client(cluster)
   ...:
   ...: with parallel_backend("dask"):
   ...:     results = Parallel()(delayed(sum_values)(np.zeros(i), 5) for i in range(50000, 50010))
   ...:
   ...: print(results)
```python-traceback tornado.application - ERROR - Exception in callback functools.partial(>, ) Traceback (most recent call last): File "/Users/jjerphanion/.virtualenvs/asserttest/lib/python3.6/site-packages/joblib/_dask.py", line 223, in maybe_to_futures f = call_data_futures[arg] File "/Users/jjerphanion/.virtualenvs/asserttest/lib/python3.6/site-packages/joblib/_dask.py", line 56, in __getitem__ ref, val = self._data[id(obj)] KeyError: 4457179776 During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/Users/jjerphanion/.virtualenvs/asserttest/lib/python3.6/site-packages/tornado/ioloop.py", line 743, in _run_callback ret = callback() File "/Users/jjerphanion/.virtualenvs/asserttest/lib/python3.6/site-packages/tornado/ioloop.py", line 767, in _discard_future_result future.result() File "/Users/jjerphanion/.virtualenvs/asserttest/lib/python3.6/site-packages/tornado/gen.py", line 748, in run yielded = self.gen.send(value) File "/Users/jjerphanion/.virtualenvs/asserttest/lib/python3.6/site-packages/joblib/_dask.py", line 264, in callback_wrapper callback(result) # gets called in separate thread File "/Users/jjerphanion/.virtualenvs/asserttest/lib/python3.6/site-packages/joblib/parallel.py", line 309, in __call__ self.parallel.dispatch_next() File "/Users/jjerphanion/.virtualenvs/asserttest/lib/python3.6/site-packages/joblib/parallel.py", line 731, in dispatch_next if not self.dispatch_one_batch(self._original_iterator): File "/Users/jjerphanion/.virtualenvs/asserttest/lib/python3.6/site-packages/joblib/parallel.py", line 759, in dispatch_one_batch self._dispatch(tasks) File "/Users/jjerphanion/.virtualenvs/asserttest/lib/python3.6/site-packages/joblib/parallel.py", line 716, in _dispatch job = self._backend.apply_async(batch, callback=cb) File "/Users/jjerphanion/.virtualenvs/asserttest/lib/python3.6/site-packages/joblib/_dask.py", line 254, in apply_async func, args = self._to_func_args(func) File "/Users/jjerphanion/.virtualenvs/asserttest/lib/python3.6/site-packages/joblib/_dask.py", line 243, in _to_func_args args = list(maybe_to_futures(args)) File "/Users/jjerphanion/.virtualenvs/asserttest/lib/python3.6/site-packages/joblib/_dask.py", line 231, in maybe_to_futures [f] = self.client.scatter([arg]) AssertionError: yield from wasn't used with future distributed.utils - ERROR - 'tcp://127.0.0.1:55265' Traceback (most recent call last): File "/Users/jjerphanion/.virtualenvs/asserttest/lib/python3.6/site-packages/distributed/utils.py", line 693, in log_errors yield File "/Users/jjerphanion/.virtualenvs/asserttest/lib/python3.6/site-packages/distributed/scheduler.py", line 2903, in update_data ws = self.workers[w] KeyError: 'tcp://127.0.0.1:55265' distributed.core - ERROR - 'tcp://127.0.0.1:55265' Traceback (most recent call last): File "/Users/jjerphanion/.virtualenvs/asserttest/lib/python3.6/site-packages/distributed/core.py", line 361, in handle_comm result = yield result File "/Users/jjerphanion/.virtualenvs/asserttest/lib/python3.6/site-packages/tornado/gen.py", line 735, in run value = future.result() File "/Users/jjerphanion/.virtualenvs/asserttest/lib/python3.6/site-packages/tornado/gen.py", line 748, in run yielded = self.gen.send(value) File "/Users/jjerphanion/.virtualenvs/asserttest/lib/python3.6/site-packages/distributed/scheduler.py", line 2298, in scatter self.update_data(who_has=who_has, nbytes=nbytes, client=client) File "/Users/jjerphanion/.virtualenvs/asserttest/lib/python3.6/site-packages/distributed/scheduler.py", line 2903, in update_data ws = self.workers[w] KeyError: 'tcp://127.0.0.1:55265' distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Restarting worker distributed.nanny - WARNING - Restarting worker --------------------------------------------------------------------------- KeyError Traceback (most recent call last) ~/.virtualenvs/asserttest/lib/python3.6/site-packages/joblib/_dask.py in maybe_to_futures(args) 222 try: --> 223 f = call_data_futures[arg] 224 except KeyError: ~/.virtualenvs/asserttest/lib/python3.6/site-packages/joblib/_dask.py in __getitem__(self, obj) 55 def __getitem__(self, obj): ---> 56 ref, val = self._data[id(obj)] 57 if ref() is not obj: KeyError: 4459636096 During handling of the above exception, another exception occurred: KeyboardInterrupt Traceback (most recent call last) in 11 12 with parallel_backend("dask"): ---> 13 results = Parallel()(delayed(sum_values)(np.zeros(i), 5) for i in range(50000, 50010)) 14 15 print(results) ~/.virtualenvs/asserttest/lib/python3.6/site-packages/joblib/parallel.py in __call__(self, iterable) 922 self._iterating = self._original_iterator is not None 923 --> 924 while self.dispatch_one_batch(iterator): 925 pass 926 ~/.virtualenvs/asserttest/lib/python3.6/site-packages/joblib/parallel.py in dispatch_one_batch(self, iterator) 757 return False 758 else: --> 759 self._dispatch(tasks) 760 return True 761 ~/.virtualenvs/asserttest/lib/python3.6/site-packages/joblib/parallel.py in _dispatch(self, batch) 714 with self._lock: 715 job_idx = len(self._jobs) --> 716 job = self._backend.apply_async(batch, callback=cb) 717 # A job can complete so quickly than its callback is 718 # called before we get here, causing self._jobs to ~/.virtualenvs/asserttest/lib/python3.6/site-packages/joblib/_dask.py in apply_async(self, func, callback) 252 def apply_async(self, func, callback=None): 253 key = '%s-batch-%s' % (_funcname(func), uuid4().hex) --> 254 func, args = self._to_func_args(func) 255 256 future = self.client.submit(func, *args, key=key, **self.submit_kwargs) ~/.virtualenvs/asserttest/lib/python3.6/site-packages/joblib/_dask.py in _to_func_args(self, func) 241 tasks = [] 242 for f, args, kwargs in func.items: --> 243 args = list(maybe_to_futures(args)) 244 kwargs = dict(zip(kwargs.keys(), 245 maybe_to_futures(kwargs.values()))) ~/.virtualenvs/asserttest/lib/python3.6/site-packages/joblib/_dask.py in maybe_to_futures(args) 229 # more workers need to reuse this data 230 # concurrently. --> 231 [f] = self.client.scatter([arg]) 232 call_data_futures[arg] = f 233 ~/.virtualenvs/asserttest/lib/python3.6/site-packages/distributed/client.py in scatter(self, data, workers, broadcast, direct, hash, maxsize, timeout, asynchronous) 1874 broadcast=broadcast, direct=direct, 1875 local_worker=local_worker, timeout=timeout, -> 1876 asynchronous=asynchronous, hash=hash) 1877 1878 @gen.coroutine ~/.virtualenvs/asserttest/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs) 675 return future 676 else: --> 677 return sync(self.loop, func, *args, **kwargs) 678 679 def __repr__(self): ~/.virtualenvs/asserttest/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs) 317 else: 318 while not e.is_set(): --> 319 e.wait(10) 320 if error[0]: 321 six.reraise(*error[0]) /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py in wait(self, timeout) 549 signaled = self._flag 550 if not signaled: --> 551 signaled = self._cond.wait(timeout) 552 return signaled 553 /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py in wait(self, timeout) 297 else: 298 if timeout > 0: --> 299 gotit = waiter.acquire(True, timeout) 300 else: 301 gotit = waiter.acquire(False) KeyboardInterrupt: ```


Python Packages

$ pip freeze
appnope==0.1.0
backcall==0.1.0
Click==7.0
cloudpickle==1.2.1
dask==1.2.0
decorator==4.4.0
distributed==1.27.0
HeapDict==1.0.0
ipython==7.6.0
ipython-genutils==0.2.0
jedi==0.14.0
joblib==0.13.2
msgpack==0.6.1
numpy==1.16.4
parso==0.5.0
pexpect==4.7.0
pickleshare==0.7.5
prompt-toolkit==2.0.9
psutil==5.6.3
ptyprocess==0.6.0
Pygments==2.4.2
PyYAML==5.1.1
six==1.12.0
sortedcontainers==2.1.0
tblib==1.4.0
toolz==0.9.0
tornado==6.0.3
traitlets==4.3.2
wcwidth==0.1.7
zict==1.0.0

@jjerphan confirmed, thanks. Will take a closer look later today.

Contrary to @julioasotodv , it also hangs on my machine with:

dask==0.20.0
distributed==1.24.0
joblib==0.13.2
--- **Python version:** 3.6.8 **Machine:**
Darwin 18.6.0 Darwin Kernel Version 18.6.0: Thu Apr 25 23:16:27 PDT 2019; root:xnu-4903.261.4~2/RELEASE_X86_64 x86_64
**Python Packages:**
Click==7.0
cloudpickle==1.2.1
dask==0.20.0
distributed==1.24.0
HeapDict==1.0.0
joblib==0.13.2
msgpack==0.6.1
numpy==1.16.4
psutil==5.6.3
PyYAML==5.1.1
six==1.12.0
sortedcontainers==2.1.0
tblib==1.4.0
toolz==0.9.0
tornado==6.0.3
zict==1.0.0
Traceback
Traceback (most recent call last):
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 223, in maybe_to_futures
    f = call_data_futures[arg]
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 56, in __getitem__
    ref, val = self._data[id(obj)]
KeyError: 4597948288

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "", line 2, in 
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/parallel.py", line 924, in __call__
    while self.dispatch_one_batch(iterator):
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/parallel.py", line 759, in dispatch_one_batch
    self._dispatch(tasks)
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/parallel.py", line 716, in _dispatch
    job = self._backend.apply_async(batch, callback=cb)
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 254, in apply_async
    func, args = self._to_func_args(func)
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 243, in _to_func_args
    args = list(maybe_to_futures(args))
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 231, in maybe_to_futures
    [f] = self.client.scatter([arg])
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/distributed/client.py", line 1866, in scatter
tornado.application - ERROR - Exception in callback functools.partial(>, )
Traceback (most recent call last):
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 223, in maybe_to_futures
    f = call_data_futures[arg]
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 56, in __getitem__
    ref, val = self._data[id(obj)]
KeyError: 4598141776

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/tornado/gen.py", line 748, in run
    yielded = self.gen.send(value)
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 264, in callback_wrapper
    callback(result)  # gets called in separate thread
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/parallel.py", line 309, in __call__
    self.parallel.dispatch_next()
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/parallel.py", line 731, in dispatch_next
    if not self.dispatch_one_batch(self._original_iterator):
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/parallel.py", line 759, in dispatch_one_batch
    self._dispatch(tasks)
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/parallel.py", line 716, in _dispatch
    job = self._backend.apply_async(batch, callback=cb)
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 254, in apply_async
    func, args = self._to_func_args(func)
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 243, in _to_func_args
    args = list(maybe_to_futures(args))
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 231, in maybe_to_futures
    [f] = self.client.scatter([arg])
AssertionError: yield from wasn't used with future
    asynchronous=asynchronous, hash=hash)
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/distributed/client.py", line 665, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/distributed/utils.py", line 275, in sync
    e.wait(10)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 551, in wait
    signaled = self._cond.wait(timeout)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 299, in wait
    gotit = waiter.acquire(True, timeout)
KeyboardInterrupt
>>> distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker

This problem also happen on Centos7 for both environments.


Python version: 3.6.8
Machine:

$ uname -a
Linux 6ea377206ec7 4.9.125-linuxkit #1 SMP Fri Sep 7 08:20:28 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux

This problem also happen on Centos7 and MacOS for the latest version of joblib, dask and distributed.


Python version: 3.6.8
Python Environment :

Click==7.0
cloudpickle==1.2.1
dask==2.0.0
distributed==2.0.1
HeapDict==1.0.0
joblib==0.13.2
msgpack==0.6.1
numpy==1.16.4
psutil==5.6.3
PyYAML==5.1.1
six==1.12.0
sortedcontainers==2.1.0
tblib==1.4.0
toolz==0.9.0
tornado==6.0.3
zict==1.0.0

I ran this example again with logs in joblib and distributed and I got the following logs.

This can be reproduced with this setup that might help with debugging (cc @TomAugspurger).

By setting the batch_size to be the number of tasks (here 10), we can submit up to the full set of tasks in a row.

This can be done by modifying the previous snippet with:

# ...
with parallel_backend("dask"):
    results = Parallel(batch_size=10)(delayed(sum_values)(np.zeros(i), 5) for i in range(50000, 50010))
# ...

In this case. the execution does not hang but directly crashes.

My intuition is that some part of joblib is not playing well with the distributed engine.

If the input arrays given to delayed() are smaller, everything works fine. I believe that the problem is related to the max_nbytes argument in joblib.Parallel. If the arrays sizes are larger than this number, a client.scatter() is triggered from dask鈥檚 side, and ultimately creating the trouble.

However, I am not 100% sure.

Yes, I also think that this relates with the pre-scatter of large arguments as Errors are thrown when scattering.

This system was introduced in distributed#2020 : tasks are pre-scatter if their size exceed a given threshold. This was originally 1e6 bytes ; this is now 1e3 bytes.

The arguments that you give in your example exceed this latter threshold and hence are pre-scattered ; for instance:

>>> import numpy as np
>>> np.zeros(50000).nbytes
400000

I am not completely sure as well, but based on the logs of execution the problem maybe sits in the behaviour of the Client and the Scheduler and/or in their communication when BatchCompletionCallBack calls happen after some tasks completion and thus make Parallel dispatch next tasks.

Also, it might be possible that this pre-scatter interferes with remaining tasks being dispatched in the while loop in Parallel.__call__.

I believe this is related to joblib/joblib#852

This seems to be fixed by joblib#914.

@jjerphan indeed. At the time the PR is merged, this issue could be closed.

Was this page helpful?
0 / 5 - 0 ratings