Airflow: Add on_kill method to DataprocSubmitJobOperator

Created on 18 Aug 2020  路  6Comments  路  Source: apache/airflow

Description

This operator should implement on_kill method using cancel_job method of DataprocHook so in case of termination we cancel running job. This option probably should be configurable (for example cancel_on_kill) because of request_id prameter in a job definition: https://googleapis.dev/python/dataproc/latest/gapic/v1/api.html#google.cloud.dataproc_v1.JobControllerClient.submit_job

I'm happy to help with system test 馃憤

Use case / motivation

Remove dangling jobs when operator is terminated.

Related Issues

https://github.com/apache/airflow/pull/6371
https://github.com/apache/airflow/pull/6371#issuecomment-590757917

providers good first issue feature Google

All 6 comments

@edejong @jaketf @potiuk I would love to hear your opinion on this one 馃憤

Sounds like a good feature to have.
My only thought is that we should also add similar on_kill method for workflow template operators.

Dataproc jobs are kind of a wild wild west and may have significant side effects. From a documentation perspective we should call out that on_kill simply kills the job but will not "roll back" changes in external systems (GCS, Hive Metastore, BQ, pubsub, etc) that may have occurred. Users should be careful to handle any such scenarios in the logic of their pipelines. This may seem obvious to us but may not be clear to users (if we contrast to a BigQuery query job where everything is controlled internally and if a job is cancelled nothing happens because all results are committed atomically).

A Few examples

  • even before completing as a spark driver could make arbitrary calls mutation data on GCS or a database (e.g. could write some sort of lock file that ends up being abandoned).
  • If you snipe a map reduce job in the middle and any intermediate files we flushed to GCS those will not get cleaned up.
  • a hive jobs can contain multiple query statements (e.g. a CREATE TABLE and a INSERT INTO) which may leave a side effect of a new empty table in hive metastore
  • sniping a spark streaming job subscribing to pubsub may lead to ACKed messages who's corresponding outputs were not committed.

I think we should limit the on_kill to canceling the job. What is more, I think it should be configurable to allow attaching to an existing job using request_id

In package airflow.providers.google.cloud.operators.dataproc each DataprocSubmitJobOperator inherits from DataprocJobBaseOperator.

DataprocJobBaseOperator has the following implementation of on_kill method (lines 984-992):

def on_kill(self):
    """
    Callback called when the operator is killed.
    Cancel any running job.
    """
    if self.dataproc_job_id:
        self.hook.cancel_job(
            project_id=self.project_id, job_id=self.dataproc_job_id, location=self.region
        )

@turbaszek wrote:

This operator should implement on_kill method using cancel_job method of DataprocHook so in case of termination we cancel running job.

Considering present implementation of on_kill method of DataprocJobBaseOperator isn't it already done?
The code call's method cancel_job of DataprocHook when on_kill method is run and property dataproc_job_id is present.

If I understand this correctly, the only thing that method lacks is:

This option probably should be configurable (for example cancel_on_kill) because of request_id parameter in a job definition

Thus the following code might do the job:

def on_kill(self, cancel_on_kill=True, request_id=None):
    """
    Callback called when the operator is killed.
    Cancel any running job.

    Parameters:
        cancel_on_kill (bool): Cancel job if true, defaults to True
        request_id (job_id): Job ID to cancel, defaults to object property dataproc_job_id
    """
    if not cancel_on_kill:
        return
    job_id = self.dataproc_job_id
    if request_id:
        job_id = request_id
    if self.dataproc_job_id:
        self.hook.cancel_job(
            project_id=self.project_id, job_id=job_id, location=self.region
        )

Did I understand this issue correctly? If not I guess calling hook's cancel_job method is not enough and I will investigate it further.

DataprocSubmitJobOperator has no relation with DataprocJobBaseOperator.

DataprocJobBaseOperator is used by "old" operators like DataprocSubmitPigJobOperator, DataprocSubmitHiveJobOperator etc. that are deprecated in favor of the generic operator DataprocSubmitJobOperator.

However, you are right that the logic of on_kill currently exists in old ops and may be reused in DataprocSubmitJobOperator.on_kill 馃槈

Thus the following code might do the job:

Looks good but let's remove the request_id as I'm still not sure how it works

Was this page helpful?
0 / 5 - 0 ratings

Related issues

zacwellmer picture zacwellmer  路  4Comments

hagope picture hagope  路  4Comments

ephraimbuddy picture ephraimbuddy  路  3Comments

ryanahamilton picture ryanahamilton  路  3Comments

mik-laj picture mik-laj  路  4Comments