Airflow: Apache Beam 2.20.0 will break the DataFlowOperator

Created on 29 Apr 2020  路  4Comments  路  Source: apache/airflow

Apache Airflow version: 1.10.10

Kubernetes version (if you are using kubernetes) (use kubectl version):

Environment:

  • Cloud provider or hardware configuration: GCP
  • OS (e.g. from /etc/os-release): Ubuntu
  • Kernel (e.g. uname -a): 5.0.0-1034-gcp
  • Install tools: apache-beam (Python) on Google Cloud DataFlow
  • Others:

What happened: When using Apache Beam with Python and upgrading to the latest apache-beam=2.20.0 version, the DataFlowOperatow will always yield a failed state. The job itself runs fine. The reason is that the Monitoring String Format changed, making the dataflow hook fail retrieving the job_id from GCP.

What you expected to happen: The dataflow job to terminate with no error

The job_id is not retrieved properly since the format changed

How to reproduce it: Set up airflow with Dataflow and install apache-beam 2.20.0 for Python. Run a DataFlow job.

Anything else we need to know: The commit that changes the format can be found here: https://github.com/apache/beam/commit/d38e87b6180cb60752be24bdbbb4e17daf0fb3aa
The current regexp in Airflow is br'.*console.cloud.google.com/dataflow.*/jobs/([a-z|0-9|A-Z|\-|\_]+).*') and can be found here for 1.10.10: https://github.com/apache/airflow/blob/v1-10-stable/airflow/contrib/hooks/gcp_dataflow_hook.py#L145
As now the region_id is coming after jobs/, we retrieve the region_id.
I got it to work again by using the following regexp: br".*console.cloud.google.com/dataflow.*/jobs/.*/([a-z|0-9|A-Z|\-|\_]+).*"

bug Google

Most helpful comment

I am working on a thorough modernization of integration with Dataflow, so in the next versions Airflow will have better support for this service.
https://github.com/apache/airflow/pulls?q=is%3Apr+is%3Aopen+Dataflow+author%3Amik-laj
I have other changes in the long queue.

All 4 comments

Thanks for opening your first issue here! Be sure to follow the issue template!

As a future improvement, would it be possible to have an integration test using DataFlowOperator and beam head?

System tests are run manually, so they would not detect this problem. We hope to add system testing support to CI soon. However, this is not a commercial project, so it is not easy. We need many organizational, technical and financial problems first.

I am working on a thorough modernization of integration with Dataflow, so in the next versions Airflow will have better support for this service.
https://github.com/apache/airflow/pulls?q=is%3Apr+is%3Aopen+Dataflow+author%3Amik-laj
I have other changes in the long queue.

Was this page helpful?
0 / 5 - 0 ratings