client.persist() API provided an option to provide workers as 'dict' in order to specify which keys/computation needs to be run on which worker. However in PR #4406 this was removed and I now get TypeError: unhashable type "dict'.
To provide more information on what I'm trying to do:
I have each worker mapped to a GPU in my system. When I run dask shuffle, I need partition 0 to got to GPU 0, partition 1 to GPU 1 and so on. Prior to the change I accomplished this by passing a dict mapping keys to intended workers/GPUs in client.persist.
1) Is this feature permanently removed?
2) Is there some other way in dask to accomplish what I'm trying to do?
Thanks for raising an issue @Iroy30. We recently moved specifying keywords like workers=, priority=, etc. with Dask collections to use Dask's annotation machinery. If you need to specify a specific worker for each partition you can use something along the lines of:
with dask.annotate(workers=<callable>)):
client.persist(objects)
where <callable> should be replaced with a callable that takes a given partition key as input and returns the corresponding worker you want the partition to be persisted on
cc @ian-r-rose for visibility
cc @pentschev @quasiben (for vis as well)
@jrbourbeau Thanks. I will try this.
Hi, I tried the annotation machinery. This is a small reproducer of the intended workflow:
ddf['src'] = ddf['src']*2
worker_addresses = Comms.get_workers()
def key_to_worker(key):
return tuple([worker_addresses[key[1]]])
with dask.annotate(workers = key_to_worker):
pe = client.persist(ddf)
pa = futures_of(pe)
wait(pa)
where ddf is a dask dataframe and worker_addresses is simply a list of worker addresses when client is initialized.
The key_to_worker function takes in a key and returns the corresponding worker indexed by the partition number in the key.
This gives me the following error:
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File "/home/nfs/iroy/anaconda3/envs/env_dev/lib/python3.8/site-packages/distributed/protocol/core.py", line 39, in dumps
small_header, small_payload = dumps_msgpack(msg, **compress_opts)
File "/home/nfs/iroy/anaconda3/envs/env_dev/lib/python3.8/site-packages/distributed/protocol/core.py", line 184, in dumps_msgpack
payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
File "/home/nfs/iroy/anaconda3/envs/env_dev/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'function' object
Is this the correct way of using the dask annotation ?
What version of Dask + Distributed are you using?
It looks like this is an issue on the current dask and distributed main branches. Here's a minimal reproducer
import dask
import dask.array as da
from dask.distributed import Client
if __name__ == "__main__":
with Client() as client:
worker = next(iter(client.scheduler_info()["workers"].keys()))
def key_to_worker(key):
return worker
x = da.ones(100, chunks=100)
with dask.annotate(workers=key_to_worker):
xp = client.persist(x)
print(xp.compute())
The key_to_worker function makes its way into the update-graph-hlg message because we pull in annotations set by dask.annotate, which in this case is a non-msgpack serializable function, here.
@Iroy30 as a temporary workaround you can move the dask.annotate context manager around the creation of your Dask collection instead of around the persist call and things should work. I.e. instead of
x = da.ones(100, chunks=100)
with dask.annotate(workers=key_to_worker):
xp = client.persist(x)
do
with dask.annotate(workers=key_to_worker):
x = da.ones(100, chunks=100)
xp = client.persist(x)
cc @madsbk
Most helpful comment
Thanks for raising an issue @Iroy30. We recently moved specifying keywords like
workers=,priority=, etc. with Dask collections to use Dask's annotation machinery. If you need to specify a specific worker for each partition you can use something along the lines of:where
<callable>should be replaced with a callable that takes a given partition key as input and returns the corresponding worker you want the partition to bepersisted oncc @ian-r-rose for visibility