Description
This operator should implement on_kill method using cancel_job method of DataprocHook so in case of termination we cancel running job. This option probably should be configurable (for example cancel_on_kill) because of request_id prameter in a job definition: https://googleapis.dev/python/dataproc/latest/gapic/v1/api.html#google.cloud.dataproc_v1.JobControllerClient.submit_job
I'm happy to help with system test 馃憤
Use case / motivation
Remove dangling jobs when operator is terminated.
Related Issues
https://github.com/apache/airflow/pull/6371
https://github.com/apache/airflow/pull/6371#issuecomment-590757917
@edejong @jaketf @potiuk I would love to hear your opinion on this one 馃憤
Sounds like a good feature to have.
My only thought is that we should also add similar on_kill method for workflow template operators.
Dataproc jobs are kind of a wild wild west and may have significant side effects. From a documentation perspective we should call out that on_kill simply kills the job but will not "roll back" changes in external systems (GCS, Hive Metastore, BQ, pubsub, etc) that may have occurred. Users should be careful to handle any such scenarios in the logic of their pipelines. This may seem obvious to us but may not be clear to users (if we contrast to a BigQuery query job where everything is controlled internally and if a job is cancelled nothing happens because all results are committed atomically).
A Few examples
I think we should limit the on_kill to canceling the job. What is more, I think it should be configurable to allow attaching to an existing job using request_id
In package airflow.providers.google.cloud.operators.dataproc each DataprocSubmitJobOperator inherits from DataprocJobBaseOperator.
DataprocJobBaseOperator has the following implementation of on_kill method (lines 984-992):
def on_kill(self):
"""
Callback called when the operator is killed.
Cancel any running job.
"""
if self.dataproc_job_id:
self.hook.cancel_job(
project_id=self.project_id, job_id=self.dataproc_job_id, location=self.region
)
@turbaszek wrote:
This operator should implement on_kill method using cancel_job method of DataprocHook so in case of termination we cancel running job.
Considering present implementation of on_kill method of DataprocJobBaseOperator isn't it already done?
The code call's method cancel_job of DataprocHook when on_kill method is run and property dataproc_job_id is present.
If I understand this correctly, the only thing that method lacks is:
This option probably should be configurable (for example cancel_on_kill) because of request_id parameter in a job definition
Thus the following code might do the job:
def on_kill(self, cancel_on_kill=True, request_id=None):
"""
Callback called when the operator is killed.
Cancel any running job.
Parameters:
cancel_on_kill (bool): Cancel job if true, defaults to True
request_id (job_id): Job ID to cancel, defaults to object property dataproc_job_id
"""
if not cancel_on_kill:
return
job_id = self.dataproc_job_id
if request_id:
job_id = request_id
if self.dataproc_job_id:
self.hook.cancel_job(
project_id=self.project_id, job_id=job_id, location=self.region
)
Did I understand this issue correctly? If not I guess calling hook's cancel_job method is not enough and I will investigate it further.
DataprocSubmitJobOperator has no relation with DataprocJobBaseOperator.
DataprocJobBaseOperator is used by "old" operators like DataprocSubmitPigJobOperator, DataprocSubmitHiveJobOperator etc. that are deprecated in favor of the generic operator DataprocSubmitJobOperator.
However, you are right that the logic of on_kill currently exists in old ops and may be reused in DataprocSubmitJobOperator.on_kill 馃槈
Thus the following code might do the job:
Looks good but let's remove the request_id as I'm still not sure how it works