Current client.restart() restarts all workers and entire cluster. Is there a way to restart or perform stop, start, restart operation on single / list of workers using the client apis.
This will allow cleanup of specific workers only without affecting others where additional tasks may be running.
No, there is not currently an easy way to do this.
Is it possible to restart the scheduler? One thing I noticed is that if you have published datasets and then call client.restart, the datasets are still published in the scheduler. If another client then asks for the result from one of those datasets, the scheduler does not reissue the work to a worker. It seems like client.restart should also restart the scheduler or at least unpublish all datasets.
Unpublishing all datasets when performing a restart would be easy to implement, e.g., by adding a restart() method to PublishExtension.
I recommend raising the published datasets topic as a separate issue.
Add retire_workers looks like helps shutting down workers without removing them completely. But is there a way to start these workers again using an api.
You're right. That PR doesn't solve your issue.
Any ideas or suggestions about this issue. I want to use worker restart to refresh local packages cache. Is it makes sense to make PR for this.
I would like to restart the worker after every task is run as there seems to be a memory leak with dask and my tasks (which does not occur when the task is run locally). any ideas - or should I make a new PR.
I came across this requirement as well:
I solved it with the below. This assumes your dask-worker(s) are overseen by nanny managers. This will throw a CommClosedError as the scheduler can no longer communicate with the worker but otherwise it should reboot, however hacky.
import os
my_worker = 'name-or-ip-address-to-worker'
client.run(lamda: os._exit(0), workers=[my_worker])
I'm investigating this as a potential solution to https://github.com/dask/distributed/issues/391#issuecomment-413421527, in which we occasionally see unresponsive workers during network-heavy aggregation operations.
Looking through the existing restart code, it appears that a something akin to "retire workers" could be implemented that first
makes an attempt to replicate data from the worker that's about to be restarted, via:
Then issues a command to the nanny (if present) to restart the worker process:
@mrocklin, Would this make sense as an additional scheduler api?
I have looked at retire_workers in the past and I would agree that it makes sense in the restart-particular-worker scenario. However, if the worker is unresponsive, what is the hope to be able to copy its data?
Most helpful comment
I came across this requirement as well:
I solved it with the below. This assumes your dask-worker(s) are overseen by nanny managers. This will throw a
CommClosedErroras the scheduler can no longer communicate with the worker but otherwise it should reboot, however hacky.