I'm using Airflow 1.10.9 with celery executor and postgres and I need to have a nested loop with the first level having 2 options and the second level going into 1800 options to loop through.
I found if I go above 600 options on the second loop I get a duplicate key value error. It seems either two sessions of postgres connects or something else is triggering the duplicate entry of my first task.
Here is an example DAG that exhibits the issue when you adjust the range from 10 to 1800 for j. Below should reproduce the issue.
The setup is in docker and is using this docker compose file
https://github.com/puckel/docker-airflow/blob/master/docker-compose-CeleryExecutor.yml
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, date, timedelta
from airflow.models import DAG
import random
import csv
from airflow.utils.dates import days_ago
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def return_branch(**kwargs):
branches = ['branch_0,''branch_1', 'branch_2', 'branch_3', 'branch_4']
return random.choice(branches)
with DAG(
dag_id='branch_demo',
default_args=default_args,
dagrun_timeout=timedelta(hours=2),
schedule_interval='0 11 * * *'
) as dag:
kick_off_dag = BashOperator(task_id='run_this_first',bash_command='echo "first"')
branching = BranchPythonOperator(
task_id='branching',
python_callable=return_branch,
provide_context=True)
kick_off_dag >> branching
for i in range(2):
d = BashOperator(task_id='branch_{0}'.format(i),bash_command='echo "job"')
for j in range(1800):
m = BashOperator(task_id='branch_{0}_{1}'.format(i, j),bash_command='echo "done"')
d >> m
branching >> d
Postgres log error
ERROR: duplicate key value violates unique constraint "task_instance_pkey"
DETAIL: Key (task_id, dag_id, execution_date)=(run_this_first, branch_demo, 2020-04-07 18:16:14.566623+00) already exists.
STATEMENT: INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, max_tries, hostname, unixname, job_id, pool, queue, priority_weight, operator, queued_dttm, pid, executor_config) VALUES ('run_this_first', 'branch_demo', '2020-04-07T18:16:14.566623+00:00'::timestamptz, NULL, NULL, NULL, NULL, 0, 1, '', 'root', NULL, 'default_pool', 'default', 3604, NULL, NULL, NULL, '\x80047d942e'::bytea)
Traceback from scheduler logs:
Process DagFileProcessor102313-Process:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1226, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 854, in do_executemany
cursor.executemany(statement, parameters)
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "task_instance_pkey"
DETAIL: Key (task_id, dag_id, execution_date)=(create_cluster, predict_model_training_combined-2, 2020-04-07 13:59:38.550927+00) already exists.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 157, in _run_file_processor
pickle_dags)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1609, in process_file
self._process_dags(dagbag, dags, ti_keys_to_schedule)
File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1293, in _process_dags
self._process_task_instances(dag, tis_out)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 778, in _process_task_instances
run.verify_integrity(session=session)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 70, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 400, in verify_integrity
session.commit()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1036, in commit
self.transaction.commit()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 503, in commit
self._prepare_impl()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 482, in _prepare_impl
self.session.flush()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2479, in flush
self._flush(objects)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2617, in _flush
transaction.rollback(_capture_exception=True)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
compat.reraise(exc_type, exc_value, exc_tb)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
raise value
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2577, in _flush
flush_context.execute()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
rec.execute(self)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
uow,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
insert,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1084, in _emit_insert_statements
c = cached_connections[connection].execute(statement, multiparams)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 982, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 293, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1101, in _execute_clauseelement
distilled_params,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1250, in _execute_context
e, statement, parameters, cursor, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1476, in _handle_dbapi_exception
util.raise_from_cause(sqlalchemy_exception, exc_info)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 152, in reraise
raise value.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1226, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 854, in do_executemany
cursor.executemany(statement, parameters)
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "task_instance_pkey"
DETAIL: Key (task_id, dag_id, execution_date)=(create_cluster, predict_model_training_combined-2, 2020-04-07 13:59:38.550927+00) already exists.
[SQL: INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, max_tries, hostname, unixname, job_id, pool, queue, priority_weight, operator, queued_dttm, pid, executor_config) VALUES (%(task_id)s, %(dag_id)s, %(execution_date)s, %(start_date)s, %(end_date)s, %(duration)s, %(state)s, %(try_number)s, %(max_tries)s, %(hostname)s, %(unixname)s, %(job_id)s, %(pool)s, %(queue)s, %(priority_weight)s, %(operator)s, %(queued_dttm)s, %(pid)s, %(executor_config)s)]
[parameters: ({'task_id': 'create_cluster', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 7119, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd653458570>}, {'task_id': 'before_training', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 7118, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd6534582d0>}, {'task_id': 'after_training', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 1, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd653458540>}, {'task_id': 'remove_cluster', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 1, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd653458330>}, {'task_id': 'train_model_AGGH_eon', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 3, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd653458180>}, {'task_id': 'watch_training_AGGH_eon', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 2, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd653458360>}, {'task_id': 'train_model_AGGH_ein', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 3, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd653458630>}, {'task_id': 'watch_training_AGGH_ein', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 2, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd6534586f0>} ... displaying 10 of 7120 total bound parameter sets ... {'task_id': 'train_model_ZYYJ_ein', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 3, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd652758660>}, {'task_id': 'watch_training_ZYYJ_ein', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 2, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd6527586f0>})]
(Background on this error at: http://sqlalche.me/e/gkpj)
Process DagFileProcessor104982-Process:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1226, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 854, in do_executemany
cursor.executemany(statement, parameters)
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "task_instance_pkey"
DETAIL: Key (task_id, dag_id, execution_date)=(create_cluster, predict_model_training_combined-4, 2020-04-07 14:41:02.111727+00) already exists.
Thanks for opening your first issue here! Be sure to follow the issue template!
NOTE: SequentialExecuter is not affected. Local and Celery are. I'm trying Celery again with less parallelism in the config.
Have you checked whether changing option max-tis-per-query affects this problem?
https://airflow.readthedocs.io/en/latest/configurations-ref.html#max-tis-per-query
I tried changing it from 512 to 10000 to 0 (for unlimited) and i still have the same problem.
I have no idea what might be causing this problem. Do you want to work on this ticket? Airflow is an Open Source project, so there is no paid technical support. Each problem is solved by other users who have a similar problem.
Thank you very much for reporting the error and the exact description.
Yes i would like to, we are trying to implement airflow for model training. It would work with SequentialExecuter but that would take forever to get through this many tasks. I'm experimenting with adding a sleep task or two and adjusting concurrency per task. So far it makes the postgres error go away but tasks are stuck in a None state and not queuing or starting/scheduling.
If the tasks are in the None state then the problem exists somewhere in this block of code.
https://github.com/apache/airflow/blob/master/airflow/jobs/scheduler_job.py#L861-L863
1 line create task instance with none state
3 line change state from none to SCHEDULED
Please note that this is beyond the main scheduler and you must view the logs in the file.
If you want to see increase verbosity in logs then run the following command:
export AIRFLOW__CORE__LOGGING_LEVEL=debug
Yeah i have debug enabled. Not seeing anything stand out.
I'll capture some debug logs and upload them here to see if you spot anything out of the ordinary.
I discovered that 1.10.2 is not affected by my problem. I went through several versions going back from 1.10.9 and each one had the same issue until i got to 1.10.2. If I lower the parallelism 1.10.2 does not get the database error, it still gets the error at higher configs of parallelism but somehow the tasks still progresses.
Attached are the logs surrounding my attempt at running my test DAG along with the pkey failure error.
We met another similar problem when using RestApi POST /api/experimental/dags/<DAG_ID>/dag_runs to trigger a DAG Run.
However there is a unique key for (dag_id, execution_date) and execution_date is timestamp (i.e. ms) level. When trigger multi DAG Runs programatically, it's likely to get two DAG Runs with the same execution_date and duplicate key error will occur.
I think execution_date is a decent but not a good design to identify the DAG Run's uniqueness. 馃
In the other hand, the mysql id is more reasonable in some certain scenes.
As an update, latest 1.10.10 is still affected by this problem.