Airflow: provide_context=True not working with PythonVirtualenvOperator

Created on 7 Apr 2020  路  10Comments  路  Source: apache/airflow

Apache Airflow version: 1.10.9

Kubernetes version (if you are using kubernetes) (use kubectl version): 1.14.8

Environment: Docker (Ubuntu 18.4 - Python 3.7)

  • Cloud provider or hardware configuration: Azure
  • OS (e.g. from /etc/os-release): Docker (Ubuntu 18.4 - Python 3.7)
  • Kernel (e.g. uname -a): Docker (Ubuntu 18.4 - Python 3.7)
  • Install tools: N/A
  • Others: N/A
    What happened:

When we enable provide_context=True for CustomPythonVirtualenvOperator we get the error below.

[2020-04-07 15:08:51,940] {taskinstance.py:1128} ERROR - can't pickle module objects
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 297, in execute_callable
    self._write_args(input_filename)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 339, in _write_args
    pickle.dump(arg_dict, f)
TypeError: can't pickle module objects

One way to get around this issue is to create your own CustomPythonVirtualenvOperator and overwrite _write_args, but this should not be the case. Feel free to use this if you're encountering the same issue:

class CustomPythonVirtualenvOperator(PythonVirtualenvOperator):
    def _write_args(self, input_filename):
        # serialize args to file
        if self._pass_op_args():
            with open(input_filename, 'wb') as f:
                # we only need dag_run to access conf at run time
                arg_dict = ({'args': self.op_args, 'kwargs': {'dag_run': self.op_kwargs['dag_run']}})
                if self.use_dill:
                    dill.dump(arg_dict, f)
                else:
                    pickle.dump(arg_dict, f)

What you expected to happen:

Ideally we should be able to use the context so we can run these tasks with run-time arguments via the CLI or the REST API.

How to reproduce it:

from airflow.operators.python_operator import PythonOperator, PythonVirtualenvOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow import DAG
import pickle
import dill

default_args = {
    'owner': 'Luis M',
    'depends_on_past': False,
    'start_date': days_ago(0),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue'
}
dag = DAG(
    'bug',
    default_args=default_args,
    description='bug',
    schedule_interval=timedelta(days=1))


class CustomPythonVirtualenvOperator(PythonVirtualenvOperator):
    def _write_args(self, input_filename):
        # serialize args to file
        if self._pass_op_args():
            with open(input_filename, 'wb') as f:
                arg_dict = ({'args': self.op_args, 'kwargs': {'dag_run': self.op_kwargs['dag_run']}})
                if self.use_dill:
                    dill.dump(arg_dict, f)
                else:
                    pickle.dump(arg_dict, f)


def passf(**kwargs):
    pass

def failf(**kwargs):
    pass

task1 = CustomPythonVirtualenvOperator(
        task_id='task1',
        python_callable=passf,
        python_version='3',
        dag=dag,
        provide_context=True
)

task2 = PythonVirtualenvOperator(
        task_id='task2',
        python_callable=failf,
        python_version='3',
        dag=dag,
        provide_context=True
)

Anything else we need to know:

If you run the DAG provided you should see task1 passing and task2 failing.

bug

All 10 comments

Thanks for opening your first issue here! Be sure to follow the issue template!

This problem seems indeed significant. I wonder if it appears in the master version. Have you tried to check which objects in the context are causing the problem? Maybe we can exclude one or two objects to restore the correct behavior of this option in Airflow 1.10.

This is an open source project, so there is no specific person who solves the issue. Would you like to take responsibility for it? I will gladly help and answer the questions if you want to solve this problem.

Contributor guide: https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst
Development environment: https://github.com/apache/airflow/blob/master/BREEZE.rst

The community is waiting for your contribution 馃悎

@mik-laj Thank you for the information, I will look into the root cause and create a PR. Could you please assign me to the issue.

I have no tested it on the master branch, I'll do so before proceeding with the PR.

@mik-laj Hi Kamil - I created the PR #8256 to fix this issue on the branch v1-10-stable and the CI tests are passing, there seems to be an issue with requirements, but I that's related to this change. Could you let me know the next steps?

There were too many changes on the master branch, I will revisit this bug there once 2.0 is out.

Is it right that this issue is not going to be fixed in 1.10.x?

As a workaround one could use dill=True, it is able to serialize modules.

@kaxil @potiuk can you help with that?

It was not marked as Milestone 1.10.12 I am afraid. I marked is as such now. There is a good "chance" rc1 will be cancelled because of #10362 and if so - we might be able to add it.

Yeah thanks, will include it, planning to cut 1.10.12rc2 later tonight

PR merged, will be part of 1.10.12rc2

Was this page helpful?
0 / 5 - 0 ratings