Re: How do you branch your code with BigQuery?
Hi I have created a custom plugin that allows you to branch on the results of a BigQuery query. The code for it is at https://github.com/JohnLewisandPartners/custom-airflow-plugins/blob/master/bq_branch/plugins/bq_branch.py. The version in master only works on airflow 1.10, but there is a branch called airflow_1.9 that also contains the latest BigQuery hook and so works on airflow 1.9 The query you run must return true for all columns - the same as for the BigQuery check operator, so you may need to rewrite your queries to do this On Sun, 14 Oct 2018 at 14:16, airflowuser wrote: > I believe this is quite common case when working with data. > > If something : do A > else: do B > With coding PythonBranchOperator is the solution. > > But when working on Google Cloud there is no way to do this. > All existed operators are designed to continue or fail on comparison of > specific value: > BigQueryValueCheckOperator with pass_value=500 will continue if 500 > return or fail in any other case. Same for all other CheckOperators. You > must know the value in advanced for this to work and it's not an actual > branch but more of a way to stop the workflow if an unexpected result has > been found. > > But how do you handle a scenario where you want to do A or B based on > condition from a query result? Nothing needs to be failed. just a simple > branch. > > XCOM could solve it. But there is no support for XCOM yet. > > > https://stackoverflow.com/questions/52801318/airflow-how-to-push-xcom-value-from-bigqueryoperator > > Say for example: > the query represent the number of frauds.. if it's <1000 you want to email > specific users (EmailOperator) , if it's >=1000 you want to run another > operator and continue the workflow. > > Any thoughts on the matter will be appreciated. -- -- Anthony Brown Data Engineer BI Team - John Lewis Tel : 0787 215 7305 ** This email is confidential and may contain copyright material of the John Lewis Partnership. If you are not the intended recipient, please notify us immediately and delete all copies of this message. (Please note that it is your responsibility to scan this message for viruses). Email to and from the John Lewis Partnership is automatically monitored for operational and lawful business reasons. ** John Lewis plc Registered in England 233462 Registered office 171 Victoria Street London SW1E 5NN Websites: https://www.johnlewis.com http://www.waitrose.com https://www.johnlewisfinance.com http://www.johnlewispartnership.co.uk **
Re: How do you branch your code with BigQuery?
I do intend to create a PR (when I get the chance) to get this into main airflow repo If anybody has any comments about this before I do, please let me know On Mon, 15 Oct 2018 at 10:33, airflowuser wrote: > Awesome! > I think this would be a fine addition to the BigQuery operators if you > ever think about PR this to airflow master > > cheers > > Sent with ProtonMail Secure Email. > > ‐‐‐ Original Message ‐‐‐ > On Monday, October 15, 2018 10:02 AM, Anthony Brown < > anthony.br...@johnlewis.co.uk> wrote: > > > Hi > > I have created a custom plugin that allows you to branch on the results > > of a BigQuery query. The code for it is at > > > https://github.com/JohnLewisandPartners/custom-airflow-plugins/blob/master/bq_branch/plugins/bq_branch.py > . > > The version in master only works on airflow 1.10, but there is a branch > > called airflow_1.9 that also contains the latest BigQuery hook and so > works > > on airflow 1.9 > > > > The query you run must return true for all columns - the same as for the > > BigQuery check operator, so you may need to rewrite your queries to do > this > > > > On Sun, 14 Oct 2018 at 14:16, airflowuser > > airflowu...@protonmail.com.invalid wrote: > > > > > I believe this is quite common case when working with data. > > > If something : do A > > > else: do B > > > With coding PythonBranchOperator is the solution. > > > But when working on Google Cloud there is no way to do this. > > > All existed operators are designed to continue or fail on comparison of > > > specific value: > > > BigQueryValueCheckOperator with pass_value=500 will continue if 500 > > > return or fail in any other case. Same for all other CheckOperators. > You > > > must know the value in advanced for this to work and it's not an actual > > > branch but more of a way to stop the workflow if an unexpected result > has > > > been found. > > > But how do you handle a scenario where you want to do A or B based on > > > condition from a query result? Nothing needs to be failed. just a > simple > > > branch. > > > XCOM could solve it. But there is no support for XCOM yet. > > > > https://stackoverflow.com/questions/52801318/airflow-how-to-push-xcom-value-from-bigqueryoperator > > > Say for example: > > > the query represent the number of frauds.. if it's <1000 you want to > email > > > specific users (EmailOperator) , if it's >=1000 you want to run another > > > operator and continue the workflow. > > > Any thoughts on the matter will be appreciated. > > > > -- > > > > -- > > > > Anthony Brown > > Data Engineer BI Team - John Lewis > > Tel : 0787 215 7305 > > > > This email is confidential and may contain copyright material of the > John Lewis Partnership. > > If you are not the intended recipient, please notify us immediately and > delete all copies of this message. > > (Please note that it is your responsibility to scan this message for > viruses). Email to and from the > > John Lewis Partnership is automatically monitored for operational and > lawful business reasons. > > > > John Lewis plc > > Registered in England 233462 > > Registered office 171 Victoria Street London SW1E 5NN > > > > Websites: https://www.johnlewis.com > > http://www.waitrose.com > > https://www.johnlewisfinance.com > > http://www.johnlewispartnership.co.uk > > > -- -- Anthony Brown Data Engineer BI Team - John Lewis Tel : 0787 215 7305
Re: Mocking airflow (similar to moto for AWS)
is creates an SQLite just for the > test > > > so you can add variables and connections needed for the test without > > > affecting the real Airflow installation. > > > > > > The first thing I realized is that this didn't work if the imports were > > > outside the context manager, since airflow.configuration and > > > airflow.settings perform all the initialization when they are imported, > > so > > > the AIRFLOW_HOME variable is already set to the real installation > before > > > getting inside the context manager. > > > > > > The workaround for this was to reload those modules and this works for > > the > > > tests I have written. However, when I tried to use it for something > more > > > complex (I have a plugin that I'm importing) I noticed that inside the > > > operator in this plugin, AIRFLOW_HOME is still set to the real > > > installation, not the temporary one for the test. I thought this must > be > > > related to the imports but I haven't been able to figure out a way to > fix > > > the issue. I tried patching some methods but I must have been missing > > > something because the database initialization failed. > > > > > > Does anyone have an idea on the best way to mock/patch airflow so that > > > EVERYTHING that is executed inside the context manager uses the > temporary > > > installation? > > > > > > PS: This is my current attempt which works for the tests I defined but > > not > > > for external plugins: > > > https://github.com/biellls/airflow_testing > > > > > > For an example on how it works: > > > > > > https://github.com/biellls/airflow_testing/blob/master/tests/mock_airflow_test.py > > > > > > > > > -- > > > > *Jarek Potiuk, Principal Software Engineer* > > Mobile: +48 660 796 129 > > > -- -- Anthony Brown Data Engineer BI Team - John Lewis Tel : 0787 215 7305 ** This email is confidential and may contain copyright material of the John Lewis Partnership. If you are not the intended recipient, please notify us immediately and delete all copies of this message. (Please note that it is your responsibility to scan this message for viruses). Email to and from the John Lewis Partnership is automatically monitored for operational and lawful business reasons. ** John Lewis plc Registered in England 233462 Registered office 171 Victoria Street London SW1E 5NN Websites: https://www.johnlewis.com http://www.waitrose.com https://www.johnlewisfinance.com http://www.johnlewispartnership.co.uk **
error handling flow in DAG
Hi I am coding various data flows and one of the requirements we have is to have some error tasks happen when some of the tasks failure. These error tasks are specific to the task that failed and are not a generic to the whole DAG So for instance if I have a DAG that runs the following tasks task_1 > task_2 > task_3 If task_1 fails, then I want to run task_1_failure_a ---> task_1_failure_b If task_2 fails, I do not need to do anything specific, but if task_3 fails, I need to run task_3_failure_a ---> task_3_failure_b I already have a generic on_failure_callback task defined on all tasks that handles alerting, but am stuck on the best way of handling a failure flow for tasks The ways I have come up with of handling this are Have a branch operator between each task with trigger_rule set to all_done. The branch operator would then decide whether to go to next (success) task, or to go down the failure branch Put the failure tasks in a separate DAG with no schedule. Have a different on_failure_callback for each task that would trigger the failure DAG for that task and then do my generic error handling Does anybody have any thoughts on which of the above two approaches would be best, or suggest an alternative way of doing this Thanks -- -- Anthony Brown Data Engineer BI Team - John Lewis Tel : 0787 215 7305 ** This email is confidential and may contain copyright material of the John Lewis Partnership. If you are not the intended recipient, please notify us immediately and delete all copies of this message. (Please note that it is your responsibility to scan this message for viruses). Email to and from the John Lewis Partnership is automatically monitored for operational and lawful business reasons. ** John Lewis plc Registered in England 233462 Registered office 171 Victoria Street London SW1E 5NN Websites: https://www.johnlewis.com http://www.waitrose.com https://www.johnlewisfinance.com http://www.johnlewispartnership.co.uk **