Currently switching our workflows to go from local flow execution with LocalDaskCluster to a mode with Prefect server and local agents. One issue I encountered is that all my tasks are running sequentially instead of distributing them out e.g. on multiple processes or threads. Even tried with a second Local agent but it doesn't pick up work. It is not quite obvious IMHO in the documentation what the difference is between Executors and Agents and how the relate to each other
The following image shows sequential execution of my tasks.

The dependency graph looks like this:

We ran our jobs like this before:
executor = LocalDaskExecutor(scheduler="processes", num_workers=15)
end_state = flow.run(executor=executor)
Now we register the flow in the server like the following:
flow.save("<location>/flow.prefect")
flow_reloaded = flow.load(
"<location>/flow.prefect"
)
flow_reloaded.register()
What did you expect to happen instead?
That the local agent maximized parallism or allows to specify similiar to LocalDaskScheduler how many workers or which distribution mode I want (e.g. threads or processes)
Basically all flows which are run with Local agent
prefect server start
prefect agent start
<registr flow>
<run flow from server UI>
prefect diagnostics
{
"config_overrides": {},
"env_vars": [
"PREFECT__LOGGING__LEVEL"
],
"system_information": {
"platform": "macOS-10.15.4-x86_64-i386-64bit",
"prefect_version": "0.10.2",
"python_version": "3.8.1"
}
}
@ludwigm noticed the same, after digging in the code realized that Environment is not responsible for parallelization of the task - it just calls flow's runner. It's rather engine's executor that is responsible for parallelization - https://github.com/PrefectHQ/prefect/tree/master/src/prefect/engine/executors
I've managed to have proper execution in Dask in example shown below:
from prefect import Flow, task
from prefect.environments.execution.remote import RemoteEnvironment
import time
@task
def extract():
return [1, 2, 3]
@task
def transform1(x):
time.sleep(3)
return [i * 9 for i in x]
@task
def transform2(x):
time.sleep(5)
return [i * 11 for i in x]
@task
def transform3(x):
time.sleep(3)
return [i * 13 for i in x]
@task
def transform4(x):
time.sleep(5)
return [i * 15 for i in x]
@task
def add(x, y):
return x + y
@task
def load(y):
print("Received y: {}".format(y))
env = RemoteEnvironment(
executor="prefect.engine.executors.DaskExecutor",
executor_kwargs={"address": "tcp://dask-scheduler:8786"},
labels=["dask"]
)
with Flow("ETL", environment=env) as flow:
e = extract()
t1 = transform1(e)
t2 = transform2(e)
t3 = transform3(e)
t4 = transform4(e)
a1 = add(t1, t2)
a2 = add(t3, t4)
a = add(a1, a2)
l = load(a)
flow.register()
Many thanks the missing bit for me is that I can set a environment at the flow where I can specify which actual executor to use. I verified it and could do it now like this:
flow = self._create_test_flow()
flow.environment = RemoteEnvironment(executor="prefect.engine.executors.LocalDaskExecutor", executor_kwargs={"scheduler": "processes", "num_workers":15})
flow.save("<location>/flow.prefect")
flow_reloaded = flow.load(
"<location>/flow.prefect"
)
flow_reloaded.register()
It also results in the expected parallelism:

What helped me was this documentation page:
https://docs.prefect.io/orchestration/execution/overview.html#environments
So IMHO we could close the issue. Helpful would be a doc page describing how these terms Executor, Agent, Server and Environment relate to each other as it took me a while
@ludwigm Glad you got it working as intended! We have some changes planned which could answer these questions more straightforward in the docs.
Most helpful comment
@ludwigm noticed the same, after digging in the code realized that Environment is not responsible for parallelization of the task - it just calls flow's runner. It's rather engine's executor that is responsible for parallelization - https://github.com/PrefectHQ/prefect/tree/master/src/prefect/engine/executors
I've managed to have proper execution in Dask in example shown below: