Apache Airflow 1.10.10:
Kubernetes version (if you are using kubernetes) (use kubectl version):
Environment:
CENTOS_MANTISBT_PROJECT="CentOS-7"
CENTOS_MANTISBT_PROJECT_VERSION="7"
REDHAT_SUPPORT_PRODUCT="centos"
REDHAT_SUPPORT_PRODUCT_VERSION="7"
Kernel (e.g. uname -a):
Linux mid1-t029nifi-1 3.10.0-327.28.3.el7.x86_64 #1 SMP Thu Aug 18 19:05:49 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux
Install tools:
pip, yum
Others:
What happened:
When dag serialisation is active, If I manually start an operator, the 1st one works fine, the next will fail with this error:
Could not queue task instance for execution, dependencies not met: Trigger Rule: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'skipped': Decimal('0'), 'successes': Decimal('0'), 'failed': Decimal('0'), 'upstream_failed': Decimal('0'), 'done': 0L, 'total': 1}, upstream_task_ids=set([u'query']
Settings dag serialisation to false the problem does not arise.
please note : _Scheduler works fine_.
What you expected to happen:
I expected to start manually all the dag's tasks from the 1st one to the last.
Code is not able to correctly find the task's status that is before the one I'm restarting.
If I start the 1st operator, anything works fine.
You can reproduce it following these steps:
op1 >> op2
Anything else we need to know:
This happens every time.
Mysql 5.7.x, Python 2.7
Interesting one!
Can you share your DAG please @ozw1z5rd
And also I would strongly suggest upgrading to Python 3
You must enable dag serialisation to replicate my issue, no serialisation no issue on company's system.
These are my setting ( from pilot installation )
min_serialized_dag_update_interval = 15
store_dag_code = True
max_num_rendered_ti_fields_per_task = 0 # this avoid the problem of Dead lock, which seems to affect MySQL only engine
Any dag is affected, my tests where on this specific one:
from builtins import range
from datetime import timedelta
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
args = {
'owner': 'Airflow',
'start_date': days_ago(2),
}
dag = DAG(
dag_id='example_sequence_restart',
default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
tags=['example']
)
run_this_last = DummyOperator(
task_id='run_this_last',
dag=dag,
)
# [START howto_operator_bash]
run_this = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
dag=dag,
)
# [END howto_operator_bash]
run_this >> run_this_last
task = BashOperator(
task_id='start',
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
dag=dag,
)
task >> run_this
# [START howto_operator_bash_template]
also_run_this = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
dag=dag,
)
# [END howto_operator_bash_template]
also_run_this >> run_this_last
I have to say that after the database migration I changes the database a bit:
dag_tag
change the constraint to
CONSTRAINT dag_tag_ibfk_1 FOREIGN KEY (dag_id) REFERENCES dag (dag_id) on delete cascade
rendered_task_instance_fields
changed the execution_date from timestamp to timestamp(6)
execution_date timestamp(6)
task_fail
changed the execution_date to timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6)
I was before these changes ( mostly the on on rendered_task_instance_fields ) i was unable to manually trigger the same task twice and get the two execution completed without errors. One completed, the other was unable to make the insert into the rendered_task_instance_fields:
IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'PARTITIONADD-partition_add-2020-07-28 17:17:13' for key 'PRIMARY'")
[SQL: INSERT INTO rendered_task_instance_fields (dag_id, task_id, execution_date, rendered_fields) VALUES (%s, %s, %s, %s)]
[parameters: ('PARTITIONADD', 'partition_add', datetime.datetime(2020, 7, 28, 17, 17, 13, 315192), '{"hql": "\\n ALTER TABLE unifieddata_cat.transient_ww_eventsjson\\n ADD IF NOT EXISTS PARTITION( country = \'{country}\',year ... (158 characters truncated) ... e_url": "http://httpfs-preprod.hd.docomodigital.com:14000", "hdfs_path_pattern": "/Vault/Docomodigital/Preproduction/rawEvents/{country}/2020/07/28"}')]
(Background on this error at: http://sqlalche.me/e/gkpj)
After the change on excution_time anything worked fine.
Definiitely Python 3 is one of the best choices you can make now @ozw1z5rd !
And also I would strongly suggest upgrading to Python 3
Yes, I agree. However, we need to convert our customizations code to Python 3.. So for next months, if we like or not it, Python 2.7 still will stay with us.
Yes, I agree. However, we need to convert our customizations code to Python 3.. So for next months, if we like or not it, Python 2.7 still will stay with us.
<advertisment>Do you need help with making the move faster :)? Maybe we can help :D </advertisement>
I get this error too, and fix it:
vim airflow/serialization/serialized_objects.py +582
replace:
dag.task_dict[task_id]._upstream_task_ids.add(task_id) # pylint: disable=protected-access
to
dag.task_dict[task_id]._upstream_task_ids.add(task.task_id) # pylint: disable=protected-access
because the operator get the wrong upstream_task_ids, just add the right upstream task id,and works fine.
This has been fixed in 1.10.11 - https://github.com/apache/airflow/pull/8775
Most helpful comment
And also I would strongly suggest upgrading to Python 3