This might be related to #344. I'm using code examples here and here.
In both cases I found that, when running search.fit(digits.data, digits.target) an error occured:
distributed.utils - ERROR - No module named dask_searchcv.methods.
I checked I could import dask_searchcv.methods (as I'd previously run conda install dask-searchcv -c conda-forge -y).
I realised the problem was the module wasn't on the workers.
I connected to each and ran conda to install the module needed:
local$ dask-ec2 ssh 0
dask1$ conda install dask-searchcv -c conda-forge -y
dask1$ exit
local$ dask-ec2 ssh 1
dask1$ conda install dask-searchcv -c conda-forge -y
The original code now works.
Summary of question/issue: I wonder how to get modules on all the workers?
dask-ec2 install dask-searchcv numpy GPyAs a hack I could write a script that iterates through all the workers and runs the code I want on each...
What's the 'best' solution? It might be a feature that's needed?
I've documented my efforts here. It describes a few other work-arounds I've noticed I needed.
Yeah, you need packages installed on all workers for things to work.
A method of sending the same command to all workers?
You can do this with Client.run:
def install():
import os
os.system("conda install dask-searchcv -c conda-forge -y") # or pip
client.run(install) # Run on all workers
This is a common pattern I use when developing.
Passing a list of modules to load? e.g.
dask-ec2 install dask-searchcv numpy GPy
That might be nice - care to open an issue in the dask-ec2 repo? This is really an issue with setting up your cluster and not an issue with the distributed library specifically.
Thanks for the really helpful reply. Hopefully useful for others with the same problem.
Although it seems a bit clunky, I quite like the Client.run solution as it means I can include in the notebook all the modules that are needed - so it doesn't need a special invocation of dask-ec2 for each project (which each have different module requirements). Also I imagine some modules might need additional commands etc running to set them up.
Thinking a little more, it might be cool if there was a lazy option where you could ask dask-ec2 to look at the local machine and duplicate the modules on the workers.
I'll hold off opening another issue - looks like they've got enough already :)
I came across this issue and while the suggested solution helps for a static set of workers, it won't help for an adaptive cluster.
Not sure when this feature was added but by now one can register a plugin to execute code on lifecycle events for workers: https://docs.dask.org/en/latest/setup/custom-startup.html#worker-lifecycle-plugins
So this is how we solved it (for our set of dependencies):
from dask.distributed import Client, Worker, WorkerPlugin
import os
from typing import List
class DependencyInstaller(WorkerPlugin):
def __init__(self, dependencies: List[str]):
self._depencendies = " ".join(f"'{dep}'" for dep in dependencies)
def setup(self, _worker: Worker):
os.system(f"pip install {self._depencendies}")
dependency_installer = DependencyInstaller([
"pyarrow",
"google-cloud-bigquery-storage==0.8",
"google-cloud-bigquery==0.23"
])
client = Client()
client.register_worker_plugin(dependency_installer)
There was some effort around this here:
https://github.com/dask/distributed/pull/3216
On Fri, Jul 3, 2020 at 4:14 AM Swen Wenzel notifications@github.com wrote:
I came across this issue and while the suggested solution helps for a
static set of workers, it won't help for an adaptive cluster.
Not sure when this feature was added but by now one can register a plugin
to execute code on lifecycle events for workers:
https://docs.dask.org/en/latest/setup/custom-startup.html#worker-lifecycle-plugins
So this is how we solved it:from dask.distributed import Client, Worker, WorkerPluginimport osfrom typing import List
class DependencyInstaller(WorkerPlugin):
def __init__(self, dependencies: List[str]):
self._depencendies = " ".join(f"'{dep}'" for dep in dependencies)def setup(self, _worker: Worker): os.system(f"pip install {self._depencendies}")dependency_installer = DependencyInstaller([
"pyarrow",
"google-cloud-bigquery-storage==0.8",
"google-cloud-bigquery==0.23"
])client = Client()client.register_worker_plugin(dependency_installer)—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/1200#issuecomment-653495399,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/AACKZTGNF7L73MNZRTYIXY3RZW4RPANCNFSM4DQKXI6A
.
Most helpful comment
Yeah, you need packages installed on all workers for things to work.
You can do this with
Client.run:This is a common pattern I use when developing.
That might be nice - care to open an issue in the
dask-ec2repo? This is really an issue with setting up your cluster and not an issue with the distributed library specifically.