Prefect: LocalDaskExecutor(scheduler='threads') not running concurrently

Created on 7 Mar 2020  路  9Comments  路  Source: PrefectHQ/prefect

Description

I suppose LocalDaskExecutor(scheduler='threads') should be running concurrently. However this does not seem true according to my example below.

Not sure why it is the case, probably the task starts when compute is called in the wait? https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/executors/dask.py#L287

Reproduction

import prefect
from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor

@task
def task_sleep(seconds):
    import time
    print("start sleeping {}".format(seconds))
    time.sleep(seconds)
    print("end sleeping {}".format(seconds))
    return seconds

with Flow("dummy_sleep") as flow:
    task_sleep(seconds=10)
    task_sleep(seconds=12)

state = flow.run(executor=LocalDaskExecutor(scheduler='threads'))

which produces

[2020-03-07 05:34:35,250] INFO - prefect.FlowRunner | Beginning Flow run for 'dummy_sleep'
[2020-03-07 05:34:35,253] INFO - prefect.FlowRunner | Starting flow run.
[2020-03-07 05:34:35,301] INFO - prefect.TaskRunner | Task 'task_sleep': Starting task run...
start sleeping 10
end sleeping 10
[2020-03-07 05:34:45,316] INFO - prefect.TaskRunner | Task 'task_sleep': finished task run for task with final state: 'Success'
[2020-03-07 05:34:45,325] INFO - prefect.TaskRunner | Task 'task_sleep': Starting task run...
start sleeping 12
end sleeping 12
[2020-03-07 05:34:57,342] INFO - prefect.TaskRunner | Task 'task_sleep': finished task run for task with final state: 'Success'
[2020-03-07 05:34:57,343] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

Environment

prefect diagnostics
{
  "config_overrides": {},
  "env_vars": [],
  "system_information": {
    "platform": "Linux-5.3.0-1011-gcp-x86_64-with-Ubuntu-19.10-eoan",
    "prefect_version": "0.9.7",
    "python_version": "3.7.5"
  }
}
bug

All 9 comments

This is a great callout, thank you. I believe what's happening is that we implicitly wait on each terminal task in sequence, evaluating all of its upstream tasks concurrently. However, a task only gets evaluated when a terminal task downstream of it is computed. Therefore, in your flow, the two terminal tasks are evaluated in sequence.

Below, I can get concurrency as expected by adding a single (dummy) terminal task that comes after both tasks.

We should think of ways to ensure the entire graph is computed at once (possibly automatically adding dummy nodes like the one I'm adding here at runtime) - cc @cicdw


import prefect
from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor

@task
def task_sleep(seconds):
    import time
    print("start sleeping {}".format(seconds))
    time.sleep(seconds)
    print("end sleeping {}".format(seconds))
    return seconds

with Flow("dummy_sleep") as flow:
    t1 = task_sleep(seconds=10)
    t2 = task_sleep(seconds=12)

    t3 = prefect.Task()
    t3.set_upstream(t1)
    t3.set_upstream(t2)

state = flow.run(executor=LocalDaskExecutor(scheduler='threads'))

Logs:


[2020-03-07 19:46:15,142] INFO - prefect.FlowRunner | Beginning Flow run for 'dummy_sleep'
[2020-03-07 19:46:15,144] INFO - prefect.FlowRunner | Starting flow run.
[2020-03-07 19:46:15,235] INFO - prefect.TaskRunner | Task 'task_sleep': Starting task run...
start sleeping 10
[2020-03-07 19:46:15,240] INFO - prefect.TaskRunner | Task 'task_sleep': Starting task run...
start sleeping 12
end sleeping 10
[2020-03-07 19:46:25,254] INFO - prefect.TaskRunner | Task 'task_sleep': finished task run for task with final state: 'Success'
end sleeping 12
[2020-03-07 19:46:27,248] INFO - prefect.TaskRunner | Task 'task_sleep': finished task run for task with final state: 'Success'
[2020-03-07 19:46:27,254] INFO - prefect.TaskRunner | Task 'Task': Starting task run...
[2020-03-07 19:46:27,257] INFO - prefect.TaskRunner | Task 'Task': finished task run for task with final state: 'Success'
[2020-03-07 19:46:27,258] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

@jlowin Thanks for your insights and tips. I have been trying to implement a custom executor and thought it might be helpful to share some experience I got (e.g. for improving the doc or for who wants to implement one). I did the following as a simple asynchronous executor.

from prefect import task, Flow
from prefect.engine.executors.base import Executor
from prefect.engine.state import State
from contextlib import contextmanager

class ThreadExecutor(Executor):
    "An asynchronous executor for Prefect"
    @contextmanager
    def start(self):
        import concurrent
        try:
            with concurrent.futures.ThreadPoolExecutor() as pool_executor:
                self.pool_executor = pool_executor
                yield
        finally:
            self.pool_executor = None
    def submit(self, fn, *args, **kwargs):
        ## The upstream_states can be either future or state.
        ## wait and convert results here??
        kwargs['upstream_states'] = { edge:(value if isinstance(value, State) else value.result())
                                            for edge, value in kwargs['upstream_states'].items() }
        future = self.pool_executor.submit(fn, *args, **kwargs)
        return future
    def map(self, fn, *args):
        raise NotImplementedError
    def wait(self, futures):
        return {task: future.result() for task, future in futures.items()}
  1. The arguments of wait is a dictionary of task to future instead of a list of futures which I originally assumed.

  2. Note in submit, I have modify the kwargs['upstream_states'] and do something like value if isinstance(value, State) else value.result() for it to work. I am not sure I am on the right way. It sounds to me that the upstream 'future' should be waited and resolved to State prior calling submit. Is it expected for submit to modify kwargs['upstream_states']?

Thanks.

Trying your example from above, I'm unable to reproduce on master (0.10.0+297.g1babcb7f currently). I get the expected concurrency as is:

$ python test.py
[2020-04-15 20:38:50,189] INFO - prefect.FlowRunner | Beginning Flow run for 'dummy_sleep'
[2020-04-15 20:38:50,193] INFO - prefect.FlowRunner | Starting flow run.
[2020-04-15 20:38:50,296] INFO - prefect.TaskRunner | Task 'task_sleep': Starting task run...
start sleeping 10
[2020-04-15 20:38:50,303] INFO - prefect.TaskRunner | Task 'task_sleep': Starting task run...
start sleeping 12
end sleeping 10
[2020-04-15 20:39:00,323] INFO - prefect.TaskRunner | Task 'task_sleep': finished task run for task with final state: 'Success'
end sleeping 12
[2020-04-15 20:39:02,329] INFO - prefect.TaskRunner | Task 'task_sleep': finished task run for task with final state: 'Success'
[2020-04-15 20:39:02,329] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

@Marlin-Na, could you try rerunning on master to see if the issue still persists for you?

Hi @jcrist , the issue still persists from my side. I tested three versions: 0.10.2+47.g13977484 (current master), 0.10.2+21.g1babcb7f (your version) and 0.10.2 (pypi release). I also attached here versions of other python packages:

certifi==2020.4.5.1
chardet==3.0.4
click==7.1.1
cloudpickle==1.3.0
croniter==0.3.31
dask==2.14.0
distributed==2.14.0
docker==4.2.0
fsspec==0.7.2
HeapDict==1.0.1
idna==2.9
locket==0.2.0
marshmallow==3.5.1
marshmallow-oneofschema==2.0.1
msgpack==1.0.0
mypy-extensions==0.4.3
partd==1.1.0
pendulum==2.1.0
pkg-resources==0.0.0
prefect==0.10.2+21.g1babcb7f
psutil==5.7.0
python-box==4.2.2
python-dateutil==2.8.1
python-slugify==4.0.0
pytz==2019.3
pytzdata==2019.3
PyYAML==5.3.1
requests==2.23.0
ruamel.yaml==0.16.10
ruamel.yaml.clib==0.2.0
six==1.14.0
sortedcontainers==2.1.0
supervisor==4.1.0
tabulate==0.8.7
tblib==1.6.0
text-unidecode==1.3
toml==0.10.0
toolz==0.10.0
tornado==6.0.4
typing==3.7.4.1
typing-extensions==3.7.4.2
urllib3==1.25.9
websocket-client==0.57.0
zict==2.0.0

Hmmm. Can you run the following?

$ python -c "import dask.system;print(dask.system.cpu_count())"

@jcrist ah, that's 1 (I am running on a Google VM). So that should explain the difference. However I don't think concurrency should depend on number of cpus, e.g. for IO-bounded tasks or for tasks calling external tools.

However I don't think concurrency should depend on number of cpus, e.g. for IO-bounded tasks or for tasks calling external tools.

When starting a local dask cluster, by default it determines the worker configuration based on the cpus available. Since you're using the local scheduler (threads only), you can configure this by passing num_workers=... to LocalDaskExecutor.

import prefect
from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor

@task
def task_sleep(seconds):
    import time
    print("start sleeping {}".format(seconds))
    time.sleep(seconds)
    print("end sleeping {}".format(seconds))
    return seconds

with Flow("dummy_sleep") as flow:
    task_sleep(seconds=10)
    task_sleep(seconds=12)

state = flow.run(executor=LocalDaskExecutor(scheduler='threads', num_workers=8))

If you were using the distributed scheduler (which also runs fine locally) these keywords would be n_workers (number of worker processes) and threads_per_worker (number of threads per worker).

@jcrist Thanks! This looks great.

Glad to help! Ok to close?

Was this page helpful?
0 / 5 - 0 ratings

Related issues

dkapitan picture dkapitan  路  3Comments

rej-jsa picture rej-jsa  路  4Comments

fgblomqvist picture fgblomqvist  路  4Comments

gryBox picture gryBox  路  3Comments

mark-w-325 picture mark-w-325  路  3Comments