Re: scheduler questions

2016-10-17 Thread Maycock, Luke
This is great! I have also upvoted 
https://github.com/apache/incubator-airflow/pull/1830.


If I would like to get feedback on a design or if I have an Airflow question, 
is emailing this dev list the correct way to go?


Cheers,
Luke Maycock
OLIVER WYMAN
luke.mayc...@affiliate.oliverwyman.com<mailto:luke.mayc...@affiliate.oliverwyman.com>
www.oliverwyman.com<http://www.oliverwyman.com/>




From: siddharth anand <san...@apache.org>
Sent: 13 October 2016 18:11
To: dev@airflow.incubator.apache.org
Subject: Re: scheduler questions

Boris,

*Question 1*
Only_Run_Latest is in master -
https://github.com/apache/incubator-airflow/commit/edf033be65b575f44aa221d5d0ec9ecb6b32c67a.
That will solve your problem.

Releases come out one a quarter sometimes once every 2 quarters, so I would
recommend that you run off master or off your own fork.

You could also achieve this yourself with the following code snippet. It
uses a ShortCircuitOperator that will skip downstream tasks if the DagRun
being executed is not the current one. It will work for any schedule. The
code below has essentially been implemented in the LatestOnlyOperator above
for convenience.

def skip_to_current_job(ds, **kwargs):

   now = datetime.now()

   left_window = kwargs['dag'].following_schedule(kwargs['execution_date'])

   right_window = kwargs['dag'].following_schedule(left_window)

   logging.info(('Left Window {}, Now {}, Right Window {}'
).format(left_window,now,right_window))

   if not now <= right_window:

   logging.info('Not latest execution, skipping downstream.')

   return False

   return True


t0 = ShortCircuitOperator(

 task_id = 'short_circuit_if_not_current,

 provide_context = True,

 python_callable = skip_to_current_job,

 dag = dag

)


-s


On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin <bo...@boristyukin.com> wrote:

> Hello all and thanks for such an amazing project! I have been evaluating
> Airflow and spent a few days reading about it and playing with it and I
> have a few questions that I struggle to understand.
>
> Let's say I have a simple DAG that runs once a day and it is doing a full
> reload of tables from the source database so the process is not
> incremental.
>
> Let's consider this scenario:
>
> Day 1 - OK
>
> Day 2 - airflow scheduler or server with airflow is down for some reason
> ((or
> DAG is paused)
>
> Day 3 - still down(or DAG is paused)
>
> Day 4 - server is up and now needs to run missing jobs.
>
>
> How can I make airflow to run only Day 4 job and not backfill Day 2 and 3?
>
>
> I tried to do depend_on_past = True but it does not seem to do this trick.
>
>
> I also found in a roadmap doc this but seems it is not made to the release
> yet:
>
>
>  Only Run Latest - Champion : Sid
>
> • For cases where we need to only run the latest in a series of task
> instance runs and mark the others as skipped. For example, we may have job
> to execute a DB snapshot every day. If the DAG is paused for 5 days and
> then unpaused, we don’t want to run all 5, just the latest. With this
> feature, we will provide “cron” functionality for task scheduling that is
> not related to ETL
>
>
> My second question, what if I have another DAG that does incremental loads
> from a source table:
>
>
> Day 1 - OK, loaded new/changed data for previous day
>
> Day 2 - source system is down (or DAG is paused), Airflow DagRun failed
>
> Day 3 - source system is down (or DAG is paused), Airflow DagRun failed
>
> Day 4 - source system is up, Airflow Dagrun succeeded
>
>
> My problem (unless I am missing something), Airflow on Day 4 would use
> execution time from Day 3, so the interval for incremental load would be
> since the last run (which was Failed). My hope it would use the last
> _successful_ run so on Day 4 it would go back to Day 1. Is it possible to
> achieve this?
>
> I am aware of a manual backfill command via CLI but I am not sure I want to
> use due to all the issues and inconsistencies I've read about it.
>
> Thanks!
>


This e-mail and any attachments may be confidential or legally privileged. If 
you received this message in error or are not the intended recipient, you 
should destroy the e-mail message and any attachments or copies, and you are 
prohibited from retaining, distributing, disclosing or using any information 
contained herein. Please inform us of the erroneous delivery by return e-mail. 
Thank you for your cooperation.


Re: scheduler questions

2016-10-13 Thread Boris Tyukin
you rock, Sid! thanks for taking your time explaining it for me

On Thu, Oct 13, 2016 at 6:10 PM, siddharth anand  wrote:

> I can't see an image.
>
> We run most of our dags with depends_on_past=True.
>
> If you want to chain your dag runs, such as not starting the first task of
> your dag run start until the last task of your previous dag runs completes,
> you can use an external task sensor. The external task sensor would be the
> first task in the dag and would depend on the last task in the same dag
> from the previous dag run. This is strict dag chaining.
>
> If you just don't want the same task in the subsequent dag run to get
> scheduled unless the first task completes, depends_on_past=True helps
> there. This is more a cascading effect in the tree view.
> -s
>
> On Thu, Oct 13, 2016 at 12:41 PM, Boris Tyukin 
> wrote:
>
> > This is not what I see actually. I posted below my test DAG and a
> > screenshot.
> >
> > It does create DAGRuns on subsequent runs - I modeled that scenario by
> > commenting one bash command and uncommenting another one with Exit 1.
> >
> > it does not create Task Instances on subsequent failed DAGs but it does
> > create DAGRuns and the first successful run after failed ones would not
> > have execution timestamp from last successful run
> >
> >
> > [image: Inline image 1]
> >
> >
> > here is my test DAG
> >
> >
> >
> > from datetime import datetime, timedelta
> >
> > # Determine schedule:
> > dag_schedule_interval = timedelta(seconds=60)
> > dag_start_date = datetime.now() - dag_schedule_interval
> >
> >
> > default_args = {
> > 'owner': 'airflow',
> > 'depends_on_past': True,
> > 'start_date': dag_start_date,
> > # 'start_date': datetime(2016, 10, 11, 17, 0),
> > 'email': ['airf...@airflow.com'],
> > 'email_on_failure': False,
> > 'email_on_retry': False,
> > 'retries': 0,
> > 'retry_delay': timedelta(seconds=20),
> > ,'only_run_latest'=True,
> > # 'queue': 'bash_queue',
> > # 'pool': 'backfill',
> > # 'priority_weight': 10,
> > # 'end_date': datetime(2016, 1, 1),
> > }
> >
> > # Change version number if schedule needs to be changed:
> > dag = DAG(
> > 'pipeline1_v8', default_args=default_args, schedule_interval=dag_
> > schedule_interval)
> >
> > dag.doc_md = __doc__
> >
> > # t1, t2 and t3 are examples of tasks created by instantiating operators
> > t1 = BashOperator(
> > task_id='t1',
> > bash_command='echo execution ts {{ ts }} & echo 1',
> > # bash_command='exit 1',
> > dag=dag)
> >
> > On Thu, Oct 13, 2016 at 1:37 PM, siddharth anand 
> > wrote:
> >
> >> If you use depends_on_past=True, it won't proceed to the next DAG Run if
> >> the previous DAG Run failed. If Day 2 fails, Day 3 won't run.
> >>
> >> -s
> >>
> >> On Thu, Oct 13, 2016 at 10:34 AM, siddharth anand 
> >> wrote:
> >>
> >> > Yes! It does work with Depends_on_past=True.
> >> > -s
> >> >
> >> > On Thu, Oct 13, 2016 at 10:28 AM, Boris Tyukin  >
> >> > wrote:
> >> >
> >> >> thanks so much, Sid! just a follow up question on "Only_Run_Latest" -
> >> will
> >> >> it work with depend_on_past = True? or it will assume that DAG is
> used
> >> >> False?
> >> >>
> >> >> On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand 
> >> >> wrote:
> >> >>
> >> >> > Boris,
> >> >> >
> >> >> > *Question 1*
> >> >> > Only_Run_Latest is in master -
> >> >> > https://github.com/apache/incubator-airflow/commit/
> >> >> > edf033be65b575f44aa221d5d0ec9ecb6b32c67a.
> >> >> > That will solve your problem.
> >> >> >
> >> >> > Releases come out one a quarter sometimes once every 2 quarters,
> so I
> >> >> would
> >> >> > recommend that you run off master or off your own fork.
> >> >> >
> >> >> > You could also achieve this yourself with the following code
> >> snippet. It
> >> >> > uses a ShortCircuitOperator that will skip downstream tasks if the
> >> >> DagRun
> >> >> > being executed is not the current one. It will work for any
> schedule.
> >> >> The
> >> >> > code below has essentially been implemented in the
> LatestOnlyOperator
> >> >> above
> >> >> > for convenience.
> >> >> >
> >> >> > def skip_to_current_job(ds, **kwargs):
> >> >> >
> >> >> > now = datetime.now()
> >> >> >
> >> >> > left_window = kwargs['dag'].following_schedu
> >> le(kwargs['execution_
> >> >> > date'])
> >> >> >
> >> >> > right_window = kwargs['dag'].following_schedule(left_window)
> >> >> >
> >> >> > logging.info(('Left Window {}, Now {}, Right Window {}'
> >> >> > ).format(left_window,now,right_window))
> >> >> >
> >> >> > if not now <= right_window:
> >> >> >
> >> >> > logging.info('Not latest execution, skipping downstream.')
> >> >> >
> >> >> > return False
> >> >> >
> >> >> > return True
> >> >> >
> >> >> >
> >> >> > t0 = ShortCircuitOperator(
> >> >> >
> >> >> >   task_id = 'short_circuit_if_not_current,
> >> >> 

Re: scheduler questions

2016-10-13 Thread siddharth anand
I can't see an image.

We run most of our dags with depends_on_past=True.

If you want to chain your dag runs, such as not starting the first task of
your dag run start until the last task of your previous dag runs completes,
you can use an external task sensor. The external task sensor would be the
first task in the dag and would depend on the last task in the same dag
from the previous dag run. This is strict dag chaining.

If you just don't want the same task in the subsequent dag run to get
scheduled unless the first task completes, depends_on_past=True helps
there. This is more a cascading effect in the tree view.
-s

On Thu, Oct 13, 2016 at 12:41 PM, Boris Tyukin 
wrote:

> This is not what I see actually. I posted below my test DAG and a
> screenshot.
>
> It does create DAGRuns on subsequent runs - I modeled that scenario by
> commenting one bash command and uncommenting another one with Exit 1.
>
> it does not create Task Instances on subsequent failed DAGs but it does
> create DAGRuns and the first successful run after failed ones would not
> have execution timestamp from last successful run
>
>
> [image: Inline image 1]
>
>
> here is my test DAG
>
>
>
> from datetime import datetime, timedelta
>
> # Determine schedule:
> dag_schedule_interval = timedelta(seconds=60)
> dag_start_date = datetime.now() - dag_schedule_interval
>
>
> default_args = {
> 'owner': 'airflow',
> 'depends_on_past': True,
> 'start_date': dag_start_date,
> # 'start_date': datetime(2016, 10, 11, 17, 0),
> 'email': ['airf...@airflow.com'],
> 'email_on_failure': False,
> 'email_on_retry': False,
> 'retries': 0,
> 'retry_delay': timedelta(seconds=20),
> ,'only_run_latest'=True,
> # 'queue': 'bash_queue',
> # 'pool': 'backfill',
> # 'priority_weight': 10,
> # 'end_date': datetime(2016, 1, 1),
> }
>
> # Change version number if schedule needs to be changed:
> dag = DAG(
> 'pipeline1_v8', default_args=default_args, schedule_interval=dag_
> schedule_interval)
>
> dag.doc_md = __doc__
>
> # t1, t2 and t3 are examples of tasks created by instantiating operators
> t1 = BashOperator(
> task_id='t1',
> bash_command='echo execution ts {{ ts }} & echo 1',
> # bash_command='exit 1',
> dag=dag)
>
> On Thu, Oct 13, 2016 at 1:37 PM, siddharth anand 
> wrote:
>
>> If you use depends_on_past=True, it won't proceed to the next DAG Run if
>> the previous DAG Run failed. If Day 2 fails, Day 3 won't run.
>>
>> -s
>>
>> On Thu, Oct 13, 2016 at 10:34 AM, siddharth anand 
>> wrote:
>>
>> > Yes! It does work with Depends_on_past=True.
>> > -s
>> >
>> > On Thu, Oct 13, 2016 at 10:28 AM, Boris Tyukin 
>> > wrote:
>> >
>> >> thanks so much, Sid! just a follow up question on "Only_Run_Latest" -
>> will
>> >> it work with depend_on_past = True? or it will assume that DAG is used
>> >> False?
>> >>
>> >> On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand 
>> >> wrote:
>> >>
>> >> > Boris,
>> >> >
>> >> > *Question 1*
>> >> > Only_Run_Latest is in master -
>> >> > https://github.com/apache/incubator-airflow/commit/
>> >> > edf033be65b575f44aa221d5d0ec9ecb6b32c67a.
>> >> > That will solve your problem.
>> >> >
>> >> > Releases come out one a quarter sometimes once every 2 quarters, so I
>> >> would
>> >> > recommend that you run off master or off your own fork.
>> >> >
>> >> > You could also achieve this yourself with the following code
>> snippet. It
>> >> > uses a ShortCircuitOperator that will skip downstream tasks if the
>> >> DagRun
>> >> > being executed is not the current one. It will work for any schedule.
>> >> The
>> >> > code below has essentially been implemented in the LatestOnlyOperator
>> >> above
>> >> > for convenience.
>> >> >
>> >> > def skip_to_current_job(ds, **kwargs):
>> >> >
>> >> > now = datetime.now()
>> >> >
>> >> > left_window = kwargs['dag'].following_schedu
>> le(kwargs['execution_
>> >> > date'])
>> >> >
>> >> > right_window = kwargs['dag'].following_schedule(left_window)
>> >> >
>> >> > logging.info(('Left Window {}, Now {}, Right Window {}'
>> >> > ).format(left_window,now,right_window))
>> >> >
>> >> > if not now <= right_window:
>> >> >
>> >> > logging.info('Not latest execution, skipping downstream.')
>> >> >
>> >> > return False
>> >> >
>> >> > return True
>> >> >
>> >> >
>> >> > t0 = ShortCircuitOperator(
>> >> >
>> >> >   task_id = 'short_circuit_if_not_current,
>> >> >
>> >> >   provide_context = True,
>> >> >
>> >> >   python_callable = skip_to_current_job,
>> >> >
>> >> >   dag = dag
>> >> >
>> >> > )
>> >> >
>> >> >
>> >> > -s
>> >> >
>> >> >
>> >> > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin > >
>> >> > wrote:
>> >> >
>> >> > > Hello all and thanks for such an amazing project! I have been
>> >> evaluating
>> >> > > Airflow and spent a few days reading about 

Re: scheduler questions

2016-10-13 Thread Boris Tyukin
This is not what I see actually. I posted below my test DAG and a
screenshot.

It does create DAGRuns on subsequent runs - I modeled that scenario by
commenting one bash command and uncommenting another one with Exit 1.

it does not create Task Instances on subsequent failed DAGs but it does
create DAGRuns and the first successful run after failed ones would not
have execution timestamp from last successful run


[image: Inline image 1]


here is my test DAG



from datetime import datetime, timedelta

# Determine schedule:
dag_schedule_interval = timedelta(seconds=60)
dag_start_date = datetime.now() - dag_schedule_interval


default_args = {
'owner': 'airflow',
'depends_on_past': True,
'start_date': dag_start_date,
# 'start_date': datetime(2016, 10, 11, 17, 0),
'email': ['airf...@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(seconds=20),
,'only_run_latest'=True,
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}

# Change version number if schedule needs to be changed:
dag = DAG(
'pipeline1_v8', default_args=default_args,
schedule_interval=dag_schedule_interval)

dag.doc_md = __doc__

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='t1',
bash_command='echo execution ts {{ ts }} & echo 1',
# bash_command='exit 1',
dag=dag)

On Thu, Oct 13, 2016 at 1:37 PM, siddharth anand  wrote:

> If you use depends_on_past=True, it won't proceed to the next DAG Run if
> the previous DAG Run failed. If Day 2 fails, Day 3 won't run.
>
> -s
>
> On Thu, Oct 13, 2016 at 10:34 AM, siddharth anand 
> wrote:
>
> > Yes! It does work with Depends_on_past=True.
> > -s
> >
> > On Thu, Oct 13, 2016 at 10:28 AM, Boris Tyukin 
> > wrote:
> >
> >> thanks so much, Sid! just a follow up question on "Only_Run_Latest" -
> will
> >> it work with depend_on_past = True? or it will assume that DAG is used
> >> False?
> >>
> >> On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand 
> >> wrote:
> >>
> >> > Boris,
> >> >
> >> > *Question 1*
> >> > Only_Run_Latest is in master -
> >> > https://github.com/apache/incubator-airflow/commit/
> >> > edf033be65b575f44aa221d5d0ec9ecb6b32c67a.
> >> > That will solve your problem.
> >> >
> >> > Releases come out one a quarter sometimes once every 2 quarters, so I
> >> would
> >> > recommend that you run off master or off your own fork.
> >> >
> >> > You could also achieve this yourself with the following code snippet.
> It
> >> > uses a ShortCircuitOperator that will skip downstream tasks if the
> >> DagRun
> >> > being executed is not the current one. It will work for any schedule.
> >> The
> >> > code below has essentially been implemented in the LatestOnlyOperator
> >> above
> >> > for convenience.
> >> >
> >> > def skip_to_current_job(ds, **kwargs):
> >> >
> >> > now = datetime.now()
> >> >
> >> > left_window = kwargs['dag'].following_schedule(kwargs['execution_
> >> > date'])
> >> >
> >> > right_window = kwargs['dag'].following_schedule(left_window)
> >> >
> >> > logging.info(('Left Window {}, Now {}, Right Window {}'
> >> > ).format(left_window,now,right_window))
> >> >
> >> > if not now <= right_window:
> >> >
> >> > logging.info('Not latest execution, skipping downstream.')
> >> >
> >> > return False
> >> >
> >> > return True
> >> >
> >> >
> >> > t0 = ShortCircuitOperator(
> >> >
> >> >   task_id = 'short_circuit_if_not_current,
> >> >
> >> >   provide_context = True,
> >> >
> >> >   python_callable = skip_to_current_job,
> >> >
> >> >   dag = dag
> >> >
> >> > )
> >> >
> >> >
> >> > -s
> >> >
> >> >
> >> > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin 
> >> > wrote:
> >> >
> >> > > Hello all and thanks for such an amazing project! I have been
> >> evaluating
> >> > > Airflow and spent a few days reading about it and playing with it
> and
> >> I
> >> > > have a few questions that I struggle to understand.
> >> > >
> >> > > Let's say I have a simple DAG that runs once a day and it is doing a
> >> full
> >> > > reload of tables from the source database so the process is not
> >> > > incremental.
> >> > >
> >> > > Let's consider this scenario:
> >> > >
> >> > > Day 1 - OK
> >> > >
> >> > > Day 2 - airflow scheduler or server with airflow is down for some
> >> reason
> >> > > ((or
> >> > > DAG is paused)
> >> > >
> >> > > Day 3 - still down(or DAG is paused)
> >> > >
> >> > > Day 4 - server is up and now needs to run missing jobs.
> >> > >
> >> > >
> >> > > How can I make airflow to run only Day 4 job and not backfill Day 2
> >> and
> >> > 3?
> >> > >
> >> > >
> >> > > I tried to do depend_on_past = True but it does not seem to do this
> >> > trick.
> >> > >
> >> > >
> >> > > I also found in a roadmap doc 

Re: scheduler questions

2016-10-13 Thread siddharth anand
If you use depends_on_past=True, it won't proceed to the next DAG Run if
the previous DAG Run failed. If Day 2 fails, Day 3 won't run.

-s

On Thu, Oct 13, 2016 at 10:34 AM, siddharth anand  wrote:

> Yes! It does work with Depends_on_past=True.
> -s
>
> On Thu, Oct 13, 2016 at 10:28 AM, Boris Tyukin 
> wrote:
>
>> thanks so much, Sid! just a follow up question on "Only_Run_Latest" - will
>> it work with depend_on_past = True? or it will assume that DAG is used
>> False?
>>
>> On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand 
>> wrote:
>>
>> > Boris,
>> >
>> > *Question 1*
>> > Only_Run_Latest is in master -
>> > https://github.com/apache/incubator-airflow/commit/
>> > edf033be65b575f44aa221d5d0ec9ecb6b32c67a.
>> > That will solve your problem.
>> >
>> > Releases come out one a quarter sometimes once every 2 quarters, so I
>> would
>> > recommend that you run off master or off your own fork.
>> >
>> > You could also achieve this yourself with the following code snippet. It
>> > uses a ShortCircuitOperator that will skip downstream tasks if the
>> DagRun
>> > being executed is not the current one. It will work for any schedule.
>> The
>> > code below has essentially been implemented in the LatestOnlyOperator
>> above
>> > for convenience.
>> >
>> > def skip_to_current_job(ds, **kwargs):
>> >
>> > now = datetime.now()
>> >
>> > left_window = kwargs['dag'].following_schedule(kwargs['execution_
>> > date'])
>> >
>> > right_window = kwargs['dag'].following_schedule(left_window)
>> >
>> > logging.info(('Left Window {}, Now {}, Right Window {}'
>> > ).format(left_window,now,right_window))
>> >
>> > if not now <= right_window:
>> >
>> > logging.info('Not latest execution, skipping downstream.')
>> >
>> > return False
>> >
>> > return True
>> >
>> >
>> > t0 = ShortCircuitOperator(
>> >
>> >   task_id = 'short_circuit_if_not_current,
>> >
>> >   provide_context = True,
>> >
>> >   python_callable = skip_to_current_job,
>> >
>> >   dag = dag
>> >
>> > )
>> >
>> >
>> > -s
>> >
>> >
>> > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin 
>> > wrote:
>> >
>> > > Hello all and thanks for such an amazing project! I have been
>> evaluating
>> > > Airflow and spent a few days reading about it and playing with it and
>> I
>> > > have a few questions that I struggle to understand.
>> > >
>> > > Let's say I have a simple DAG that runs once a day and it is doing a
>> full
>> > > reload of tables from the source database so the process is not
>> > > incremental.
>> > >
>> > > Let's consider this scenario:
>> > >
>> > > Day 1 - OK
>> > >
>> > > Day 2 - airflow scheduler or server with airflow is down for some
>> reason
>> > > ((or
>> > > DAG is paused)
>> > >
>> > > Day 3 - still down(or DAG is paused)
>> > >
>> > > Day 4 - server is up and now needs to run missing jobs.
>> > >
>> > >
>> > > How can I make airflow to run only Day 4 job and not backfill Day 2
>> and
>> > 3?
>> > >
>> > >
>> > > I tried to do depend_on_past = True but it does not seem to do this
>> > trick.
>> > >
>> > >
>> > > I also found in a roadmap doc this but seems it is not made to the
>> > release
>> > > yet:
>> > >
>> > >
>> > >  Only Run Latest - Champion : Sid
>> > >
>> > > • For cases where we need to only run the latest in a series of task
>> > > instance runs and mark the others as skipped. For example, we may have
>> > job
>> > > to execute a DB snapshot every day. If the DAG is paused for 5 days
>> and
>> > > then unpaused, we don’t want to run all 5, just the latest. With this
>> > > feature, we will provide “cron” functionality for task scheduling
>> that is
>> > > not related to ETL
>> > >
>> > >
>> > > My second question, what if I have another DAG that does incremental
>> > loads
>> > > from a source table:
>> > >
>> > >
>> > > Day 1 - OK, loaded new/changed data for previous day
>> > >
>> > > Day 2 - source system is down (or DAG is paused), Airflow DagRun
>> failed
>> > >
>> > > Day 3 - source system is down (or DAG is paused), Airflow DagRun
>> failed
>> > >
>> > > Day 4 - source system is up, Airflow Dagrun succeeded
>> > >
>> > >
>> > > My problem (unless I am missing something), Airflow on Day 4 would use
>> > > execution time from Day 3, so the interval for incremental load would
>> be
>> > > since the last run (which was Failed). My hope it would use the last
>> > > _successful_ run so on Day 4 it would go back to Day 1. Is it
>> possible to
>> > > achieve this?
>> > >
>> > > I am aware of a manual backfill command via CLI but I am not sure I
>> want
>> > to
>> > > use due to all the issues and inconsistencies I've read about it.
>> > >
>> > > Thanks!
>> > >
>> >
>>
>
>


Re: scheduler questions

2016-10-13 Thread siddharth anand
Yes! It does work with Depends_on_past=True.
-s

On Thu, Oct 13, 2016 at 10:28 AM, Boris Tyukin 
wrote:

> thanks so much, Sid! just a follow up question on "Only_Run_Latest" - will
> it work with depend_on_past = True? or it will assume that DAG is used
> False?
>
> On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand 
> wrote:
>
> > Boris,
> >
> > *Question 1*
> > Only_Run_Latest is in master -
> > https://github.com/apache/incubator-airflow/commit/
> > edf033be65b575f44aa221d5d0ec9ecb6b32c67a.
> > That will solve your problem.
> >
> > Releases come out one a quarter sometimes once every 2 quarters, so I
> would
> > recommend that you run off master or off your own fork.
> >
> > You could also achieve this yourself with the following code snippet. It
> > uses a ShortCircuitOperator that will skip downstream tasks if the DagRun
> > being executed is not the current one. It will work for any schedule. The
> > code below has essentially been implemented in the LatestOnlyOperator
> above
> > for convenience.
> >
> > def skip_to_current_job(ds, **kwargs):
> >
> > now = datetime.now()
> >
> > left_window = kwargs['dag'].following_schedule(kwargs['execution_
> > date'])
> >
> > right_window = kwargs['dag'].following_schedule(left_window)
> >
> > logging.info(('Left Window {}, Now {}, Right Window {}'
> > ).format(left_window,now,right_window))
> >
> > if not now <= right_window:
> >
> > logging.info('Not latest execution, skipping downstream.')
> >
> > return False
> >
> > return True
> >
> >
> > t0 = ShortCircuitOperator(
> >
> >   task_id = 'short_circuit_if_not_current,
> >
> >   provide_context = True,
> >
> >   python_callable = skip_to_current_job,
> >
> >   dag = dag
> >
> > )
> >
> >
> > -s
> >
> >
> > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin 
> > wrote:
> >
> > > Hello all and thanks for such an amazing project! I have been
> evaluating
> > > Airflow and spent a few days reading about it and playing with it and I
> > > have a few questions that I struggle to understand.
> > >
> > > Let's say I have a simple DAG that runs once a day and it is doing a
> full
> > > reload of tables from the source database so the process is not
> > > incremental.
> > >
> > > Let's consider this scenario:
> > >
> > > Day 1 - OK
> > >
> > > Day 2 - airflow scheduler or server with airflow is down for some
> reason
> > > ((or
> > > DAG is paused)
> > >
> > > Day 3 - still down(or DAG is paused)
> > >
> > > Day 4 - server is up and now needs to run missing jobs.
> > >
> > >
> > > How can I make airflow to run only Day 4 job and not backfill Day 2 and
> > 3?
> > >
> > >
> > > I tried to do depend_on_past = True but it does not seem to do this
> > trick.
> > >
> > >
> > > I also found in a roadmap doc this but seems it is not made to the
> > release
> > > yet:
> > >
> > >
> > >  Only Run Latest - Champion : Sid
> > >
> > > • For cases where we need to only run the latest in a series of task
> > > instance runs and mark the others as skipped. For example, we may have
> > job
> > > to execute a DB snapshot every day. If the DAG is paused for 5 days and
> > > then unpaused, we don’t want to run all 5, just the latest. With this
> > > feature, we will provide “cron” functionality for task scheduling that
> is
> > > not related to ETL
> > >
> > >
> > > My second question, what if I have another DAG that does incremental
> > loads
> > > from a source table:
> > >
> > >
> > > Day 1 - OK, loaded new/changed data for previous day
> > >
> > > Day 2 - source system is down (or DAG is paused), Airflow DagRun failed
> > >
> > > Day 3 - source system is down (or DAG is paused), Airflow DagRun failed
> > >
> > > Day 4 - source system is up, Airflow Dagrun succeeded
> > >
> > >
> > > My problem (unless I am missing something), Airflow on Day 4 would use
> > > execution time from Day 3, so the interval for incremental load would
> be
> > > since the last run (which was Failed). My hope it would use the last
> > > _successful_ run so on Day 4 it would go back to Day 1. Is it possible
> to
> > > achieve this?
> > >
> > > I am aware of a manual backfill command via CLI but I am not sure I
> want
> > to
> > > use due to all the issues and inconsistencies I've read about it.
> > >
> > > Thanks!
> > >
> >
>


Re: scheduler questions

2016-10-13 Thread Boris Tyukin
so for my second scenario, I think i would still need to run missing days
jobs one by one (by clearing the failed ones) and I understand this is
recommended approach as I figured from Maxime's video.

But sometimes it is more efficient to combine all missing day runs in one
so I would be using a window for incremental process as the last successful
job (Day 1) to current run (Day 4) - so it will be only one DagRun not 3 to
catch up. Does it make sense? is it possible?

On Thu, Oct 13, 2016 at 1:16 PM, siddharth anand  wrote:

> *Question 2*
> You can use depend_on_past=True. Then, future dag runs won't be scheduled
> until past one succeed, which I specify as shown below:
>
> default_args = {
> 'owner': 'sanand',
> 'depends_on_past': True,
> 'pool': 'ep_data_pipeline',
> 'start_date': START_DATE,
> 'email': [import_ep_pipeline_alert_email_dl],
> 'email_on_failure': import_airflow_enable_notifications,
> 'email_on_retry': import_airflow_enable_notifications,
> 'retries': 3,
> 'retry_delay': timedelta(seconds=30),
> 'priority_weight': import_airflow_priority_weight}dag =
> DAG(DAG_NAME, schedule_interval='@hourly', default_args=default_args,
> sla_miss_callback=sla_alert_func)
>
>
>
> I also use retries to sidestep intermittent issues. If you need to retry a
> failed dag run, you can clear that dag run in the UI (or CLI) and the
> scheduler will rerun it.
>
> On Thu, Oct 13, 2016 at 10:11 AM, siddharth anand 
> wrote:
>
> > Boris,
> >
> > *Question 1*
> > Only_Run_Latest is in master - https://github.com/apache/
> > incubator-airflow/commit/edf033be65b575f44aa221d5d0ec9ecb6b32c67a. That
> > will solve your problem.
> >
> > Releases come out one a quarter sometimes once every 2 quarters, so I
> > would recommend that you run off master or off your own fork.
> >
> > You could also achieve this yourself with the following code snippet. It
> > uses a ShortCircuitOperator that will skip downstream tasks if the DagRun
> > being executed is not the current one. It will work for any schedule. The
> > code below has essentially been implemented in the LatestOnlyOperator
> above
> > for convenience.
> >
> > def skip_to_current_job(ds, **kwargs):
> >
> > now = datetime.now()
> >
> > left_window = kwargs['dag'].following_schedule(kwargs['execution_
> date'
> > ])
> >
> > right_window = kwargs['dag'].following_schedule(left_window)
> >
> > logging.info(('Left Window {}, Now {}, Right Window {}'
> > ).format(left_window,now,right_window))
> >
> > if not now <= right_window:
> >
> > logging.info('Not latest execution, skipping downstream.')
> >
> > return False
> >
> > return True
> >
> >
> > t0 = ShortCircuitOperator(
> >
> >   task_id = 'short_circuit_if_not_current,
> >
> >   provide_context = True,
> >
> >   python_callable = skip_to_current_job,
> >
> >   dag = dag
> >
> > )
> >
> >
> > -s
> >
> >
> > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin 
> > wrote:
> >
> >> Hello all and thanks for such an amazing project! I have been evaluating
> >> Airflow and spent a few days reading about it and playing with it and I
> >> have a few questions that I struggle to understand.
> >>
> >> Let's say I have a simple DAG that runs once a day and it is doing a
> full
> >> reload of tables from the source database so the process is not
> >> incremental.
> >>
> >> Let's consider this scenario:
> >>
> >> Day 1 - OK
> >>
> >> Day 2 - airflow scheduler or server with airflow is down for some reason
> >> ((or
> >> DAG is paused)
> >>
> >> Day 3 - still down(or DAG is paused)
> >>
> >> Day 4 - server is up and now needs to run missing jobs.
> >>
> >>
> >> How can I make airflow to run only Day 4 job and not backfill Day 2 and
> 3?
> >>
> >>
> >> I tried to do depend_on_past = True but it does not seem to do this
> trick.
> >>
> >>
> >> I also found in a roadmap doc this but seems it is not made to the
> release
> >> yet:
> >>
> >>
> >>  Only Run Latest - Champion : Sid
> >>
> >> • For cases where we need to only run the latest in a series of task
> >> instance runs and mark the others as skipped. For example, we may have
> job
> >> to execute a DB snapshot every day. If the DAG is paused for 5 days and
> >> then unpaused, we don’t want to run all 5, just the latest. With this
> >> feature, we will provide “cron” functionality for task scheduling that
> is
> >> not related to ETL
> >>
> >>
> >> My second question, what if I have another DAG that does incremental
> loads
> >> from a source table:
> >>
> >>
> >> Day 1 - OK, loaded new/changed data for previous day
> >>
> >> Day 2 - source system is down (or DAG is paused), Airflow DagRun failed
> >>
> >> Day 3 - source system is down (or DAG is paused), Airflow DagRun failed
> >>
> >> Day 4 - source system is up, Airflow Dagrun succeeded
> >>
> >>
> >> My problem (unless I am missing something), Airflow on Day 4 would use
> >> 

Re: scheduler questions

2016-10-13 Thread Joseph Napolitano
Hi Boris,

To answer the first question, the backfill command has a flag to mark jobs
as successful without running them.  Take care to align the start and end
times precisely as needed.  As an example, for a job that runs daily at 7am:

airflow backfill -s 2016-10-07T07 -e 2016-10-10T07 my-dag-name -m

The "-m" parameter tells Airflow to mark it successful without running it.

On Thu, Oct 13, 2016 at 10:46 AM, Boris Tyukin 
wrote:

> Hello all and thanks for such an amazing project! I have been evaluating
> Airflow and spent a few days reading about it and playing with it and I
> have a few questions that I struggle to understand.
>
> Let's say I have a simple DAG that runs once a day and it is doing a full
> reload of tables from the source database so the process is not
> incremental.
>
> Let's consider this scenario:
>
> Day 1 - OK
>
> Day 2 - airflow scheduler or server with airflow is down for some reason
> ((or
> DAG is paused)
>
> Day 3 - still down(or DAG is paused)
>
> Day 4 - server is up and now needs to run missing jobs.
>
>
> How can I make airflow to run only Day 4 job and not backfill Day 2 and 3?
>
>
> I tried to do depend_on_past = True but it does not seem to do this trick.
>
>
> I also found in a roadmap doc this but seems it is not made to the release
> yet:
>
>
>  Only Run Latest - Champion : Sid
>
> • For cases where we need to only run the latest in a series of task
> instance runs and mark the others as skipped. For example, we may have job
> to execute a DB snapshot every day. If the DAG is paused for 5 days and
> then unpaused, we don’t want to run all 5, just the latest. With this
> feature, we will provide “cron” functionality for task scheduling that is
> not related to ETL
>
>
> My second question, what if I have another DAG that does incremental loads
> from a source table:
>
>
> Day 1 - OK, loaded new/changed data for previous day
>
> Day 2 - source system is down (or DAG is paused), Airflow DagRun failed
>
> Day 3 - source system is down (or DAG is paused), Airflow DagRun failed
>
> Day 4 - source system is up, Airflow Dagrun succeeded
>
>
> My problem (unless I am missing something), Airflow on Day 4 would use
> execution time from Day 3, so the interval for incremental load would be
> since the last run (which was Failed). My hope it would use the last
> _successful_ run so on Day 4 it would go back to Day 1. Is it possible to
> achieve this?
>
> I am aware of a manual backfill command via CLI but I am not sure I want to
> use due to all the issues and inconsistencies I've read about it.
>
> Thanks!
>



-- 
*Joe Napolitano *| Sr. Data Engineer
www.blueapron.com | 5 Crosby Street, New York, NY 10013


scheduler questions

2016-10-13 Thread Boris Tyukin
Hello all and thanks for such an amazing project! I have been evaluating
Airflow and spent a few days reading about it and playing with it and I
have a few questions that I struggle to understand.

Let's say I have a simple DAG that runs once a day and it is doing a full
reload of tables from the source database so the process is not incremental.

Let's consider this scenario:

Day 1 - OK

Day 2 - airflow scheduler or server with airflow is down for some reason ((or
DAG is paused)

Day 3 - still down(or DAG is paused)

Day 4 - server is up and now needs to run missing jobs.


How can I make airflow to run only Day 4 job and not backfill Day 2 and 3?


I tried to do depend_on_past = True but it does not seem to do this trick.


I also found in a roadmap doc this but seems it is not made to the release
yet:


 Only Run Latest - Champion : Sid

• For cases where we need to only run the latest in a series of task
instance runs and mark the others as skipped. For example, we may have job
to execute a DB snapshot every day. If the DAG is paused for 5 days and
then unpaused, we don’t want to run all 5, just the latest. With this
feature, we will provide “cron” functionality for task scheduling that is
not related to ETL


My second question, what if I have another DAG that does incremental loads
from a source table:


Day 1 - OK, loaded new/changed data for previous day

Day 2 - source system is down (or DAG is paused), Airflow DagRun failed

Day 3 - source system is down (or DAG is paused), Airflow DagRun failed

Day 4 - source system is up, Airflow Dagrun succeeded


My problem (unless I am missing something), Airflow on Day 4 would use
execution time from Day 3, so the interval for incremental load would be
since the last run (which was Failed). My hope it would use the last
_successful_ run so on Day 4 it would go back to Day 1. Is it possible to
achieve this?

I am aware of a manual backfill command via CLI but I am not sure I want to
use due to all the issues and inconsistencies I've read about it.

Thanks!