Airflow: CeleryExecutor gevent/eventlet pools need monkey patching

Created on 31 Mar 2020  路  6Comments  路  Source: apache/airflow

Apache Airflow version:
1.10.9

Kubernetes version (if you are using kubernetes) (use kubectl version):

Client Version: version.Info{Major:"1", Minor:"16", GitVersion:"v1.16.2", GitCommit:"c97fe5036ef3df2967d086711e6c0c405941e14b", GitTreeState:"clean", BuildDate:"2019-10-15T23:42:50Z", GoVersion:"go1.12.10", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"14+", GitVersion:"v1.14.10-gke.17", GitCommit:"bdceba0734835c6cb1acbd1c447caf17d8613b44", GitTreeState:"clean", BuildDate:"2020-01-17T23:10:13Z", GoVersion:"go1.12.12b4", Compiler:"gc", Platform:"linux/amd64"}

Note: the issue is not specific to k8s.

Environment:
Any. I was able to reproduce using CeleryExecutor docker-compose in the puckel repo (code version tagged as 1.10.9).

What happened:
When setting pool in the [celery] section in airflow.cfg to eventlet or gevent, task instances get scheduled, queued, picked up by the workers, but not executed.

What you expected to happen:
Task instances should be executed. The problem is that the application is not monkey-patched. Celery by default handles monkey-patching but not in all scenarios (e.g. only if Celery is invoked via command line, more info).

Airflow invokes Celery workers in Python via .run(). Unfortunately, this function does not handle monkey patching.

How to reproduce it:

  1. Clone puckel's docker-airflow:
    git clone [email protected]:puckel/docker-airflow.git
  2. Modify Dockerfile to:
    RUN pip install eventlet
    Then:
    docker build --rm -t puckel/docker-airflow:1.10.9 .
  3. Set pool = eventlet in airflow.cfg (the file will be mounted by docker-compose).
  1. Spin up [the CeleryExecutor docker compose](CeleryExecutor docker-compose:
    docker-compose -f docker-compose-CeleryExecutor.yml up -d
  2. Navigate to http://localhost:8080, and run an example DAG.
  3. Notice that no task ever gets to the running state.

Solution:
Ideally this should be fixed in Celery, but in the meantime it might be good to have a solution here as well. Here is a patch that I applied to solve this (on Airflow 1.10.9):

--- cli.py  2020-03-27 17:05:45.000000000 -0400
+++ cli-new.py  2020-03-27 17:19:48.000000000 -0400
@@ -1098,7 +1098,10 @@
     }

     if conf.has_option("celery", "pool"):
-        options["pool"] = conf.get("celery", "pool")
+        pool = conf.get("celery", "pool")
+        options["pool"] = pool
+        from celery import maybe_patch_concurrency
+        maybe_patch_concurrency(['-P', pool])

     if args.daemon:
         pid, stdout, stderr, log_file = setup_locations("worker",
scheduleexecutor bug

Most helpful comment

I experienced the same issue using exactly the same setup (Airflow 1.10.9 with puckel/docker-airflow). @aamangeldi s patch fixes the issue for me.

Is there any schedule for bringing this upstream?

We can push that out in 1.10.11

All 6 comments

Thanks for opening your first issue here! Be sure to follow the issue template!

@aamangeldi would you like to open a PR with suggested change? 馃殌

@turbaszek Yup, can do.

@aamangeldi thank you!

I experienced the same issue using exactly the same setup (Airflow 1.10.9 with puckel/docker-airflow). @aamangeldi s patch fixes the issue for me.

Is there any schedule for bringing this upstream?

I experienced the same issue using exactly the same setup (Airflow 1.10.9 with puckel/docker-airflow). @aamangeldi s patch fixes the issue for me.

Is there any schedule for bringing this upstream?

We can push that out in 1.10.11

Was this page helpful?
0 / 5 - 0 ratings