Prefect: DaskExecutor failing to run flow

Created on 6 Oct 2020  路  4Comments  路  Source: PrefectHQ/prefect

Description

Having problems running flows using the DaskExecutor. I have created this standalone flow to demonstrate the issue:

from prefect import Flow
from prefect import Task
from prefect.engine.executors import DaskExecutor
from datetime import datetime
from time import sleep


class MockExtractUsers(Task):
    def run(self):
        sleep(0.5)
        return [{"name": "test", "id": i} for i in range(0, 180)]


class MockExtractUserAttribute(Task):
    def run(self, user):
        sleep(0.5)
        return {**user, **{"attribute": True}}


class TransformAddTimestamp(Task):
    def run(self, data):
        timestamp = str(datetime.now())
        return [{**item, **{"_timestamp": timestamp}} for item in data]


executor = DaskExecutor(cluster_kwargs={"processes": False}, debug=True)

with Flow("test_flow") as flow:
    mock_extract_users = MockExtractUsers()
    mock_extract_attrib = MockExtractUserAttribute()
    add_timestamp = TransformAddTimestamp()

    mock_extract_attrib.set_upstream(mock_extract_users, key="user", mapped=True)
    add_timestamp.set_upstream(mock_extract_attrib, key="data")

flow.run(executor=executor)

Soon after the mapped task begins I see the following message in the logs:

...
[2020-10-06 13:25:44] INFO - prefect.TaskRunner | Task 'MockExtractUserAttribute[12]': finished task run for task with final state: 'Success'
[2020-10-06 13:25:44] INFO - prefect.TaskRunner | Task 'MockExtractUserAttribute[9]': Starting task run...
[2020-10-06 13:25:44] INFO - prefect.TaskRunner | Task 'MockExtractUserAttribute[13]': Starting task run...
[2020-10-06 13:25:44] INFO - prefect.TaskRunner | Task 'MockExtractUserAttribute[14]': Starting task run...
Task exception was never retrieved
future: <Task finished coro=<connect.<locals>._() done, defined at <filepath>/.venv/lib/python3.7/site-packages/distributed/comm/core.py:288> exception=CommClosedError()>
Traceback (most recent call last):
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/comm/core.py", line 297, in _
    handshake = await asyncio.wait_for(comm.read(), 1)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py", line 435, in wait_for
    await waiter
concurrent.futures._base.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/comm/core.py", line 304, in _
    raise CommClosedError() from e
distributed.comm.core.CommClosedError

Once the mapped task is complete but before the add_timestamp task begins, I see the following messages (note debug=True on the DaskExecutor):

distributed.worker - WARNING -  Compute Failed
Function:  _maybe_run
args:      ('prefect-bc2e670c83ef48acb766d01ac0380d35', <function run_task at 0x7fb6e845ab00>)
kwargs:    {'task': <Task: TransformAddTimestamp>, 'state': None, 'upstream_states': {<Edge(key=data, mapped=False, flattened=False): MockExtractUserAttribute to TransformAddTimestamp>: <Mapped: "Ready to proceed with mapping.">}, 'context': {'config': <Box: {'debug': False, 'home_dir': '/Users/andy/.prefect', 'backend': 'cloud', 'server': {'host': 'http://localhost', 'port': 4200, 'host_port': 4200, 'endpoint': 'http://localhost:4200', 'database': {'host': 'localhost', 'port': 5432, 'host_port': 5432, 'name': 'prefect_server', 'username': 'prefect', 'password': 'test-password', 'connection_url': 'postgresql://prefect:test-password@localhost:5432/prefect_server', 'volume_path': '/Users/andy/.prefect/pg_data'}, 'graphql': {'host': '0.0.0.0', 'port': 4201, 'host_port': 4201, 'debug': False, 'path': '/graphql/'}, 'hasura': {'host': 'localhost', 'port': 3000, 'host_port': 3000, 'admin_secret': '', 'claims_namespace': 'hasura-claims', 'graphql_url': 'http://localhost:3000/v1alpha1/graphql', 'ws_url': 
Exception: AttributeError("'NoneType' object has no attribute 'result'")

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7fb708b33490>>, <Task finished coro=<SpecCluster._correct_state_internal() done, defined at <filepath>/.venv/lib/python3.7/site-packages/distributed/deploy/spec.py:320> exception=OSError("Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: connect() didn't finish in time")>)
Traceback (most recent call last):
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/comm/core.py", line 322, in connect
    _raise(error)
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: connect() didn't finish in time

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<filepath>/.venv/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "<filepath>/.venv/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 416, in __aexit__
    await self.close()
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/deploy/spec.py", line 401, in _close
    await self._correct_state()
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/deploy/spec.py", line 328, in _correct_state_internal
    await self.scheduler_comm.retire_workers(workers=list(to_close))
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/core.py", line 810, in send_recv_from_rpc
    comm = await self.live_comm()
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/core.py", line 772, in live_comm
    **self.connection_args,
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/comm/core.py", line 334, in connect
    _raise(error)
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: connect() didn't finish in time
[2020-10-06 13:26:16] ERROR - prefect.FlowRunner | Unexpected error: OSError("Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: connect() didn't finish in time")
Traceback (most recent call last):
  File "<filepath>/.venv/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 267, in start
    yield
  File "<filepath>/.venv/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 603, in get_flow_run_state
    for t in final_tasks
  File "<filepath>/.venv/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 451, in wait
    return self.client.gather(futures)
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/client.py", line 1992, in gather
    asynchronous=asynchronous,
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/client.py", line 833, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
    raise exc.with_traceback(tb)
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
    result[0] = yield future
  File "<filepath>/.venv/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/client.py", line 1851, in _gather
    raise exception.with_traceback(traceback)
  File "<filepath>/.venv/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 70, in _maybe_run
    return fn(*args, **kwargs)
  File "<filepath>/.venv/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 721, in run_task
    upstream_state.result = [s.result for s in upstream_state.map_states]
  File "<filepath>/.venv/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 721, in <listcomp>
    upstream_state.result = [s.result for s in upstream_state.map_states]
AttributeError: 'NoneType' object has no attribute 'result'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<filepath>/.venv/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "<filepath>/.venv/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 618, in get_flow_run_state
    assert isinstance(final_states, dict)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/contextlib.py", line 130, in __exit__
    self.gen.throw(type, value, traceback)
  File "<filepath>/.venv/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 269, in start
    self._post_start_yield()
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/deploy/spec.py", line 429, in __exit__
    super().__exit__(typ, value, traceback)
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 409, in __exit__
    return self.sync(self.__aexit__, typ, value, traceback)
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 183, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
    raise exc.with_traceback(tb)
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
    result[0] = yield future
  File "<filepath>/.venv/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "<filepath>/.venv/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "<filepath>/.venv/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 416, in __aexit__
    await self.close()
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/deploy/spec.py", line 401, in _close
    await self._correct_state()
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/deploy/spec.py", line 328, in _correct_state_internal
    await self.scheduler_comm.retire_workers(workers=list(to_close))
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/core.py", line 810, in send_recv_from_rpc
    comm = await self.live_comm()
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/core.py", line 772, in live_comm
    **self.connection_args,
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/comm/core.py", line 334, in connect
    _raise(error)
  File "<filepath>/.venv/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: connect() didn't finish in time
[2020-10-06 13:26:16] ERROR - prefect.test_flow | Unexpected error occured in FlowRunner: OSError("Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: connect() didn't finish in time")

Other points of note:

  • Using the LocalDaskExecutor works without issue
  • Using DaskExecutor, if I remove the mapped step and change add_timestamp.set_upstream to mock_extract_users it works fine.
  • In the example I made 180 mapped tasks. Setting a small number of mapped tasks (e.g. 10) works fine but with larger numbers it fails. For me, it seems to fail reliably from around > 20.
  • Setting the Dask Executor to use a lower number of threads per worker (e.g. 8) results in the flow running fine: executor = DaskExecutor(cluster_kwargs={"processes": False, "n_workers": 1, "threads_per_worker": 8}, debug=True)
  • Running Prefect 0.13.9 but I see the same issue when running 0.13.8. 0.13.7 runs fine 馃憤

All 4 comments

Hi @andywaugh, thanks for the excellent issue report. I'm unable to reproduce your issue, I suspect this has something to do your environment.

  • What version of Python are you using?
  • What versions of distributed and tornado do you have installed?
  • How many cores do you have on your machine (and how many dask threads do you have that result in it failing?).
  • Does this issue happen every time?

Hey @jcrist, thanks for the swift response.

  • Python 3.7.7
  • distributed-2.28.0
  • tornado-6.0.4
  • 8 cores but with hyperthreading so 16 logical.
  • Setting 12 threads_per_worker or lower seems to be reliably fine. >= 16 is reliably not fine. I have seen failures when playing setting between 12-15 but they're inconsistent.

Ah, with more threads running I'm able to reproduce. Thanks, I'll work to get this fixed. I believe this is the source of #3435, a reproducible example here is quite helpful (we didn't have one before).

Apologies for the delayed update here. A few assorted notes:

  • This should be improved in distributed 2.30.1, which was just released today. Issues will still show up (if you don't bump the default timeouts higher) at larger number of workers or in resource-starved environments, but the defaults should work much better for common situations. See https://github.com/dask/distributed/issues/4165 for more info.

  • When running with a LocalCluster with processes=False, you shouldn't create multiple workers, you should create one worker with multiple threads. Creating multiple thread-based workers each with one thread adds needless overhead with no benefit.

# Say you want to use a local dask cluster with 8 threads using
# the dask.distributed scheduler (so not the local threaded scheduler)

# Do this
executor = DaskExecutor(cluster_kwargs={"processes": False, "threads_per_worker": 8}, debug=True)

# Don't do this
executor = DaskExecutor(cluster_kwargs={"processes": False, "n_workers": 8}, debug=True)
  • When running in a single process with threads, I recommend using the LocalDaskExecutor() over the DaskExecutor() (with processes=False) unless you have good reasons to use the latter (access to the dashboard, etc...). The local dask scheduler is much lighter-weight and for Prefect workflows should perform the same while using fewer resources (not that the DaskExecutor uses a ton of resources either, but in severely resource-constrained environments this might matter). It's also a lot simpler to debug.

Closing as resolved.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

jlowin picture jlowin  路  4Comments

jlowin picture jlowin  路  3Comments

rej-jsa picture rej-jsa  路  4Comments

cicdw picture cicdw  路  4Comments

mark-w-325 picture mark-w-325  路  3Comments