Prefect: DaskExecutor.address doesn't work with Dask Gateway proxy

Created on 28 Jan 2020  路  2Comments  路  Source: PrefectHQ/prefect

Description

Hello! I'm attempting to run a Prefect flow with DaskExecutor connected to a Dask cluster that was created using Dask Gateway. This raises an SSL error, however it could have something to do with my DG implementation. DG is relatively new, so I'm wondering if it has been tested with Prefect?

Thanks!

Expected Behavior

What did you expect to happen instead? Flow to run as it normally would with a DaskExecutor.

Reproduction

from dask_gateway import Gateway
from prefect.engine.executors import DaskExecutor
from prefect import task, Flow
import datetime
import random
from time import sleep

@task
def inc(x):
    sleep(random.random() / 10)
    return x + 1

with Flow("dask-example") as flow:
    incs = inc.map(x=range(100))

gateway = Gateway()
cluster = gateway.new_cluster()
cluster.scale(4)

# Example scheduler address from DG: 'gateway://dask-scheduler-proxy.<fqdn>:443/<hash from dg>'
executor = DaskExecutor(address=cluster.scheduler_address)
flow.run(executor=executor)

Error:

[2020-01-28 19:17:47,571] INFO - prefect.FlowRunner | Beginning Flow run for 'dask-example'
[2020-01-28 19:17:47,574] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-28 19:17:47,578] ERROR - prefect.FlowRunner | Unexpected error: TypeError('Gateway expects a `ssl_context` argument of type ssl.SSLContext, instead got None')
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/opt/conda/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 400, in get_flow_run_state
    with executor.start():
  File "/opt/conda/lib/python3.7/contextlib.py", line 112, in __enter__
    return next(self.gen)
  File "/opt/conda/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 75, in start
    with Client(self.address, **self.kwargs) as client:
  File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 728, in __init__
    self.start(timeout=timeout)
  File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 893, in start
    sync(self.loop, self._start, **kwargs)
  File "/opt/conda/lib/python3.7/site-packages/distributed/utils.py", line 335, in sync
    raise exc.with_traceback(tb)
  File "/opt/conda/lib/python3.7/site-packages/distributed/utils.py", line 319, in f
    result[0] = yield future
  File "/opt/conda/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 986, in _start
    await self._ensure_connected(timeout=timeout)
  File "/opt/conda/lib/python3.7/site-packages/distributed/client.py", line 1043, in _ensure_connected
    connection_args=self.connection_args,
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/core.py", line 218, in connect
    quiet_exceptions=EnvironmentError,
  File "/opt/conda/lib/python3.7/site-packages/dask_gateway/comm.py", line 41, in connect
    "ssl.SSLContext, instead got %s" % ctx
TypeError: Gateway expects a `ssl_context` argument of type ssl.SSLContext, instead got None
[2020-01-28 19:17:47,584] ERROR - prefect.Flow: dask-example | Unexpected error occured in FlowRunner: TypeError('Gateway expects a `ssl_context` argument of type ssl.SSLContext, instead got None')
<Failed: "Unexpected error: TypeError('Gateway expects a `ssl_context` argument of type ssl.SSLContext, instead got None')">

Environment

Dask cluster running on Kubernetes managed with Dask Gateway.

docs

Most helpful comment

Successful connection and execution of prefect flow by passing in the cluster.security attribute as a kwarg:

executor = DaskExecutor(address=cluster.scheduler_address, security=cluster.security)
flow.run(executor=executor)

[2020-01-28 21:10:04,687] INFO - prefect.FlowRunner | Beginning Flow run for 'dask-example'
[2020-01-28 21:10:04,690] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-28 21:10:07,820] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
<Success: "All reference tasks succeeded.">

All 2 comments

Successful connection and execution of prefect flow by passing in the cluster.security attribute as a kwarg:

executor = DaskExecutor(address=cluster.scheduler_address, security=cluster.security)
flow.run(executor=executor)

[2020-01-28 21:10:04,687] INFO - prefect.FlowRunner | Beginning Flow run for 'dask-example'
[2020-01-28 21:10:04,690] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-28 21:10:07,820] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
<Success: "All reference tasks succeeded.">

Great! @cicdw You may want to have someone add a note to the docs on how to do this.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

jameslamb picture jameslamb  路  3Comments

jlowin picture jlowin  路  3Comments

fgblomqvist picture fgblomqvist  路  4Comments

gryBox picture gryBox  路  3Comments

GZangl picture GZangl  路  3Comments