Airflow: Kubernetes executor is broken in Airflow 1.10.11

Created on 14 Jul 2020  路  17Comments  路  Source: apache/airflow

Apache Airflow version: 1.10.11

Kubernetes version (if you are using kubernetes) (use kubectl version): v1.15.11

Environment:

  • Cloud provider or hardware configuration: AWS
  • OS (e.g. from /etc/os-release):
  • Kernel (e.g. uname -a):
  • Install tools:
  • Others:

What happened:

The scheduler can't launch pods if KubernetesExecutor defines pod resources
The error message:

[2020-07-14 08:00:13,853] {{kubernetes_executor.py:758}} INFO - Add task ('DAG_ID', 'TASK_ID', datetime.datetime(2020, 7, 14, 7, 0, tzinfo=<Timezone [UTC]>), 1) with command ['airflow', 'run', 'DAG_ID', 'TASK_ID', '2020-07-14T07:00:00+00:00', '--local', '--pool', 'rider_scoring', '-sd', '/opt/airflow/dags/dag.py'] with executor_config {'KubernetesExecutor': {'request_cpu': '200m', 'limit_cpu': '200m', 'request_memory': '500Mi', 'limit_memory': '500Mi'}}
[2020-07-14 08:00:13,854] {{scheduler_job.py:1383}} ERROR - Exception when executing execute_helper
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1381, in _execute
    self._execute_helper()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1452, in _execute_helper
    if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1514, in _validate_and_run_task_instances
    self.executor.heartbeat()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/executors/base_executor.py", line 130, in heartbeat
    self.trigger_tasks(open_slots)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/executors/base_executor.py", line 154, in trigger_tasks
    executor_config=simple_ti.executor_config)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/executors/kubernetes_executor.py", line 761, in execute_async
    kube_executor_config = PodGenerator.from_obj(executor_config)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/kubernetes/pod_generator.py", line 374, in from_obj
    return PodGenerator(**namespaced).gen_pod()
TypeError: __init__() got an unexpected keyword argument 'request_cpu'
````

**What you expected to happen**:

The scheduler should launch new worker pods

**How to reproduce it**:

Define pod resources in `KubernetesExecutor`
```python
default_args = {
    'executor_config': {
        'KubernetesExecutor': {
            'request_cpu': "200m",
            'limit_cpu': "200m",
            'request_memory': "500Mi",
            'limit_memory': "500Mi"
        }
    }
}
dag = DAG(
    dag_id='foo',
    default_args=default_args,
)

Anything else we need to know:

I think there are two issues:

  • When #6230 has been backported to 1.10.11 branch, namespaced['resources'] = resources get lost. Commit in 1.10.11
  • PodGenerator gets as parameters namespaced object which contains the content of KubernetesExecutor.
    When resources are defined in KubernetesExecutor, PodGenerator receives request_cpu, limit_cpu, request_memory, limit_memory but they are not valid parameters for PodGenerator. I believe we need to delete them from namespaced. Another option would be to instantiate namespaced as an empty dict and add only the allowed parameter for PodGenerator.

FYI @dimberman @kaxil

bug

All 17 comments

Thanks for opening your first issue here! Be sure to follow the issue template!

@kaxil @dimberman - I guess that calls for rather fast 1.10.12 ?

But I think we should build more comprehensive tests in before And possibly make the #9663 - I am happy to work on that with you.

Yeah, we might need to. Let's test this today.

I tried with default settings y'day and all worked fine. Today. I will try with setting executor_config

So the fix for this might just be:

diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py
index e46407bd4..db380f271 100644
--- a/airflow/kubernetes/pod_generator.py
+++ b/airflow/kubernetes/pod_generator.py
@@ -371,6 +371,7 @@ class PodGenerator(object):
                 'iam.cloud.google.com/service-account': gcp_service_account_key
             })

+        namespaced['resources'] = resources
         return PodGenerator(**namespaced).gen_pod()

     @staticmethod

WDYT @odracci ?

oh we need to remove the params from namespaced too I think

@kaxil yes, we need to remove the params from namespaced

Came across the same problem yesterday and used this as a work around

default_args = {
    'executor_config': {
        'KubernetesExecutor': {
           'resources': {
            'limits' : {
             'cpu': "200m",
             'memory': "500Mi",
            },
            'requests' : {
             'cpu': "200m",
              'memory': "500Mi"
            }
        }
       }
    }
}

There is also the question of why we made a breaking change on a minor release:

# fails in 1.10.11
from airflow.contrib.kubernetes.XXX import XXX

# new package location in 1.10.11
from airflow.kubernetes.XXX import XXX

I am not sure if its related, but I also can no longer use PodRuntimeInfoEnv:

from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

KubernetesPodOperator(
    ...,
    pod_runtime_info_envs=[
        PodRuntimeInfoEnv(
            name="POD_IP",
            field_path="status.podIP"
        )
    ]
)

Will fail with:

[2020-08-04 00:52:30,004] {taskinstance.py:1150} ERROR - Invalid value for `field_path`, must not be `None`
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 979, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 291, in execute
    final_state, _, result = self.create_new_pod_for_operator(labels, launcher)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 374, in create_new_pod_for_operator
    self.volume_mounts  # type: ignore
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/kubernetes/k8s_model.py", line 61, in append_to_pod
    new_pod = reduce(lambda p, o: o.attach_to_pod(p), k8s_objects, pod)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/kubernetes/k8s_model.py", line 61, in <lambda>
    new_pod = reduce(lambda p, o: o.attach_to_pod(p), k8s_objects, pod)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/kubernetes/pod_runtime_info_env.py", line 57, in attach_to_pod
    env = self.to_k8s_client_obj()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/kubernetes/pod_runtime_info_env.py", line 50, in to_k8s_client_obj
    self.field_path
  File "/home/airflow/.local/lib/python3.6/site-packages/kubernetes/client/models/v1_object_field_selector.py", line 52, in __init__
    self.field_path = field_path
  File "/home/airflow/.local/lib/python3.6/site-packages/kubernetes/client/models/v1_object_field_selector.py", line 98, in field_path
    raise ValueError("Invalid value for `field_path`, must not be `None`")  # noqa: E501
ValueError: Invalid value for `field_path`, must not be `None`

The Webserver also has broken: https://github.com/apache/airflow/pull/10015

There is also the question of why we made a breaking change on a minor release:

It's bug.and it will be fixed in the next release. See: https://github.com/apache/airflow/pull/10067

Hi everyone,

I apologize for all the bugs on 1.10.11. That was on me. There were a lot of major refactors that were necessary for new features/to remove some older issues and unfortunately the k8s API is pretty comprehensive so a few things fell through the cracks. We'll be releasing 1.10.12 VERY shortly and are close to fixing all noted bugs.

Hi everyone,

I apologize for all the bugs on 1.10.11. That was on me. There were a lot of major refactors that were necessary for new features/to remove some older issues and unfortunately the k8s API is pretty comprehensive so a few things fell through the cracks. We'll be releasing 1.10.12 VERY shortly and are close to fixing all noted bugs.

Nothing to apologize for @dimberman. It was a very needed refactor and we were waiting for it to release cncf.kubernetes backport packages. I keep my fingers crossed. Also looking forward to finally start adding more and more tests to cover the Kubernetes executor to avoid regressions in the future as well.

I have this saying - the only sure way to not introduce bugs is not to do anything at all :). But doing nothing also means stagnating and falling behind :)

Thank you @potiuk :). Yes with the helm chart in the CI we should be able to add much more comprehensive system tests soon!

And not to forget the newer way is more powerful as it directly uses the k8s API :) -- so thanks Daniel for backporting that one

And not to forget the newer way is more powerful as it directly uses the k8s API :) -- so thanks Daniel for backporting that one

agree, thank you @dimberman . looking forward to using the them in 1.10.12 :)

fixed :) https://github.com/apache/airflow/pull/10084.

Man this was one hell of a chaos testing, but feeling pretty good with how many situations are handled now!

Was this page helpful?
0 / 5 - 0 ratings