Distributed: `map_blocks` leads to downstream TypeError: can not serialize 'function' object

Created on 9 Mar 2021  路  6Comments  路  Source: dask/distributed

What happened:

The motivation for this bug report comes from the observation that using dask.array.map_blocks on a function def func(block_info=None) in combination with a distributed.Client to visualize arrays in napari leads to the error TypeError: can not serialize 'function' object.

The error occurs only when using the dask.distributed backend, it works with a dask backend.

The error occurs only when mapping a function def func(block_info=None) and not when using def func() or def func(x, block_info=None).

For more context, see https://github.com/dask/dask-image/issues/194.

What you expected to happen:

That passing dask arrays to napari (which slices the array and calls np.asarray) should work independently of whether a dask or dask.distributed backend is used.

Minimal Complete Verifiable Example:

The original error occurring in combination with napari:

%gui qt

import dask.array as da
import numpy as np

import napari
from dask.distributed import Client
from dask_image.imread import imread
client = Client()

xn = np.random.randint(0, 100, (20, 30, 30))
xd = da.from_array(xn, chunks=(1, 5, 5))

def func(block_info=None):
    return np.random.randint(0, 100, (1, 5, 5))

# this instead works (no `block_info` argument)
# def func():
#    return np.random.randint(0, 100, (1, 5, 5))

xm = da.map_blocks(func, chunks=xd.chunks, dtype=xd.dtype)
napari.view_image(xm)

Likely, this minimal example reproduces the error without napari:

import dask.array as da
import numpy as np

from dask.core import flatten
from dask.distributed import Client
client = Client()

xn = np.random.randint(0, 100, (2, 4, 4))
xd = da.from_array(xn, chunks=(1, 2, 2))

# fails
def func(block_info=None):
    return np.random.randint(0, 100, (1, 2, 2))

# works
# def func():
#     return np.random.randint(0, 100, (1, 2, 2))

xm = da.map_blocks(func, chunks=xd.chunks, dtype=xd.dtype)

xm.dask.__dask_distributed_pack__(client, set(flatten(xm.__dask_keys__())))

Anything else we need to know?:

The original error does not occur with dask and dask.distributed version 2.30.0 and lower (the minimal example is incompatible with older versions).

Environment:

  • Dask version: 2021.03.0
  • Python version: 3.9.2
  • Operating System: macOS
  • Install method (conda, pip, source): pip

Most helpful comment

Closed via https://github.com/dask/dask/pull/7353. Thanks @m-albert for reporting and @madsbk for fixing!

All 6 comments

@madsbk, do you have any thoughts here? 馃檪

Thanks for reporting @m-albert! I've not looked deeply into this, but dask.Array.map_blocks has some additional code paths it goes down when the function being mapped has a block_info= keyword. Based on code snippets above, that might be a good place to start looking

Just to clarify the issue is that HLG serialization uses MsgPack, but it encounters a function in the Dask graph, which causes things to fall apart. Please see this traceback for context ( https://github.com/dask/dask-image/issues/194#issuecomment-785625110 ). This came up when using Napari with Dask, but it appears the issue traces back to Dask alone.

Also as @jni points out, it seems a critical piece here is whether array fusion is enabled/disabled

import dask
import distributed

import dask.array as da
import numpy as np


c = distributed.Client()

xn = np.random.randint(0, 100, (20, 30, 30))
xd = da.from_array(xn, chunks=(1, 5, 5))

def func(block_info=None):
    return np.random.randint(0, 100, (1, 5, 5))

xm = da.map_blocks(func, chunks=xd.chunks, dtype=xd.dtype)


dask.config.set({"optimization.fuse.active": True})
xm[10:17, 4:6, 4:6].compute()  # works

dask.config.set({"optimization.fuse.active": False})
xm[10:17, 4:6, 4:6].compute()  # fails


Traceback:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-1-b2e0d1fb182c> in <module>
     20 
     21 dask.config.set({"optimization.fuse.active": False})
---> 22 xm[10:17, 4:6, 4:6].compute()  # fails

~/Developer/dask/dask/base.py in compute(self, **kwargs)
    281         dask.base.compute
    282         """
--> 283         (result,) = compute(self, traverse=False, **kwargs)
    284         return result
    285 

~/Developer/dask/dask/base.py in compute(*args, **kwargs)
    563         postcomputes.append(x.__dask_postcompute__())
    564 
--> 565     results = schedule(dsk, keys, **kwargs)
    566     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    567 

~/Developer/distributed/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2640         Client.compute : Compute asynchronous collections
   2641         """
-> 2642         futures = self._graph_to_futures(
   2643             dsk,
   2644             keys=set(flatten([keys])),

~/Developer/distributed/distributed/client.py in _graph_to_futures(self, dsk, keys, workers, allow_other_workers, priority, user_priority, resources, retries, fifo_timeout, actors)
   2548                 dsk = HighLevelGraph.from_collections(id(dsk), dsk, dependencies=())
   2549 
-> 2550             dsk = dsk.__dask_distributed_pack__(self, keyset)
   2551 
   2552             annotations = {}

~/Developer/dask/dask/highlevelgraph.py in __dask_distributed_pack__(self, client, client_keys)
    942                 }
    943             )
--> 944         return dumps_msgpack({"layers": layers})
    945 
    946     @staticmethod

~/Developer/distributed/distributed/protocol/core.py in dumps_msgpack(msg, compression)
    120     """
    121     header = {}
--> 122     payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
    123 
    124     fmt, payload = maybe_compress(payload, compression=compression)

~/miniconda/envs/dask/lib/python3.8/site-packages/msgpack/__init__.py in packb(o, **kwargs)
     33     See :class:`Packer` for options.
     34     """
---> 35     return Packer(**kwargs).pack(o)
     36 
     37 

msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

TypeError: can not serialize 'function' object

Closed via https://github.com/dask/dask/pull/7353. Thanks @m-albert for reporting and @madsbk for fixing!

Wow that was so quick :) Thanks @madsbk @jakirkham @jrbourbeau

Was this page helpful?
0 / 5 - 0 ratings