What happened:
With python 3.6.9, my job got stuck at 998/1000 and left the following log message on the worker. At this point the job didn't continue for >10 minutes. I had to stop it manually.
distributed.worker - ERROR - 'buffers' is an invalid keyword argument for this function Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/distributed/worker.py", line 2511, in execute data[k] = self.data[k] File "/usr/local/lib/python3.6/site-packages/zict/buffer.py", line 78, in __getitem__ return self.slow_to_fast(key) File "/usr/local/lib/python3.6/site-packages/zict/buffer.py", line 65, in slow_to_fast value = self.slow[key] File "/usr/local/lib/python3.6/site-packages/zict/func.py", line 38, in __getitem__ return self.load(self.d[key]) File "/usr/local/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 502, in deserialize_bytes return deserialize(header, frames) File "/usr/local/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 302, in deserialize return loads(header, frames) File "/usr/local/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 64, in pickle_loads return pickle.loads(x, buffers=buffers) File "/usr/local/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 64, in loads return pickle.loads(x, buffers=buffers) TypeError: 'buffers' is an invalid keyword argument for this function
What you expected to happen:
My job finishes
Minimal Complete Verifiable Example:
On python 3.6, run
>>> from distributed.protocol import pickle
>>> pickle.loads(b"", buffers=(1))
...
TypeError: 'buffers' is an invalid keyword argument for this function
Anything else we need to know?:
There is a backport of pickle5 to 3.6 on https://pypi.org/project/pickle5/
I am trying to work around that problem as follows. Thx @samaust for the remark in #3843 .
import pickle5
from distributed.protocol import pickle
pickle.pickle = pickle5
def pickle_dumps(x):
header = {"serializer": "pickle"}
frames = [None]
buffer_callback = lambda f: frames.append(memoryview(f))
frames[0] = pickle.dumps(x, buffer_callback=buffer_callback)
return header, frames
def pickle_loads(header, frames):
x, buffers = frames[0], frames[1:]
return pickle.loads(x, buffers=buffers)
from distributed.protocol.serialize import register_serialization_family
register_serialization_family('pickle', pickle_dumps, pickle_loads)
Environment:
With python 3.6.9, my job got stuck at 998/1000 and left the following log message on the worker. At this point the job didn't continue for >10 minutes. I had to stop it manually.
Oof, that sounds frustrating. My apologies.
Minimal Complete Verifiable Example:
Ideally this would be something that you were trying to do with Dask. Is it easy for you to provide us an example that someone like you would try doing normally that would fail? I ask because our normal test suite passes fine on Python 3.6 and somehow failed to trigger this failure. It would be useful to know what was going on and have a test for it.
cc @jakirkham because of the pickle connection.
Unfortunately it is not very easy to show an example. It seems, that my job usually doesn't fill the buffer and executes normally. In my use-case it is rather an edge-case, when dask uses buffers=buffers keyword.
Oof, that sounds frustrating. My apologies.
No worries anymore :) Thx for your empathy.
Can you verify that all of your machines have the same version of Python
running with client.get_versions(check=True) ?
On Wed, Jun 3, 2020 at 7:42 AM michaelnarodovitch notifications@github.com
wrote:
Unfortunately it is not very easy to show an example. It seems, that my
job usually doesn't fill the buffer and executes normally. It seems to be
rather an edge-case, when dask uses buffers=buffers keyword.โ
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/3851#issuecomment-638244099,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/AACKZTDP4WGWELYXZ72OKWDRUZOOVANCNFSM4NRVXOYQ
.
Yes, all have the same versions, as they run from the same docker image: python 3.6.9.final.0. I am quite sure, that the problem is related to the following commit. https://github.com/dask/distributed/commit/ddc6377b76f957ef4a3a607f1dc2cf2ba772abc7
Ended up with the following workaround. It seems to help
from distributed.diagnostics.plugin import WorkerPlugin
import dask
class Pickle5Hack(WorkerPlugin):
def setup(self, worker: dask.distributed.Worker):
import sys
import pickle5
sys.modules['pickle'] = pickle5
client = Client('localhost:8786')
client.register_worker_plugin(Pickle5Hack())
Thx for the great project and great documentation.
I'm glad to see that you were able to find a solution to help work around things.
Let's wait a bit to see if @jakirkham has some thoughts. If memory serves he's responsible for the pickle5 changes and may know more.
Yeah, while I believe there could be an issue, Jim and I haven't been able to find anything and we still haven't identified a reproducer, which makes it hard to do anything helpful here. Would you be able to come up with a reproducer for the behavior @michaelnarodovitch?
Independently we would like to get pickle5 working with this, but it doesn't currently ( https://github.com/dask/distributed/issues/2495 ) as this would depend on cloudpickle having this functionality ( https://github.com/cloudpipe/cloudpickle/pull/370 ), which depends on some structural changes to cloudpickle ( https://github.com/cloudpipe/cloudpickle/pull/368 ) that still need wrapping up.
+1 on getting a reproducer if possible
If we're unable to find out what's going on then maybe we revert the change, or apply it only for appropriate versions of Python?
FWIW someone reported an issue with this the other day and it turned out to be some other usage error ( https://github.com/dask/distributed/issues/3843 ). So it may not be this change per se, but instead some other upstream error that is not being handled gracefully.
Mh, I see that this is a tricky one. I was able to reproduce on my cluster. The job forced disk spills with subsequent shuffles, which collect ~4 GB pandas dataframes on 7 GB memory nodes (to make it reproduce pretty fast, I reduced the memory of the nodes). The following stacktrace might provide more insight.
distributed.core - INFO - Event loop was unresponsive in Worker for 4.36s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.protocol.pickle - INFO - Failed to deserialize b'<MY_BIG_BUFFER - pandas_dataframe_holds_4GB_of_data>'
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 64, in loads
return pickle.loads(x, buffers=buffers)
TypeError: 'buffers' is an invalid keyword argument for this function
distributed.worker - ERROR - 'buffers' is an invalid keyword argument for this function
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/distributed/worker.py", line 2511, in execute
data[k] = self.data[k]
File "/usr/local/lib/python3.6/site-packages/zict/buffer.py", line 78, in __getitem__
return self.slow_to_fast(key)
File "/usr/local/lib/python3.6/site-packages/zict/buffer.py", line 65, in slow_to_fast
value = self.slow[key]
File "/usr/local/lib/python3.6/site-packages/zict/func.py", line 38, in __getitem__
return self.load(self.d[key])
File "/usr/local/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 502, in deserialize_bytes
return deserialize(header, frames)
File "/usr/local/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 302, in deserialize
return loads(header, frames)
File "/usr/local/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 64, in pickle_loads
return pickle.loads(x, buffers=buffers)
File "/usr/local/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 64, in loads
return pickle.loads(x, buffers=buffers)
TypeError: 'buffers' is an invalid keyword argument for this function
Dask: 2.17.2
Dask Distributed: 2.17.0
Python: 3.6.9
Thank you so much for following up on this.
Can you please share the code? Otherwise I'm afraid we won't be able to reproduce it.
So I've dug into this it a bit and this is what I have come up with. Sharing the reproducer for now. Though I haven't debugged it at all yet. This passes on 2.16.0 and fails on 2.17.0 using Python 3.7.
from distributed.protocol import deserialize_bytes, serialize_bytes
b = 2**27 * b"a"
deserialize_bytes(serialize_bytes(b, serializers=["pickle"]))
Heh on the bright side we already solved this before ( https://github.com/dask/distributed/pull/3639 ). ๐ We were just missing the test, which we now have. Will clean that up.
Heh on the bright side we already solved this before ( #3639 ). ๐ We were just missing the test, which we now have. Will clean that up.
This is ready for testing/review ๐
Oh great that you found something :) I didn't really understand the failure mode in my use-case, and the code of my use-case was not something to share directly.
Just by looking at the serializer code, it is everything else then obvious.
The reproducer looks promising to me.
PS: My 'fixes' didn't help with 2.17.0. On 2.16.0, the job is executing just fine.
Upgrading to distributed==2.17.0 has been giving me headaches - I've since found
distributed.worker - ERROR - 'buffers' is an invalid keyword argument for this function
in my logs while my workers' memory use rises slowly to 80% where they pause indefinitely.
I can confirm that when running the branch in #3639, I no longer have this problem!
(Python 3.7, dask==2.17.2, distributed==2.17.0)
Pulled via pip install git+https://github.com/jakirkham/distributed.git@33594d3c22dfa3fac4fdb82f7860da3492dafd49 and reran my (previously failing) usecase with that version. Works flawless now.
Great, thanks for the feedback! ๐
The fix is in distributed version 2.18.0. So please upgrade to get the fix ๐
Thank you! It works as expected now.
Most helpful comment
The fix is in
distributedversion2.18.0. So please upgrade to get the fix ๐