Airflow Logging

2016-10-13 Thread Maycock, Luke
Hi All,

We (owlabs - fork: https://github.com/owlabs/incubator-airflow) have a high 
level design for how to improve the logging throughout the Airflow code to be 
more consistent, maintainable and extensible. We'd really appreciate any 
feedback on the design.

Design for Consolidating Logging Setup:

 *   Create a Class in the utils\logging.py to instantiate and handle setup of 
an "airflow" logger object. This allows us to easily find all setup code for 
further modification and extension in the future.
 *   This class would be where any general logging configuration (from 
Airflow.cfg) would be applied.
 *   Instantiate this class in this file, so that importing it allows easy 
control of the logging setup from anywhere else in the application.
 *   Move all setup of the logging in general to this class with easy to call 
control methods to turn forms of logging on and off (e.g. 
console/debugFile/errorFile).
 *   Move any other helper functions related to logging that are still required 
into the utils\logging.py so that we can easily find them for modification and 
extension in the future.
 *   Ensure that all logging throughout the application is done via the 
"airflow" logger object, or by a child of this logger object. This allows us to:
*   Leave all settings of the root logger object alone, so that we are 
friendly to any parent or child processes that are not part of the application, 
allowing them to safely manage their own overriding logging setup.
*   Modify the settings of logging throughout the entire application via a 
single simple point of control.

Cheers,
Luke Maycock
OLIVER WYMAN
luke.mayc...@affiliate.oliverwyman.com
www.oliverwyman.com



This e-mail and any attachments may be confidential or legally privileged. If 
you received this message in error or are not the intended recipient, you 
should destroy the e-mail message and any attachments or copies, and you are 
prohibited from retaining, distributing, disclosing or using any information 
contained herein. Please inform us of the erroneous delivery by return e-mail. 
Thank you for your cooperation.


Re: +1 on PRs!

2016-10-13 Thread Arthur Wiedmer
Another way to do this would be to use the reactions on Github.

Use the 👍 to upvote an issue or PR you care about. And reserve the +1s for
Committer Code review.

On top of that, it makes it easier to tally the votes.

Best,
Arthur

On Tue, Oct 11, 2016 at 2:31 PM, siddharth anand  wrote:

> Thanks for clarifying that for the community Jakob. It's not always clear
> to a submitter whether they have received a +1 from a committer or not and
> could lead to confusion as to whether they have received a +1 but are just
> waiting for a merge.
>
> I think the Jira "vote" feature avoids this confusion and feel that is a
> better way forward. It has the added benefit of capturing votes for
> abandoned or closed-but-not-merged PRs.
>
> -s
>
> On Tue, Oct 11, 2016 at 1:19 PM, Jakob Homan  wrote:
>
> > Be sure to distinguish between +1 == I want this, and +1 == I approve
> > this change to be merged into the codebase.
> >
> > Non-committers are welcome and encouraged to review patches, including
> > providing +1s.  The patch can't be merged on the basis of that
> > non-committer +1 only (a committer's +1 is still required), but this
> > is good experience and contribution from the non-committer.
> >
> >
> > On 11 October 2016 at 12:16, Laura Lorenz 
> > wrote:
> > > I started to do this and totally got myself stuck answering bug reports
> > 😅🙃
> > >
> > > On Mon, Oct 10, 2016 at 11:33 PM, siddharth anand 
> > wrote:
> > >
> > >> Great idea, Max.
> > >>
> > >> There is also a vote feature on JIRAs. Sometimes PRs get abandoned,
> > whereas
> > >> the JIRA tends to stick around longer, sometimes even without an
> owner.
> > I'm
> > >> not sure which is the best way, but I completely agree with the
> > sentiment.
> > >>
> > >> -s
> > >>
> > >> On Mon, Oct 10, 2016 at 6:01 PM, Maxime Beauchemin <
> > >> maximebeauche...@gmail.com> wrote:
> > >>
> > >> > Airflowers,
> > >> >
> > >> > I would love if people could write simple `+1` comments on the PRs
> > they
> > >> > care about.
> > >> >
> > >> > It's motivating for contributors to see that people want the
> features
> > >> they
> > >> > work on, and it can help committers prioritize which PRs to review
> and
> > >> > release first.
> > >> >
> > >> > It's also a great way to keep a pulse on the project, see what is
> > coming
> > >> > up, and to start getting involved. Of course more involved feedback
> > >> > (reaction icons, comments, review) are also very welcomed.
> > >> >
> > >> > See you on Github!
> > >> >
> > >> > Max
> > >> >
> > >>
> >
>


scheduler questions

2016-10-13 Thread Boris Tyukin
Hello all and thanks for such an amazing project! I have been evaluating
Airflow and spent a few days reading about it and playing with it and I
have a few questions that I struggle to understand.

Let's say I have a simple DAG that runs once a day and it is doing a full
reload of tables from the source database so the process is not incremental.

Let's consider this scenario:

Day 1 - OK

Day 2 - airflow scheduler or server with airflow is down for some reason ((or
DAG is paused)

Day 3 - still down(or DAG is paused)

Day 4 - server is up and now needs to run missing jobs.


How can I make airflow to run only Day 4 job and not backfill Day 2 and 3?


I tried to do depend_on_past = True but it does not seem to do this trick.


I also found in a roadmap doc this but seems it is not made to the release
yet:


 Only Run Latest - Champion : Sid

• For cases where we need to only run the latest in a series of task
instance runs and mark the others as skipped. For example, we may have job
to execute a DB snapshot every day. If the DAG is paused for 5 days and
then unpaused, we don’t want to run all 5, just the latest. With this
feature, we will provide “cron” functionality for task scheduling that is
not related to ETL


My second question, what if I have another DAG that does incremental loads
from a source table:


Day 1 - OK, loaded new/changed data for previous day

Day 2 - source system is down (or DAG is paused), Airflow DagRun failed

Day 3 - source system is down (or DAG is paused), Airflow DagRun failed

Day 4 - source system is up, Airflow Dagrun succeeded


My problem (unless I am missing something), Airflow on Day 4 would use
execution time from Day 3, so the interval for incremental load would be
since the last run (which was Failed). My hope it would use the last
_successful_ run so on Day 4 it would go back to Day 1. Is it possible to
achieve this?

I am aware of a manual backfill command via CLI but I am not sure I want to
use due to all the issues and inconsistencies I've read about it.

Thanks!


Re: +1 on PRs!

2016-10-13 Thread Maxime Beauchemin
Good idea! Well even writing a comment that says "happy to see this
feature" or something of that nature does the trick. We just have to be
careful with +1, which apparently maps to the more common "LGTM" under the
Apache umbrella.

Personally I'd rather do as much as possible on Github, to reduce the
notification channels and simplify our workflows.

I remember a thread about moving out of Jira and into GH Issues. What was
the final word on that?

Max

On Oct 13, 2016 7:17 AM, "Arthur Wiedmer"  wrote:

> Another way to do this would be to use the reactions on Github.
>
> Use the 👍 to upvote an issue or PR you care about. And reserve the +1s for
> Committer Code review.
>
> On top of that, it makes it easier to tally the votes.
>
> Best,
> Arthur
>
> On Tue, Oct 11, 2016 at 2:31 PM, siddharth anand 
> wrote:
>
> > Thanks for clarifying that for the community Jakob. It's not always clear
> > to a submitter whether they have received a +1 from a committer or not
> and
> > could lead to confusion as to whether they have received a +1 but are
> just
> > waiting for a merge.
> >
> > I think the Jira "vote" feature avoids this confusion and feel that is a
> > better way forward. It has the added benefit of capturing votes for
> > abandoned or closed-but-not-merged PRs.
> >
> > -s
> >
> > On Tue, Oct 11, 2016 at 1:19 PM, Jakob Homan  wrote:
> >
> > > Be sure to distinguish between +1 == I want this, and +1 == I approve
> > > this change to be merged into the codebase.
> > >
> > > Non-committers are welcome and encouraged to review patches, including
> > > providing +1s.  The patch can't be merged on the basis of that
> > > non-committer +1 only (a committer's +1 is still required), but this
> > > is good experience and contribution from the non-committer.
> > >
> > >
> > > On 11 October 2016 at 12:16, Laura Lorenz 
> > > wrote:
> > > > I started to do this and totally got myself stuck answering bug
> reports
> > > 😅🙃
> > > >
> > > > On Mon, Oct 10, 2016 at 11:33 PM, siddharth anand  >
> > > wrote:
> > > >
> > > >> Great idea, Max.
> > > >>
> > > >> There is also a vote feature on JIRAs. Sometimes PRs get abandoned,
> > > whereas
> > > >> the JIRA tends to stick around longer, sometimes even without an
> > owner.
> > > I'm
> > > >> not sure which is the best way, but I completely agree with the
> > > sentiment.
> > > >>
> > > >> -s
> > > >>
> > > >> On Mon, Oct 10, 2016 at 6:01 PM, Maxime Beauchemin <
> > > >> maximebeauche...@gmail.com> wrote:
> > > >>
> > > >> > Airflowers,
> > > >> >
> > > >> > I would love if people could write simple `+1` comments on the PRs
> > > they
> > > >> > care about.
> > > >> >
> > > >> > It's motivating for contributors to see that people want the
> > features
> > > >> they
> > > >> > work on, and it can help committers prioritize which PRs to review
> > and
> > > >> > release first.
> > > >> >
> > > >> > It's also a great way to keep a pulse on the project, see what is
> > > coming
> > > >> > up, and to start getting involved. Of course more involved
> feedback
> > > >> > (reaction icons, comments, review) are also very welcomed.
> > > >> >
> > > >> > See you on Github!
> > > >> >
> > > >> > Max
> > > >> >
> > > >>
> > >
> >
>


Re: scheduler questions

2016-10-13 Thread Joseph Napolitano
Hi Boris,

To answer the first question, the backfill command has a flag to mark jobs
as successful without running them.  Take care to align the start and end
times precisely as needed.  As an example, for a job that runs daily at 7am:

airflow backfill -s 2016-10-07T07 -e 2016-10-10T07 my-dag-name -m

The "-m" parameter tells Airflow to mark it successful without running it.

On Thu, Oct 13, 2016 at 10:46 AM, Boris Tyukin 
wrote:

> Hello all and thanks for such an amazing project! I have been evaluating
> Airflow and spent a few days reading about it and playing with it and I
> have a few questions that I struggle to understand.
>
> Let's say I have a simple DAG that runs once a day and it is doing a full
> reload of tables from the source database so the process is not
> incremental.
>
> Let's consider this scenario:
>
> Day 1 - OK
>
> Day 2 - airflow scheduler or server with airflow is down for some reason
> ((or
> DAG is paused)
>
> Day 3 - still down(or DAG is paused)
>
> Day 4 - server is up and now needs to run missing jobs.
>
>
> How can I make airflow to run only Day 4 job and not backfill Day 2 and 3?
>
>
> I tried to do depend_on_past = True but it does not seem to do this trick.
>
>
> I also found in a roadmap doc this but seems it is not made to the release
> yet:
>
>
>  Only Run Latest - Champion : Sid
>
> • For cases where we need to only run the latest in a series of task
> instance runs and mark the others as skipped. For example, we may have job
> to execute a DB snapshot every day. If the DAG is paused for 5 days and
> then unpaused, we don’t want to run all 5, just the latest. With this
> feature, we will provide “cron” functionality for task scheduling that is
> not related to ETL
>
>
> My second question, what if I have another DAG that does incremental loads
> from a source table:
>
>
> Day 1 - OK, loaded new/changed data for previous day
>
> Day 2 - source system is down (or DAG is paused), Airflow DagRun failed
>
> Day 3 - source system is down (or DAG is paused), Airflow DagRun failed
>
> Day 4 - source system is up, Airflow Dagrun succeeded
>
>
> My problem (unless I am missing something), Airflow on Day 4 would use
> execution time from Day 3, so the interval for incremental load would be
> since the last run (which was Failed). My hope it would use the last
> _successful_ run so on Day 4 it would go back to Day 1. Is it possible to
> achieve this?
>
> I am aware of a manual backfill command via CLI but I am not sure I want to
> use due to all the issues and inconsistencies I've read about it.
>
> Thanks!
>



-- 
*Joe Napolitano *| Sr. Data Engineer
www.blueapron.com | 5 Crosby Street, New York, NY 10013


Re: scheduler questions

2016-10-13 Thread Ben Tallman
Boris -

We have a pull request in which causes the scheduler to not backfill on a
per dag basis. This is designed for exactly this situation. Basically, the
scheduler will skip intervals and jump to the last one in the list if this
flag is set. If this is important to you, please vote for it.

https://github.com/apache/incubator-airflow/pull/1830

For instance:
dag = DAG(
"test_dag_id_here",
"backfill": False
, ...
)



Thanks,
Ben

*--*
*ben tallman* | *apigee
*
 | m: +1.503.680.5709 | o: +1.503.608.7552 | twitter @anonymousmanage

 @apigee



On Thu, Oct 13, 2016 at 8:07 AM, Joseph Napolitano <
joseph.napolit...@blueapron.com.invalid> wrote:

> Hi Boris,
>
> To answer the first question, the backfill command has a flag to mark jobs
> as successful without running them.  Take care to align the start and end
> times precisely as needed.  As an example, for a job that runs daily at
> 7am:
>
> airflow backfill -s 2016-10-07T07 -e 2016-10-10T07 my-dag-name -m
>
> The "-m" parameter tells Airflow to mark it successful without running it.
>
> On Thu, Oct 13, 2016 at 10:46 AM, Boris Tyukin 
> wrote:
>
> > Hello all and thanks for such an amazing project! I have been evaluating
> > Airflow and spent a few days reading about it and playing with it and I
> > have a few questions that I struggle to understand.
> >
> > Let's say I have a simple DAG that runs once a day and it is doing a full
> > reload of tables from the source database so the process is not
> > incremental.
> >
> > Let's consider this scenario:
> >
> > Day 1 - OK
> >
> > Day 2 - airflow scheduler or server with airflow is down for some reason
> > ((or
> > DAG is paused)
> >
> > Day 3 - still down(or DAG is paused)
> >
> > Day 4 - server is up and now needs to run missing jobs.
> >
> >
> > How can I make airflow to run only Day 4 job and not backfill Day 2 and
> 3?
> >
> >
> > I tried to do depend_on_past = True but it does not seem to do this
> trick.
> >
> >
> > I also found in a roadmap doc this but seems it is not made to the
> release
> > yet:
> >
> >
> >  Only Run Latest - Champion : Sid
> >
> > • For cases where we need to only run the latest in a series of task
> > instance runs and mark the others as skipped. For example, we may have
> job
> > to execute a DB snapshot every day. If the DAG is paused for 5 days and
> > then unpaused, we don’t want to run all 5, just the latest. With this
> > feature, we will provide “cron” functionality for task scheduling that is
> > not related to ETL
> >
> >
> > My second question, what if I have another DAG that does incremental
> loads
> > from a source table:
> >
> >
> > Day 1 - OK, loaded new/changed data for previous day
> >
> > Day 2 - source system is down (or DAG is paused), Airflow DagRun failed
> >
> > Day 3 - source system is down (or DAG is paused), Airflow DagRun failed
> >
> > Day 4 - source system is up, Airflow Dagrun succeeded
> >
> >
> > My problem (unless I am missing something), Airflow on Day 4 would use
> > execution time from Day 3, so the interval for incremental load would be
> > since the last run (which was Failed). My hope it would use the last
> > _successful_ run so on Day 4 it would go back to Day 1. Is it possible to
> > achieve this?
> >
> > I am aware of a manual backfill command via CLI but I am not sure I want
> to
> > use due to all the issues and inconsistencies I've read about it.
> >
> > Thanks!
> >
>
>
>
> --
> *Joe Napolitano *| Sr. Data Engineer
> www.blueapron.com | 5 Crosby Street, New York, NY 10013
>


Re: scheduler questions

2016-10-13 Thread siddharth anand
Boris,

*Question 1*
Only_Run_Latest is in master -
https://github.com/apache/incubator-airflow/commit/edf033be65b575f44aa221d5d0ec9ecb6b32c67a.
That will solve your problem.

Releases come out one a quarter sometimes once every 2 quarters, so I would
recommend that you run off master or off your own fork.

You could also achieve this yourself with the following code snippet. It
uses a ShortCircuitOperator that will skip downstream tasks if the DagRun
being executed is not the current one. It will work for any schedule. The
code below has essentially been implemented in the LatestOnlyOperator above
for convenience.

def skip_to_current_job(ds, **kwargs):

now = datetime.now()

left_window = kwargs['dag'].following_schedule(kwargs['execution_date'])

right_window = kwargs['dag'].following_schedule(left_window)

logging.info(('Left Window {}, Now {}, Right Window {}'
).format(left_window,now,right_window))

if not now <= right_window:

logging.info('Not latest execution, skipping downstream.')

return False

return True


t0 = ShortCircuitOperator(

  task_id = 'short_circuit_if_not_current,

  provide_context = True,

  python_callable = skip_to_current_job,

  dag = dag

)


-s


On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin  wrote:

> Hello all and thanks for such an amazing project! I have been evaluating
> Airflow and spent a few days reading about it and playing with it and I
> have a few questions that I struggle to understand.
>
> Let's say I have a simple DAG that runs once a day and it is doing a full
> reload of tables from the source database so the process is not
> incremental.
>
> Let's consider this scenario:
>
> Day 1 - OK
>
> Day 2 - airflow scheduler or server with airflow is down for some reason
> ((or
> DAG is paused)
>
> Day 3 - still down(or DAG is paused)
>
> Day 4 - server is up and now needs to run missing jobs.
>
>
> How can I make airflow to run only Day 4 job and not backfill Day 2 and 3?
>
>
> I tried to do depend_on_past = True but it does not seem to do this trick.
>
>
> I also found in a roadmap doc this but seems it is not made to the release
> yet:
>
>
>  Only Run Latest - Champion : Sid
>
> • For cases where we need to only run the latest in a series of task
> instance runs and mark the others as skipped. For example, we may have job
> to execute a DB snapshot every day. If the DAG is paused for 5 days and
> then unpaused, we don’t want to run all 5, just the latest. With this
> feature, we will provide “cron” functionality for task scheduling that is
> not related to ETL
>
>
> My second question, what if I have another DAG that does incremental loads
> from a source table:
>
>
> Day 1 - OK, loaded new/changed data for previous day
>
> Day 2 - source system is down (or DAG is paused), Airflow DagRun failed
>
> Day 3 - source system is down (or DAG is paused), Airflow DagRun failed
>
> Day 4 - source system is up, Airflow Dagrun succeeded
>
>
> My problem (unless I am missing something), Airflow on Day 4 would use
> execution time from Day 3, so the interval for incremental load would be
> since the last run (which was Failed). My hope it would use the last
> _successful_ run so on Day 4 it would go back to Day 1. Is it possible to
> achieve this?
>
> I am aware of a manual backfill command via CLI but I am not sure I want to
> use due to all the issues and inconsistencies I've read about it.
>
> Thanks!
>


Re: scheduler questions

2016-10-13 Thread siddharth anand
*Question 2*
You can use depend_on_past=True. Then, future dag runs won't be scheduled
until past one succeed, which I specify as shown below:

default_args = {
'owner': 'sanand',
'depends_on_past': True,
'pool': 'ep_data_pipeline',
'start_date': START_DATE,
'email': [import_ep_pipeline_alert_email_dl],
'email_on_failure': import_airflow_enable_notifications,
'email_on_retry': import_airflow_enable_notifications,
'retries': 3,
'retry_delay': timedelta(seconds=30),
'priority_weight': import_airflow_priority_weight}dag =
DAG(DAG_NAME, schedule_interval='@hourly', default_args=default_args,
sla_miss_callback=sla_alert_func)



I also use retries to sidestep intermittent issues. If you need to retry a
failed dag run, you can clear that dag run in the UI (or CLI) and the
scheduler will rerun it.

On Thu, Oct 13, 2016 at 10:11 AM, siddharth anand  wrote:

> Boris,
>
> *Question 1*
> Only_Run_Latest is in master - https://github.com/apache/
> incubator-airflow/commit/edf033be65b575f44aa221d5d0ec9ecb6b32c67a. That
> will solve your problem.
>
> Releases come out one a quarter sometimes once every 2 quarters, so I
> would recommend that you run off master or off your own fork.
>
> You could also achieve this yourself with the following code snippet. It
> uses a ShortCircuitOperator that will skip downstream tasks if the DagRun
> being executed is not the current one. It will work for any schedule. The
> code below has essentially been implemented in the LatestOnlyOperator above
> for convenience.
>
> def skip_to_current_job(ds, **kwargs):
>
> now = datetime.now()
>
> left_window = kwargs['dag'].following_schedule(kwargs['execution_date'
> ])
>
> right_window = kwargs['dag'].following_schedule(left_window)
>
> logging.info(('Left Window {}, Now {}, Right Window {}'
> ).format(left_window,now,right_window))
>
> if not now <= right_window:
>
> logging.info('Not latest execution, skipping downstream.')
>
> return False
>
> return True
>
>
> t0 = ShortCircuitOperator(
>
>   task_id = 'short_circuit_if_not_current,
>
>   provide_context = True,
>
>   python_callable = skip_to_current_job,
>
>   dag = dag
>
> )
>
>
> -s
>
>
> On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin 
> wrote:
>
>> Hello all and thanks for such an amazing project! I have been evaluating
>> Airflow and spent a few days reading about it and playing with it and I
>> have a few questions that I struggle to understand.
>>
>> Let's say I have a simple DAG that runs once a day and it is doing a full
>> reload of tables from the source database so the process is not
>> incremental.
>>
>> Let's consider this scenario:
>>
>> Day 1 - OK
>>
>> Day 2 - airflow scheduler or server with airflow is down for some reason
>> ((or
>> DAG is paused)
>>
>> Day 3 - still down(or DAG is paused)
>>
>> Day 4 - server is up and now needs to run missing jobs.
>>
>>
>> How can I make airflow to run only Day 4 job and not backfill Day 2 and 3?
>>
>>
>> I tried to do depend_on_past = True but it does not seem to do this trick.
>>
>>
>> I also found in a roadmap doc this but seems it is not made to the release
>> yet:
>>
>>
>>  Only Run Latest - Champion : Sid
>>
>> • For cases where we need to only run the latest in a series of task
>> instance runs and mark the others as skipped. For example, we may have job
>> to execute a DB snapshot every day. If the DAG is paused for 5 days and
>> then unpaused, we don’t want to run all 5, just the latest. With this
>> feature, we will provide “cron” functionality for task scheduling that is
>> not related to ETL
>>
>>
>> My second question, what if I have another DAG that does incremental loads
>> from a source table:
>>
>>
>> Day 1 - OK, loaded new/changed data for previous day
>>
>> Day 2 - source system is down (or DAG is paused), Airflow DagRun failed
>>
>> Day 3 - source system is down (or DAG is paused), Airflow DagRun failed
>>
>> Day 4 - source system is up, Airflow Dagrun succeeded
>>
>>
>> My problem (unless I am missing something), Airflow on Day 4 would use
>> execution time from Day 3, so the interval for incremental load would be
>> since the last run (which was Failed). My hope it would use the last
>> _successful_ run so on Day 4 it would go back to Day 1. Is it possible to
>> achieve this?
>>
>> I am aware of a manual backfill command via CLI but I am not sure I want
>> to
>> use due to all the issues and inconsistencies I've read about it.
>>
>> Thanks!
>>
>
>


Re: +1 on PRs!

2016-10-13 Thread siddharth anand
These are all good ideas and any would work. Our committer list is small
enough that no committer would merge without either +1'ing it
himself/herself or checking that another did.

Sure, there could be a corner cases where a contributor +1'd a PR before
being promoted to a committer and then another committer might confuse that
+1 for a "committer +1" and accidentally merge it, but I don't think we
need to worry about those special cases.

If you like a PR/JIRA, then don't worry. Make your voice heard and we
(committers) will do the right thing.

-s

On Thu, Oct 13, 2016 at 7:56 AM, Maxime Beauchemin <
maximebeauche...@gmail.com> wrote:

> Good idea! Well even writing a comment that says "happy to see this
> feature" or something of that nature does the trick. We just have to be
> careful with +1, which apparently maps to the more common "LGTM" under the
> Apache umbrella.
>
> Personally I'd rather do as much as possible on Github, to reduce the
> notification channels and simplify our workflows.
>
> I remember a thread about moving out of Jira and into GH Issues. What was
> the final word on that?
>
> Max
>
> On Oct 13, 2016 7:17 AM, "Arthur Wiedmer" 
> wrote:
>
> > Another way to do this would be to use the reactions on Github.
> >
> > Use the 👍 to upvote an issue or PR you care about. And reserve the +1s
> for
> > Committer Code review.
> >
> > On top of that, it makes it easier to tally the votes.
> >
> > Best,
> > Arthur
> >
> > On Tue, Oct 11, 2016 at 2:31 PM, siddharth anand 
> > wrote:
> >
> > > Thanks for clarifying that for the community Jakob. It's not always
> clear
> > > to a submitter whether they have received a +1 from a committer or not
> > and
> > > could lead to confusion as to whether they have received a +1 but are
> > just
> > > waiting for a merge.
> > >
> > > I think the Jira "vote" feature avoids this confusion and feel that is
> a
> > > better way forward. It has the added benefit of capturing votes for
> > > abandoned or closed-but-not-merged PRs.
> > >
> > > -s
> > >
> > > On Tue, Oct 11, 2016 at 1:19 PM, Jakob Homan 
> wrote:
> > >
> > > > Be sure to distinguish between +1 == I want this, and +1 == I approve
> > > > this change to be merged into the codebase.
> > > >
> > > > Non-committers are welcome and encouraged to review patches,
> including
> > > > providing +1s.  The patch can't be merged on the basis of that
> > > > non-committer +1 only (a committer's +1 is still required), but this
> > > > is good experience and contribution from the non-committer.
> > > >
> > > >
> > > > On 11 October 2016 at 12:16, Laura Lorenz 
> > > > wrote:
> > > > > I started to do this and totally got myself stuck answering bug
> > reports
> > > > 😅🙃
> > > > >
> > > > > On Mon, Oct 10, 2016 at 11:33 PM, siddharth anand <
> san...@apache.org
> > >
> > > > wrote:
> > > > >
> > > > >> Great idea, Max.
> > > > >>
> > > > >> There is also a vote feature on JIRAs. Sometimes PRs get
> abandoned,
> > > > whereas
> > > > >> the JIRA tends to stick around longer, sometimes even without an
> > > owner.
> > > > I'm
> > > > >> not sure which is the best way, but I completely agree with the
> > > > sentiment.
> > > > >>
> > > > >> -s
> > > > >>
> > > > >> On Mon, Oct 10, 2016 at 6:01 PM, Maxime Beauchemin <
> > > > >> maximebeauche...@gmail.com> wrote:
> > > > >>
> > > > >> > Airflowers,
> > > > >> >
> > > > >> > I would love if people could write simple `+1` comments on the
> PRs
> > > > they
> > > > >> > care about.
> > > > >> >
> > > > >> > It's motivating for contributors to see that people want the
> > > features
> > > > >> they
> > > > >> > work on, and it can help committers prioritize which PRs to
> review
> > > and
> > > > >> > release first.
> > > > >> >
> > > > >> > It's also a great way to keep a pulse on the project, see what
> is
> > > > coming
> > > > >> > up, and to start getting involved. Of course more involved
> > feedback
> > > > >> > (reaction icons, comments, review) are also very welcomed.
> > > > >> >
> > > > >> > See you on Github!
> > > > >> >
> > > > >> > Max
> > > > >> >
> > > > >>
> > > >
> > >
> >
>


Re: scheduler questions

2016-10-13 Thread Boris Tyukin
thanks so much, Sid! just a follow up question on "Only_Run_Latest" - will
it work with depend_on_past = True? or it will assume that DAG is used
False?

On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand  wrote:

> Boris,
>
> *Question 1*
> Only_Run_Latest is in master -
> https://github.com/apache/incubator-airflow/commit/
> edf033be65b575f44aa221d5d0ec9ecb6b32c67a.
> That will solve your problem.
>
> Releases come out one a quarter sometimes once every 2 quarters, so I would
> recommend that you run off master or off your own fork.
>
> You could also achieve this yourself with the following code snippet. It
> uses a ShortCircuitOperator that will skip downstream tasks if the DagRun
> being executed is not the current one. It will work for any schedule. The
> code below has essentially been implemented in the LatestOnlyOperator above
> for convenience.
>
> def skip_to_current_job(ds, **kwargs):
>
> now = datetime.now()
>
> left_window = kwargs['dag'].following_schedule(kwargs['execution_
> date'])
>
> right_window = kwargs['dag'].following_schedule(left_window)
>
> logging.info(('Left Window {}, Now {}, Right Window {}'
> ).format(left_window,now,right_window))
>
> if not now <= right_window:
>
> logging.info('Not latest execution, skipping downstream.')
>
> return False
>
> return True
>
>
> t0 = ShortCircuitOperator(
>
>   task_id = 'short_circuit_if_not_current,
>
>   provide_context = True,
>
>   python_callable = skip_to_current_job,
>
>   dag = dag
>
> )
>
>
> -s
>
>
> On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin 
> wrote:
>
> > Hello all and thanks for such an amazing project! I have been evaluating
> > Airflow and spent a few days reading about it and playing with it and I
> > have a few questions that I struggle to understand.
> >
> > Let's say I have a simple DAG that runs once a day and it is doing a full
> > reload of tables from the source database so the process is not
> > incremental.
> >
> > Let's consider this scenario:
> >
> > Day 1 - OK
> >
> > Day 2 - airflow scheduler or server with airflow is down for some reason
> > ((or
> > DAG is paused)
> >
> > Day 3 - still down(or DAG is paused)
> >
> > Day 4 - server is up and now needs to run missing jobs.
> >
> >
> > How can I make airflow to run only Day 4 job and not backfill Day 2 and
> 3?
> >
> >
> > I tried to do depend_on_past = True but it does not seem to do this
> trick.
> >
> >
> > I also found in a roadmap doc this but seems it is not made to the
> release
> > yet:
> >
> >
> >  Only Run Latest - Champion : Sid
> >
> > • For cases where we need to only run the latest in a series of task
> > instance runs and mark the others as skipped. For example, we may have
> job
> > to execute a DB snapshot every day. If the DAG is paused for 5 days and
> > then unpaused, we don’t want to run all 5, just the latest. With this
> > feature, we will provide “cron” functionality for task scheduling that is
> > not related to ETL
> >
> >
> > My second question, what if I have another DAG that does incremental
> loads
> > from a source table:
> >
> >
> > Day 1 - OK, loaded new/changed data for previous day
> >
> > Day 2 - source system is down (or DAG is paused), Airflow DagRun failed
> >
> > Day 3 - source system is down (or DAG is paused), Airflow DagRun failed
> >
> > Day 4 - source system is up, Airflow Dagrun succeeded
> >
> >
> > My problem (unless I am missing something), Airflow on Day 4 would use
> > execution time from Day 3, so the interval for incremental load would be
> > since the last run (which was Failed). My hope it would use the last
> > _successful_ run so on Day 4 it would go back to Day 1. Is it possible to
> > achieve this?
> >
> > I am aware of a manual backfill command via CLI but I am not sure I want
> to
> > use due to all the issues and inconsistencies I've read about it.
> >
> > Thanks!
> >
>


Re: scheduler questions

2016-10-13 Thread Boris Tyukin
so for my second scenario, I think i would still need to run missing days
jobs one by one (by clearing the failed ones) and I understand this is
recommended approach as I figured from Maxime's video.

But sometimes it is more efficient to combine all missing day runs in one
so I would be using a window for incremental process as the last successful
job (Day 1) to current run (Day 4) - so it will be only one DagRun not 3 to
catch up. Does it make sense? is it possible?

On Thu, Oct 13, 2016 at 1:16 PM, siddharth anand  wrote:

> *Question 2*
> You can use depend_on_past=True. Then, future dag runs won't be scheduled
> until past one succeed, which I specify as shown below:
>
> default_args = {
> 'owner': 'sanand',
> 'depends_on_past': True,
> 'pool': 'ep_data_pipeline',
> 'start_date': START_DATE,
> 'email': [import_ep_pipeline_alert_email_dl],
> 'email_on_failure': import_airflow_enable_notifications,
> 'email_on_retry': import_airflow_enable_notifications,
> 'retries': 3,
> 'retry_delay': timedelta(seconds=30),
> 'priority_weight': import_airflow_priority_weight}dag =
> DAG(DAG_NAME, schedule_interval='@hourly', default_args=default_args,
> sla_miss_callback=sla_alert_func)
>
>
>
> I also use retries to sidestep intermittent issues. If you need to retry a
> failed dag run, you can clear that dag run in the UI (or CLI) and the
> scheduler will rerun it.
>
> On Thu, Oct 13, 2016 at 10:11 AM, siddharth anand 
> wrote:
>
> > Boris,
> >
> > *Question 1*
> > Only_Run_Latest is in master - https://github.com/apache/
> > incubator-airflow/commit/edf033be65b575f44aa221d5d0ec9ecb6b32c67a. That
> > will solve your problem.
> >
> > Releases come out one a quarter sometimes once every 2 quarters, so I
> > would recommend that you run off master or off your own fork.
> >
> > You could also achieve this yourself with the following code snippet. It
> > uses a ShortCircuitOperator that will skip downstream tasks if the DagRun
> > being executed is not the current one. It will work for any schedule. The
> > code below has essentially been implemented in the LatestOnlyOperator
> above
> > for convenience.
> >
> > def skip_to_current_job(ds, **kwargs):
> >
> > now = datetime.now()
> >
> > left_window = kwargs['dag'].following_schedule(kwargs['execution_
> date'
> > ])
> >
> > right_window = kwargs['dag'].following_schedule(left_window)
> >
> > logging.info(('Left Window {}, Now {}, Right Window {}'
> > ).format(left_window,now,right_window))
> >
> > if not now <= right_window:
> >
> > logging.info('Not latest execution, skipping downstream.')
> >
> > return False
> >
> > return True
> >
> >
> > t0 = ShortCircuitOperator(
> >
> >   task_id = 'short_circuit_if_not_current,
> >
> >   provide_context = True,
> >
> >   python_callable = skip_to_current_job,
> >
> >   dag = dag
> >
> > )
> >
> >
> > -s
> >
> >
> > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin 
> > wrote:
> >
> >> Hello all and thanks for such an amazing project! I have been evaluating
> >> Airflow and spent a few days reading about it and playing with it and I
> >> have a few questions that I struggle to understand.
> >>
> >> Let's say I have a simple DAG that runs once a day and it is doing a
> full
> >> reload of tables from the source database so the process is not
> >> incremental.
> >>
> >> Let's consider this scenario:
> >>
> >> Day 1 - OK
> >>
> >> Day 2 - airflow scheduler or server with airflow is down for some reason
> >> ((or
> >> DAG is paused)
> >>
> >> Day 3 - still down(or DAG is paused)
> >>
> >> Day 4 - server is up and now needs to run missing jobs.
> >>
> >>
> >> How can I make airflow to run only Day 4 job and not backfill Day 2 and
> 3?
> >>
> >>
> >> I tried to do depend_on_past = True but it does not seem to do this
> trick.
> >>
> >>
> >> I also found in a roadmap doc this but seems it is not made to the
> release
> >> yet:
> >>
> >>
> >>  Only Run Latest - Champion : Sid
> >>
> >> • For cases where we need to only run the latest in a series of task
> >> instance runs and mark the others as skipped. For example, we may have
> job
> >> to execute a DB snapshot every day. If the DAG is paused for 5 days and
> >> then unpaused, we don’t want to run all 5, just the latest. With this
> >> feature, we will provide “cron” functionality for task scheduling that
> is
> >> not related to ETL
> >>
> >>
> >> My second question, what if I have another DAG that does incremental
> loads
> >> from a source table:
> >>
> >>
> >> Day 1 - OK, loaded new/changed data for previous day
> >>
> >> Day 2 - source system is down (or DAG is paused), Airflow DagRun failed
> >>
> >> Day 3 - source system is down (or DAG is paused), Airflow DagRun failed
> >>
> >> Day 4 - source system is up, Airflow Dagrun succeeded
> >>
> >>
> >> My problem (unless I am missing something), Airflow on Day 4 would use
> >> execution time from Day 3, so the interval for incremental load wo

Re: scheduler questions

2016-10-13 Thread siddharth anand
Yes! It does work with Depends_on_past=True.
-s

On Thu, Oct 13, 2016 at 10:28 AM, Boris Tyukin 
wrote:

> thanks so much, Sid! just a follow up question on "Only_Run_Latest" - will
> it work with depend_on_past = True? or it will assume that DAG is used
> False?
>
> On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand 
> wrote:
>
> > Boris,
> >
> > *Question 1*
> > Only_Run_Latest is in master -
> > https://github.com/apache/incubator-airflow/commit/
> > edf033be65b575f44aa221d5d0ec9ecb6b32c67a.
> > That will solve your problem.
> >
> > Releases come out one a quarter sometimes once every 2 quarters, so I
> would
> > recommend that you run off master or off your own fork.
> >
> > You could also achieve this yourself with the following code snippet. It
> > uses a ShortCircuitOperator that will skip downstream tasks if the DagRun
> > being executed is not the current one. It will work for any schedule. The
> > code below has essentially been implemented in the LatestOnlyOperator
> above
> > for convenience.
> >
> > def skip_to_current_job(ds, **kwargs):
> >
> > now = datetime.now()
> >
> > left_window = kwargs['dag'].following_schedule(kwargs['execution_
> > date'])
> >
> > right_window = kwargs['dag'].following_schedule(left_window)
> >
> > logging.info(('Left Window {}, Now {}, Right Window {}'
> > ).format(left_window,now,right_window))
> >
> > if not now <= right_window:
> >
> > logging.info('Not latest execution, skipping downstream.')
> >
> > return False
> >
> > return True
> >
> >
> > t0 = ShortCircuitOperator(
> >
> >   task_id = 'short_circuit_if_not_current,
> >
> >   provide_context = True,
> >
> >   python_callable = skip_to_current_job,
> >
> >   dag = dag
> >
> > )
> >
> >
> > -s
> >
> >
> > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin 
> > wrote:
> >
> > > Hello all and thanks for such an amazing project! I have been
> evaluating
> > > Airflow and spent a few days reading about it and playing with it and I
> > > have a few questions that I struggle to understand.
> > >
> > > Let's say I have a simple DAG that runs once a day and it is doing a
> full
> > > reload of tables from the source database so the process is not
> > > incremental.
> > >
> > > Let's consider this scenario:
> > >
> > > Day 1 - OK
> > >
> > > Day 2 - airflow scheduler or server with airflow is down for some
> reason
> > > ((or
> > > DAG is paused)
> > >
> > > Day 3 - still down(or DAG is paused)
> > >
> > > Day 4 - server is up and now needs to run missing jobs.
> > >
> > >
> > > How can I make airflow to run only Day 4 job and not backfill Day 2 and
> > 3?
> > >
> > >
> > > I tried to do depend_on_past = True but it does not seem to do this
> > trick.
> > >
> > >
> > > I also found in a roadmap doc this but seems it is not made to the
> > release
> > > yet:
> > >
> > >
> > >  Only Run Latest - Champion : Sid
> > >
> > > • For cases where we need to only run the latest in a series of task
> > > instance runs and mark the others as skipped. For example, we may have
> > job
> > > to execute a DB snapshot every day. If the DAG is paused for 5 days and
> > > then unpaused, we don’t want to run all 5, just the latest. With this
> > > feature, we will provide “cron” functionality for task scheduling that
> is
> > > not related to ETL
> > >
> > >
> > > My second question, what if I have another DAG that does incremental
> > loads
> > > from a source table:
> > >
> > >
> > > Day 1 - OK, loaded new/changed data for previous day
> > >
> > > Day 2 - source system is down (or DAG is paused), Airflow DagRun failed
> > >
> > > Day 3 - source system is down (or DAG is paused), Airflow DagRun failed
> > >
> > > Day 4 - source system is up, Airflow Dagrun succeeded
> > >
> > >
> > > My problem (unless I am missing something), Airflow on Day 4 would use
> > > execution time from Day 3, so the interval for incremental load would
> be
> > > since the last run (which was Failed). My hope it would use the last
> > > _successful_ run so on Day 4 it would go back to Day 1. Is it possible
> to
> > > achieve this?
> > >
> > > I am aware of a manual backfill command via CLI but I am not sure I
> want
> > to
> > > use due to all the issues and inconsistencies I've read about it.
> > >
> > > Thanks!
> > >
> >
>


Re: scheduler questions

2016-10-13 Thread siddharth anand
If you use depends_on_past=True, it won't proceed to the next DAG Run if
the previous DAG Run failed. If Day 2 fails, Day 3 won't run.

-s

On Thu, Oct 13, 2016 at 10:34 AM, siddharth anand  wrote:

> Yes! It does work with Depends_on_past=True.
> -s
>
> On Thu, Oct 13, 2016 at 10:28 AM, Boris Tyukin 
> wrote:
>
>> thanks so much, Sid! just a follow up question on "Only_Run_Latest" - will
>> it work with depend_on_past = True? or it will assume that DAG is used
>> False?
>>
>> On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand 
>> wrote:
>>
>> > Boris,
>> >
>> > *Question 1*
>> > Only_Run_Latest is in master -
>> > https://github.com/apache/incubator-airflow/commit/
>> > edf033be65b575f44aa221d5d0ec9ecb6b32c67a.
>> > That will solve your problem.
>> >
>> > Releases come out one a quarter sometimes once every 2 quarters, so I
>> would
>> > recommend that you run off master or off your own fork.
>> >
>> > You could also achieve this yourself with the following code snippet. It
>> > uses a ShortCircuitOperator that will skip downstream tasks if the
>> DagRun
>> > being executed is not the current one. It will work for any schedule.
>> The
>> > code below has essentially been implemented in the LatestOnlyOperator
>> above
>> > for convenience.
>> >
>> > def skip_to_current_job(ds, **kwargs):
>> >
>> > now = datetime.now()
>> >
>> > left_window = kwargs['dag'].following_schedule(kwargs['execution_
>> > date'])
>> >
>> > right_window = kwargs['dag'].following_schedule(left_window)
>> >
>> > logging.info(('Left Window {}, Now {}, Right Window {}'
>> > ).format(left_window,now,right_window))
>> >
>> > if not now <= right_window:
>> >
>> > logging.info('Not latest execution, skipping downstream.')
>> >
>> > return False
>> >
>> > return True
>> >
>> >
>> > t0 = ShortCircuitOperator(
>> >
>> >   task_id = 'short_circuit_if_not_current,
>> >
>> >   provide_context = True,
>> >
>> >   python_callable = skip_to_current_job,
>> >
>> >   dag = dag
>> >
>> > )
>> >
>> >
>> > -s
>> >
>> >
>> > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin 
>> > wrote:
>> >
>> > > Hello all and thanks for such an amazing project! I have been
>> evaluating
>> > > Airflow and spent a few days reading about it and playing with it and
>> I
>> > > have a few questions that I struggle to understand.
>> > >
>> > > Let's say I have a simple DAG that runs once a day and it is doing a
>> full
>> > > reload of tables from the source database so the process is not
>> > > incremental.
>> > >
>> > > Let's consider this scenario:
>> > >
>> > > Day 1 - OK
>> > >
>> > > Day 2 - airflow scheduler or server with airflow is down for some
>> reason
>> > > ((or
>> > > DAG is paused)
>> > >
>> > > Day 3 - still down(or DAG is paused)
>> > >
>> > > Day 4 - server is up and now needs to run missing jobs.
>> > >
>> > >
>> > > How can I make airflow to run only Day 4 job and not backfill Day 2
>> and
>> > 3?
>> > >
>> > >
>> > > I tried to do depend_on_past = True but it does not seem to do this
>> > trick.
>> > >
>> > >
>> > > I also found in a roadmap doc this but seems it is not made to the
>> > release
>> > > yet:
>> > >
>> > >
>> > >  Only Run Latest - Champion : Sid
>> > >
>> > > • For cases where we need to only run the latest in a series of task
>> > > instance runs and mark the others as skipped. For example, we may have
>> > job
>> > > to execute a DB snapshot every day. If the DAG is paused for 5 days
>> and
>> > > then unpaused, we don’t want to run all 5, just the latest. With this
>> > > feature, we will provide “cron” functionality for task scheduling
>> that is
>> > > not related to ETL
>> > >
>> > >
>> > > My second question, what if I have another DAG that does incremental
>> > loads
>> > > from a source table:
>> > >
>> > >
>> > > Day 1 - OK, loaded new/changed data for previous day
>> > >
>> > > Day 2 - source system is down (or DAG is paused), Airflow DagRun
>> failed
>> > >
>> > > Day 3 - source system is down (or DAG is paused), Airflow DagRun
>> failed
>> > >
>> > > Day 4 - source system is up, Airflow Dagrun succeeded
>> > >
>> > >
>> > > My problem (unless I am missing something), Airflow on Day 4 would use
>> > > execution time from Day 3, so the interval for incremental load would
>> be
>> > > since the last run (which was Failed). My hope it would use the last
>> > > _successful_ run so on Day 4 it would go back to Day 1. Is it
>> possible to
>> > > achieve this?
>> > >
>> > > I am aware of a manual backfill command via CLI but I am not sure I
>> want
>> > to
>> > > use due to all the issues and inconsistencies I've read about it.
>> > >
>> > > Thanks!
>> > >
>> >
>>
>
>


Re: scheduler questions

2016-10-13 Thread Boris Tyukin
This is not what I see actually. I posted below my test DAG and a
screenshot.

It does create DAGRuns on subsequent runs - I modeled that scenario by
commenting one bash command and uncommenting another one with Exit 1.

it does not create Task Instances on subsequent failed DAGs but it does
create DAGRuns and the first successful run after failed ones would not
have execution timestamp from last successful run


[image: Inline image 1]


here is my test DAG



from datetime import datetime, timedelta

# Determine schedule:
dag_schedule_interval = timedelta(seconds=60)
dag_start_date = datetime.now() - dag_schedule_interval


default_args = {
'owner': 'airflow',
'depends_on_past': True,
'start_date': dag_start_date,
# 'start_date': datetime(2016, 10, 11, 17, 0),
'email': ['airf...@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(seconds=20),
,'only_run_latest'=True,
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}

# Change version number if schedule needs to be changed:
dag = DAG(
'pipeline1_v8', default_args=default_args,
schedule_interval=dag_schedule_interval)

dag.doc_md = __doc__

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='t1',
bash_command='echo execution ts {{ ts }} & echo 1',
# bash_command='exit 1',
dag=dag)

On Thu, Oct 13, 2016 at 1:37 PM, siddharth anand  wrote:

> If you use depends_on_past=True, it won't proceed to the next DAG Run if
> the previous DAG Run failed. If Day 2 fails, Day 3 won't run.
>
> -s
>
> On Thu, Oct 13, 2016 at 10:34 AM, siddharth anand 
> wrote:
>
> > Yes! It does work with Depends_on_past=True.
> > -s
> >
> > On Thu, Oct 13, 2016 at 10:28 AM, Boris Tyukin 
> > wrote:
> >
> >> thanks so much, Sid! just a follow up question on "Only_Run_Latest" -
> will
> >> it work with depend_on_past = True? or it will assume that DAG is used
> >> False?
> >>
> >> On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand 
> >> wrote:
> >>
> >> > Boris,
> >> >
> >> > *Question 1*
> >> > Only_Run_Latest is in master -
> >> > https://github.com/apache/incubator-airflow/commit/
> >> > edf033be65b575f44aa221d5d0ec9ecb6b32c67a.
> >> > That will solve your problem.
> >> >
> >> > Releases come out one a quarter sometimes once every 2 quarters, so I
> >> would
> >> > recommend that you run off master or off your own fork.
> >> >
> >> > You could also achieve this yourself with the following code snippet.
> It
> >> > uses a ShortCircuitOperator that will skip downstream tasks if the
> >> DagRun
> >> > being executed is not the current one. It will work for any schedule.
> >> The
> >> > code below has essentially been implemented in the LatestOnlyOperator
> >> above
> >> > for convenience.
> >> >
> >> > def skip_to_current_job(ds, **kwargs):
> >> >
> >> > now = datetime.now()
> >> >
> >> > left_window = kwargs['dag'].following_schedule(kwargs['execution_
> >> > date'])
> >> >
> >> > right_window = kwargs['dag'].following_schedule(left_window)
> >> >
> >> > logging.info(('Left Window {}, Now {}, Right Window {}'
> >> > ).format(left_window,now,right_window))
> >> >
> >> > if not now <= right_window:
> >> >
> >> > logging.info('Not latest execution, skipping downstream.')
> >> >
> >> > return False
> >> >
> >> > return True
> >> >
> >> >
> >> > t0 = ShortCircuitOperator(
> >> >
> >> >   task_id = 'short_circuit_if_not_current,
> >> >
> >> >   provide_context = True,
> >> >
> >> >   python_callable = skip_to_current_job,
> >> >
> >> >   dag = dag
> >> >
> >> > )
> >> >
> >> >
> >> > -s
> >> >
> >> >
> >> > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin 
> >> > wrote:
> >> >
> >> > > Hello all and thanks for such an amazing project! I have been
> >> evaluating
> >> > > Airflow and spent a few days reading about it and playing with it
> and
> >> I
> >> > > have a few questions that I struggle to understand.
> >> > >
> >> > > Let's say I have a simple DAG that runs once a day and it is doing a
> >> full
> >> > > reload of tables from the source database so the process is not
> >> > > incremental.
> >> > >
> >> > > Let's consider this scenario:
> >> > >
> >> > > Day 1 - OK
> >> > >
> >> > > Day 2 - airflow scheduler or server with airflow is down for some
> >> reason
> >> > > ((or
> >> > > DAG is paused)
> >> > >
> >> > > Day 3 - still down(or DAG is paused)
> >> > >
> >> > > Day 4 - server is up and now needs to run missing jobs.
> >> > >
> >> > >
> >> > > How can I make airflow to run only Day 4 job and not backfill Day 2
> >> and
> >> > 3?
> >> > >
> >> > >
> >> > > I tried to do depend_on_past = True but it does not seem to do this
> >> > trick.
> >> > >
> >> > >
> >> > > I also found in a roadmap doc this but seems it is not made to the
> >> > release
> >> > > yet:
> >> > >
> >> > >
> >> > >  Only Run 

Re: Next Airflow meet-up

2016-10-13 Thread George Leslie-Waksman
Can someone add "gwax" to the editor list for Confluence so I can add a
Clover Health as the host of the subsequent meetup?

I assume early December is too soon; how do folks feel about mid-January?

--George Leslie-Waksman

On Wed, Oct 12, 2016 at 12:22 AM Alex Van Boxel  wrote:

> @bolke: it's the "Google Expert summit". It's a closed conference I'm
> afraid. Will work at my abstract this evening.
>
> On Wed, Oct 12, 2016 at 2:56 AM siddharth anand  wrote:
>
> > Wonderful!
> >
> > We have 3 speakers already. Paul, Alex, can you send a short talk summary
> > to Chris as soon as you can.
> >
> > -s
> >
> > On Tue, Oct 11, 2016 at 5:29 PM, Gurer Kiratli <
> > gurer.kira...@airbnb.com.invalid> wrote:
> >
> > > just talked with Paul Yang. He would like to have a 15 minute
> > > presentation/talk.
> > >
> > > Cheers,
> > >
> > > Gurer
> > >
> > > On Tue, Oct 11, 2016 at 2:44 PM, Chris Riccomini <
> criccom...@apache.org>
> > > wrote:
> > >
> > > > > Chris, I'm assuming that you will have one speaker from WePay and
> > Alex,
> > > > correct? So, you are only looking for one additional speaker?
> > > >
> > > > Correct.
> > > >
> > > > On Tue, Oct 11, 2016 at 2:36 PM, siddharth anand 
> > > > wrote:
> > > > > Chris, I'm assuming that you will have one speaker from WePay and
> > Alex,
> > > > > correct? So, you are only looking for one additional speaker?
> > > > >
> > > > > Bolke,
> > > > > QCon SF is Nov 7-9, though i suspect it's not close enough in time
> to
> > > > work
> > > > > for you.
> > > > > -s
> > > > >
> > > > > On Tue, Oct 11, 2016 at 1:46 PM, Chris Riccomini <
> > > criccom...@apache.org>
> > > > > wrote:
> > > > >>
> > > > >> I think Alex will be attending a conference in SF on the 14-15.
> Not
> > > > >> sure which one, though.
> > > > >>
> > > > >> On Tue, Oct 11, 2016 at 12:53 PM, Bolke de Bruin <
> bdbr...@gmail.com
> > >
> > > > >> wrote:
> > > > >> > Are there any conferences around this time? Then I might be able
> > to
> > > > join
> > > > >> > as well.
> > > > >> >
> > > > >> >
> > > > >> >> Op 11 okt. 2016, om 17:53 heeft Chris Riccomini <
> > > > criccom...@apache.org>
> > > > >> >> het volgende geschreven:
> > > > >> >>
> > > > >> >> @gurer, does anyone from AirBNB want to speak? :)
> > > > >> >>
> > > > >> >> On Tue, Oct 11, 2016 at 8:51 AM, Gurer Kiratli
> > > > >> >>  wrote:
> > > > >> >>> Thank you Chris!!
> > > > >> >>>
> > > > >> >>> On Tue, Oct 11, 2016 at 8:46 AM, Chris Riccomini
> > > > >> >>> 
> > > > >> >>> wrote:
> > > > >> 
> > > > >>  Hey all,
> > > > >> 
> > > > >>  I've booked the meetup!
> > > > >> 
> > > > >> 
> > > > >> 
> > > > >>  http://www.meetup.com/Bay-Area-Apache-Airflow-
> > > > Incubating-Meetup/events/234778571/
> > > > >> 
> > > > >>  November 16, 2016, 6:30 PM
> > > > >>  350 Convention Way, Redwood City, CA
> > > > >> 
> > > > >>  See you then!
> > > > >> 
> > > > >>  Cheers,
> > > > >>  Chris
> > > > >> 
> > > > >>  On Fri, Oct 7, 2016 at 1:08 PM, Chris Riccomini
> > > > >>  
> > > > >>  wrote:
> > > > >> > @sid Awesome! Will create the meetup event on Monday. Added
> to
> > > > TODO.
> > > > >> > :)
> > > > >> >
> > > > >> > @all please email if you'd like to speak. Will do three 15m
> > > talks
> > > > >> > with
> > > > >> > 5m q&a. Nice short-ish talks.
> > > > >> >
> > > > >> > On Fri, Oct 7, 2016 at 11:24 AM, siddharth anand <
> > > > san...@apache.org>
> > > > >> > wrote:
> > > > >> >> Great! I'm looking forward to the talk and to the meet-up.
> If
> > > you
> > > > >> >> are
> > > > >> >> interested in speaking at the WePay November meet-up,
> respond
> > > to
> > > > >> >> this
> > > > >> >> list.
> > > > >> >> Chris, can you go ahead and create the meet-up event?
> > > > >> >>
> > > > >> >> George, if you are still interested in a 2017 talk, go
> ahead
> > > and
> > > > >> >> add
> > > > >> >> your
> > > > >> >> name to the sign-up list. That will give you some time to
> > look
> > > > for
> > > > >> >> guest
> > > > >> >> speakers.
> > > > >> >>
> > > > >> >> -s
> > > > >> >>
> > > > >> >> On Fri, Oct 7, 2016 at 10:11 AM, Chris Riccomini
> > > > >> >> 
> > > > >> >> wrote:
> > > > >> >>
> > > > >> >>> @Alex, that would be awesome!
> > > > >> >>>
> > > > >> >>> On Fri, Oct 7, 2016 at 10:05 AM, Alex Van Boxel <
> > > > a...@vanboxel.be>
> > > > >> >>> wrote:
> > > > >>  I don't want to impose, but yes. Normally, like 95%
> chance
> > > > that I
> > > > >>  can
> > > > >> >>> make
> > > > >>  it. The thing is that I got a go to come but I'm waiting
> > for
> > > > >>  people
> > > > >>  to
> > > > >> >>> make
> > > > >>  the travel arrangements. Maybe I can give the story of
> how
> > > > we're
> > > > >>  moving
> > > > >>  from Luigi to Airflow :-)
> > > > >> >>

Re: scheduler questions

2016-10-13 Thread siddharth anand
I can't see an image.

We run most of our dags with depends_on_past=True.

If you want to chain your dag runs, such as not starting the first task of
your dag run start until the last task of your previous dag runs completes,
you can use an external task sensor. The external task sensor would be the
first task in the dag and would depend on the last task in the same dag
from the previous dag run. This is strict dag chaining.

If you just don't want the same task in the subsequent dag run to get
scheduled unless the first task completes, depends_on_past=True helps
there. This is more a cascading effect in the tree view.
-s

On Thu, Oct 13, 2016 at 12:41 PM, Boris Tyukin 
wrote:

> This is not what I see actually. I posted below my test DAG and a
> screenshot.
>
> It does create DAGRuns on subsequent runs - I modeled that scenario by
> commenting one bash command and uncommenting another one with Exit 1.
>
> it does not create Task Instances on subsequent failed DAGs but it does
> create DAGRuns and the first successful run after failed ones would not
> have execution timestamp from last successful run
>
>
> [image: Inline image 1]
>
>
> here is my test DAG
>
>
>
> from datetime import datetime, timedelta
>
> # Determine schedule:
> dag_schedule_interval = timedelta(seconds=60)
> dag_start_date = datetime.now() - dag_schedule_interval
>
>
> default_args = {
> 'owner': 'airflow',
> 'depends_on_past': True,
> 'start_date': dag_start_date,
> # 'start_date': datetime(2016, 10, 11, 17, 0),
> 'email': ['airf...@airflow.com'],
> 'email_on_failure': False,
> 'email_on_retry': False,
> 'retries': 0,
> 'retry_delay': timedelta(seconds=20),
> ,'only_run_latest'=True,
> # 'queue': 'bash_queue',
> # 'pool': 'backfill',
> # 'priority_weight': 10,
> # 'end_date': datetime(2016, 1, 1),
> }
>
> # Change version number if schedule needs to be changed:
> dag = DAG(
> 'pipeline1_v8', default_args=default_args, schedule_interval=dag_
> schedule_interval)
>
> dag.doc_md = __doc__
>
> # t1, t2 and t3 are examples of tasks created by instantiating operators
> t1 = BashOperator(
> task_id='t1',
> bash_command='echo execution ts {{ ts }} & echo 1',
> # bash_command='exit 1',
> dag=dag)
>
> On Thu, Oct 13, 2016 at 1:37 PM, siddharth anand 
> wrote:
>
>> If you use depends_on_past=True, it won't proceed to the next DAG Run if
>> the previous DAG Run failed. If Day 2 fails, Day 3 won't run.
>>
>> -s
>>
>> On Thu, Oct 13, 2016 at 10:34 AM, siddharth anand 
>> wrote:
>>
>> > Yes! It does work with Depends_on_past=True.
>> > -s
>> >
>> > On Thu, Oct 13, 2016 at 10:28 AM, Boris Tyukin 
>> > wrote:
>> >
>> >> thanks so much, Sid! just a follow up question on "Only_Run_Latest" -
>> will
>> >> it work with depend_on_past = True? or it will assume that DAG is used
>> >> False?
>> >>
>> >> On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand 
>> >> wrote:
>> >>
>> >> > Boris,
>> >> >
>> >> > *Question 1*
>> >> > Only_Run_Latest is in master -
>> >> > https://github.com/apache/incubator-airflow/commit/
>> >> > edf033be65b575f44aa221d5d0ec9ecb6b32c67a.
>> >> > That will solve your problem.
>> >> >
>> >> > Releases come out one a quarter sometimes once every 2 quarters, so I
>> >> would
>> >> > recommend that you run off master or off your own fork.
>> >> >
>> >> > You could also achieve this yourself with the following code
>> snippet. It
>> >> > uses a ShortCircuitOperator that will skip downstream tasks if the
>> >> DagRun
>> >> > being executed is not the current one. It will work for any schedule.
>> >> The
>> >> > code below has essentially been implemented in the LatestOnlyOperator
>> >> above
>> >> > for convenience.
>> >> >
>> >> > def skip_to_current_job(ds, **kwargs):
>> >> >
>> >> > now = datetime.now()
>> >> >
>> >> > left_window = kwargs['dag'].following_schedu
>> le(kwargs['execution_
>> >> > date'])
>> >> >
>> >> > right_window = kwargs['dag'].following_schedule(left_window)
>> >> >
>> >> > logging.info(('Left Window {}, Now {}, Right Window {}'
>> >> > ).format(left_window,now,right_window))
>> >> >
>> >> > if not now <= right_window:
>> >> >
>> >> > logging.info('Not latest execution, skipping downstream.')
>> >> >
>> >> > return False
>> >> >
>> >> > return True
>> >> >
>> >> >
>> >> > t0 = ShortCircuitOperator(
>> >> >
>> >> >   task_id = 'short_circuit_if_not_current,
>> >> >
>> >> >   provide_context = True,
>> >> >
>> >> >   python_callable = skip_to_current_job,
>> >> >
>> >> >   dag = dag
>> >> >
>> >> > )
>> >> >
>> >> >
>> >> > -s
>> >> >
>> >> >
>> >> > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin > >
>> >> > wrote:
>> >> >
>> >> > > Hello all and thanks for such an amazing project! I have been
>> >> evaluating
>> >> > > Airflow and spent a few days reading about it and playing with it
>> and
>> >> I
>> >> > > have a few questions that I struggle to understand.
>> >> > >
>> >> > > Let's say

Re: scheduler questions

2016-10-13 Thread Boris Tyukin
you rock, Sid! thanks for taking your time explaining it for me

On Thu, Oct 13, 2016 at 6:10 PM, siddharth anand  wrote:

> I can't see an image.
>
> We run most of our dags with depends_on_past=True.
>
> If you want to chain your dag runs, such as not starting the first task of
> your dag run start until the last task of your previous dag runs completes,
> you can use an external task sensor. The external task sensor would be the
> first task in the dag and would depend on the last task in the same dag
> from the previous dag run. This is strict dag chaining.
>
> If you just don't want the same task in the subsequent dag run to get
> scheduled unless the first task completes, depends_on_past=True helps
> there. This is more a cascading effect in the tree view.
> -s
>
> On Thu, Oct 13, 2016 at 12:41 PM, Boris Tyukin 
> wrote:
>
> > This is not what I see actually. I posted below my test DAG and a
> > screenshot.
> >
> > It does create DAGRuns on subsequent runs - I modeled that scenario by
> > commenting one bash command and uncommenting another one with Exit 1.
> >
> > it does not create Task Instances on subsequent failed DAGs but it does
> > create DAGRuns and the first successful run after failed ones would not
> > have execution timestamp from last successful run
> >
> >
> > [image: Inline image 1]
> >
> >
> > here is my test DAG
> >
> >
> >
> > from datetime import datetime, timedelta
> >
> > # Determine schedule:
> > dag_schedule_interval = timedelta(seconds=60)
> > dag_start_date = datetime.now() - dag_schedule_interval
> >
> >
> > default_args = {
> > 'owner': 'airflow',
> > 'depends_on_past': True,
> > 'start_date': dag_start_date,
> > # 'start_date': datetime(2016, 10, 11, 17, 0),
> > 'email': ['airf...@airflow.com'],
> > 'email_on_failure': False,
> > 'email_on_retry': False,
> > 'retries': 0,
> > 'retry_delay': timedelta(seconds=20),
> > ,'only_run_latest'=True,
> > # 'queue': 'bash_queue',
> > # 'pool': 'backfill',
> > # 'priority_weight': 10,
> > # 'end_date': datetime(2016, 1, 1),
> > }
> >
> > # Change version number if schedule needs to be changed:
> > dag = DAG(
> > 'pipeline1_v8', default_args=default_args, schedule_interval=dag_
> > schedule_interval)
> >
> > dag.doc_md = __doc__
> >
> > # t1, t2 and t3 are examples of tasks created by instantiating operators
> > t1 = BashOperator(
> > task_id='t1',
> > bash_command='echo execution ts {{ ts }} & echo 1',
> > # bash_command='exit 1',
> > dag=dag)
> >
> > On Thu, Oct 13, 2016 at 1:37 PM, siddharth anand 
> > wrote:
> >
> >> If you use depends_on_past=True, it won't proceed to the next DAG Run if
> >> the previous DAG Run failed. If Day 2 fails, Day 3 won't run.
> >>
> >> -s
> >>
> >> On Thu, Oct 13, 2016 at 10:34 AM, siddharth anand 
> >> wrote:
> >>
> >> > Yes! It does work with Depends_on_past=True.
> >> > -s
> >> >
> >> > On Thu, Oct 13, 2016 at 10:28 AM, Boris Tyukin  >
> >> > wrote:
> >> >
> >> >> thanks so much, Sid! just a follow up question on "Only_Run_Latest" -
> >> will
> >> >> it work with depend_on_past = True? or it will assume that DAG is
> used
> >> >> False?
> >> >>
> >> >> On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand 
> >> >> wrote:
> >> >>
> >> >> > Boris,
> >> >> >
> >> >> > *Question 1*
> >> >> > Only_Run_Latest is in master -
> >> >> > https://github.com/apache/incubator-airflow/commit/
> >> >> > edf033be65b575f44aa221d5d0ec9ecb6b32c67a.
> >> >> > That will solve your problem.
> >> >> >
> >> >> > Releases come out one a quarter sometimes once every 2 quarters,
> so I
> >> >> would
> >> >> > recommend that you run off master or off your own fork.
> >> >> >
> >> >> > You could also achieve this yourself with the following code
> >> snippet. It
> >> >> > uses a ShortCircuitOperator that will skip downstream tasks if the
> >> >> DagRun
> >> >> > being executed is not the current one. It will work for any
> schedule.
> >> >> The
> >> >> > code below has essentially been implemented in the
> LatestOnlyOperator
> >> >> above
> >> >> > for convenience.
> >> >> >
> >> >> > def skip_to_current_job(ds, **kwargs):
> >> >> >
> >> >> > now = datetime.now()
> >> >> >
> >> >> > left_window = kwargs['dag'].following_schedu
> >> le(kwargs['execution_
> >> >> > date'])
> >> >> >
> >> >> > right_window = kwargs['dag'].following_schedule(left_window)
> >> >> >
> >> >> > logging.info(('Left Window {}, Now {}, Right Window {}'
> >> >> > ).format(left_window,now,right_window))
> >> >> >
> >> >> > if not now <= right_window:
> >> >> >
> >> >> > logging.info('Not latest execution, skipping downstream.')
> >> >> >
> >> >> > return False
> >> >> >
> >> >> > return True
> >> >> >
> >> >> >
> >> >> > t0 = ShortCircuitOperator(
> >> >> >
> >> >> >   task_id = 'short_circuit_if_not_current,
> >> >> >
> >> >> >   provide_context = True,
> >> >> >
> >> >> >   python_callable = skip_to_current_job,
> >> >> >
> >> >> >   dag

A question/poll on the TaskInstance data model...

2016-10-13 Thread Ben Tallman
I (and Apigee) would like to have the DAG Graph paint old DagRuns based on
the tasks (and ids) that ran, and not based off of the current DAG from the
DagBag. In order to do that, I need to be able to map a DagRun, and one way
is from the TaskInstance table. However that doesn't actually contain links
between tasks, but it could.

Does anyone feel strongly against storing upstream and downstream task_ids
in the taskinstance table as a first step? Our goal is NOT to be able to
rerun the past, but to be able to see the past (and provide links to the
taskinstance details and logs.

Thanks,
Ben

*--*
*ben tallman* | *apigee
*
 | m: +1.503.680.5709 | o: +1.503.608.7552 | twitter @anonymousmanage

 @apigee