Hello! I'm attempting to run a Prefect flow with DaskExecutor connected to a Dask cluster that was created using Dask Gateway. This raises an SSL error, however it could have something to do with my DG implementation. DG is relatively new, so I'm wondering if it has been tested with Prefect?
Thanks!
What did you expect to happen instead? Flow to run as it normally would with a DaskExecutor.
from dask_gateway import Gateway
from prefect.engine.executors import DaskExecutor
from prefect import task, Flow
import datetime
import random
from time import sleep
@task
def inc(x):
sleep(random.random() / 10)
return x + 1
with Flow("dask-example") as flow:
incs = inc.map(x=range(100))
gateway = Gateway()
cluster = gateway.new_cluster()
cluster.scale(4)
# Example scheduler address from DG: 'gateway://dask-scheduler-proxy.<fqdn>:443/<hash from dg>'
executor = DaskExecutor(address=cluster.scheduler_address)
flow.run(executor=executor)
Error:
[2020-01-28 19:17:47,571] INFO - prefect.FlowRunner | Beginning Flow run for 'dask-example'
[2020-01-28 19:17:47,574] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-28 19:17:47,578] ERROR - prefect.FlowRunner | Unexpected error: TypeError('Gateway expects a `ssl_context` argument of type ssl.SSLContext, instead got None')
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/opt/conda/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 400, in get_flow_run_state
with executor.start():
File "/opt/conda/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/opt/conda/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 75, in start
with Client(self.address, **self.kwargs) as client:
File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 728, in __init__
self.start(timeout=timeout)
File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 893, in start
sync(self.loop, self._start, **kwargs)
File "/opt/conda/lib/python3.7/site-packages/distributed/utils.py", line 335, in sync
raise exc.with_traceback(tb)
File "/opt/conda/lib/python3.7/site-packages/distributed/utils.py", line 319, in f
result[0] = yield future
File "/opt/conda/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 986, in _start
await self._ensure_connected(timeout=timeout)
File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 1043, in _ensure_connected
connection_args=self.connection_args,
File "/opt/conda/lib/python3.7/site-packages/distributed/comm/core.py", line 218, in connect
quiet_exceptions=EnvironmentError,
File "/opt/conda/lib/python3.7/site-packages/dask_gateway/comm.py", line 41, in connect
"ssl.SSLContext, instead got %s" % ctx
TypeError: Gateway expects a `ssl_context` argument of type ssl.SSLContext, instead got None
[2020-01-28 19:17:47,584] ERROR - prefect.Flow: dask-example | Unexpected error occured in FlowRunner: TypeError('Gateway expects a `ssl_context` argument of type ssl.SSLContext, instead got None')
<Failed: "Unexpected error: TypeError('Gateway expects a `ssl_context` argument of type ssl.SSLContext, instead got None')">
Dask cluster running on Kubernetes managed with Dask Gateway.
Successful connection and execution of prefect flow by passing in the cluster.security attribute as a kwarg:
executor = DaskExecutor(address=cluster.scheduler_address, security=cluster.security)
flow.run(executor=executor)
[2020-01-28 21:10:04,687] INFO - prefect.FlowRunner | Beginning Flow run for 'dask-example'
[2020-01-28 21:10:04,690] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-28 21:10:07,820] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
<Success: "All reference tasks succeeded.">
Great! @cicdw You may want to have someone add a note to the docs on how to do this.
Most helpful comment
Successful connection and execution of
prefectflow by passing in thecluster.securityattribute as akwarg: