I am following the instructions here
https://airflow.readthedocs.io/en/stable/executor/debug.html
root@ip-172-16-101-169:/usr/local/airflow/dags/dbscore# cat tasks.py
import sys
import traceback
from datetime import datetime
from scripts.dbscore.data import load_data
# from scripts.dbscore.models import send_failure_info_by_email, send_trained_models_by_email
# from scripts.dbscore.train import XGBoostTraining
#from scripts.dbscore.utils import generate_s3_logs_dir
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from config.dbscore import config
dag = DAG(
dag_id='sklearn-dag',
start_date=datetime(2020, 1, 10),
schedule_interval=None,
)
from airflow.operators.dummy_operator import DummyOperator
import os
import sys
def print_hello():
print('hey')
return sys.path, os.environ['PYTHONPATH'].split(os.pathsep)
dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)
hello_operator = PythonOperator(task_id='hello_task', python_callable=print_hello, dag=dag)
dummy_operator >> hello_operator
if __name__ == '__main__':
dag.clear(reset_dag_runs=True)
dag.run()
and then if I run this
AIRFLOW__CORE__EXECUTOR=DebugExecutor python dags/dbscore/tasks.py
I get the following output
(.venv) root@ip-172-16-101-169:/usr/local/airflow# AIRFLOW__CORE__EXECUTOR=DebugExecutor python dags/dbscore/tasks.py
/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/configuration.py:241: FutureWarning: The task_runner setting in [core] has the old default value of 'BashTaskRunner'. This value has been changed to 'StandardTaskRunner' in the running config, but please update your config before Apache Airflow 2.0.
FutureWarning
/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/configuration.py:631: DeprecationWarning: Specifying both AIRFLOW_HOME environment variable and airflow_home in the config file is deprecated. Please use only the AIRFLOW_HOME environment variable and remove the config file entry.
warnings.warn(msg, category=DeprecationWarning)
/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/config_templates/airflow_local_settings.py:159: DeprecationWarning: The elasticsearch_host option in [elasticsearch] has been renamed to host - the old setting has been used, but please update your config.
ELASTICSEARCH_HOST = conf.get('elasticsearch', 'HOST')
[2020-04-02 04:02:48,994] {settings.py:253} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=2000, pid=24900
[2020-04-02 04:02:49,372] {__init__.py:51} INFO - Using executor DebugExecutor
[2020-04-02 04:02:49,501] {base_executor.py:58} INFO - Adding to queue: ['<TaskInstance: sklearn-dag.dummy_task 2020-01-10 00:00:00+00:00 [queued]>']
[2020-04-02 04:02:54,410] {taskinstance.py:1048} INFO - Marking task as SUCCESS.dag_id=sklearn-dag, task_id=dummy_task, execution_date=20200110T000000, start_date=20200402T040249, end_date=20200402T040254
Traceback (most recent call last):
File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/executors/debug_executor.py", line 81, in _run_task
self.change_state(key, State.SUCCESS)
File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/executors/debug_executor.py", line 148, in change_state
self.running.remove(key)
AttributeError: 'dict' object has no attribute 'remove'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "dags/dbscore/tasks.py", line 180, in <module>
dag.run()
File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/models/dag.py", line 1399, in run
job.run()
File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 221, in run
self._execute()
File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/jobs/backfill_job.py", line 788, in _execute
session=session)
File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/utils/db.py", line 70, in wrapper
return func(*args, **kwargs)
File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/jobs/backfill_job.py", line 718, in _execute_for_run_dates
session=session)
File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/utils/db.py", line 70, in wrapper
return func(*args, **kwargs)
File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/jobs/backfill_job.py", line 593, in _process_backfill_task_instances
executor.heartbeat()
File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 134, in heartbeat
self.sync()
File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/executors/debug_executor.py", line 71, in sync
task_succeeded = self._run_task(ti)
File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/executors/debug_executor.py", line 84, in _run_task
self.change_state(key, State.FAILED)
File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/executors/debug_executor.py", line 148, in change_state
self.running.remove(key)
AttributeError: 'dict' object has no attribute 'remove'
Apache Airflow version: 1.10.9
Kubernetes version (if you are using kubernetes) (use kubectl version):
Environment:
uname -a): 4.15.0-1063-awsThe execution failed with traceback in the console
What you expected to happen:
Execution to finish, so I can enable the IPDB session if necessary
Thanks for opening your first issue here! Be sure to follow the issue template!
cc @turbaszek ?
Will take a look at that!
I also ran into this problem. I made a hotfix and forgot to get a bug:
def change_state(self, key, state):
self.log.debug("Popping %s from executor task queue.", key)
del self.running[key]
self.event_buffer[key] = state
I haven鈥檛 met after this fix problems
@khyurri I think that changing remove to pop should help (Probably some types have changed). Would you like to take over this issue? Or should I prepare a fix? :)
I can take it
@turbaszek Should I create PR into v1-10-stable?
Looks like at master self.running has a different type (set, instead dict):
self.running: Set[TaskInstanceKeyType] = set()
@potiuk can you cherrypick this change to 1.10?
@khyurri feel free to open PR to v1-10-test :)
FYI @kaxil @potiuk
Oh yes, can you please create a PR, planning to cut RCs today, waiting for the DockerFile & Timezone PRs to get in.
Related Jira: https://issues.apache.org/jira/browse/AIRFLOW-6836
Fixed in 1.10.10