Airflow: BigQueryHook refactor + deterministic BQ Job ID

Created on 18 May 2020  路  14Comments  路  Source: apache/airflow

Description

Looking at the code it seems like a lot of the logic in the BQ Hook is already implemented in the google API python library. This includes job polling, a nicer way to use job config and of course las the validations that we now do manually.
It would be ice to make use of these and simplify the code.

My idea is then to refactor the run_ methods to take the google job config and a deterministic job id.
This would help in case of a pod dies because of any given reason, we鈥檇 restart polling for the async job previously started (I apologize for the crappy explanation).

See my hacky spike below:

This is the job id definition for reference
job_id = re.sub(r"[^0-9a-zA-Z_\-]+", "-", f"{self.dag_id}_{self.task_id}_{context['execution_date'].isoformat()}__try_0")

and here roughly how un query would work

def run_query(self, job_id: str, job_config: QueryJobConfig, sql: str, destination_dataset_table: str = None) -> str:
        def _recurse(job_id: str):
            [j, try_num] = job_id.split("__try_")
            new_job_id = f"{j}__try_{int(try_num) + 1}"
            return run_query(new_job_id, job_config, sql)

        def run_query(job_id: str, job_config: QueryJobConfig, sql: str):
            if not self.project_id:
                raise ValueError("The project_id should be set")

            if destination_dataset_table is not None:
                job_config.destination = TableReference.from_string(destination_dataset_table, self.project_id)

            try:
                job: QueryJob = self.client.get_job(job_id, self.project_id)
                if job.state == 'RUNNING':
                    if job.query != sql:
                        job.cancel()
                        self.log.info(f"Job {job_id} found, but sql is different. "
                                      f"Cancelling the current job and starting a new one")
                        return _recurse(job_id)
                    self.log.info(f"Job {job_id} still running, re-starting to poll.")
                    return job.result()
                else:
                    self.log.info(f"Job {job_id} already executed once. Restarting")
                    return _recurse(job_id)
            except NotFound:
                self.log.info(f"Job {job_id} not found, starting a new job.")
            job: QueryJob = self.client.query(sql, job_config, job_id, project=self.project_id)
            self.log.info(f"Running Job {job_id}...")
            return job.result()

        return run_query(job_id, job_config, sql)

the encoded __try_ is not the airflow but a secondary try in case the task is cleared since BQ Job Ids are a unique key and can't be re-used.

Use case / motivation
Trying to use the functionalities in the google cloud library rather than re-implementing them ourselves.
This would allow us to pass through a Deterministic Job ID too, useful for picking up jobs which are still running in case a pod dies.
Related Issues

feature Google

Most helpful comment

Summoning @edejong to hear his opinion :)

All 14 comments

Thanks for opening your first issue here! Be sure to follow the issue template!

Hi @albertocalderari, I'm curently workining on refactor of BQ integration. I decided to abandon custom "run" and use insert_job method which will accept job_id:

https://github.com/PolideaInternal/airflow/blob/71328b19be73ff0e4820be1ee25867eb6a578820/airflow/providers/google/cloud/hooks/bigquery.py#L1432-L1441

My idea was yo use the methods exposed from google.cloud.bigquery.Client along with google.cloud.bigquery.{JobType}JobConfig rather than using dictionaries.
Doing this we can ditch most of the custom validation we have in the hook and make the code a bit slimmer.

I'd build the job configs in the operator itself and mark run_with_configuration as deprecated.

I like the Idea of having only 1 method to execute any kind of job though, we could add the recursive logic in there.

What do you think @turbaszek?

My idea was yo use the methods exposed from google.cloud.bigquery.Client along with google.cloud.bigquery.{JobType}JobConfig rather than using dictionaries.

I see your point, however this will make the migration harder. On the bright side, we are ditching the custom validation. If you will take a look at inser_job method you will see that I'm constructing a XJobConfig:
https://github.com/apache/airflow/blob/375d1ca229464617780623c61c6e8a1bf570c87f/airflow/providers/google/cloud/hooks/bigquery.py#L1418-L1434

@turbaszek Hey, sorry work was hell this past week and did not find the time or the energy to reply :(
I have noticed that inner job does that, and that we'd be building the config from the dictionary that is passed in. Though going full on with using {JobType}JobConfig would make method signatures a lot more friendly to the eye. I do agree also that the migration would be harder - that could be done in 2 steps:

  • 1 create a new method used only by the operators and mark the old ones as deprecated
  • 2 re move the deprecated methods

If I can't sell this to you with this, I'll go your way and see how it comes :)

Summoning @edejong to hear his opinion :)

I should really check GitHub more often, I only saw the notification now.

Let me know if I understand the question correctly: should the BigQueryHook's interface rely on classes from the Google API client library, or should all data be passed in as dictionaries?

I think it's one thing to have the Airflow hooks/operators coupled to the BigQuery REST interface which I guess is what you get passing in the config in a Python dict. This allows you to translate online examples very easily to a DAG.

But it's a much bigger step to rely on the Google client library in the API because that introduces a tight coupling to this specific library. It would only look good if the Airflow code can stay 100% agnostic about what is passed to the library. Can we guarantee that, even for the future? And does that align with other GCP products?

So my personal opinion is stick with the dict :)

As for generating a job id for all job types, I agree that would be a very good move. Without it you would have to wait for a response to even have something to check up on after. That works most of the time, but in cases where it goes wrong it makes it harder to troubleshoot.

I love the suggested job ids string. One small thing I would change is to add a prefix such as airflow_ or even just af_ to make it even easier to spot these in Stackdriver for example. I would generate some well defined job id string every time one wasn't provided by the user.

See https://cloud.google.com/bigquery/docs/running-jobs#generate-jobid for recommendations.

So my personal opinion is stick with the dict :)

Same is mine. And what is more... dict is JSON serializable so it can be used as template field!

@albertocalderari should we consider this issue as resolved?

@turbaszek That's only half of the issue, the other half is implementing the deterministic job id.
So as far as I understand, Operators will interact with the hook using json config and then we'll build the job from it.
As soon as U finish you refactor I'll start implementing the feature :)

@albertocalderari I think I may be missing something. What do you understand by deterministic job_id?

@turbaszek In case of downscale or pod dying you'd want to check if the job is still running, hence the need of having a job id derived from the task name and execution date.
We had several instances where the job is still running and a new one is started generating extra costs for no reason.

If you want we can have a call and I can explain better.

@turbaszek In case of downscale or pod dying you'd want to check if the job is still running, hence the need of having a job id derived from the task name and execution date.

Now I got it. So if we use execution date in job_id like this airflow_task_id_20200623T000000+0000 then in case of failure re-running the DAg will make the operator reattach to the existing job. I like it. @edejong WDYT?

@turbaszek yeah sort of, it鈥檚 not as simple, I really rather have a quick call, are you on airflow鈥檚 slack?

Was this page helpful?
0 / 5 - 0 ratings