Re: How do you branch your code with BigQuery?

2018-10-15 Thread Anthony Brown
Hi
   I have created a custom plugin that allows you to branch on the results
of a BigQuery query. The code for it is at
https://github.com/JohnLewisandPartners/custom-airflow-plugins/blob/master/bq_branch/plugins/bq_branch.py.
The version in master only works on airflow 1.10, but there is a branch
called airflow_1.9 that also contains the latest BigQuery hook and so works
on airflow 1.9

   The query you run must return true for all columns - the same as for the
BigQuery check operator, so you may need to rewrite your queries to do this



On Sun, 14 Oct 2018 at 14:16, airflowuser
 wrote:

> I believe this is quite common case when working with data.
>
> If something : do A
> else: do B
> With coding PythonBranchOperator is the solution.
>
> But when working on Google Cloud there is no way to do this.
> All existed operators are designed to continue or fail on comparison of
> specific value:
> BigQueryValueCheckOperator  with pass_value=500 will continue if 500
> return or fail in any other case. Same for all other CheckOperators. You
> must know the value in advanced for this to work and it's not an actual
> branch but more of a way to stop the workflow if an unexpected result has
> been found.
>
> But how do you handle a scenario where you want to do A or B based on
> condition from a query result? Nothing needs to be failed. just a simple
> branch.
>
> XCOM could solve it. But there is no support for XCOM yet.
>
>
> https://stackoverflow.com/questions/52801318/airflow-how-to-push-xcom-value-from-bigqueryoperator
>
> Say for example:
> the query represent the number of frauds.. if it's <1000 you want to email
> specific users (EmailOperator) , if it's >=1000 you want to run another
> operator and continue the workflow.
>
> Any thoughts on the matter will be appreciated.



-- 
-- 

Anthony Brown
Data Engineer BI Team - John Lewis
Tel : 0787 215 7305
**
This email is confidential and may contain copyright material of the John Lewis 
Partnership. 
If you are not the intended recipient, please notify us immediately and delete 
all copies of this message. 
(Please note that it is your responsibility to scan this message for viruses). 
Email to and from the
John Lewis Partnership is automatically monitored for operational and lawful 
business reasons.
**

John Lewis plc
Registered in England 233462
Registered office 171 Victoria Street London SW1E 5NN
 
Websites: https://www.johnlewis.com 
http://www.waitrose.com 
https://www.johnlewisfinance.com
http://www.johnlewispartnership.co.uk
 
**


Re: How do you branch your code with BigQuery?

2018-10-15 Thread Anthony Brown
I do intend to create a PR (when I get the chance) to get this into main
airflow repo

If anybody has any comments about this before I do, please let me know


On Mon, 15 Oct 2018 at 10:33, airflowuser
 wrote:

> Awesome!
> I think this would be a fine addition to the BigQuery operators if you
> ever think about PR this to airflow master
>
> cheers
>
> Sent with ProtonMail Secure Email.
>
> ‐‐‐ Original Message ‐‐‐
> On Monday, October 15, 2018 10:02 AM, Anthony Brown <
> anthony.br...@johnlewis.co.uk> wrote:
>
> > Hi
> > I have created a custom plugin that allows you to branch on the results
> > of a BigQuery query. The code for it is at
> >
> https://github.com/JohnLewisandPartners/custom-airflow-plugins/blob/master/bq_branch/plugins/bq_branch.py
> .
> > The version in master only works on airflow 1.10, but there is a branch
> > called airflow_1.9 that also contains the latest BigQuery hook and so
> works
> > on airflow 1.9
> >
> > The query you run must return true for all columns - the same as for the
> > BigQuery check operator, so you may need to rewrite your queries to do
> this
> >
> > On Sun, 14 Oct 2018 at 14:16, airflowuser
> > airflowu...@protonmail.com.invalid wrote:
> >
> > > I believe this is quite common case when working with data.
> > > If something : do A
> > > else: do B
> > > With coding PythonBranchOperator is the solution.
> > > But when working on Google Cloud there is no way to do this.
> > > All existed operators are designed to continue or fail on comparison of
> > > specific value:
> > > BigQueryValueCheckOperator with pass_value=500 will continue if 500
> > > return or fail in any other case. Same for all other CheckOperators.
> You
> > > must know the value in advanced for this to work and it's not an actual
> > > branch but more of a way to stop the workflow if an unexpected result
> has
> > > been found.
> > > But how do you handle a scenario where you want to do A or B based on
> > > condition from a query result? Nothing needs to be failed. just a
> simple
> > > branch.
> > > XCOM could solve it. But there is no support for XCOM yet.
> > >
> https://stackoverflow.com/questions/52801318/airflow-how-to-push-xcom-value-from-bigqueryoperator
> > > Say for example:
> > > the query represent the number of frauds.. if it's <1000 you want to
> email
> > > specific users (EmailOperator) , if it's >=1000 you want to run another
> > > operator and continue the workflow.
> > > Any thoughts on the matter will be appreciated.
> >
> > --
> >
> > --
> >
> > Anthony Brown
> > Data Engineer BI Team - John Lewis
> > Tel : 0787 215 7305
> >
> > This email is confidential and may contain copyright material of the
> John Lewis Partnership.
> > If you are not the intended recipient, please notify us immediately and
> delete all copies of this message.
> > (Please note that it is your responsibility to scan this message for
> viruses). Email to and from the
> > John Lewis Partnership is automatically monitored for operational and
> lawful business reasons.
> >
> > John Lewis plc
> > Registered in England 233462
> > Registered office 171 Victoria Street London SW1E 5NN
> >
> > Websites: https://www.johnlewis.com
> > http://www.waitrose.com
> > https://www.johnlewisfinance.com
> > http://www.johnlewispartnership.co.uk
>
>
>

-- 
-- 

Anthony Brown
Data Engineer BI Team - John Lewis
Tel : 0787 215 7305


Re: Mocking airflow (similar to moto for AWS)

2018-10-18 Thread Anthony Brown
is creates an SQLite just for the
> test
> > > so you can add variables and connections needed for the test without
> > > affecting the real Airflow installation.
> > >
> > > The first thing I realized is that this didn't work if the imports were
> > > outside the context manager, since airflow.configuration and
> > > airflow.settings perform all the initialization when they are imported,
> > so
> > > the AIRFLOW_HOME variable is already set to the real installation
> before
> > > getting inside the context manager.
> > >
> > > The workaround for this was to reload those modules and this works for
> > the
> > > tests I have written. However, when I tried to use it for something
> more
> > > complex (I have a plugin that I'm importing) I noticed that inside the
> > > operator in this plugin, AIRFLOW_HOME is still set to the real
> > > installation, not the temporary one for the test. I thought this must
> be
> > > related to the imports but I haven't been able to figure out a way to
> fix
> > > the issue. I tried patching some methods but I must have been missing
> > > something because the database initialization failed.
> > >
> > > Does anyone have an idea on the best way to mock/patch airflow so that
> > > EVERYTHING that is executed inside the context manager uses the
> temporary
> > > installation?
> > >
> > > PS: This is my current attempt which works for the tests I defined but
> > not
> > > for external plugins:
> > > https://github.com/biellls/airflow_testing
> > >
> > > For an example on how it works:
> > >
> >
> https://github.com/biellls/airflow_testing/blob/master/tests/mock_airflow_test.py
> > >
> >
> >
> > --
> >
> > *Jarek Potiuk, Principal Software Engineer*
> > Mobile: +48 660 796 129
> >
>


-- 
-- 

Anthony Brown
Data Engineer BI Team - John Lewis
Tel : 0787 215 7305
**
This email is confidential and may contain copyright material of the John Lewis 
Partnership. 
If you are not the intended recipient, please notify us immediately and delete 
all copies of this message. 
(Please note that it is your responsibility to scan this message for viruses). 
Email to and from the
John Lewis Partnership is automatically monitored for operational and lawful 
business reasons.
**

John Lewis plc
Registered in England 233462
Registered office 171 Victoria Street London SW1E 5NN
 
Websites: https://www.johnlewis.com 
http://www.waitrose.com 
https://www.johnlewisfinance.com
http://www.johnlewispartnership.co.uk
 
**


error handling flow in DAG

2018-10-01 Thread Anthony Brown
Hi
   I am coding various data flows and one of the requirements we have is to
have some error tasks happen when some of the tasks failure. These error
tasks are specific to the task that failed and are not a generic to the
whole DAG

   So for instance if I have a DAG that runs the following tasks

task_1 > task_2 > task_3

   If task_1 fails, then I want to run

task_1_failure_a ---> task_1_failure_b

   If task_2 fails, I do not need to do anything specific, but if task_3
fails, I need to run

task_3_failure_a ---> task_3_failure_b

   I already have a generic on_failure_callback task defined on all tasks
that handles alerting, but am stuck on the best way of handling a failure
flow for tasks

   The ways I have come up with of handling this are

Have a branch operator between each task with trigger_rule set to all_done.
The branch operator would then decide whether to go to next (success) task,
or to go down the failure branch

Put the failure tasks in a separate DAG with no schedule. Have a different
on_failure_callback for each task that would trigger the failure DAG for
that task and then do my generic error handling

   Does anybody have any thoughts on which of the above two approaches
would be best, or suggest an alternative way of doing this

Thanks

-- 
-- 

Anthony Brown
Data Engineer BI Team - John Lewis
Tel : 0787 215 7305
**
This email is confidential and may contain copyright material of the John Lewis 
Partnership. 
If you are not the intended recipient, please notify us immediately and delete 
all copies of this message. 
(Please note that it is your responsibility to scan this message for viruses). 
Email to and from the
John Lewis Partnership is automatically monitored for operational and lawful 
business reasons.
**

John Lewis plc
Registered in England 233462
Registered office 171 Victoria Street London SW1E 5NN
 
Websites: https://www.johnlewis.com 
http://www.waitrose.com 
https://www.johnlewisfinance.com
http://www.johnlewispartnership.co.uk
 
**