Distributed: Unable to provide workers as dict for client.persist()

Created on 1 Mar 2021  路  7Comments  路  Source: dask/distributed

client.persist() API provided an option to provide workers as 'dict' in order to specify which keys/computation needs to be run on which worker. However in PR #4406 this was removed and I now get TypeError: unhashable type "dict'.

To provide more information on what I'm trying to do:
I have each worker mapped to a GPU in my system. When I run dask shuffle, I need partition 0 to got to GPU 0, partition 1 to GPU 1 and so on. Prior to the change I accomplished this by passing a dict mapping keys to intended workers/GPUs in client.persist.
1) Is this feature permanently removed?
2) Is there some other way in dask to accomplish what I'm trying to do?

Most helpful comment

Thanks for raising an issue @Iroy30. We recently moved specifying keywords like workers=, priority=, etc. with Dask collections to use Dask's annotation machinery. If you need to specify a specific worker for each partition you can use something along the lines of:

    with dask.annotate(workers=<callable>)):
        client.persist(objects)

where <callable> should be replaced with a callable that takes a given partition key as input and returns the corresponding worker you want the partition to be persisted on

cc @ian-r-rose for visibility

All 7 comments

Thanks for raising an issue @Iroy30. We recently moved specifying keywords like workers=, priority=, etc. with Dask collections to use Dask's annotation machinery. If you need to specify a specific worker for each partition you can use something along the lines of:

    with dask.annotate(workers=<callable>)):
        client.persist(objects)

where <callable> should be replaced with a callable that takes a given partition key as input and returns the corresponding worker you want the partition to be persisted on

cc @ian-r-rose for visibility

cc @pentschev @quasiben (for vis as well)

@jrbourbeau Thanks. I will try this.

Hi, I tried the annotation machinery. This is a small reproducer of the intended workflow:

    ddf['src'] = ddf['src']*2
    worker_addresses = Comms.get_workers()
    def key_to_worker(key):
        return tuple([worker_addresses[key[1]]])
    with dask.annotate(workers = key_to_worker):
        pe = client.persist(ddf)
    pa = futures_of(pe)
    wait(pa)

where ddf is a dask dataframe and worker_addresses is simply a list of worker addresses when client is initialized.
The key_to_worker function takes in a key and returns the corresponding worker indexed by the partition number in the key.

This gives me the following error:

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/home/nfs/iroy/anaconda3/envs/env_dev/lib/python3.8/site-packages/distributed/protocol/core.py", line 39, in dumps
    small_header, small_payload = dumps_msgpack(msg, **compress_opts)
  File "/home/nfs/iroy/anaconda3/envs/env_dev/lib/python3.8/site-packages/distributed/protocol/core.py", line 184, in dumps_msgpack
    payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
  File "/home/nfs/iroy/anaconda3/envs/env_dev/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'function' object

Is this the correct way of using the dask annotation ?

What version of Dask + Distributed are you using?

It looks like this is an issue on the current dask and distributed main branches. Here's a minimal reproducer

import dask
import dask.array as da
from dask.distributed import Client


if __name__ == "__main__":

    with Client() as client:
        worker = next(iter(client.scheduler_info()["workers"].keys()))

        def key_to_worker(key):
            return worker

        x = da.ones(100, chunks=100)

        with dask.annotate(workers=key_to_worker):
            xp = client.persist(x)

        print(xp.compute())

The key_to_worker function makes its way into the update-graph-hlg message because we pull in annotations set by dask.annotate, which in this case is a non-msgpack serializable function, here.

@Iroy30 as a temporary workaround you can move the dask.annotate context manager around the creation of your Dask collection instead of around the persist call and things should work. I.e. instead of

        x = da.ones(100, chunks=100)

        with dask.annotate(workers=key_to_worker):
            xp = client.persist(x)

do

        with dask.annotate(workers=key_to_worker):
            x = da.ones(100, chunks=100)

        xp = client.persist(x)

cc @madsbk

Was this page helpful?
0 / 5 - 0 ratings

Related issues

djhoese picture djhoese  路  3Comments

macks22 picture macks22  路  7Comments

quasiben picture quasiben  路  5Comments

lionfish0 picture lionfish0  路  4Comments

fjetter picture fjetter  路  6Comments