Apache Airflow version: 1.10.10
Kubernetes version: v1.16.8
MySQL version: 5.7
What happened:
Airflow tasks fail with Deadlock when running Dag with max_active_runs > 1 and concurrency > 1 and when dag_serialization is enabled.
Logs
[2020-04-22 19:19:49,018] {taskinstance.py:1145} ERROR - (_mysql_exceptions.OperationalError) (1205, 'Lock wait timeout exceeded; try restarting transaction')
[SQL: INSERT INTO rendered_task_instance_fields (dag_id, task_id, execution_date, rendered_fields) VALUES (%s, %s, %s, %s)]
[parameters: ('some_dag_v.0.0.1', 'some_task_id', datetime.datetime(2019, 12, 2, 0, 0), 'Some rendered fields (837 characters truncated)')](Background on this error at: http://sqlalche.me/e/e3q8)
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1248, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 590, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 255, in execute
self.errorhandler(self, exc, value)
File "/usr/local/lib/python3.7/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
raise errorvalue
File "/usr/local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 252, in execute
res = self._query(query)
File "/usr/local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 378, in _query
db.query(q)
File "/usr/local/lib/python3.7/site-packages/MySQLdb/connections.py", line 280, in query
_mysql.connection.query(self, query)
_mysql_exceptions.OperationalError: (1205, 'Lock wait timeout exceeded; try restarting transaction')The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1002, in _run_raw_task
self.refresh_from_db(lock_for_update=True)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
return func(args, *kwargs)
File "/usr/local/lib/python3.7/contextlib.py", line 119, in __exit__
next(self.gen)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 45, in create_session
session.commit()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1036, in commit
self.transaction.commit()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 503, in commit
self._prepare_impl()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 482, in _prepare_impl
self.session.flush()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2496, in flush
self._flush(objects)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2637, in _flush
transaction.rollback(_capture_exception=True)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 69, in __exit__
exc_value, with_traceback=exc_tb,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 178, in raise_
raise exception
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2597, in _flush
flush_context.execute()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
rec.execute(self)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
uow,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
insert,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1083, in _emit_insert_statements
c = cached_connections[connection].execute(statement, multiparams)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 984, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 293, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1103, in _execute_clauseelement
distilled_params,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1288, in _execute_context
e, statement, parameters, cursor, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1482, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 178, in raise_
raise exception
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1248, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 590, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 255, in execute
self.errorhandler(self, exc, value)
File "/usr/local/lib/python3.7/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
raise errorvalue
File "/usr/local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 252, in execute
res = self._query(query)
File "/usr/local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 378, in _query
db.query(q)
File "/usr/local/lib/python3.7/site-packages/MySQLdb/connections.py", line 280, in query
_mysql.connection.query(self, query)
Thanks for opening your first issue here! Be sure to follow the issue template!
I encountered the same problem. Is this problem solved now?
The following two SQL statements will report an error:
ERROR - (_mysql_exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
SQL: INSERT INTO rendered_task_instance_fields (dag_id, task_id, execution_date, rendered_fields) VALUES (%s, %s, %s, %s)
SQL: DELETE FROM rendered_task_instance_fields WHERE rendered_task_instance_fields.dag_id = %s AND rendered_task_instance_fields.task_id = %s AND (rendered_task_instance_fields.dag_id, rendered_task_instance_fields.task_id, rendered_task_instance_fields.execution_date) NOT IN (SELECT subq1.dag_id, subq1.task_id, subq1.execution_date
FROM (SELECT rendered_task_instance_fields.dag_id AS dag_id, rendered_task_instance_fields.task_id AS task_id, rendered_task_instance_fields.execution_date AS execution_date
FROM rendered_task_instance_fields
WHERE rendered_task_instance_fields.dag_id = %s AND rendered_task_instance_fields.task_id = %s ORDER BY rendered_task_instance_fields.execution_date DESC
LIMIT %s) AS subq1)
Same problem +1
Same problem +1
Apache Airflow version: 1.10.11
Hi, I have the same issue.
I was looking in models/renderedtifields.py file and I noticed that
def delete_old_records(
contains a line that loads the number or rendered fields to keep:
num_to_keep=conf.getint("core", "max_num_rendered_ti_fields_per_task", fallback=0)
and if this value is <= 0 the function will return doing nothing.
if num_to_keep <= 0:
return
Since the dead lock is about the insert and the delete in that table, setting max_num_rendered_ti_fields_per_task = 0 inside the [core] config ... perhaps can fix the issue.
Of course it does not work.
Using SHOW ENGINE INNODB STATUS I see queries like this:
DELETE FROM rendered_task_instance_fields
WHERE rendered_task_instance_fields.dag_id = 'PARTITIONADD'
AND rendered_task_instance_fields.task_id = 'partition_add'
AND (rendered_task_instance_fields.dag_id, rendered_task_instance_fields.task_id, rendered_task_instance_fields.execution_date) NOT IN (
SELECT subq1.dag_id, subq1.task_id, subq1.execution_date
FROM (
SELECT rendered_task_instance_fields.dag_id AS dag_id, rendered_task_instance_fields.task_id AS task_id,
rendered_task_instance_fields.execution_date AS execution_date
FROM rendered_task_instance_fields
WHERE rendered_task_instance_fields.dag_id = 'PARTITIONADD'
AND rendered_task_instance_fields.task_id = 'partition_add'
ORDER BY rendered_task_instance_fields.execution_date DESC
LIMIT 30
)
AS subq1
)
-----> Please note LIMIT 30
I found this code insidemodels/taskinstance.py
if STORE_SERIALIZED_DAGS:
RTIF.write(RTIF(ti=self, render_templates=False), session=session)
RTIF.delete_old_records(self.task_id, self.dag_id, session=session)
and it's the unique place where delete_old_records is called, so ... it is weird, is it not?
from which point of the universe comes that "30"?
I'll investigate better tomorrow...
max_num_rendered_ti_fields_per_task = 0
seems that fixed my problems. Of course can only be a temporary fix.
I moved the table cleaning to external task.
In my case issue is about concurrent dags, the insertion issue is fixed in airflow 1.10.11 fixed PR
Steps to reproduce:
If X tasks of a dag ran at the same time, (X - 1) of them will face lock and just 1 of them return success!
[2020-08-17 10:42:30,177] {taskinstance.py:882} INFO -
--------------------------------------------------------------------------------
[2020-08-17 10:42:30,187] {taskinstance.py:901} INFO - Executing <Task(BranchPythonOperator): branch_task> on 2020-08-17T10:34:24.115521+00:00
[2020-08-17 10:42:30,190] {standard_task_runner.py:54} INFO - Started process 553 to run task
[2020-08-17 10:42:30,212] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'logo_detection_webhook', 'branch_task', '2020-08-17T10:34:24.115521+00:00', '--job_id', '169070', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/model_training/logo_detection_webhook.py', '--cfg_path', '/tmp/tmprvq51le4']
[2020-08-17 10:42:30,213] {standard_task_runner.py:78} INFO - Job 169070: Subtask branch_task
[2020-08-17 10:42:30,251] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: logo_detection_webhook.branch_task 2020-08-17T10:34:24.115521+00:00 [running]> 91a09a3997d3
[2020-08-17 10:43:21,389] {taskinstance.py:1150} ERROR - (_mysql_exceptions.OperationalError) (1205, 'Lock wait timeout exceeded; try restarting transaction')
[SQL: DELETE FROM rendered_task_instance_fields WHERE rendered_task_instance_fields.dag_id = %s AND rendered_task_instance_fields.task_id = %s AND (rendered_task_instance_fields.dag_id, rendered_task_instance_fields.task_id, rendered_task_instance_fields.execution_date) NOT IN (SELECT subq1.dag_id, subq1.task_id, subq1.execution_date
FROM (SELECT rendered_task_instance_fields.dag_id AS dag_id, rendered_task_instance_fields.task_id AS task_id, rendered_task_instance_fields.execution_date AS execution_date
FROM rendered_task_instance_fields
WHERE rendered_task_instance_fields.dag_id = %s AND rendered_task_instance_fields.task_id = %s ORDER BY rendered_task_instance_fields.execution_date DESC
LIMIT %s) AS subq1)]
[parameters: ('logo_detection_webhook', 'branch_task', 'logo_detection_webhook', 'branch_task', 30)]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1278, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 255, in execute
self.errorhandler(self, exc, value)
File "/usr/local/lib/python3.7/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
raise errorvalue
File "/usr/local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 252, in execute
res = self._query(query)
File "/usr/local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 378, in _query
db.query(q)
File "/usr/local/lib/python3.7/site-packages/MySQLdb/connections.py", line 280, in query
_mysql.connection.query(self, query)
_mysql_exceptions.OperationalError: (1205, 'Lock wait timeout exceeded; try restarting transaction')
Is this issue still present in 1.10.12?
Is this issue still present in 1.10.12?
Have fixed in 1.10.2.
https://github.com/apache/airflow/pull/9993/files
But I am not sure this is work,I fix it by change subquery in delete
I will mark it as fixed then @zorseti. I think we can always re-open in case we still see it happening :)