Apache Airflow version:
1.10.9
Kubernetes version (if you are using kubernetes) (use kubectl version):
Client Version: version.Info{Major:"1", Minor:"16", GitVersion:"v1.16.2", GitCommit:"c97fe5036ef3df2967d086711e6c0c405941e14b", GitTreeState:"clean", BuildDate:"2019-10-15T23:42:50Z", GoVersion:"go1.12.10", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"14+", GitVersion:"v1.14.10-gke.17", GitCommit:"bdceba0734835c6cb1acbd1c447caf17d8613b44", GitTreeState:"clean", BuildDate:"2020-01-17T23:10:13Z", GoVersion:"go1.12.12b4", Compiler:"gc", Platform:"linux/amd64"}
Note: the issue is not specific to k8s.
Environment:
Any. I was able to reproduce using CeleryExecutor docker-compose in the puckel repo (code version tagged as 1.10.9).
What happened:
When setting pool in the [celery] section in airflow.cfg to eventlet or gevent, task instances get scheduled, queued, picked up by the workers, but not executed.
What you expected to happen:
Task instances should be executed. The problem is that the application is not monkey-patched. Celery by default handles monkey-patching but not in all scenarios (e.g. only if Celery is invoked via command line, more info).
Airflow invokes Celery workers in Python via .run(). Unfortunately, this function does not handle monkey patching.
How to reproduce it:
git clone [email protected]:puckel/docker-airflow.git
RUN pip install eventlet
docker build --rm -t puckel/docker-airflow:1.10.9 .
pool = eventlet in airflow.cfg (the file will be mounted by docker-compose).
docker-compose -f docker-compose-CeleryExecutor.yml up -d
Solution:
Ideally this should be fixed in Celery, but in the meantime it might be good to have a solution here as well. Here is a patch that I applied to solve this (on Airflow 1.10.9):
--- cli.py 2020-03-27 17:05:45.000000000 -0400
+++ cli-new.py 2020-03-27 17:19:48.000000000 -0400
@@ -1098,7 +1098,10 @@
}
if conf.has_option("celery", "pool"):
- options["pool"] = conf.get("celery", "pool")
+ pool = conf.get("celery", "pool")
+ options["pool"] = pool
+ from celery import maybe_patch_concurrency
+ maybe_patch_concurrency(['-P', pool])
if args.daemon:
pid, stdout, stderr, log_file = setup_locations("worker",
Thanks for opening your first issue here! Be sure to follow the issue template!
@aamangeldi would you like to open a PR with suggested change? 馃殌
@turbaszek Yup, can do.
@aamangeldi thank you!
I experienced the same issue using exactly the same setup (Airflow 1.10.9 with puckel/docker-airflow). @aamangeldi s patch fixes the issue for me.
Is there any schedule for bringing this upstream?
I experienced the same issue using exactly the same setup (Airflow 1.10.9 with puckel/docker-airflow). @aamangeldi s patch fixes the issue for me.
Is there any schedule for bringing this upstream?
We can push that out in 1.10.11
Most helpful comment
We can push that out in 1.10.11