Distributed: Installing modules on workers

Created on 22 Jun 2017  Â·  4Comments  Â·  Source: dask/distributed

This might be related to #344. I'm using code examples here and here.
In both cases I found that, when running search.fit(digits.data, digits.target) an error occured:
distributed.utils - ERROR - No module named dask_searchcv.methods.

I checked I could import dask_searchcv.methods (as I'd previously run conda install dask-searchcv -c conda-forge -y).

I realised the problem was the module wasn't on the workers.

Temporary Solution

I connected to each and ran conda to install the module needed:

    local$ dask-ec2 ssh 0
    dask1$ conda install dask-searchcv -c conda-forge -y
    dask1$ exit
    local$ dask-ec2 ssh 1
    dask1$ conda install dask-searchcv -c conda-forge -y

The original code now works.

Summary of question/issue: I wonder how to get modules on all the workers?

Ideas

  • A method of sending the same command to all workers?
  • Passing a list of modules to load? e.g. dask-ec2 install dask-searchcv numpy GPy
  • I could make an image maybe that has the modules in? (with anaconda etc on it already) but that's awkward if I realise I need another module.

As a hack I could write a script that iterates through all the workers and runs the code I want on each...

What's the 'best' solution? It might be a feature that's needed?

Installation information

I've documented my efforts here. It describes a few other work-arounds I've noticed I needed.

Most helpful comment

Yeah, you need packages installed on all workers for things to work.

A method of sending the same command to all workers?

You can do this with Client.run:

def install():
    import os
    os.system("conda install dask-searchcv -c conda-forge -y")  # or pip

client.run(install)  # Run on all workers

This is a common pattern I use when developing.

Passing a list of modules to load? e.g. dask-ec2 install dask-searchcv numpy GPy

That might be nice - care to open an issue in the dask-ec2 repo? This is really an issue with setting up your cluster and not an issue with the distributed library specifically.

All 4 comments

Yeah, you need packages installed on all workers for things to work.

A method of sending the same command to all workers?

You can do this with Client.run:

def install():
    import os
    os.system("conda install dask-searchcv -c conda-forge -y")  # or pip

client.run(install)  # Run on all workers

This is a common pattern I use when developing.

Passing a list of modules to load? e.g. dask-ec2 install dask-searchcv numpy GPy

That might be nice - care to open an issue in the dask-ec2 repo? This is really an issue with setting up your cluster and not an issue with the distributed library specifically.

Thanks for the really helpful reply. Hopefully useful for others with the same problem.

Although it seems a bit clunky, I quite like the Client.run solution as it means I can include in the notebook all the modules that are needed - so it doesn't need a special invocation of dask-ec2 for each project (which each have different module requirements). Also I imagine some modules might need additional commands etc running to set them up.

Thinking a little more, it might be cool if there was a lazy option where you could ask dask-ec2 to look at the local machine and duplicate the modules on the workers.

I'll hold off opening another issue - looks like they've got enough already :)

I came across this issue and while the suggested solution helps for a static set of workers, it won't help for an adaptive cluster.
Not sure when this feature was added but by now one can register a plugin to execute code on lifecycle events for workers: https://docs.dask.org/en/latest/setup/custom-startup.html#worker-lifecycle-plugins
So this is how we solved it (for our set of dependencies):

from dask.distributed import Client, Worker, WorkerPlugin
import os
from typing import List


class DependencyInstaller(WorkerPlugin):
    def __init__(self, dependencies: List[str]):
        self._depencendies = " ".join(f"'{dep}'" for dep in dependencies)

    def setup(self, _worker: Worker):
        os.system(f"pip install {self._depencendies}")


dependency_installer = DependencyInstaller([
    "pyarrow",
    "google-cloud-bigquery-storage==0.8",
    "google-cloud-bigquery==0.23"
])
client = Client()
client.register_worker_plugin(dependency_installer)

There was some effort around this here:
https://github.com/dask/distributed/pull/3216

On Fri, Jul 3, 2020 at 4:14 AM Swen Wenzel notifications@github.com wrote:

I came across this issue and while the suggested solution helps for a
static set of workers, it won't help for an adaptive cluster.
Not sure when this feature was added but by now one can register a plugin
to execute code on lifecycle events for workers:
https://docs.dask.org/en/latest/setup/custom-startup.html#worker-lifecycle-plugins
So this is how we solved it:

from dask.distributed import Client, Worker, WorkerPluginimport osfrom typing import List

class DependencyInstaller(WorkerPlugin):
def __init__(self, dependencies: List[str]):
self._depencendies = " ".join(f"'{dep}'" for dep in dependencies)

def setup(self, _worker: Worker):
    os.system(f"pip install {self._depencendies}")

dependency_installer = DependencyInstaller([
"pyarrow",
"google-cloud-bigquery-storage==0.8",
"google-cloud-bigquery==0.23"
])client = Client()client.register_worker_plugin(dependency_installer)

—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
https://github.com/dask/distributed/issues/1200#issuecomment-653495399,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/AACKZTGNF7L73MNZRTYIXY3RZW4RPANCNFSM4DQKXI6A
.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

lostmygithubaccount picture lostmygithubaccount  Â·  4Comments

mrocklin picture mrocklin  Â·  6Comments

m-albert picture m-albert  Â·  6Comments

quasiben picture quasiben  Â·  5Comments

DPeterK picture DPeterK  Â·  3Comments