Reading from Parquet is failing with PyArrow 0.13. Downgrading to PyArrow 0.12.1 seems to fix the problem. I've only encountered this when using the distributed client. Using a Dask dataframe by itself does not appear to be affected.
For example,
from distributed import LocalCluster, Client
import dask.dataframe as dd
client = Client(LocalCluster(diagnostics_port=('0.0.0.0', 8889), n_workers = 4))
ddf = dd.read_parquet('DATA/parquet/', engine = 'pyarrow')
ddf.set_index('index')
Gives
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 54, in dumps
for key, value in data.items()
File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 55, in <dictcomp>
if type(value) is Serialize}
File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 163, in serialize
raise TypeError(msg, str(x)[:10000])
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
File "/opt/conda/lib/python3.6/site-packages/distributed/batched.py", line 94, in _background_send
on_error='raise')
File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 729, in run
value = future.result()
File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 736, in run
yielded = self.gen.throw(*exc_info) # type: ignore
File "/opt/conda/lib/python3.6/site-packages/distributed/comm/tcp.py", line 224, in write
'recipient': self._peer_addr})
File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 729, in run
value = future.result()
File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 736, in run
yielded = self.gen.throw(*exc_info) # type: ignore
File "/opt/conda/lib/python3.6/site-packages/distributed/comm/utils.py", line 50, in to_frames
res = yield offload(_to_frames)
File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 729, in run
value = future.result()
File "/opt/conda/lib/python3.6/concurrent/futures/_base.py", line 425, in result
return self.__get_result()
File "/opt/conda/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
File "/opt/conda/lib/python3.6/concurrent/futures/thread.py", line 56, in run
result = self.fn(*self.args, **self.kwargs)
File "/opt/conda/lib/python3.6/site-packages/distributed/comm/utils.py", line 43, in _to_frames
context=context))
File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 54, in dumps
for key, value in data.items()
File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/core.py", line 55, in <dictcomp>
if type(value) is Serialize}
File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 163, in serialize
raise TypeError(msg, str(x)[:10000])
Similarly,
from distributed import LocalCluster, Client
import dask.dataframe as dd
client = Client(LocalCluster(diagnostics_port=('0.0.0.0', 8889), n_workers = 4))
ddf = dd.read_parquet('DATA/parquet/', engine = 'pyarrow')
ddf.compute()
Causes this error
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
/opt/conda/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x)
37 try:
---> 38 result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
39 if len(result) < 1000:
AttributeError: Can't pickle local object 'ParquetDataset._get_open_file_func.<locals>.open_file'
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
<ipython-input-3-a90291ebde1e> in <module>
1 ft_inp_ddf = dd.read_parquet('DATA/ft_14_15_inputs_parquet/', engine = 'pyarrow')
----> 2 ft_inp_ddf.compute()
/opt/conda/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158
/opt/conda/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400
/opt/conda/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2318 retries=retries,
2319 user_priority=priority,
-> 2320 actors=actors,
2321 )
2322 packed = pack_data(keys, futures)
/opt/conda/lib/python3.6/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
2259
2260 self._send_to_scheduler({'op': 'update-graph',
-> 2261 'tasks': valmap(dumps_task, dsk3),
2262 'dependencies': dependencies,
2263 'keys': list(flatkeys),
/opt/conda/lib/python3.6/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
/opt/conda/lib/python3.6/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
/opt/conda/lib/python3.6/site-packages/distributed/worker.py in dumps_task(task)
2769 elif not any(map(_maybe_complex, task[1:])):
2770 return {'function': dumps_function(task[0]),
-> 2771 'args': warn_dumps(task[1:])}
2772 return to_serialize(task)
2773
/opt/conda/lib/python3.6/site-packages/distributed/worker.py in warn_dumps(obj, dumps, limit)
2778 def warn_dumps(obj, dumps=pickle.dumps, limit=1e6):
2779 """ Dump an object to bytes, warn if those bytes are large """
-> 2780 b = dumps(obj)
2781 if not _warn_dumps_warned[0] and len(b) > limit:
2782 _warn_dumps_warned[0] = True
/opt/conda/lib/python3.6/site-packages/distributed/protocol/pickle.py in dumps(x)
49 except Exception:
50 try:
---> 51 return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
52 except Exception as e:
53 logger.info("Failed to serialize %s. Exception: %s", x, e)
/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dumps(obj, protocol)
959 try:
960 cp = CloudPickler(file, protocol=protocol)
--> 961 cp.dump(obj)
962 return file.getvalue()
963 finally:
/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in dump(self, obj)
265 self.inject_addons()
266 try:
--> 267 return Pickler.dump(self, obj)
268 except RuntimeError as e:
269 if 'recursion' in e.args[0]:
/opt/conda/lib/python3.6/pickle.py in dump(self, obj)
407 if self.proto >= 4:
408 self.framer.start_framing()
--> 409 self.save(obj)
410 self.write(STOP)
411 self.framer.end_framing()
/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/opt/conda/lib/python3.6/pickle.py in save_tuple(self, obj)
749 write(MARK)
750 for element in obj:
--> 751 save(element)
752
753 if id(obj) in memo:
/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):
/opt/conda/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
632
633 if state is not None:
--> 634 save(state)
635 write(BUILD)
636
/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/opt/conda/lib/python3.6/pickle.py in save_dict(self, obj)
819
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
823 dispatch[dict] = save_dict
/opt/conda/lib/python3.6/pickle.py in _batch_setitems(self, items)
845 for k, v in tmp:
846 save(k)
--> 847 save(v)
848 write(SETITEMS)
849 elif n:
/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function(self, obj, name)
398 # func is nested
399 if lookedup_by_name is None or lookedup_by_name is not obj:
--> 400 self.save_function_tuple(obj)
401 return
402
/opt/conda/lib/python3.6/site-packages/cloudpickle/cloudpickle.py in save_function_tuple(self, func)
592 if hasattr(func, '__qualname__'):
593 state['qualname'] = func.__qualname__
--> 594 save(state)
595 write(pickle.TUPLE)
596 write(pickle.REDUCE) # applies _fill_function on the tuple
/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/opt/conda/lib/python3.6/pickle.py in save_dict(self, obj)
819
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
823 dispatch[dict] = save_dict
/opt/conda/lib/python3.6/pickle.py in _batch_setitems(self, items)
845 for k, v in tmp:
846 save(k)
--> 847 save(v)
848 write(SETITEMS)
849 elif n:
/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/opt/conda/lib/python3.6/pickle.py in save_list(self, obj)
779
780 self.memoize(obj)
--> 781 self._batch_appends(obj)
782
783 dispatch[list] = save_list
/opt/conda/lib/python3.6/pickle.py in _batch_appends(self, items)
806 write(APPENDS)
807 elif n:
--> 808 save(tmp[0])
809 write(APPEND)
810 # else tmp is empty, and we're done
/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
519
520 # Save the reduce() output and finally memoize the object
--> 521 self.save_reduce(obj=obj, *rv)
522
523 def persistent_id(self, obj):
/opt/conda/lib/python3.6/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
632
633 if state is not None:
--> 634 save(state)
635 write(BUILD)
636
/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
474 f = self.dispatch.get(t)
475 if f is not None:
--> 476 f(self, obj) # Call unbound method with explicit self
477 return
478
/opt/conda/lib/python3.6/pickle.py in save_dict(self, obj)
819
820 self.memoize(obj)
--> 821 self._batch_setitems(obj.items())
822
823 dispatch[dict] = save_dict
/opt/conda/lib/python3.6/pickle.py in _batch_setitems(self, items)
845 for k, v in tmp:
846 save(k)
--> 847 save(v)
848 write(SETITEMS)
849 elif n:
/opt/conda/lib/python3.6/pickle.py in save(self, obj, save_persistent_id)
494 reduce = getattr(obj, "__reduce_ex__", None)
495 if reduce is not None:
--> 496 rv = reduce(self.proto)
497 else:
498 reduce = getattr(obj, "__reduce__", None)
/opt/conda/lib/python3.6/site-packages/pyarrow/_parquet.cpython-36m-x86_64-linux-gnu.so in pyarrow._parquet.ParquetSchema.__reduce_cython__()
TypeError: no default __reduce__ due to non-trivial __cinit__
It appears that, in the absence of a serialization mechanism directly in pyarrow, we'll have to write a dask specific one for these schema objects, at least for now. I can have a go early this coming week, if no one beats me to it. In the mean time @johnbensnyder , you can try to downgrade to pyarrow 0.12, or use fastparquet.
Sorry to add noise in this issue, but is this also similar to this -> https://github.com/dask/distributed/issues/2581#issuecomment-480472832?. If yes, I would like to try to solve it for pytorch, too.
@muammar , that issue is in deed the same type of thing happening, but not related directly to this one, as it's a completely different sort of object which is causing the problem.
@johnbensnyder , I have not managed to reproduce this locally; could you specify which versions you are using and how you generated the data, please?
@muammar , that issue is in deed the same type of thing happening, but not related directly to this one, as it's a completely different sort of object which is causing the problem.
It is happening for me when I use the latest git version and install distributed with python3 setup.py install. If I install the stable distributed version it works.
OK, thanks, updating to current master did indeed reveal the situation you are seeing.
At first glance, I do not know how to fix it. The new pyarrow ParquetDataset includes an opaque parquet schema instance (a C++-managed object) as an attribute, which has no de/serialise mechanism on the python side; apparently an arrow schema instance is not enough here anymore (I assume conversion to/from thrift is implemented within c++).
It also holds onto references to open files in _open_file_func, _get_open_file_func and the pieces' open_file_func, which will not serialise if it is a local file.
I am including this as information for someone who might know how to fix these problems, perhaps on the (py)arrow side.
My guess is that if you want to engage the pyarrow devs that you should raise a JIRA on their issue tracker. I recommend trying to reproduce the issue using only cloudpickle and pyarrow and then raising there.
Thanks for reporting upstream @martindurant !
OK, so we haven't gotten any response on the arrow issue tracker. Any thoughts on how we should handle this? @martindurant if you have any inclination, I would love it if you were willing to handle/track this problem and find some solution (technical, social, or otherwise).
I have no technical solution from our side, and the cython code for this looks... rather complicated.
My initial hunch points to this changeset https://github.com/apache/arrow/commit/f2fb02b82b60ba9a90c8bad6e5b11e37fc3ea9d3
By experimenting, I could get serialisation of the ParquetDataset by removing the schema attribute and also from its pieces; but the hanging function defs certainly in this diff still fouled pickle if not cloudpickle.
Other connected commits:
https://github.com/apache/arrow/commit/f70dbd1dbdb51a47e6a8a8aac8efd40ccf4d44f2 (at least renames the metadata stuff)
https://github.com/apache/arrow/commit/86f480a2a246f0c885031aff8d21f68640dbd72a (main one for the RangeIndex implementation)
I don't know who at arrow can help ( @kszucs perhaps?)
Yes, some suggestions on how we might proceed:
These are things that I might do when trying to resolve this problem
@pitrou @xhochy any thoughts?
Sorry, I've missed the notification. I can take a look tomorrow, if that's OK for You.
(I can also handle this)
https://github.com/apache/arrow/pull/4156 should resolve the pickling issue for ParquetDataset and ParquetPiece.
Note, that this is a new feature, pickling wasn't implemented for these objects, see https://issues.apache.org/jira/browse/ARROW-5144
I am having a very similar problem using bcolz. The system is an offline system so I can't copy over the error message.
Versions in use are:
dask==1.2.0
distributed==1.27.0
bcolz==1.2.1
I haven't been able to find any version combo yet that works...
^ this is completely without pyarrow 0.13?
correct.. I was not using pyarrow at all. I did try and install the version of pyarrow that was mentioned above to work and nothing. I was able to kind of hack this out to make it work by not using the dd.from_bcolz method to construct the DataFrame but rather just wrap the bcolz.todataframe method with delayed and it seems to be working right now.. and as I say that my job just got killed due to being out of memory...
bcolz is not that much used so, although there have certainly been some comm changes in distributed, I would suspect this might just be a coincidence. Still, it would be good to work out what in bcolz doesn't serialise, when this happened, and whether it was a change in bcolz or in distributed or something else, like cloudpickle.
The bcolz issue appears to be separate. I recommend raising a separate
issue so that this issue doesn't get off track.
On Fri, Apr 26, 2019 at 10:34 AM Martin Durant notifications@github.com
wrote:
bcolz is not that much used so, although there have certainly been some
comm changes in distributed, I would suspect this might just be a
coincidence. Still, it would be good to work out what in bcolz doesn't
serialise, when this happened, and whether it was a change in bcolz or in
distributed or something else, like cloudpickle.—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/2597#issuecomment-487101286,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AACKZTC4RFXOKGOATTFPICTPSMOJBANCNFSM4HD76L3Q
.
I Have a Dask DataFrame and when I call map_partitions in it. & Inside every Dataframe I am trying to call cascated_union from shapely in a list of Points, it gives me the very same Error.
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):...............
...................................
Is this the same issue? Does anyone knows how to call cascated_union in each partition?
@pankalos most likely it's a similar, but different issue to this one. You may want to read through https://distributed.dask.org/en/latest/serialization.html, and experiment with why a list of shapely points apparently can't be serialized.
@pankalos @TomAugspurger For reference, I had the same problem as @pankalos and it got solved by downgrading pyarrow to 0.12 fixed the problem.
This ought to now work with pyarrow 0.14, if someone would like to try.
Hi,
I've updated to version:
pyarrow 0.14
arrow-cpp 0.14
and I'm able to serialize and read parquet.
Thx!
I have a somewhat similar issue to @pankalos but with trying to use the dask DataFrame and read_csv function.
The weird thing is that I've used dask (distributed) to create the series of .csv files, but when I try to read them back in (in distributed mode) I get the following error:
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File "c:\users\...\python\python36\lib\site-packages\distributed\protocol\core.py", line 46, in dumps
for key, value in data.items()
File "c:\users\...\programs\python\python36\lib\site-packages\distributed\protocol\core.py", line 47, in <dictcomp>
if type(value) is Serialize
File "c:\users\...\programs\python\python36\lib\site-packages\distributed\protocol\serialize.py", line 164, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function check_meta at 0x000001F48B4A1EA0>, (<function apply at 0x000001F48B1C4400>, <function pandas_read_text at 0x000001F48B50D158>, [<function _make_parser_function.<locals>.parser_f at 0x000001F4E983D6A8>, (<function read_block_from_file at 0x000001F48B3949D8>, <dask.bytes.core.OpenFile object at 0x000001F48C5E3D30>, 0, 64000000, b'\\n'), etc...
I loaded Dask using the following:
cluster = LocalCluster()
client = Client(cluster, serializers=['dask'])
client
Versions:
dask = 2.0.0
python = 3.6.4
'packages': {'required': (('dask', '2.0.0'),
('distributed', '2.0.1'),
('msgpack', '0.6.1'),
('cloudpickle', '1.2.1'),
('tornado', '5.1.1'),
('toolz', '0.9.0')),
'optional': (('numpy', '1.16.4'),
('pandas', '0.25.0'),
('bokeh', '1.0.4'),
('lz4', None),
('dask_ml', '1.0.0'),
('blosc', None))}},
Is this similar? This is a new issue, I've never had this problem before, and running with proccesses=False does make it work but I lose out on the performance gain that I am looking for.
Any thoughts?
Since the tuple is in the error message, you could %debug and try to serialise each member in turn, to find out which it is that is causing the problem.
So I tried using the %debug magic command, but it shed any real light onto the source of the error. The original error plus TypeError: ('Could not serialize object of type tuple.' is returned even when I pass a single .csv as to the read_csv command:
import dask.dataframe as dd
dd.read_csv('data/raw/file_name_1.csv').compute()
and this file is not serialized by dask, this is the original .csv that I started with. This also only occurs with the read_csv command, the to_csv works fines.
The other thing to note is that the dask threads just hang on this failure. They don't quit or restart, but just hang after the error, and I need to restart the notebook kernel to perform other dask function (apart from read_csv).
If this is a separate issue, I can move this to a new thread.
The point of debug is to go to where the tuple is being serialised, and find out which of its members is causing the problem.
Hi,
we are also getting the error with dd.read_csv.
we created a simple csv file with a list of integers and tried to read them in using:
df = dd.read_csv('hdfs:///tmp/demofile.csv')
We tried to set dtype=object but still getting the same error.
we did not manage to get additional information using %debug
I can confirm that the HDFS file system instances are pickleable, and so are open-file instances that reference them. I made a dataframe from csv with a file on HDFS, as in the example, and it pickles fine with cloudpickle, at least on my current setup. In short, I cannot reproduce this problem.
In this issue, we still have no idea what it is that is failing to serialise, and since I cannot reproduce, I'm afraid it's down to the rest of you to try and find out!
After running update on the OS and re-running a clean installation on a new conda environment I was not able to reproduce the error .
I have a similar issue. Downgrade pandas from 0.25 to 0.24.2 worked for me.
I confirm that the problem seems to be due to pandas 0.25, since I had similar issue using the azure connection and as such had no dependency on pyarrow. The problem was solved as indicated by @tsonic.
@TomAugspurger , could there be something in new pandas that is not serialisable?
@AlexisMignon , @tsonic would appreciate a sample of your data, so we can reproduce.
I'll take a look if someone has a reproducible example: http://matthewrocklin.com/blog/work/2018/02/28/minimal-bug-reports
The following code (in a test.py file) raises the problem for me:
import dask.dataframe as dd
from dask.distributed import Client
import pandas as pd
if __name__ == "__main__":
df = pd.DataFrame(
data=[("key_%i" % (i // 2), i) for i in range(100000)],
columns=["key", "value"]
)
df.to_csv("./data.csv", index=False)
c = Client("localhost:8786")
dd_data = dd.read_csv("./data.csv", blocksize=100000)
dd_data.groupby("key")["value"].sum().compute()
Launched by doing:
python3 -m venv venv
source venv/bin/activate
pip install pandas==0.25 dask==2.1.0 distributed==2.1.0
dask-scheduler &
dask-worker localhost:8786 &
python test.py
The same thing with pandas==0.24.2 works
Works with dask 2.2.0+38.g266c314 (master) and pandas 0.25.0
I am using conda-forge packages. Pandas 0.25.0 causes the issue as shown in @AlexisMignon 's comment for dask+dask-distributed 2.3.0. Downgrading pandas to 0.24.2, which also causes dask+dask-distributed to downgrade to 2.2.0, works without TypeError.
@hongzmsft do you have a reproducible example? I think we're still looking for one.
Alternatively, does the example from https://github.com/dask/distributed/issues/2597#issuecomment-521155030 work for you?
@TomAugspurger I am using the example from https://github.com/dask/distributed/issues/2597#issuecomment-521155030 . The program runs fine with dask=2.2.0, distributed=2.2.0, pandas=0.24.2, python=3.6. It also runs fine with dask=2.3.0, distributed=2.3.0, pandas=0.25.0, python=3.7.0. but raise a TypeError as reported in this issue with dask=2.3.0, distributed=2.3.0, pandas=0.25.0, python=3.6.
Here is a docker file to reproduce the issue:
FROM debian:buster-slim
RUN apt-get update && \
apt-get install -qy wget && \
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh && \
bash Miniconda3-latest-Linux-x86_64.sh -b -p /opt/conda && \
rm Miniconda3-latest-Linux-x86_64.sh
COPY test.py /
RUN chmod +x /test.py
ENTRYPOINT ["/test.py"]
RUN /opt/conda/bin/conda install -c conda-forge dask=2.3* distributed=2.3* pandas=0.25.0 python=3.6*
A slightly modified test.py from https://github.com/dask/distributed/issues/2597#issuecomment-521155030 :
#!/opt/conda/bin/python
import dask.dataframe as dd
from dask.distributed import Client
import pandas as pd
if __name__ == "__main__":
df = pd.DataFrame(
data=[("key_%i" % (i // 2), i) for i in range(100000)],
columns=["key", "value"]
)
df.to_csv("./data.csv", index=False)
c = Client(n_workers=2)
dd_data = dd.read_csv("./data.csv", blocksize=100000)
dd_data.groupby("key")["value"].sum().compute()
The error is like:
TypeError: ('Could not serialize object of type list.', "[<function _make_parser_function.<locals>.parser_f at 0x7f3c3f884ea0>, (<function read_block_from_file at 0x7f3c3f367d90>, <OpenFile '/data.csv'>, 0, 100000, b'\\n'), b'key,value\\n', (<class 'dict'>, []), (<class 'dict'>, [['key', dtype('O')], ['value', dtype('int64')]]), ['key', 'value']]")
The error seems slightly different from what the original issue is about, but consistent with other responses.
Thanks for the nice example @hongzmsft and @AlexisMignon! I'm able to reproduce the TypeError on Python 3.6 and, as @hongzmsft pointed out, the error does not occur on Python 3.7.
After some digging, it looks like this serialization issue has to do with cloudpickle's handling of type annotation (which are used in pd.read_csv) for Python 3.4-3.6. There's already an upstream issue in cloudpickle for this (ref https://github.com/cloudpipe/cloudpickle/issues/298) and, even better, there's an open PR with a fix (https://github.com/cloudpipe/cloudpickle/pull/299)!
I can confirm that, with the change in https://github.com/cloudpipe/cloudpickle/pull/299, the example in https://github.com/dask/distributed/issues/2597#issuecomment-523209146 runs successfully on Python 3.6
I can confirm that, with the change in cloudpipe/cloudpickle#299, the example in #2597 (comment) runs successfully on Python 3.6
Could you please elaborate on how you implemented that change in cloudpickle locally?
I've tried replacing cloudpickle.py and cloudpickle_py of cloudpipe/cloudpickle#299 in my site-packages libraries manually since the version that patches that bug isn't available yet. But it didn't worked.
Once there's a new release of cloudpickle you'll be able to upgrade to the latest version using pip or conda as you normally would (e.g. pip install cloudpickle --upgrade). In the meantime, you can use pip to install the development version of cloudpickle directly from GitHub with:
pip install git+https://github.com/cloudpipe/cloudpickle
Hopefully that helps @gcoimbra
I had to upgrade PyArrow from 0.12.1 to 0.14 because dask asked me to it. But it worked!
Thanks a lot @jrbourbeau !
cloudpickle 1.2.2 was released.
The problem came back even with cloudpickle 1.2.2. It happens with PyArrow and fastparquet. Shoud I post this to https://github.com/dask/dask/issues/5317?
Traceback (most recent call last):
File "/home/green/.venvs/agt/lib/python3.6/site-packages/distributed/comm/utils.py", line 29, in _to_frames
msg, serializers=serializers, on_error=on_error, context=context
File "/home/green/.venvs/agt/lib/python3.6/site-packages/distributed/protocol/core.py", line 44, in dumps
for key, value in data.items()
File "/home/green/.venvs/agt/lib/python3.6/site-packages/distributed/protocol/core.py", line 45, in <dictcomp>
if type(value) is Serialize
File "/home/green/.venvs/agt/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 167, in serialize
for obj in x
File "/home/green/.venvs/agt/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 167, in <listcomp>
for obj in x
File "/home/green/.venvs/agt/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 167, in serialize
for obj in x
File "/home/green/.venvs/agt/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 167, in <listcomp>
for obj in x
File "/home/green/.venvs/agt/lib/python3.6/site-packages/distributed/protocol/serialize.py", line 210, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type list.', "[<function _make_parser_function.<locals>.parser_f at 0x7fb4e13e06a8>, (<function read_block_from_file at 0x7fb4d5eecd90>, <OpenFile '/hdd/home/green/CDC/6Periodo/NESPED/AGT/datasets/towers.csv'>, 2240000000, 64000000, b'\\n'), b'radio,mcc,net,area,cell,unit,lon,lat,range,samples,changeable,created,updated,averageSignal\\n', (<class 'dict'>, [['dtype', (<class 'dict'>, [['radio', 'category'], ['mcc', <class 'numpy.uint16'>], ['net', <class 'numpy.uint8'>], ['area', <class 'numpy.uint16'>], ['cell', <class 'numpy.uint32'>], ['lat', <class 'numpy.float32'>], ['lon', <class 'numpy.float32'>], ['range', <class 'numpy.uint32'>], ['samples', <class 'numpy.uint64'>], ['changeable', <class 'bool'>], ['created', <class 'numpy.uint32'>], ['updated', <class 'numpy.uint32'>], ['averageSignal', <class 'numpy.uint64'>]])], ['usecols', dict_keys(['radio', 'mcc', 'net', 'area', 'cell', 'lat', 'lon', 'range', 'samples', 'changeable', 'created', 'updated', 'averageSignal'])]]), (<class 'dict'>, [['radio', 'category'], ['mcc', dtype('uint16')], ['net', dtype('uint8')], ['area', dtype('uint16')], ['cell', dtype('uint32')], ['lon', dtype('float32')], ['lat', dtype('float32')], ['range', dtype('uint32')], ['samples', dtype('uint64')], ['changeable', dtype('bool')], ['created', dtype('uint32')], ['updated', dtype('uint32')], ['averageSignal', dtype('uint64')]]), ['radio', 'mcc', 'net', 'area', 'cell', 'lon', 'lat', 'range', 'samples', 'changeable', 'created', 'updated', 'averageSignal']]")
The error message is bigger because it happens on other threads too.
@gcoimbra , as before, it would be really useful to us if you were to go through the values in that list, which are probably also in dict(df.dask), to see which thing in there fails with cloudpickle; I assume it's the _make_parser_function thing, which comes from pyarrow. Is it something else with fastparquet?
@martindurant
It seems that I was causing the problem. The problem doesn't seems to be with PyArrow or Fastparquet. Because it happens in when I try to read a csv with dask.dataframe.read_csv using usecols optional argument passing a dict_keys object instead of a list. Then, dask tries to serialize the following object (that can be seen in my previous post) is created:
['usecols', dict_keys(...)]
Removing the usecols or using usecols=list() argument fixes the problem. I'm sorry for the trouble.
Do you want me to try to fix the problem and submit a pull request?
I'm not sure there's anything to fix: the (pandas) docstring says that usecols must be list-like.
In any case, I'll close this, since we now know what's going on.
I got the same error in Jul 10, 2020
MacPro 2017 macos catalina with miniconda env
How to solve the error? Is this because I have a single laptop and for distributed we need a cluster of multiple computers?
# imports
import numpy as np
import pandas as pd
import dask
import dask.dataframe as dd
import dask.array as da
import dask_ml
import pyarrow
print([(x.__name__,x.__version__) for x in
[np,pd, dask, dask_ml,pyarrow]])
[('numpy', '1.18.5'), ('pandas', '1.0.5'), ('dask', '2.20.0'), ('dask_ml', '1.5.0'), ('pyarrow', '0.17.1')]
# data
a = da.random.normal(size=(2000, 2000), chunks=(1000, 1000)) # data
res = a.dot(a.T).mean(axis=0) # operation
res = res.persist() # start computation in the background
# code
from dask.distributed import Client, progress
client = Client() # use dask.distributed by default
progress(res) # watch progress
res.compute() # convert to final result when done if desired
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/protocol/core.py", line 106, in loads
header = msgpack.loads(header, use_list=False, **msgpack_opts)
File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 135, in unpackb
ret = unpacker._unpack()
File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 676, in _unpack
ret[key] = self._unpack(EX_CONSTRUCT)
File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 672, in _unpack
"%s is not allowed for map key" % str(type(key))
ValueError: <class 'tuple'> is not allowed for map key
distributed.core - ERROR - <class 'tuple'> is not allowed for map key
Traceback (most recent call last):
File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/core.py", line 456, in handle_stream
msgs = await comm.read()
File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/comm/tcp.py", line 212, in read
frames, deserialize=self.deserialize, deserializers=deserializers
File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/comm/utils.py", line 69, in from_frames
res = _from_frames()
File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/comm/utils.py", line 55, in _from_frames
frames, deserialize=deserialize, deserializers=deserializers
File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/protocol/core.py", line 106, in loads
header = msgpack.loads(header, use_list=False, **msgpack_opts)
File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 135, in unpackb
ret = unpacker._unpack()
File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 676, in _unpack
ret[key] = self._unpack(EX_CONSTRUCT)
File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 672, in _unpack
"%s is not allowed for map key" % str(type(key))
ValueError: <class 'tuple'> is not allowed for map key
distributed.core - ERROR - <class 'tuple'> is not allowed for map key
Traceback (most recent call last):
File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/core.py", line 412, in handle_comm
result = await result
File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/scheduler.py", line 2491, in add_client
await self.handle_stream(comm=comm, extra={"client": client})
File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/core.py", line 456, in handle_stream
msgs = await comm.read()
File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/comm/tcp.py", line 212, in read
frames, deserialize=self.deserialize, deserializers=deserializers
File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/comm/utils.py", line 69, in from_frames
res = _from_frames()
File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/comm/utils.py", line 55, in _from_frames
frames, deserialize=deserialize, deserializers=deserializers
File "/Users/poudel/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/protocol/core.py", line 106, in loads
header = msgpack.loads(header, use_list=False, **msgpack_opts)
File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 135, in unpackb
ret = unpacker._unpack()
File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 676, in _unpack
ret[key] = self._unpack(EX_CONSTRUCT)
File "/Users/poudel/.local/lib/python3.7/site-packages/msgpack-1.0.0rc1-py3.7-macosx-10.7-x86_64.egg/msgpack/fallback.py", line 672, in _unpack
"%s is not allowed for map key" % str(type(key))
ValueError: <class 'tuple'> is not allowed for map key
---------------------------------------------------------------------------
CancelledError Traceback (most recent call last)
<ipython-input-7-4d5bc96d9bb1> in <module>
1 progress(res) # watch progress
2
----> 3 res.compute() # convert to final result when done if desired
~/opt/miniconda3/envs/dsk/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
164 dask.base.compute
165 """
--> 166 (result,) = compute(self, traverse=False, **kwargs)
167 return result
168
~/opt/miniconda3/envs/dsk/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
442 postcomputes.append(x.__dask_postcompute__())
443
--> 444 results = schedule(dsk, keys, **kwargs)
445 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
446
~/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2590 should_rejoin = False
2591 try:
-> 2592 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2593 finally:
2594 for f in futures.values():
~/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1888 direct=direct,
1889 local_worker=local_worker,
-> 1890 asynchronous=asynchronous,
1891 )
1892
~/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
766 else:
767 return sync(
--> 768 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
769 )
770
~/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
343 if error[0]:
344 typ, exc, tb = error[0]
--> 345 raise exc.with_traceback(tb)
346 else:
347 return result[0]
~/.local/lib/python3.7/site-packages/distributed-2.9.3-py3.7.egg/distributed/utils.py in f()
327 if callback_timeout is not None:
328 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 329 result[0] = yield future
330 except Exception as exc:
331 error[0] = sys.exc_info()
~/opt/miniconda3/envs/dsk/lib/python3.7/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
CancelledError:
I'm not sure there's anything to fix: the (pandas) docstring says that
usecolsmust be list-like.
Can't we warn users with a ValueError exception?
I don't think we're generally in a position to check the types of arguments that we pass on to other functions.
Most helpful comment
I have a similar issue. Downgrade pandas from 0.25 to 0.24.2 worked for me.