Apache Airflow version: 1.10.11
Kubernetes version (if you are using kubernetes) (use kubectl version): v1.15.11
Environment:
uname -a):What happened:
The scheduler can't launch pods if KubernetesExecutor defines pod resources
The error message:
[2020-07-14 08:00:13,853] {{kubernetes_executor.py:758}} INFO - Add task ('DAG_ID', 'TASK_ID', datetime.datetime(2020, 7, 14, 7, 0, tzinfo=<Timezone [UTC]>), 1) with command ['airflow', 'run', 'DAG_ID', 'TASK_ID', '2020-07-14T07:00:00+00:00', '--local', '--pool', 'rider_scoring', '-sd', '/opt/airflow/dags/dag.py'] with executor_config {'KubernetesExecutor': {'request_cpu': '200m', 'limit_cpu': '200m', 'request_memory': '500Mi', 'limit_memory': '500Mi'}}
[2020-07-14 08:00:13,854] {{scheduler_job.py:1383}} ERROR - Exception when executing execute_helper
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1381, in _execute
self._execute_helper()
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1452, in _execute_helper
if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1514, in _validate_and_run_task_instances
self.executor.heartbeat()
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/executors/base_executor.py", line 130, in heartbeat
self.trigger_tasks(open_slots)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/executors/base_executor.py", line 154, in trigger_tasks
executor_config=simple_ti.executor_config)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/executors/kubernetes_executor.py", line 761, in execute_async
kube_executor_config = PodGenerator.from_obj(executor_config)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/kubernetes/pod_generator.py", line 374, in from_obj
return PodGenerator(**namespaced).gen_pod()
TypeError: __init__() got an unexpected keyword argument 'request_cpu'
````
**What you expected to happen**:
The scheduler should launch new worker pods
**How to reproduce it**:
Define pod resources in `KubernetesExecutor`
```python
default_args = {
'executor_config': {
'KubernetesExecutor': {
'request_cpu': "200m",
'limit_cpu': "200m",
'request_memory': "500Mi",
'limit_memory': "500Mi"
}
}
}
dag = DAG(
dag_id='foo',
default_args=default_args,
)
Anything else we need to know:
I think there are two issues:
namespaced['resources'] = resources get lost. Commit in 1.10.11PodGenerator gets as parameters namespaced object which contains the content of KubernetesExecutor.KubernetesExecutor, PodGenerator receives request_cpu, limit_cpu, request_memory, limit_memory but they are not valid parameters for PodGenerator. I believe we need to delete them from namespaced. Another option would be to instantiate namespaced as an empty dict and add only the allowed parameter for PodGenerator.FYI @dimberman @kaxil
Thanks for opening your first issue here! Be sure to follow the issue template!
@kaxil @dimberman - I guess that calls for rather fast 1.10.12 ?
But I think we should build more comprehensive tests in before And possibly make the #9663 - I am happy to work on that with you.
Yeah, we might need to. Let's test this today.
I tried with default settings y'day and all worked fine. Today. I will try with setting executor_config
So the fix for this might just be:
diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index e46407bd4..db380f271 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -371,6 +371,7 @@ class PodGenerator(object):
'iam.cloud.google.com/service-account': gcp_service_account_key
})
+ namespaced['resources'] = resources
return PodGenerator(**namespaced).gen_pod()
@staticmethod
WDYT @odracci ?
oh we need to remove the params from namespaced too I think
@kaxil yes, we need to remove the params from namespaced
Came across the same problem yesterday and used this as a work around
default_args = {
'executor_config': {
'KubernetesExecutor': {
'resources': {
'limits' : {
'cpu': "200m",
'memory': "500Mi",
},
'requests' : {
'cpu': "200m",
'memory': "500Mi"
}
}
}
}
}
There is also the question of why we made a breaking change on a minor release:
# fails in 1.10.11
from airflow.contrib.kubernetes.XXX import XXX
# new package location in 1.10.11
from airflow.kubernetes.XXX import XXX
I am not sure if its related, but I also can no longer use PodRuntimeInfoEnv:
from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
KubernetesPodOperator(
...,
pod_runtime_info_envs=[
PodRuntimeInfoEnv(
name="POD_IP",
field_path="status.podIP"
)
]
)
Will fail with:
[2020-08-04 00:52:30,004] {taskinstance.py:1150} ERROR - Invalid value for `field_path`, must not be `None`
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 979, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 291, in execute
final_state, _, result = self.create_new_pod_for_operator(labels, launcher)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 374, in create_new_pod_for_operator
self.volume_mounts # type: ignore
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/kubernetes/k8s_model.py", line 61, in append_to_pod
new_pod = reduce(lambda p, o: o.attach_to_pod(p), k8s_objects, pod)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/kubernetes/k8s_model.py", line 61, in <lambda>
new_pod = reduce(lambda p, o: o.attach_to_pod(p), k8s_objects, pod)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/kubernetes/pod_runtime_info_env.py", line 57, in attach_to_pod
env = self.to_k8s_client_obj()
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/kubernetes/pod_runtime_info_env.py", line 50, in to_k8s_client_obj
self.field_path
File "/home/airflow/.local/lib/python3.6/site-packages/kubernetes/client/models/v1_object_field_selector.py", line 52, in __init__
self.field_path = field_path
File "/home/airflow/.local/lib/python3.6/site-packages/kubernetes/client/models/v1_object_field_selector.py", line 98, in field_path
raise ValueError("Invalid value for `field_path`, must not be `None`") # noqa: E501
ValueError: Invalid value for `field_path`, must not be `None`
The Webserver also has broken: https://github.com/apache/airflow/pull/10015
There is also the question of why we made a breaking change on a minor release:
It's bug.and it will be fixed in the next release. See: https://github.com/apache/airflow/pull/10067
Hi everyone,
I apologize for all the bugs on 1.10.11. That was on me. There were a lot of major refactors that were necessary for new features/to remove some older issues and unfortunately the k8s API is pretty comprehensive so a few things fell through the cracks. We'll be releasing 1.10.12 VERY shortly and are close to fixing all noted bugs.
Hi everyone,
I apologize for all the bugs on 1.10.11. That was on me. There were a lot of major refactors that were necessary for new features/to remove some older issues and unfortunately the k8s API is pretty comprehensive so a few things fell through the cracks. We'll be releasing 1.10.12 VERY shortly and are close to fixing all noted bugs.
Nothing to apologize for @dimberman. It was a very needed refactor and we were waiting for it to release cncf.kubernetes backport packages. I keep my fingers crossed. Also looking forward to finally start adding more and more tests to cover the Kubernetes executor to avoid regressions in the future as well.
I have this saying - the only sure way to not introduce bugs is not to do anything at all :). But doing nothing also means stagnating and falling behind :)
Thank you @potiuk :). Yes with the helm chart in the CI we should be able to add much more comprehensive system tests soon!
And not to forget the newer way is more powerful as it directly uses the k8s API :) -- so thanks Daniel for backporting that one
And not to forget the newer way is more powerful as it directly uses the k8s API :) -- so thanks Daniel for backporting that one
agree, thank you @dimberman . looking forward to using the them in 1.10.12 :)
fixed :) https://github.com/apache/airflow/pull/10084.
Man this was one hell of a chaos testing, but feeling pretty good with how many situations are handled now!