Apache Airflow version: 1.10.7, 1.10.10, 1.10.12
Environment:
uname -a): DebianWhat happened:
For the last several versions of Airflow, we've noticed that when a task receives a SIGTERM signal (currently represented as Task exited with return code Negsignal.SIGKILL, though previously represented as Task exited with return code -9), the failure email would be sent, but the on_failure_callback would not be called.
This happened fairly frequently in the past for us as we had tasks that would consume high amounts of memory and occasionally we would have too many running on the same worker and the tasks would be OOM killed. In these instances, we would receive failure emails with the contents detected as zombie and the on_failure_callback would not be called. We were hoping #7025 would resolve this with the most recent upgrade (and we've also taken steps to reduce our memory footprint), but we just had this happen again recently.
What you expected to happen:
If a tasks fails (even if the cause of the failure is a lack of resources), I would hope the on_failure_callback would still be called.
How to reproduce it:
Example DAG setup:
CODE
# -*- coding: utf-8 -*-
"""
# POC: On Failure Callback for SIGKILL
"""
from datetime import datetime
import numpy as np
from airflow import DAG
from airflow.api.common.experimental.trigger_dag import trigger_dag
from airflow.operators.python_operator import PythonOperator
def on_failure_callback(**context):
print("===IN ON FAILURE CALLBACK===")
print("Triggering another run of the task")
trigger_dag("OOM_test_follower")
def high_memory_task():
l = []
iteration = 0
while True:
print(f"Iteration: {iteration}")
l.append(np.random.randint(1_000_000, size=(1000, 1000, 100)))
iteration += 1
def failure_task():
raise ValueError("whoops")
def print_context(**context):
print("This DAG was launched by the failure callback")
print(context)
dag = DAG(
dag_id="OOM_test",
schedule_interval=None,
catchup=False,
default_args={
"owner": "madison.bowden",
"start_date": datetime(year=2019, month=7, day=1),
"email": "your-email",
},
)
with dag:
PythonOperator(
task_id="oom_task",
python_callable=high_memory_task,
on_failure_callback=on_failure_callback,
)
failure_dag = DAG(
dag_id="Failure_test",
schedule_interval=None,
catchup=False,
default_args={
"owner": "madison.bowden",
"start_date": datetime(year=2019, month=7, day=1),
"email": "your-email",
},
)
with failure_dag:
PythonOperator(
task_id="failure_task",
python_callable=failure_task,
on_failure_callback=on_failure_callback,
)
dag_follower = DAG(
dag_id="OOM_test_follower",
schedule_interval=None,
catchup=False,
default_args={
"owner": "madison.bowden",
"start_date": datetime(year=2019, month=7, day=1),
"email": "your-email",
},
)
with dag_follower:
PythonOperator(
task_id="oom_task_failure", python_callable=print_context, provide_context=True
)
With the above example, the Failure_test should trigger a run of the OOM_test_follower DAG when it fails. The OOM_test DAG when triggered should quickly run out of memory and then not trigger a run of the OOM_test_follower DAG.
Anything else we need to know:
Thanks for opening your first issue here! Be sure to follow the issue template!
Related to #10917 ?
Related to #10917 ?
Good find! I think this issue is related, but not quite the same. This particular case is explicitly mentioned in @houqp's response here, and it looks like the change that would rectify this (moving callback handling to local_scheduler_job rather than local_task_job, I think) is out of scope for that particular change.
yeah, those are two different issues. @madison-ookla what executor are you using?
If you run into OOM, the raw task process will receive a SIGKILL instead of SIGTERM, which cannot be captured and handled by the process itself. #7025 doesn't solve this problem because it only handles cases where raw task process did not exit by itself or were killed by SIGKILL.
I think to properly fix this bug, we will need to move the failure callback invocation into caller of raw task process, e.g. local_task_job. That way, we can just check for return code from the raw task and always invoke failure callback if it's not 0, which should cover the SIGKILL case.
I will update my #10917 PR to cover this.
@houqp Totally agree with your assessment, and if you can incorporate that in #10917 that would be amazing! We definitely only want the callback issued once, and ideally issued regardless what happens to the process (SIGKILL vs SIGTERM). FWIW we're using the CeleryExecutor.
Thanks a ton!
Most helpful comment
yeah, those are two different issues. @madison-ookla what executor are you using?
If you run into OOM, the raw task process will receive a SIGKILL instead of SIGTERM, which cannot be captured and handled by the process itself. #7025 doesn't solve this problem because it only handles cases where raw task process did not exit by itself or were killed by SIGKILL.
I think to properly fix this bug, we will need to move the failure callback invocation into caller of raw task process, e.g.
local_task_job. That way, we can just check for return code from the raw task and always invoke failure callback if it's not 0, which should cover the SIGKILL case.I will update my #10917 PR to cover this.