Kedro: Max Workers for Parallel Runner

Created on 6 Sep 2019  路  6Comments  路  Source: quantumblacklabs/kedro

Description

I am working with a very slow database connection to get data migrated to S3. The parallel runner allows me to get up to the number of cpu core connections running in parallel.

Context

I would like to be able to kick it up a notch while waiting on slow sql queries.

Possible Implementation

Parallel Runner Module

As shown int the ProcessPoolExecutor docs max_workers can be passed into the ProcessPoolExecutor

Change Line 34 in the parallel runner module to bring in Optional.

- from typing import Any, Dict
+ from typing import Any, Dict, Optional

Change Line 59-68 in the parallel runner module

-    def __init__(self):
+   def __init__(self, max_workers: Optional[int]=None):
        """Instantiates the runner by creating a Manager.
+    
+     Args:
          max_workers
        """
        self._manager = ParallelRunnerManager()
        self._manager.start()
+     self.max_workers = max_workers

Change line 170 in the parallel runner module to the following

```diff python
- with ProcessPoolExecutor() as pool:
+ with ProcessPoolExecutor(max_workers=self.max_workers) as pool:

### kedro_cli in template

add `MAX_WORKERS_HELP` to [kedro_cli line 91](https://github.com/quantumblacklabs/kedro/blob/861baa5ee8dd1c8ffce3ef83ae598fa38ecb55e6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/kedro_cli.py#L91)

``` diff
+
+MAX_WORKERS_HELP = """Maximum number of process to run in parallel. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.
In the default template add a flag to the cli in [kedro_cli lines 102-147](https://github.com/quantumblacklabs/kedro/blob/861baa5ee8dd1c8ffce3ef83ae598fa38ecb55e6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/kedro_cli.py#L102-L147)

``` diff
@click.group(context_settings=CONTEXT_SETTINGS, name=__file__)
def cli():
    """Command line tools for manipulating a Kedro project."""


@cli.command()
@click.option("--from-nodes", type=str, default="", help=FROM_NODES_HELP)
@click.option("--to-nodes", type=str, default="", help=TO_NODES_HELP)
@click.option(
    "--node",
    "-n",
    "node_names",
    type=str,
    default=None,
    multiple=True,
    help=NODE_ARG_HELP,
)
@click.option(
    "--runner", "-r", type=str, default=None, multiple=False, help=RUNNER_ARG_HELP
)
@click.option("--parallel", "-p", is_flag=True, multiple=False, help=PARALLEL_ARG_HELP)
+ @click.option("--max-workers", type=int, default=None, multiple=False, help=MAX_WORKERS_HELP)
@click.option("--env", "-e", type=str, default=None, multiple=False, help=ENV_ARG_HELP)
@click.option("--tag", "-t", type=str, default=None, multiple=True, help=TAG_ARG_HELP)
- def run(tag, env, parallel, runner, node_names, to_nodes, from_nodes):
+ def run(tag, env, parallel, runner, max_workers, node_names, to_nodes, from_nodes):
    """Run the pipeline."""
    from {{cookiecutter.python_package}}.run import main
    from_nodes = [n for n in from_nodes.split(",") if n]
    to_nodes = [n for n in to_nodes.split(",") if n]

    if parallel and runner:
        raise KedroCliError(
            "Both --parallel and --runner options cannot be used together. "
            "Please use either --parallel or --runner."
        )
+
+  if runner and max_workers != None:
+     raise KedroCliError(
+        "Both --runner and --max-workers options cannot be used together."
+        "Please use either --parallel with --max-workers or --runner."

    if parallel:
        runner = "ParallelRunner"
    runner_class = load_obj(runner, "kedro.runner") if runner else SequentialRunner
+
+   runner_kwargs = {}
+   if parallel and max_workers !=None:
+      runner_kwargs['max_workers'] = max_workers

    main(
        tags=tag,
        env=env,
-         runner=runner_class(),
+         runner=runner_class(**runner_kwargs), # i am not familiar with load_obj, so I am not completely sure this is correct
        node_names=node_names,
        from_nodes=from_nodes,
        to_nodes=to_nodes,
    )

Docs

04_create_pipelines.md line 727

- * `ParallelRunner` - runs your nodes in parallel; independent nodes are able to run at the same time, allowing you to take advantage of multiple CPU cores.
+ * `ParallelRunner` - runs your nodes in parallel; independent nodes are able to run at the same time, allowing you to take advantage of multiple CPU cores as set by `max_workers`.

05_nodes_and_pipelines.md line 526

+ > *Note: * the parallel runner can be called with --max-workers to set the maximum number of worker processes.

004_create_pipelines.md line 737
``` diff

  • > *Note: * the parallel runner can be called with --max-workers to set the maximum number of worker processes.
Feature Request

All 6 comments

Hi @WaylonWalker, that's great, thank you for your effort!
I agree it's very useful. The next step would be to have a separate runner type (gevent/eventlet based) for IO-heavy operations, but for now it should do the job.

Can you please send a PR with these changes?

Thanks @Flid I'll work on it.

@WaylonWalker
We had a quick conversation about this issue with the team. We are trying to limit the complexity of the tool public interface, and would prefer to not have another CLI parameter for the run command (I'm talking about --max-workers). It has lots of switches already, and this one new is not going to be widely used really.
At the same time, we consider the entire kedro_cli.py file to be a part of the "public interface", something the Kedro user can access and modify easily in their projects. That's why, if you just add it as a parameter to the ParallelRunner (max_workers, as you did already), then users can "hardcode" the parameter in the file themselves. It's not as nice for your case, but keeps it simple for the majority of users.
Sorry a bit of your work is wasted, but the rest is still very appreciated. It would be great if you send a pull request.

@Flid Thanks for the heads up. I'll make sure to revert that part. Our team will be using a custom cookiecutter based on kedro new with a few customizations for us. I can make the change there.

I was able to easily make the change, testing it has proven tricky as my current pipeline is in 14.3 and several things seemed broken when I tried to use my branch of kedro on it. Nothing major just need a bit of time to make the upgrade and test it out.

@WaylonWalker We have added this in https://github.com/quantumblacklabs/kedro/commit/4dc48e7b17104356934b5a7cb91c760a739fe44e (and we are releasing the new version soon). I am closing this issue but let us know if you have any questions :)

Thanks @921kiyo I am just now getting several of my pipelines up to the latest version so that I could test.

Was this page helpful?
0 / 5 - 0 ratings