Prefect: Timeout Error when mapping with the DaskKubernetesEnvironment

Created on 6 Apr 2020  路  5Comments  路  Source: PrefectHQ/prefect

Description

A clear description of the bug
I'm consistently seeing a TimeoutError related to mapping while using the DaskKubernetes environment. My flow that runs every 15 minutes fails about 10% of the time with the following error.

Task 'Get New Record Counts': unexpected error while running task: TimeoutError()
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 260, in run
    executor=executor,
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 767, in run_mapped_task
    run_fn, initial_states, range(len(map_upstream_states)), map_upstream_states
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 165, in map
    with worker_client(separate_thread=True) as client:
  File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.7/site-packages/distributed/worker_client.py", line 42, in worker_client
    client = get_client(timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/distributed/worker.py", line 3087, in get_client
    return worker._get_client(timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/distributed/worker.py", line 2987, in _get_client
    timeout=timeout,
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 723, in __init__
    self.start(timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 896, in start
    sync(self.loop, self._start, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 348, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 332, in f
    result[0] = yield future
  File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 991, in _start
    await self._ensure_connected(timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1071, in _ensure_connected
    msg = await asyncio.wait_for(comm.read(), timeout)
  File "/usr/local/lib/python3.7/asyncio/tasks.py", line 449, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError

Further, this task run doesn't retry as specified in the task decorator:

@task(
    name="Get New Record Counts",
    checkpoint=True,
    max_retries=5,
    retry_delay=timedelta(minutes=3),
    tags=["prefect_cloud_database"],
)
def get_new_record_counts(table_data, batch_size, batch_count, pg_connection_string):

Here's my environment configuration:

environment = DaskKubernetesEnvironment(
    max_workers=3, min_workers=3, labels=["prefect-data-warehouse"]
)

storage = Docker(
    prefect_version="0.9.2",
    # registry_url={staging execution},
    base_image="python:3.7",
    python_dependencies=[
        "gcsfs",
        "google-cloud-firestore",
        "google-cloud-bigquery",
        "google-cloud-storage",
        "pandas",
        "pendulum",
        "psycopg2",
        "sqlalchemy",
    ],
)

schedule = CronSchedule(cron="*/15 * * * *", start_date=pendulum.now(tz="US/Eastern"))

Link for the Prefect team: {our staging url}/prefect-staging/flow-run/7fe68e1c-bed1-4e19-8261-ba2f0b0fe35d?logId=e90ea4eb-d78a-42b4-90fd-5440def9da37

Expected Behavior

What did you expect to happen instead?
I expect either the task run to retry or the timeout not to happen.

Reproduction

A minimal example that exhibits the behavior.
https://github.com/PrefectHQ/flows/blob/master/error_reproduction/dask_kubernetes_timeout_error.py
Deployed to: {staging}/prefect-staging/flow/0a825c48-64fa-472a-b79e-a49718ccb7aa

Environment

Any additional information about your environment

Optionally run prefect diagnostics from the command line and paste the information here

environment

Most helpful comment

@lauralorenz The latest version definitely resolved the issue -chef's kiss-

Closing 馃帀

All 5 comments

@dylanbhughes Do you still see this occurring often?

@joshmeek I haven't updated my flow to the latest version because it's still depending on the cancellation API. Once I have a minute to update the Stop Data Warehouse Flows flow to kill the kube infrastructure I'll migrate up and report back. @cicdw says that the mapping refactor should address my problem.

Hi @dylanbhughes just double checking because we think you may have been working on the migration recently to let us know if this is still an issue :)

@lauralorenz The latest version definitely resolved the issue -chef's kiss-

Closing 馃帀

It seems like I am facing the same bug but on prefect version 0.12.6 - it would be great if someone can shed some light on why this bug might have been resolved - and how to debug this ...

Was this page helpful?
0 / 5 - 0 ratings

Related issues

jlowin picture jlowin  路  4Comments

ludwigm picture ludwigm  路  3Comments

fgblomqvist picture fgblomqvist  路  4Comments

dkapitan picture dkapitan  路  3Comments

ponggung picture ponggung  路  3Comments