Apache Airflow version: 1.10.10
Kubernetes version (if you are using kubernetes): 1.14.10
Environment:
uname -a): AnyWhat happened:
DAGs with ExternalTaskMarker don't clean external task after second usage of clean on whole DAG
What you expected to happen:
All external task should be cleaned
How to reproduce it:
enable serialization store_serialized_dags = True
create example DAGs:
default_args = {'owner': 'airflow',
'start_date': datetime(2018, 1, 1)}
def hello_world_py(*args):
print('Hello World')
print('This is DAG is dep}')
schedule = '@daily'
dag_id = 'dep_dag'
with DAG(dag_id=dag_id,
schedule_interval=schedule,
default_args=default_args) as dag:
t1 = PythonOperator(task_id='hello_world',
python_callable=hello_world_py,)
dep_1 = ExternalTaskSensor(task_id='child_task1',
external_dag_id='hello_world_2',
external_task_id='parent_task',
mode='reschedule')
dep_1 >> t1
def create_dag(dag_id, schedule, dag_number, default_args):
dag = DAG(dag_id, schedule_interval=schedule,
default_args=default_args)
with dag:
t1 = PythonOperator(task_id='hello_world',
python_callable=hello_world_py,
dag_number=dag_number)
parent_task = SerializableExternalTaskMarker(task_id='parent_task',
external_dag_id='dep_dag',
external_task_id='child_task1')
t1 >> parent_task
return dag
for n in range(1, 4):
dag_id = 'hello_world_{}'.format(str(n))
default_args = {'owner': 'airflow',
'start_date': datetime(2018, 1, 1)}
schedule = '@daily'
dag_number = n
globals()[dag_id] = create_dag(dag_id, schedule, dag_number, default_args)
Anything else we need to know:
I think ExternalTaskMarker don't work because of serialization, after serialization each task instance get operator field equal 'SerializedBaseOperator' and markers logic dot' work here
To test ExternalTaskMarker with serialization you can use:
from airflow.sensors.external_task_sensor import ExternalTaskMarker
class FakeName(type):
def __new__(metacls, name, bases, namespace, **kw):
name = namespace.get("__name__", name)
return super().__new__(metacls, name, bases, namespace, **kw)
class SerializableExternalTaskMarker(ExternalTaskMarker, metaclass=FakeName):
# The _serialized_fields are lazily loaded when get_serialized_fields() method is called
__serialized_fields = None # type: Optional[FrozenSet[str]]
__name__ = "ExternalTaskMarker"
@classmethod
def get_serialized_fields(cls):
"""Serialized BigQueryOperator contain exactly these fields."""
if not cls.__serialized_fields:
cls.__serialized_fields = frozenset(
ExternalTaskMarker.get_serialized_fields() | {
"recursion_depth", "external_dag_id", "external_taskid", "execution_date"
}
)
return cls.__serialized_fields
@kaxil
Thanks for opening your first issue here! Be sure to follow the issue template!
@kaxil
Thanks for creating this issue, taking a look at it now
I was able to reproduce it, will fix it in soon and release it with 1.10.13