Airflow: ExternalTaskMarker don't work with store_serialized_dags

Created on 8 Sep 2020  路  4Comments  路  Source: apache/airflow

Apache Airflow version: 1.10.10

Kubernetes version (if you are using kubernetes): 1.14.10

Environment:

  • Cloud provider or hardware configuration: Any, GCP
  • OS (e.g. from /etc/os-release): Ubuntu 1.10.10
  • Kernel (e.g. uname -a): Any
  • Install tools: Any
  • Others: NA

What happened:
DAGs with ExternalTaskMarker don't clean external task after second usage of clean on whole DAG

What you expected to happen:

All external task should be cleaned

How to reproduce it:
enable serialization store_serialized_dags = True

create example DAGs:

default_args = {'owner': 'airflow',
                'start_date': datetime(2018, 1, 1)}

def hello_world_py(*args):
    print('Hello World')
    print('This is DAG is dep}')

schedule = '@daily'
dag_id = 'dep_dag'
with DAG(dag_id=dag_id,
         schedule_interval=schedule,
         default_args=default_args) as dag:
    t1 = PythonOperator(task_id='hello_world',
                        python_callable=hello_world_py,)
    dep_1 = ExternalTaskSensor(task_id='child_task1',
                               external_dag_id='hello_world_2',
                               external_task_id='parent_task',
                               mode='reschedule')
    dep_1 >> t1

def create_dag(dag_id, schedule, dag_number, default_args):
    dag = DAG(dag_id, schedule_interval=schedule,
              default_args=default_args)
    with dag:
        t1 = PythonOperator(task_id='hello_world',
                            python_callable=hello_world_py,
                            dag_number=dag_number)
        parent_task = SerializableExternalTaskMarker(task_id='parent_task',
                                         external_dag_id='dep_dag',
                                         external_task_id='child_task1')
        t1 >> parent_task
    return dag

for n in range(1, 4):
    dag_id = 'hello_world_{}'.format(str(n))
    default_args = {'owner': 'airflow',
                    'start_date': datetime(2018, 1, 1)}
    schedule = '@daily'
    dag_number = n
    globals()[dag_id] = create_dag(dag_id, schedule, dag_number, default_args)
  1. Run both DAGs
  2. Wait until first few dagruns where completed
  3. Clean first dugrun in DAG with marker
  4. Check external dug was cleaned on this date
  5. Mark success this date in each DAGs or wait until complete
  6. Clean DAG with marker second time on same date
  7. ExternalTaskMarker don't work

Anything else we need to know:
I think ExternalTaskMarker don't work because of serialization, after serialization each task instance get operator field equal 'SerializedBaseOperator' and markers logic dot' work here

To test ExternalTaskMarker with serialization you can use:

from airflow.sensors.external_task_sensor import ExternalTaskMarker
class FakeName(type):

    def __new__(metacls, name, bases, namespace, **kw):
         name = namespace.get("__name__", name)
         return super().__new__(metacls, name, bases, namespace, **kw)
class SerializableExternalTaskMarker(ExternalTaskMarker, metaclass=FakeName):

    # The _serialized_fields are lazily loaded when get_serialized_fields() method is called
    __serialized_fields = None  # type: Optional[FrozenSet[str]]
    __name__ = "ExternalTaskMarker"

    @classmethod
    def get_serialized_fields(cls):
        """Serialized BigQueryOperator contain exactly these fields."""
        if not cls.__serialized_fields:
            cls.__serialized_fields = frozenset(
                ExternalTaskMarker.get_serialized_fields() | {
                    "recursion_depth", "external_dag_id", "external_taskid", "execution_date"
                }
            )
        return cls.__serialized_fields

@kaxil

serialization bug

All 4 comments

Thanks for opening your first issue here! Be sure to follow the issue template!

@kaxil

Thanks for creating this issue, taking a look at it now

I was able to reproduce it, will fix it in soon and release it with 1.10.13

Was this page helpful?
0 / 5 - 0 ratings

Related issues

blackw1ng picture blackw1ng  路  3Comments

darwinyip picture darwinyip  路  3Comments

mik-laj picture mik-laj  路  3Comments

hagope picture hagope  路  4Comments

mik-laj picture mik-laj  路  4Comments