Airflow: Debug Executor fails to work

Created on 2 Apr 2020  路  11Comments  路  Source: apache/airflow

I am following the instructions here
https://airflow.readthedocs.io/en/stable/executor/debug.html

root@ip-172-16-101-169:/usr/local/airflow/dags/dbscore# cat tasks.py 
import sys
import traceback
from datetime import datetime

from scripts.dbscore.data import load_data
# from scripts.dbscore.models import send_failure_info_by_email, send_trained_models_by_email
# from scripts.dbscore.train import XGBoostTraining
#from scripts.dbscore.utils import generate_s3_logs_dir

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from config.dbscore import config

dag = DAG(
    dag_id='sklearn-dag',
    start_date=datetime(2020, 1, 10),
    schedule_interval=None,
)

from airflow.operators.dummy_operator import DummyOperator
import os
import sys

def print_hello():
    print('hey')
    return sys.path, os.environ['PYTHONPATH'].split(os.pathsep)

dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)

hello_operator = PythonOperator(task_id='hello_task', python_callable=print_hello, dag=dag)

dummy_operator >> hello_operator

if __name__ == '__main__':
    dag.clear(reset_dag_runs=True)
    dag.run()

and then if I run this

AIRFLOW__CORE__EXECUTOR=DebugExecutor python dags/dbscore/tasks.py

I get the following output

(.venv) root@ip-172-16-101-169:/usr/local/airflow# AIRFLOW__CORE__EXECUTOR=DebugExecutor python dags/dbscore/tasks.py
/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/configuration.py:241: FutureWarning: The task_runner setting in [core] has the old default value of 'BashTaskRunner'. This value has been changed to 'StandardTaskRunner' in the running config, but please update your config before Apache Airflow 2.0.
  FutureWarning
/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/configuration.py:631: DeprecationWarning: Specifying both AIRFLOW_HOME environment variable and airflow_home in the config file is deprecated. Please use only the AIRFLOW_HOME environment variable and remove the config file entry.
  warnings.warn(msg, category=DeprecationWarning)
/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/config_templates/airflow_local_settings.py:159: DeprecationWarning: The elasticsearch_host option in [elasticsearch] has been renamed to host - the old setting has been used, but please update your config.
  ELASTICSEARCH_HOST = conf.get('elasticsearch', 'HOST')
[2020-04-02 04:02:48,994] {settings.py:253} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=2000, pid=24900
[2020-04-02 04:02:49,372] {__init__.py:51} INFO - Using executor DebugExecutor
[2020-04-02 04:02:49,501] {base_executor.py:58} INFO - Adding to queue: ['<TaskInstance: sklearn-dag.dummy_task 2020-01-10 00:00:00+00:00 [queued]>']
[2020-04-02 04:02:54,410] {taskinstance.py:1048} INFO - Marking task as SUCCESS.dag_id=sklearn-dag, task_id=dummy_task, execution_date=20200110T000000, start_date=20200402T040249, end_date=20200402T040254
Traceback (most recent call last):
  File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/executors/debug_executor.py", line 81, in _run_task
    self.change_state(key, State.SUCCESS)
  File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/executors/debug_executor.py", line 148, in change_state
    self.running.remove(key)
AttributeError: 'dict' object has no attribute 'remove'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "dags/dbscore/tasks.py", line 180, in <module>
    dag.run()
  File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/models/dag.py", line 1399, in run
    job.run()
  File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 221, in run
    self._execute()
  File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/jobs/backfill_job.py", line 788, in _execute
    session=session)
  File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/utils/db.py", line 70, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/jobs/backfill_job.py", line 718, in _execute_for_run_dates
    session=session)
  File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/utils/db.py", line 70, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/jobs/backfill_job.py", line 593, in _process_backfill_task_instances
    executor.heartbeat()
  File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 134, in heartbeat
    self.sync()
  File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/executors/debug_executor.py", line 71, in sync
    task_succeeded = self._run_task(ti)
  File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/executors/debug_executor.py", line 84, in _run_task
    self.change_state(key, State.FAILED)
  File "/usr/local/airflow/.venv/lib/python3.6/site-packages/airflow/executors/debug_executor.py", line 148, in change_state
    self.running.remove(key)
AttributeError: 'dict' object has no attribute 'remove'

Apache Airflow version: 1.10.9

Kubernetes version (if you are using kubernetes) (use kubectl version):

Environment:

  • Cloud provider or hardware configuration: AWS
  • OS (e.g. from /etc/os-release): Ubuntu 18.04.4 LTS
  • Kernel (e.g. uname -a): 4.15.0-1063-aws
  • Install tools:
  • Others:
    What happened:

The execution failed with traceback in the console

What you expected to happen:

Execution to finish, so I can enable the IPDB session if necessary

bug

All 11 comments

Thanks for opening your first issue here! Be sure to follow the issue template!

cc @turbaszek ?

Will take a look at that!

I also ran into this problem. I made a hotfix and forgot to get a bug:

    def change_state(self, key, state):
        self.log.debug("Popping %s from executor task queue.", key)
        del self.running[key]
        self.event_buffer[key] = state

I haven鈥檛 met after this fix problems

@khyurri I think that changing remove to pop should help (Probably some types have changed). Would you like to take over this issue? Or should I prepare a fix? :)

I can take it
@turbaszek Should I create PR into v1-10-stable?

Looks like at master self.running has a different type (set, instead dict):

self.running: Set[TaskInstanceKeyType] = set()

@potiuk can you cherrypick this change to 1.10?

@khyurri feel free to open PR to v1-10-test :)
FYI @kaxil @potiuk

Oh yes, can you please create a PR, planning to cut RCs today, waiting for the DockerFile & Timezone PRs to get in.

Fixed in 1.10.10

Was this page helpful?
0 / 5 - 0 ratings