Airflow has SQLSensor and PythonBranchOperator it seems that the logic of both can be combined to create SQLBranchOperator
SQLSenor knows to take single sql query and wait for condition on it. it can be copied and changed so that it won't wait (like sensor) but simply return true false. According to the value returned from the query it will follow the chosen branch.
Thanks for opening your first issue here! Be sure to follow the issue template!
Hey! I would like to work on this
Please do !
Hey! I would like to work on this
Hey @pujaji just wondering if you get a chance to work on this?
Hi @potiuk , sorry to bother you. I am just wondering what will be a good way to write unit tests for this type of SQL related operators? I looked at the existing unit test for MsSqlOperator and found that the unit test only initialize the MsSqlOperator and call the "get_hook()" method and assert on hook.__class__.
Thanks.
I would assume that the tests should be similar to SQLSensor which already utilises various of databases options into single sensor.
The SQL part itself (taking query and run it) is the same between the SQLSensor and the SQLBranchOperator the difference comes after the query run and returned value.
Yep. Thanks @jeffolsi for the explanation. Yep. I think this is about right.
Thank you both!
Hey! I would like to work on this
Hey @pujaji just wondering if you get a chance to work on this?
Hello @samuelkhtu Please don't bother working on this. Please continue to finish this.
regards,
Puja
Thanks @pujaji , sounds like you've got this. I will circle back in a few days. Can't wait to see this new operator!
@samuelkhtu No Samuel You have got me wrong. I have stopped working on this. I urge you to accomplish this ans present this to the community
Hi @pujaji , I am sorry I misunderstood you. Sounds good. I will give this a try!
No regrets! Happy coding-contributing!! :+1:
Hey @jeffolsi , quick question for you. In the existing Airflow Python Branching Operator, the python callback function will return the 'task_id' or list of 'tasl_ids' for selecting the branching to follow.
I am just wondering if you would like to use the SQL query to select the branches as well?
For example, the SQL query "SELECT 'branch_a', 'branch_b' will return 2 columns and the SQLBranchOperator will follow branch_a and branch_b. (branch_a and branch_b are task_ids within the DAG)
Or you expect the SQL query to return multiple rows, each row will represent the task_id within the DAG?
Thanks
@samuelkhtu the query return only true / false.
Think of the query as the equivalent of the python callable. The callable return only true false and with that the follow branch is decided.
I don't think it's a good idea to combine the task_id into the sql.
Thank you @jeffolsi . I see. I guess what you are looking for is sightly different than the Python version. Yes, the SQL query can decide True/False but we still need to let the operator knows which "branch" or path you want to follow right? How about the following?
BranchSqlOperator(
conn_id: str,
sql: str,
parameters: Optional,
follow_task_Ids_if_true: list, # list of task ids to follow if SQL return True,
follow_task_Ids_if_false: list, # list of task ids to follow if SQL return False
)
@samuelkhtu exactly what i was thinking about
Great. Thank you @jeffolsi !
Code and test completed for this item. I am working on the PR.
Quick update. The PR is under review.
Hi @potiuk , can you help and comment on a question raised by @eladkal in this PR? https://github.com/apache/airflow/pull/8942
I believe this PR is ready to go otherwise.
Thanks!
Hello @jeffolsi , the new operator is in. Maybe we can close this issue?
Hi @mik-laj , can you help and close this item? This operator is merged to master. (https://github.com/apache/airflow/pull/8942) Thanks.
Closed. For the future - it's enough to add Closes #ISSUE in the commit message so that the issue is closed automatically on merge :)
Thank you for the tips!