Prefect: Local agent does not parallelize work

Created on 21 Apr 2020  路  3Comments  路  Source: PrefectHQ/prefect

Description

Currently switching our workflows to go from local flow execution with LocalDaskCluster to a mode with Prefect server and local agents. One issue I encountered is that all my tasks are running sequentially instead of distributing them out e.g. on multiple processes or threads. Even tried with a second Local agent but it doesn't pick up work. It is not quite obvious IMHO in the documentation what the difference is between Executors and Agents and how the relate to each other

The following image shows sequential execution of my tasks.
image

The dependency graph looks like this:
image

We ran our jobs like this before:

executor = LocalDaskExecutor(scheduler="processes", num_workers=15)
end_state = flow.run(executor=executor)

Now we register the flow in the server like the following:

flow.save("<location>/flow.prefect")
flow_reloaded = flow.load(
      "<location>/flow.prefect"
 )
flow_reloaded.register()

Expected Behavior

What did you expect to happen instead?
That the local agent maximized parallism or allows to specify similiar to LocalDaskScheduler how many workers or which distribution mode I want (e.g. threads or processes)

Reproduction

Basically all flows which are run with Local agent

prefect server start
prefect agent start
<registr flow>
<run flow from server UI>

Environment

prefect diagnostics

{
  "config_overrides": {},
  "env_vars": [
    "PREFECT__LOGGING__LEVEL"
  ],
  "system_information": {
    "platform": "macOS-10.15.4-x86_64-i386-64bit",
    "prefect_version": "0.10.2",
    "python_version": "3.8.1"
  }
}

Most helpful comment

@ludwigm noticed the same, after digging in the code realized that Environment is not responsible for parallelization of the task - it just calls flow's runner. It's rather engine's executor that is responsible for parallelization - https://github.com/PrefectHQ/prefect/tree/master/src/prefect/engine/executors

I've managed to have proper execution in Dask in example shown below:

from prefect import Flow, task
from prefect.environments.execution.remote import RemoteEnvironment
import time

@task
def extract():
    return [1, 2, 3]


@task
def transform1(x):
    time.sleep(3)
    return [i * 9 for i in x]

@task
def transform2(x):
    time.sleep(5)
    return [i * 11 for i in x]

@task
def transform3(x):
    time.sleep(3)
    return [i * 13 for i in x]

@task
def transform4(x):
    time.sleep(5)
    return [i * 15 for i in x]

@task
def add(x, y):
    return x + y

@task
def load(y):
    print("Received y: {}".format(y))

env = RemoteEnvironment(
    executor="prefect.engine.executors.DaskExecutor",
    executor_kwargs={"address": "tcp://dask-scheduler:8786"},
    labels=["dask"]
)

with Flow("ETL", environment=env) as flow:
    e = extract()
    t1 = transform1(e)
    t2 = transform2(e)
    t3 = transform3(e)
    t4 = transform4(e)
    a1 = add(t1, t2)
    a2 = add(t3, t4)
    a = add(a1, a2)
    l = load(a)

    flow.register()

All 3 comments

@ludwigm noticed the same, after digging in the code realized that Environment is not responsible for parallelization of the task - it just calls flow's runner. It's rather engine's executor that is responsible for parallelization - https://github.com/PrefectHQ/prefect/tree/master/src/prefect/engine/executors

I've managed to have proper execution in Dask in example shown below:

from prefect import Flow, task
from prefect.environments.execution.remote import RemoteEnvironment
import time

@task
def extract():
    return [1, 2, 3]


@task
def transform1(x):
    time.sleep(3)
    return [i * 9 for i in x]

@task
def transform2(x):
    time.sleep(5)
    return [i * 11 for i in x]

@task
def transform3(x):
    time.sleep(3)
    return [i * 13 for i in x]

@task
def transform4(x):
    time.sleep(5)
    return [i * 15 for i in x]

@task
def add(x, y):
    return x + y

@task
def load(y):
    print("Received y: {}".format(y))

env = RemoteEnvironment(
    executor="prefect.engine.executors.DaskExecutor",
    executor_kwargs={"address": "tcp://dask-scheduler:8786"},
    labels=["dask"]
)

with Flow("ETL", environment=env) as flow:
    e = extract()
    t1 = transform1(e)
    t2 = transform2(e)
    t3 = transform3(e)
    t4 = transform4(e)
    a1 = add(t1, t2)
    a2 = add(t3, t4)
    a = add(a1, a2)
    l = load(a)

    flow.register()

Many thanks the missing bit for me is that I can set a environment at the flow where I can specify which actual executor to use. I verified it and could do it now like this:

flow = self._create_test_flow()

flow.environment = RemoteEnvironment(executor="prefect.engine.executors.LocalDaskExecutor", executor_kwargs={"scheduler": "processes", "num_workers":15})

flow.save("<location>/flow.prefect")
flow_reloaded = flow.load(
      "<location>/flow.prefect"
 )
flow_reloaded.register()

It also results in the expected parallelism:
image

What helped me was this documentation page:
https://docs.prefect.io/orchestration/execution/overview.html#environments

So IMHO we could close the issue. Helpful would be a doc page describing how these terms Executor, Agent, Server and Environment relate to each other as it took me a while

@ludwigm Glad you got it working as intended! We have some changes planned which could answer these questions more straightforward in the docs.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

joshmeek picture joshmeek  路  4Comments

GZangl picture GZangl  路  3Comments

mark-w-325 picture mark-w-325  路  3Comments

rej-jsa picture rej-jsa  路  4Comments

cicdw picture cicdw  路  3Comments