Hi all, thank you for the excellent work on such a great library first. I used distributed as a backend of joblib, but when delayed functions are fed data that is of large size, it seems distributed.joblib will try to scatter the data first and an AssertionError is raised as below:
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x000001CB0DF54AE8>, <Future finished exception=AssertionError("yield from wasn't used with future",)>)
Traceback (most recent call last):
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 204, in maybe_to_futures
f = call_data_futures[arg]
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 67, in __getitem__
ref, val = self._data[id(obj)]
KeyError: 1971626397592
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\tornado\ioloop.py", line 759, in _run_callback
ret = callback()
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\tornado\stack_context.py", line 276, in null_wrapper
return fn(*args, **kwargs)
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\tornado\ioloop.py", line 780, in _discard_future_result
future.result()
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\tornado\gen.py", line 1113, in run
yielded = self.gen.send(value)
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 244, in callback_wrapper
callback(result) # gets called in separate thread
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\joblib\parallel.py", line 343, in __call__
self.parallel.dispatch_next()
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\joblib\parallel.py", line 763, in dispatch_next
if not self.dispatch_one_batch(self._original_iterator):
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\joblib\parallel.py", line 791, in dispatch_one_batch
self._dispatch(tasks)
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\joblib\parallel.py", line 748, in _dispatch
job = self._backend.apply_async(batch, callback=cb)
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 234, in apply_async
func, args = self._to_func_args(func)
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 224, in _to_func_args
args = list(maybe_to_futures(args))
File "C:\ProgramData\Anaconda3\envs\test\lib\site-packages\distributed\joblib.py", line 212, in maybe_to_futures
[f] = self.client.scatter([arg])
AssertionError: yield from wasn't used with future`
By comment out this scatter function, my code ran successfully and I think this may be a bug.
The error is gone by updating to the lastest version of both joblib and distributed.
@ZuoMatthew I am facing the same error with the latest stable version of dask, distributed and joblib. Do you remember which versions did you use in order to make it work?
A minimal reproducible example would be useful if this problem still exists.
Sorry, my bad.
Here it is:
import numpy as np
from dask.distributed import Client, LocalCluster
from joblib import Parallel, delayed, parallel_backend
def sum_values(array, scalar):
return (array + scalar).sum()
cluster = LocalCluster()
client = Client(cluster)
with parallel_backend("dask"):
results = Parallel()(delayed(sum_values)(np.zeros(i), 5) for i in range(50000, 50010))
print(results)
Edit: using the libraries:
dask==1.2.0distributed==1.27.0joblib==0.13.2@julioasotodv my experience is that when I run your example, but with smaller numbers, things work fine. When I run it with the numbers you've provided things take a while. In neither case does this look like the error raised in the original issue. Are you sure that this is the same issue?
If so, I unfortunately wasn't able to reproduce the error reported with the code that you've provided.
Hmm, that's weird.
The same code on my machine gets hung forever (wiht 0% CPU utilisation) until I Ctrl+C, and the traceback appears:
Traceback (most recent call last):
File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/joblib/_dask.py", line 223, in maybe_to_futures
f = call_data_futures[arg]
File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/joblib/_dask.py", line 56, in __getitem__
ref, val = self._data[id(obj)]
KeyError: 4669293792
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/tornado/ioloop.py", line 743, in _run_callback
ret = callback()
File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
future.result()
File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/tornado/gen.py", line 742, in run
yielded = self.gen.send(value)
File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/joblib/_dask.py", line 264, in callback_wrapper
callback(result) # gets called in separate thread
File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/joblib/parallel.py", line 309, in __call__
self.parallel.dispatch_next()
File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/joblib/parallel.py", line 731, in dispatch_next
if not self.dispatch_one_batch(self._original_iterator):
File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/joblib/parallel.py", line 759, in dispatch_one_batch
self._dispatch(tasks)
File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/joblib/parallel.py", line 716, in _dispatch
job = self._backend.apply_async(batch, callback=cb)
File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/joblib/_dask.py", line 254, in apply_async
func, args = self._to_func_args(func)
File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/joblib/_dask.py", line 243, in _to_func_args
args = list(maybe_to_futures(args))
File "/Users/n128766/anaconda3/envs/dalton/lib/python3.6/site-packages/joblib/_dask.py", line 231, in maybe_to_futures
[f] = self.client.scatter([arg])
AssertionError: yield from wasn't used with future
However, I just discovered that it happens with:
dask==1.2.0distributed==1.27.0But It doesn't with:
dask==0.20.0distributed==1.24.0That said, it just happens with large arrays (such as np.zeros(50000)). With small arrays it works just fine.
I can reproduce the issue on my machine.
Machine details
$ uname -a Darwin 18.6.0 Darwin Kernel Version 18.6.0: Thu Apr 25 23:16:27 PDT 2019; root:xnu-4903.261.4~2/RELEASE_X86_64 x86_64
In [1]: import numpy as np
...:
...: from dask.distributed import Client, LocalCluster
...: from joblib import Parallel, delayed, parallel_backend
...:
...: def sum_values(array, scalar):
...: return (array + scalar).sum()
...:
...: cluster = LocalCluster()
...: client = Client(cluster)
...:
...: with parallel_backend("dask"):
...: results = Parallel()(delayed(sum_values)(np.zeros(i), 5) for i in range(50000, 50010))
...:
...: print(results)
```python-traceback
tornado.application - ERROR - Exception in callback functools.partial(
Python Packages
$ pip freeze appnope==0.1.0 backcall==0.1.0 Click==7.0 cloudpickle==1.2.1 dask==1.2.0 decorator==4.4.0 distributed==1.27.0 HeapDict==1.0.0 ipython==7.6.0 ipython-genutils==0.2.0 jedi==0.14.0 joblib==0.13.2 msgpack==0.6.1 numpy==1.16.4 parso==0.5.0 pexpect==4.7.0 pickleshare==0.7.5 prompt-toolkit==2.0.9 psutil==5.6.3 ptyprocess==0.6.0 Pygments==2.4.2 PyYAML==5.1.1 six==1.12.0 sortedcontainers==2.1.0 tblib==1.4.0 toolz==0.9.0 tornado==6.0.3 traitlets==4.3.2 wcwidth==0.1.7 zict==1.0.0
@jjerphan confirmed, thanks. Will take a closer look later today.
Contrary to @julioasotodv , it also hangs on my machine with:
dask==0.20.0
distributed==1.24.0
joblib==0.13.2
---
**Python version:** 3.6.8
**Machine:**
Darwin 18.6.0 Darwin Kernel Version 18.6.0: Thu Apr 25 23:16:27 PDT 2019; root:xnu-4903.261.4~2/RELEASE_X86_64 x86_64
**Python Packages:**
Click==7.0
cloudpickle==1.2.1
dask==0.20.0
distributed==1.24.0
HeapDict==1.0.0
joblib==0.13.2
msgpack==0.6.1
numpy==1.16.4
psutil==5.6.3
PyYAML==5.1.1
six==1.12.0
sortedcontainers==2.1.0
tblib==1.4.0
toolz==0.9.0
tornado==6.0.3
zict==1.0.0
Traceback (most recent call last):
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 223, in maybe_to_futures
f = call_data_futures[arg]
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 56, in __getitem__
ref, val = self._data[id(obj)]
KeyError: 4597948288
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "", line 2, in
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/parallel.py", line 924, in __call__
while self.dispatch_one_batch(iterator):
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/parallel.py", line 759, in dispatch_one_batch
self._dispatch(tasks)
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/parallel.py", line 716, in _dispatch
job = self._backend.apply_async(batch, callback=cb)
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 254, in apply_async
func, args = self._to_func_args(func)
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 243, in _to_func_args
args = list(maybe_to_futures(args))
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 231, in maybe_to_futures
[f] = self.client.scatter([arg])
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/distributed/client.py", line 1866, in scatter
tornado.application - ERROR - Exception in callback functools.partial(>, )
Traceback (most recent call last):
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 223, in maybe_to_futures
f = call_data_futures[arg]
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 56, in __getitem__
ref, val = self._data[id(obj)]
KeyError: 4598141776
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/tornado/ioloop.py", line 743, in _run_callback
ret = callback()
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
future.result()
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/tornado/gen.py", line 748, in run
yielded = self.gen.send(value)
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 264, in callback_wrapper
callback(result) # gets called in separate thread
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/parallel.py", line 309, in __call__
self.parallel.dispatch_next()
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/parallel.py", line 731, in dispatch_next
if not self.dispatch_one_batch(self._original_iterator):
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/parallel.py", line 759, in dispatch_one_batch
self._dispatch(tasks)
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/parallel.py", line 716, in _dispatch
job = self._backend.apply_async(batch, callback=cb)
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 254, in apply_async
func, args = self._to_func_args(func)
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 243, in _to_func_args
args = list(maybe_to_futures(args))
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/joblib/_dask.py", line 231, in maybe_to_futures
[f] = self.client.scatter([arg])
AssertionError: yield from wasn't used with future
asynchronous=asynchronous, hash=hash)
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/distributed/client.py", line 665, in sync
return sync(self.loop, func, *args, **kwargs)
File "/Users/jjerphanion/.virtualenvs/asserttest2/lib/python3.6/site-packages/distributed/utils.py", line 275, in sync
e.wait(10)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 551, in wait
signaled = self._cond.wait(timeout)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 299, in wait
gotit = waiter.acquire(True, timeout)
KeyboardInterrupt
>>> distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
This problem also happen on Centos7 for both environments.
Python version: 3.6.8
Machine:
$ uname -a
Linux 6ea377206ec7 4.9.125-linuxkit #1 SMP Fri Sep 7 08:20:28 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux
This problem also happen on Centos7 and MacOS for the latest version of joblib, dask and distributed.
Python version: 3.6.8
Python Environment :
Click==7.0
cloudpickle==1.2.1
dask==2.0.0
distributed==2.0.1
HeapDict==1.0.0
joblib==0.13.2
msgpack==0.6.1
numpy==1.16.4
psutil==5.6.3
PyYAML==5.1.1
six==1.12.0
sortedcontainers==2.1.0
tblib==1.4.0
toolz==0.9.0
tornado==6.0.3
zict==1.0.0
I ran this example again with logs in joblib and distributed and I got the following logs.
This can be reproduced with this setup that might help with debugging (cc @TomAugspurger).
By setting the batch_size to be the number of tasks (here 10), we can submit up to the full set of tasks in a row.
This can be done by modifying the previous snippet with:
# ...
with parallel_backend("dask"):
results = Parallel(batch_size=10)(delayed(sum_values)(np.zeros(i), 5) for i in range(50000, 50010))
# ...
In this case. the execution does not hang but directly crashes.
My intuition is that some part of joblib is not playing well with the distributed engine.
If the input arrays given to delayed() are smaller, everything works fine. I believe that the problem is related to the max_nbytes argument in joblib.Parallel. If the arrays sizes are larger than this number, a client.scatter() is triggered from dask鈥檚 side, and ultimately creating the trouble.
However, I am not 100% sure.
Yes, I also think that this relates with the pre-scatter of large arguments as Errors are thrown when scattering.
This system was introduced in distributed#2020 : tasks are pre-scatter if their size exceed a given threshold. This was originally 1e6 bytes ; this is now 1e3 bytes.
The arguments that you give in your example exceed this latter threshold and hence are pre-scattered ; for instance:
>>> import numpy as np
>>> np.zeros(50000).nbytes
400000
I am not completely sure as well, but based on the logs of execution the problem maybe sits in the behaviour of the Client and the Scheduler and/or in their communication when BatchCompletionCallBack calls happen after some tasks completion and thus make Parallel dispatch next tasks.
Also, it might be possible that this pre-scatter interferes with remaining tasks being dispatched in the while loop in Parallel.__call__.
I believe this is related to joblib/joblib#852
This seems to be fixed by joblib#914.
@jjerphan indeed. At the time the PR is merged, this issue could be closed.
Most helpful comment
@jjerphan confirmed, thanks. Will take a closer look later today.