Distributed: BlockwiseIO is not always msgpack serializable

Created on 17 Dec 2020  路  17Comments  路  Source: dask/distributed

Multiple TypeErrors are raised during (de-)serialization on master CI

TypeError: can not serialize 'CreateArraySubgraph' object

E.g. https://travis-ci.org/github/dask/distributed/jobs/750189428

Most helpful comment

This was reverted in PR ( https://github.com/dask/dask/pull/6995 ). Though leaving open to discuss reintegrating the original change with fixes

All 17 comments

cc @ian-r-rose @rjzamora

It looks the new BlockwiseIO functionality is not fully msgpack-serializable (on both the dataframe and array side).

Two MWEs, which don't optimize the high-level-graph in order to delay materialization:

import dask.dataframe
import distributed

client = distributed.Client()
ddf = dask.dataframe.read_parquet("test.parquet")
ddf.compute(optimize_graph=False)
import dask.array
import distributed

client = distributed.Client()
arr = dask.array.ones((1000, 1000), chunks=(100,100))
arr.compute(optimize_graph=False)

We need to (a) make sure that these mappings can be serialized, and (b) make sure that is tested in core to catch it sooner in the future.

Interesting - I had not tested any distributed code with optimize_graph=False. Thanks for pointing this out @ian-r-rose !

Sorry about this!

As Ian already mentioned, none of the BlockwiseIO functionality is serializable. This is because the __dask_distributed_pack__/__dask_distributed_unpack__ definitions do not handle the dictionary of IO dependencies (io_deps). I am trying to figure out if it makes more more sense to require subgraphs (like CSVSubgraph) to implement their own serialization, or if BlockwiseIO should just materialize the IO subgraph to perform serialization. The latter might be better, because it would be agnostic to the underlying IO subgraph.

I will try to think through a solution tonight, but I am also curious to know if @madsbk has any ideas :)

Thanks for looking into this @rjzamora. I suspect it would be easier for downstream implementers if it was handled closer to BlockwiseIO, rather than needing to pass packing/unpacking info down the chain. The good news is that subgraphs like CSVSubgraph are by design not very stateful, so rehydrating them on command is probably not too difficult.

I'd be interested to hear what you think best practices are for serializing functions (as I did with ones/zeros in BlockwiseCreateArray). distributed.worker.dumps_function?

I'd be interested to hear what you think best practices are for serializing functions (as I did with ones/zeros in BlockwiseCreateArray). distributed.worker.dumps_function?

Yeah I think that is what we do with tasks. So makes sense. Basically uses pickle and if that doesn't work cloudpickle.

As Ian already mentioned, none of the BlockwiseIO functionality is serializable. This is because the __dask_distributed_pack__/__dask_distributed_unpack__ definitions do not handle the dictionary of IO dependencies (io_deps). I am trying to figure out if it makes more more sense to require subgraphs (like CSVSubgraph) to implement their own serialization, or if BlockwiseIO should just materialize the IO subgraph to perform serialization. The latter might be better, because it would be agnostic to the underlying IO subgraph.

I suggest that BlockwiseIO.__dask_distributed_pack__() calls self.io_deps.__reduce__(), which is available and msgpack compatible if they inherit from Layer. If not, materialize them using dict().

Does that seem reasonable to you @rjzamora? 馃檪

Does that seem reasonable to you @rjzamora? 馃檪

@madsbk and I had a chat about this earlier today. It looks like we should be able to use dumps_task to serialize io_deps (after materializing the subgraphs). From there, it is probably just a matter of accounting for stringified key names during the injection of IO tasks in the resulting Blockwise graph.

It looks like we _should_ be able to use dumps_task to serialize io_deps (after materializing the subgraphs).

I would think that there is still value to avoiding materializing the subgraph until it reaches the scheduler. If it is possible to write some custom pack/unpack logic for a BlockwiseIO layer, might it be worth it?

I would think that there is still value to avoiding materializing the subgraph until it reaches the scheduler. If it is possible to write some custom pack/unpack logic for a BlockwiseIO layer, might it be worth it?

Yep - Step 1 is to support serialization of the io-subgraph as a raw graph (to get everything working). Step 2 is to check if the underlying subgraph supports a special serialization routine.

Short term, I wonder if it makes sense to temporarily revert https://github.com/dask/dask/pull/6931? That way CI for distributed would be unblocked until we have a fix like https://github.com/dask/distributed/issues/4374#issuecomment-748217714 implemented. I'd be happy to learn if we think https://github.com/dask/distributed/issues/4374#issuecomment-748217714 is tractable and could be implemented soon-ish, but with folks beginning to take off for the holidays it would be nice to leave distributed's test suite in a runnable state.

I think it would be fine to revert this dask/dask#6931 for the time being. A "Step 1" fix may be easy, "Step 2" perhaps less so, but in either event it would give us the space to talk through it.

This was reverted in PR ( https://github.com/dask/dask/pull/6995 ). Though leaving open to discuss reintegrating the original change with fixes

We can close this when dask#7042 is merged.

Closing this since dask#7048 resolves the problem.

Thanks for following up here @rjzamora!

Was this page helpful?
0 / 5 - 0 ratings

Related issues

fjetter picture fjetter  路  6Comments

mrocklin picture mrocklin  路  4Comments

quasiben picture quasiben  路  7Comments

sofroniewn picture sofroniewn  路  5Comments

tom-andersson picture tom-andersson  路  3Comments