Having problems running flows using the DaskExecutor. I have created this standalone flow to demonstrate the issue:
from prefect import Flow
from prefect import Task
from prefect.engine.executors import DaskExecutor
from datetime import datetime
from time import sleep
class MockExtractUsers(Task):
def run(self):
sleep(0.5)
return [{"name": "test", "id": i} for i in range(0, 180)]
class MockExtractUserAttribute(Task):
def run(self, user):
sleep(0.5)
return {**user, **{"attribute": True}}
class TransformAddTimestamp(Task):
def run(self, data):
timestamp = str(datetime.now())
return [{**item, **{"_timestamp": timestamp}} for item in data]
executor = DaskExecutor(cluster_kwargs={"processes": False}, debug=True)
with Flow("test_flow") as flow:
mock_extract_users = MockExtractUsers()
mock_extract_attrib = MockExtractUserAttribute()
add_timestamp = TransformAddTimestamp()
mock_extract_attrib.set_upstream(mock_extract_users, key="user", mapped=True)
add_timestamp.set_upstream(mock_extract_attrib, key="data")
flow.run(executor=executor)
Soon after the mapped task begins I see the following message in the logs:
...
[2020-10-06 13:25:44] INFO - prefect.TaskRunner | Task 'MockExtractUserAttribute[12]': finished task run for task with final state: 'Success'
[2020-10-06 13:25:44] INFO - prefect.TaskRunner | Task 'MockExtractUserAttribute[9]': Starting task run...
[2020-10-06 13:25:44] INFO - prefect.TaskRunner | Task 'MockExtractUserAttribute[13]': Starting task run...
[2020-10-06 13:25:44] INFO - prefect.TaskRunner | Task 'MockExtractUserAttribute[14]': Starting task run...
Task exception was never retrieved
future: <Task finished coro=<connect.<locals>._() done, defined at <filepath>/.venv/lib/python3.7/site-packages/distributed/comm/core.py:288> exception=CommClosedError()>
Traceback (most recent call last):
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/comm/core.py", line 297, in _
handshake = await asyncio.wait_for(comm.read(), 1)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py", line 435, in wait_for
await waiter
concurrent.futures._base.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/comm/core.py", line 304, in _
raise CommClosedError() from e
distributed.comm.core.CommClosedError
Once the mapped task is complete but before the add_timestamp task begins, I see the following messages (note debug=True on the DaskExecutor):
distributed.worker - WARNING - Compute Failed
Function: _maybe_run
args: ('prefect-bc2e670c83ef48acb766d01ac0380d35', <function run_task at 0x7fb6e845ab00>)
kwargs: {'task': <Task: TransformAddTimestamp>, 'state': None, 'upstream_states': {<Edge(key=data, mapped=False, flattened=False): MockExtractUserAttribute to TransformAddTimestamp>: <Mapped: "Ready to proceed with mapping.">}, 'context': {'config': <Box: {'debug': False, 'home_dir': '/Users/andy/.prefect', 'backend': 'cloud', 'server': {'host': 'http://localhost', 'port': 4200, 'host_port': 4200, 'endpoint': 'http://localhost:4200', 'database': {'host': 'localhost', 'port': 5432, 'host_port': 5432, 'name': 'prefect_server', 'username': 'prefect', 'password': 'test-password', 'connection_url': 'postgresql://prefect:test-password@localhost:5432/prefect_server', 'volume_path': '/Users/andy/.prefect/pg_data'}, 'graphql': {'host': '0.0.0.0', 'port': 4201, 'host_port': 4201, 'debug': False, 'path': '/graphql/'}, 'hasura': {'host': 'localhost', 'port': 3000, 'host_port': 3000, 'admin_secret': '', 'claims_namespace': 'hasura-claims', 'graphql_url': 'http://localhost:3000/v1alpha1/graphql', 'ws_url':
Exception: AttributeError("'NoneType' object has no attribute 'result'")
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7fb708b33490>>, <Task finished coro=<SpecCluster._correct_state_internal() done, defined at <filepath>/.venv/lib/python3.7/site-packages/distributed/deploy/spec.py:320> exception=OSError("Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: connect() didn't finish in time")>)
Traceback (most recent call last):
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/comm/core.py", line 322, in connect
_raise(error)
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: connect() didn't finish in time
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<filepath>/.venv/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
ret = callback()
File "<filepath>/.venv/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
future.result()
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 416, in __aexit__
await self.close()
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/deploy/spec.py", line 401, in _close
await self._correct_state()
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/deploy/spec.py", line 328, in _correct_state_internal
await self.scheduler_comm.retire_workers(workers=list(to_close))
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/core.py", line 810, in send_recv_from_rpc
comm = await self.live_comm()
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/core.py", line 772, in live_comm
**self.connection_args,
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/comm/core.py", line 334, in connect
_raise(error)
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: connect() didn't finish in time
[2020-10-06 13:26:16] ERROR - prefect.FlowRunner | Unexpected error: OSError("Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: connect() didn't finish in time")
Traceback (most recent call last):
File "<filepath>/.venv/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 267, in start
yield
File "<filepath>/.venv/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 603, in get_flow_run_state
for t in final_tasks
File "<filepath>/.venv/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 451, in wait
return self.client.gather(futures)
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/client.py", line 1992, in gather
asynchronous=asynchronous,
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/client.py", line 833, in sync
self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "<filepath>/.venv/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/client.py", line 1851, in _gather
raise exception.with_traceback(traceback)
File "<filepath>/.venv/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 70, in _maybe_run
return fn(*args, **kwargs)
File "<filepath>/.venv/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 721, in run_task
upstream_state.result = [s.result for s in upstream_state.map_states]
File "<filepath>/.venv/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 721, in <listcomp>
upstream_state.result = [s.result for s in upstream_state.map_states]
AttributeError: 'NoneType' object has no attribute 'result'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<filepath>/.venv/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "<filepath>/.venv/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 618, in get_flow_run_state
assert isinstance(final_states, dict)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/contextlib.py", line 130, in __exit__
self.gen.throw(type, value, traceback)
File "<filepath>/.venv/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 269, in start
self._post_start_yield()
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/deploy/spec.py", line 429, in __exit__
super().__exit__(typ, value, traceback)
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 409, in __exit__
return self.sync(self.__aexit__, typ, value, traceback)
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 183, in sync
return sync(self.loop, func, *args, **kwargs)
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/utils.py", line 339, in sync
raise exc.with_traceback(tb)
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/utils.py", line 323, in f
result[0] = yield future
File "<filepath>/.venv/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "<filepath>/.venv/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
ret = callback()
File "<filepath>/.venv/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
future.result()
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/deploy/cluster.py", line 416, in __aexit__
await self.close()
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/deploy/spec.py", line 401, in _close
await self._correct_state()
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/deploy/spec.py", line 328, in _correct_state_internal
await self.scheduler_comm.retire_workers(workers=list(to_close))
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/core.py", line 810, in send_recv_from_rpc
comm = await self.live_comm()
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/core.py", line 772, in live_comm
**self.connection_args,
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/comm/core.py", line 334, in connect
_raise(error)
File "<filepath>/.venv/lib/python3.7/site-packages/distributed/comm/core.py", line 275, in _raise
raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: connect() didn't finish in time
[2020-10-06 13:26:16] ERROR - prefect.test_flow | Unexpected error occured in FlowRunner: OSError("Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: Timed out trying to connect to 'inproc://192.168.1.102/8544/1' after 10 s: connect() didn't finish in time")
Other points of note:
executor = DaskExecutor(cluster_kwargs={"processes": False, "n_workers": 1, "threads_per_worker": 8}, debug=True)Hi @andywaugh, thanks for the excellent issue report. I'm unable to reproduce your issue, I suspect this has something to do your environment.
distributed and tornado do you have installed?Hey @jcrist, thanks for the swift response.
Ah, with more threads running I'm able to reproduce. Thanks, I'll work to get this fixed. I believe this is the source of #3435, a reproducible example here is quite helpful (we didn't have one before).
Apologies for the delayed update here. A few assorted notes:
This should be improved in distributed 2.30.1, which was just released today. Issues will still show up (if you don't bump the default timeouts higher) at larger number of workers or in resource-starved environments, but the defaults should work much better for common situations. See https://github.com/dask/distributed/issues/4165 for more info.
When running with a LocalCluster with processes=False, you shouldn't create multiple workers, you should create one worker with multiple threads. Creating multiple thread-based workers each with one thread adds needless overhead with no benefit.
# Say you want to use a local dask cluster with 8 threads using
# the dask.distributed scheduler (so not the local threaded scheduler)
# Do this
executor = DaskExecutor(cluster_kwargs={"processes": False, "threads_per_worker": 8}, debug=True)
# Don't do this
executor = DaskExecutor(cluster_kwargs={"processes": False, "n_workers": 8}, debug=True)
LocalDaskExecutor() over the DaskExecutor() (with processes=False) unless you have good reasons to use the latter (access to the dashboard, etc...). The local dask scheduler is much lighter-weight and for Prefect workflows should perform the same while using fewer resources (not that the DaskExecutor uses a ton of resources either, but in severely resource-constrained environments this might matter). It's also a lot simpler to debug.Closing as resolved.