Re: scheduler questions
This is great! I have also upvoted https://github.com/apache/incubator-airflow/pull/1830. If I would like to get feedback on a design or if I have an Airflow question, is emailing this dev list the correct way to go? Cheers, Luke Maycock OLIVER WYMAN luke.mayc...@affiliate.oliverwyman.com<mailto:luke.mayc...@affiliate.oliverwyman.com> www.oliverwyman.com<http://www.oliverwyman.com/> From: siddharth anand <san...@apache.org> Sent: 13 October 2016 18:11 To: dev@airflow.incubator.apache.org Subject: Re: scheduler questions Boris, *Question 1* Only_Run_Latest is in master - https://github.com/apache/incubator-airflow/commit/edf033be65b575f44aa221d5d0ec9ecb6b32c67a. That will solve your problem. Releases come out one a quarter sometimes once every 2 quarters, so I would recommend that you run off master or off your own fork. You could also achieve this yourself with the following code snippet. It uses a ShortCircuitOperator that will skip downstream tasks if the DagRun being executed is not the current one. It will work for any schedule. The code below has essentially been implemented in the LatestOnlyOperator above for convenience. def skip_to_current_job(ds, **kwargs): now = datetime.now() left_window = kwargs['dag'].following_schedule(kwargs['execution_date']) right_window = kwargs['dag'].following_schedule(left_window) logging.info(('Left Window {}, Now {}, Right Window {}' ).format(left_window,now,right_window)) if not now <= right_window: logging.info('Not latest execution, skipping downstream.') return False return True t0 = ShortCircuitOperator( task_id = 'short_circuit_if_not_current, provide_context = True, python_callable = skip_to_current_job, dag = dag ) -s On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin <bo...@boristyukin.com> wrote: > Hello all and thanks for such an amazing project! I have been evaluating > Airflow and spent a few days reading about it and playing with it and I > have a few questions that I struggle to understand. > > Let's say I have a simple DAG that runs once a day and it is doing a full > reload of tables from the source database so the process is not > incremental. > > Let's consider this scenario: > > Day 1 - OK > > Day 2 - airflow scheduler or server with airflow is down for some reason > ((or > DAG is paused) > > Day 3 - still down(or DAG is paused) > > Day 4 - server is up and now needs to run missing jobs. > > > How can I make airflow to run only Day 4 job and not backfill Day 2 and 3? > > > I tried to do depend_on_past = True but it does not seem to do this trick. > > > I also found in a roadmap doc this but seems it is not made to the release > yet: > > > Only Run Latest - Champion : Sid > > • For cases where we need to only run the latest in a series of task > instance runs and mark the others as skipped. For example, we may have job > to execute a DB snapshot every day. If the DAG is paused for 5 days and > then unpaused, we don’t want to run all 5, just the latest. With this > feature, we will provide “cron” functionality for task scheduling that is > not related to ETL > > > My second question, what if I have another DAG that does incremental loads > from a source table: > > > Day 1 - OK, loaded new/changed data for previous day > > Day 2 - source system is down (or DAG is paused), Airflow DagRun failed > > Day 3 - source system is down (or DAG is paused), Airflow DagRun failed > > Day 4 - source system is up, Airflow Dagrun succeeded > > > My problem (unless I am missing something), Airflow on Day 4 would use > execution time from Day 3, so the interval for incremental load would be > since the last run (which was Failed). My hope it would use the last > _successful_ run so on Day 4 it would go back to Day 1. Is it possible to > achieve this? > > I am aware of a manual backfill command via CLI but I am not sure I want to > use due to all the issues and inconsistencies I've read about it. > > Thanks! > This e-mail and any attachments may be confidential or legally privileged. If you received this message in error or are not the intended recipient, you should destroy the e-mail message and any attachments or copies, and you are prohibited from retaining, distributing, disclosing or using any information contained herein. Please inform us of the erroneous delivery by return e-mail. Thank you for your cooperation.
Re: scheduler questions
you rock, Sid! thanks for taking your time explaining it for me On Thu, Oct 13, 2016 at 6:10 PM, siddharth anandwrote: > I can't see an image. > > We run most of our dags with depends_on_past=True. > > If you want to chain your dag runs, such as not starting the first task of > your dag run start until the last task of your previous dag runs completes, > you can use an external task sensor. The external task sensor would be the > first task in the dag and would depend on the last task in the same dag > from the previous dag run. This is strict dag chaining. > > If you just don't want the same task in the subsequent dag run to get > scheduled unless the first task completes, depends_on_past=True helps > there. This is more a cascading effect in the tree view. > -s > > On Thu, Oct 13, 2016 at 12:41 PM, Boris Tyukin > wrote: > > > This is not what I see actually. I posted below my test DAG and a > > screenshot. > > > > It does create DAGRuns on subsequent runs - I modeled that scenario by > > commenting one bash command and uncommenting another one with Exit 1. > > > > it does not create Task Instances on subsequent failed DAGs but it does > > create DAGRuns and the first successful run after failed ones would not > > have execution timestamp from last successful run > > > > > > [image: Inline image 1] > > > > > > here is my test DAG > > > > > > > > from datetime import datetime, timedelta > > > > # Determine schedule: > > dag_schedule_interval = timedelta(seconds=60) > > dag_start_date = datetime.now() - dag_schedule_interval > > > > > > default_args = { > > 'owner': 'airflow', > > 'depends_on_past': True, > > 'start_date': dag_start_date, > > # 'start_date': datetime(2016, 10, 11, 17, 0), > > 'email': ['airf...@airflow.com'], > > 'email_on_failure': False, > > 'email_on_retry': False, > > 'retries': 0, > > 'retry_delay': timedelta(seconds=20), > > ,'only_run_latest'=True, > > # 'queue': 'bash_queue', > > # 'pool': 'backfill', > > # 'priority_weight': 10, > > # 'end_date': datetime(2016, 1, 1), > > } > > > > # Change version number if schedule needs to be changed: > > dag = DAG( > > 'pipeline1_v8', default_args=default_args, schedule_interval=dag_ > > schedule_interval) > > > > dag.doc_md = __doc__ > > > > # t1, t2 and t3 are examples of tasks created by instantiating operators > > t1 = BashOperator( > > task_id='t1', > > bash_command='echo execution ts {{ ts }} & echo 1', > > # bash_command='exit 1', > > dag=dag) > > > > On Thu, Oct 13, 2016 at 1:37 PM, siddharth anand > > wrote: > > > >> If you use depends_on_past=True, it won't proceed to the next DAG Run if > >> the previous DAG Run failed. If Day 2 fails, Day 3 won't run. > >> > >> -s > >> > >> On Thu, Oct 13, 2016 at 10:34 AM, siddharth anand > >> wrote: > >> > >> > Yes! It does work with Depends_on_past=True. > >> > -s > >> > > >> > On Thu, Oct 13, 2016 at 10:28 AM, Boris Tyukin > > >> > wrote: > >> > > >> >> thanks so much, Sid! just a follow up question on "Only_Run_Latest" - > >> will > >> >> it work with depend_on_past = True? or it will assume that DAG is > used > >> >> False? > >> >> > >> >> On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand > >> >> wrote: > >> >> > >> >> > Boris, > >> >> > > >> >> > *Question 1* > >> >> > Only_Run_Latest is in master - > >> >> > https://github.com/apache/incubator-airflow/commit/ > >> >> > edf033be65b575f44aa221d5d0ec9ecb6b32c67a. > >> >> > That will solve your problem. > >> >> > > >> >> > Releases come out one a quarter sometimes once every 2 quarters, > so I > >> >> would > >> >> > recommend that you run off master or off your own fork. > >> >> > > >> >> > You could also achieve this yourself with the following code > >> snippet. It > >> >> > uses a ShortCircuitOperator that will skip downstream tasks if the > >> >> DagRun > >> >> > being executed is not the current one. It will work for any > schedule. > >> >> The > >> >> > code below has essentially been implemented in the > LatestOnlyOperator > >> >> above > >> >> > for convenience. > >> >> > > >> >> > def skip_to_current_job(ds, **kwargs): > >> >> > > >> >> > now = datetime.now() > >> >> > > >> >> > left_window = kwargs['dag'].following_schedu > >> le(kwargs['execution_ > >> >> > date']) > >> >> > > >> >> > right_window = kwargs['dag'].following_schedule(left_window) > >> >> > > >> >> > logging.info(('Left Window {}, Now {}, Right Window {}' > >> >> > ).format(left_window,now,right_window)) > >> >> > > >> >> > if not now <= right_window: > >> >> > > >> >> > logging.info('Not latest execution, skipping downstream.') > >> >> > > >> >> > return False > >> >> > > >> >> > return True > >> >> > > >> >> > > >> >> > t0 = ShortCircuitOperator( > >> >> > > >> >> > task_id = 'short_circuit_if_not_current, > >> >>
Re: scheduler questions
I can't see an image. We run most of our dags with depends_on_past=True. If you want to chain your dag runs, such as not starting the first task of your dag run start until the last task of your previous dag runs completes, you can use an external task sensor. The external task sensor would be the first task in the dag and would depend on the last task in the same dag from the previous dag run. This is strict dag chaining. If you just don't want the same task in the subsequent dag run to get scheduled unless the first task completes, depends_on_past=True helps there. This is more a cascading effect in the tree view. -s On Thu, Oct 13, 2016 at 12:41 PM, Boris Tyukinwrote: > This is not what I see actually. I posted below my test DAG and a > screenshot. > > It does create DAGRuns on subsequent runs - I modeled that scenario by > commenting one bash command and uncommenting another one with Exit 1. > > it does not create Task Instances on subsequent failed DAGs but it does > create DAGRuns and the first successful run after failed ones would not > have execution timestamp from last successful run > > > [image: Inline image 1] > > > here is my test DAG > > > > from datetime import datetime, timedelta > > # Determine schedule: > dag_schedule_interval = timedelta(seconds=60) > dag_start_date = datetime.now() - dag_schedule_interval > > > default_args = { > 'owner': 'airflow', > 'depends_on_past': True, > 'start_date': dag_start_date, > # 'start_date': datetime(2016, 10, 11, 17, 0), > 'email': ['airf...@airflow.com'], > 'email_on_failure': False, > 'email_on_retry': False, > 'retries': 0, > 'retry_delay': timedelta(seconds=20), > ,'only_run_latest'=True, > # 'queue': 'bash_queue', > # 'pool': 'backfill', > # 'priority_weight': 10, > # 'end_date': datetime(2016, 1, 1), > } > > # Change version number if schedule needs to be changed: > dag = DAG( > 'pipeline1_v8', default_args=default_args, schedule_interval=dag_ > schedule_interval) > > dag.doc_md = __doc__ > > # t1, t2 and t3 are examples of tasks created by instantiating operators > t1 = BashOperator( > task_id='t1', > bash_command='echo execution ts {{ ts }} & echo 1', > # bash_command='exit 1', > dag=dag) > > On Thu, Oct 13, 2016 at 1:37 PM, siddharth anand > wrote: > >> If you use depends_on_past=True, it won't proceed to the next DAG Run if >> the previous DAG Run failed. If Day 2 fails, Day 3 won't run. >> >> -s >> >> On Thu, Oct 13, 2016 at 10:34 AM, siddharth anand >> wrote: >> >> > Yes! It does work with Depends_on_past=True. >> > -s >> > >> > On Thu, Oct 13, 2016 at 10:28 AM, Boris Tyukin >> > wrote: >> > >> >> thanks so much, Sid! just a follow up question on "Only_Run_Latest" - >> will >> >> it work with depend_on_past = True? or it will assume that DAG is used >> >> False? >> >> >> >> On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand >> >> wrote: >> >> >> >> > Boris, >> >> > >> >> > *Question 1* >> >> > Only_Run_Latest is in master - >> >> > https://github.com/apache/incubator-airflow/commit/ >> >> > edf033be65b575f44aa221d5d0ec9ecb6b32c67a. >> >> > That will solve your problem. >> >> > >> >> > Releases come out one a quarter sometimes once every 2 quarters, so I >> >> would >> >> > recommend that you run off master or off your own fork. >> >> > >> >> > You could also achieve this yourself with the following code >> snippet. It >> >> > uses a ShortCircuitOperator that will skip downstream tasks if the >> >> DagRun >> >> > being executed is not the current one. It will work for any schedule. >> >> The >> >> > code below has essentially been implemented in the LatestOnlyOperator >> >> above >> >> > for convenience. >> >> > >> >> > def skip_to_current_job(ds, **kwargs): >> >> > >> >> > now = datetime.now() >> >> > >> >> > left_window = kwargs['dag'].following_schedu >> le(kwargs['execution_ >> >> > date']) >> >> > >> >> > right_window = kwargs['dag'].following_schedule(left_window) >> >> > >> >> > logging.info(('Left Window {}, Now {}, Right Window {}' >> >> > ).format(left_window,now,right_window)) >> >> > >> >> > if not now <= right_window: >> >> > >> >> > logging.info('Not latest execution, skipping downstream.') >> >> > >> >> > return False >> >> > >> >> > return True >> >> > >> >> > >> >> > t0 = ShortCircuitOperator( >> >> > >> >> > task_id = 'short_circuit_if_not_current, >> >> > >> >> > provide_context = True, >> >> > >> >> > python_callable = skip_to_current_job, >> >> > >> >> > dag = dag >> >> > >> >> > ) >> >> > >> >> > >> >> > -s >> >> > >> >> > >> >> > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin > > >> >> > wrote: >> >> > >> >> > > Hello all and thanks for such an amazing project! I have been >> >> evaluating >> >> > > Airflow and spent a few days reading about
Re: scheduler questions
This is not what I see actually. I posted below my test DAG and a screenshot. It does create DAGRuns on subsequent runs - I modeled that scenario by commenting one bash command and uncommenting another one with Exit 1. it does not create Task Instances on subsequent failed DAGs but it does create DAGRuns and the first successful run after failed ones would not have execution timestamp from last successful run [image: Inline image 1] here is my test DAG from datetime import datetime, timedelta # Determine schedule: dag_schedule_interval = timedelta(seconds=60) dag_start_date = datetime.now() - dag_schedule_interval default_args = { 'owner': 'airflow', 'depends_on_past': True, 'start_date': dag_start_date, # 'start_date': datetime(2016, 10, 11, 17, 0), 'email': ['airf...@airflow.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, 'retry_delay': timedelta(seconds=20), ,'only_run_latest'=True, # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } # Change version number if schedule needs to be changed: dag = DAG( 'pipeline1_v8', default_args=default_args, schedule_interval=dag_schedule_interval) dag.doc_md = __doc__ # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator( task_id='t1', bash_command='echo execution ts {{ ts }} & echo 1', # bash_command='exit 1', dag=dag) On Thu, Oct 13, 2016 at 1:37 PM, siddharth anandwrote: > If you use depends_on_past=True, it won't proceed to the next DAG Run if > the previous DAG Run failed. If Day 2 fails, Day 3 won't run. > > -s > > On Thu, Oct 13, 2016 at 10:34 AM, siddharth anand > wrote: > > > Yes! It does work with Depends_on_past=True. > > -s > > > > On Thu, Oct 13, 2016 at 10:28 AM, Boris Tyukin > > wrote: > > > >> thanks so much, Sid! just a follow up question on "Only_Run_Latest" - > will > >> it work with depend_on_past = True? or it will assume that DAG is used > >> False? > >> > >> On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand > >> wrote: > >> > >> > Boris, > >> > > >> > *Question 1* > >> > Only_Run_Latest is in master - > >> > https://github.com/apache/incubator-airflow/commit/ > >> > edf033be65b575f44aa221d5d0ec9ecb6b32c67a. > >> > That will solve your problem. > >> > > >> > Releases come out one a quarter sometimes once every 2 quarters, so I > >> would > >> > recommend that you run off master or off your own fork. > >> > > >> > You could also achieve this yourself with the following code snippet. > It > >> > uses a ShortCircuitOperator that will skip downstream tasks if the > >> DagRun > >> > being executed is not the current one. It will work for any schedule. > >> The > >> > code below has essentially been implemented in the LatestOnlyOperator > >> above > >> > for convenience. > >> > > >> > def skip_to_current_job(ds, **kwargs): > >> > > >> > now = datetime.now() > >> > > >> > left_window = kwargs['dag'].following_schedule(kwargs['execution_ > >> > date']) > >> > > >> > right_window = kwargs['dag'].following_schedule(left_window) > >> > > >> > logging.info(('Left Window {}, Now {}, Right Window {}' > >> > ).format(left_window,now,right_window)) > >> > > >> > if not now <= right_window: > >> > > >> > logging.info('Not latest execution, skipping downstream.') > >> > > >> > return False > >> > > >> > return True > >> > > >> > > >> > t0 = ShortCircuitOperator( > >> > > >> > task_id = 'short_circuit_if_not_current, > >> > > >> > provide_context = True, > >> > > >> > python_callable = skip_to_current_job, > >> > > >> > dag = dag > >> > > >> > ) > >> > > >> > > >> > -s > >> > > >> > > >> > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin > >> > wrote: > >> > > >> > > Hello all and thanks for such an amazing project! I have been > >> evaluating > >> > > Airflow and spent a few days reading about it and playing with it > and > >> I > >> > > have a few questions that I struggle to understand. > >> > > > >> > > Let's say I have a simple DAG that runs once a day and it is doing a > >> full > >> > > reload of tables from the source database so the process is not > >> > > incremental. > >> > > > >> > > Let's consider this scenario: > >> > > > >> > > Day 1 - OK > >> > > > >> > > Day 2 - airflow scheduler or server with airflow is down for some > >> reason > >> > > ((or > >> > > DAG is paused) > >> > > > >> > > Day 3 - still down(or DAG is paused) > >> > > > >> > > Day 4 - server is up and now needs to run missing jobs. > >> > > > >> > > > >> > > How can I make airflow to run only Day 4 job and not backfill Day 2 > >> and > >> > 3? > >> > > > >> > > > >> > > I tried to do depend_on_past = True but it does not seem to do this > >> > trick. > >> > > > >> > > > >> > > I also found in a roadmap doc
Re: scheduler questions
If you use depends_on_past=True, it won't proceed to the next DAG Run if the previous DAG Run failed. If Day 2 fails, Day 3 won't run. -s On Thu, Oct 13, 2016 at 10:34 AM, siddharth anandwrote: > Yes! It does work with Depends_on_past=True. > -s > > On Thu, Oct 13, 2016 at 10:28 AM, Boris Tyukin > wrote: > >> thanks so much, Sid! just a follow up question on "Only_Run_Latest" - will >> it work with depend_on_past = True? or it will assume that DAG is used >> False? >> >> On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand >> wrote: >> >> > Boris, >> > >> > *Question 1* >> > Only_Run_Latest is in master - >> > https://github.com/apache/incubator-airflow/commit/ >> > edf033be65b575f44aa221d5d0ec9ecb6b32c67a. >> > That will solve your problem. >> > >> > Releases come out one a quarter sometimes once every 2 quarters, so I >> would >> > recommend that you run off master or off your own fork. >> > >> > You could also achieve this yourself with the following code snippet. It >> > uses a ShortCircuitOperator that will skip downstream tasks if the >> DagRun >> > being executed is not the current one. It will work for any schedule. >> The >> > code below has essentially been implemented in the LatestOnlyOperator >> above >> > for convenience. >> > >> > def skip_to_current_job(ds, **kwargs): >> > >> > now = datetime.now() >> > >> > left_window = kwargs['dag'].following_schedule(kwargs['execution_ >> > date']) >> > >> > right_window = kwargs['dag'].following_schedule(left_window) >> > >> > logging.info(('Left Window {}, Now {}, Right Window {}' >> > ).format(left_window,now,right_window)) >> > >> > if not now <= right_window: >> > >> > logging.info('Not latest execution, skipping downstream.') >> > >> > return False >> > >> > return True >> > >> > >> > t0 = ShortCircuitOperator( >> > >> > task_id = 'short_circuit_if_not_current, >> > >> > provide_context = True, >> > >> > python_callable = skip_to_current_job, >> > >> > dag = dag >> > >> > ) >> > >> > >> > -s >> > >> > >> > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin >> > wrote: >> > >> > > Hello all and thanks for such an amazing project! I have been >> evaluating >> > > Airflow and spent a few days reading about it and playing with it and >> I >> > > have a few questions that I struggle to understand. >> > > >> > > Let's say I have a simple DAG that runs once a day and it is doing a >> full >> > > reload of tables from the source database so the process is not >> > > incremental. >> > > >> > > Let's consider this scenario: >> > > >> > > Day 1 - OK >> > > >> > > Day 2 - airflow scheduler or server with airflow is down for some >> reason >> > > ((or >> > > DAG is paused) >> > > >> > > Day 3 - still down(or DAG is paused) >> > > >> > > Day 4 - server is up and now needs to run missing jobs. >> > > >> > > >> > > How can I make airflow to run only Day 4 job and not backfill Day 2 >> and >> > 3? >> > > >> > > >> > > I tried to do depend_on_past = True but it does not seem to do this >> > trick. >> > > >> > > >> > > I also found in a roadmap doc this but seems it is not made to the >> > release >> > > yet: >> > > >> > > >> > > Only Run Latest - Champion : Sid >> > > >> > > • For cases where we need to only run the latest in a series of task >> > > instance runs and mark the others as skipped. For example, we may have >> > job >> > > to execute a DB snapshot every day. If the DAG is paused for 5 days >> and >> > > then unpaused, we don’t want to run all 5, just the latest. With this >> > > feature, we will provide “cron” functionality for task scheduling >> that is >> > > not related to ETL >> > > >> > > >> > > My second question, what if I have another DAG that does incremental >> > loads >> > > from a source table: >> > > >> > > >> > > Day 1 - OK, loaded new/changed data for previous day >> > > >> > > Day 2 - source system is down (or DAG is paused), Airflow DagRun >> failed >> > > >> > > Day 3 - source system is down (or DAG is paused), Airflow DagRun >> failed >> > > >> > > Day 4 - source system is up, Airflow Dagrun succeeded >> > > >> > > >> > > My problem (unless I am missing something), Airflow on Day 4 would use >> > > execution time from Day 3, so the interval for incremental load would >> be >> > > since the last run (which was Failed). My hope it would use the last >> > > _successful_ run so on Day 4 it would go back to Day 1. Is it >> possible to >> > > achieve this? >> > > >> > > I am aware of a manual backfill command via CLI but I am not sure I >> want >> > to >> > > use due to all the issues and inconsistencies I've read about it. >> > > >> > > Thanks! >> > > >> > >> > >
Re: scheduler questions
Yes! It does work with Depends_on_past=True. -s On Thu, Oct 13, 2016 at 10:28 AM, Boris Tyukinwrote: > thanks so much, Sid! just a follow up question on "Only_Run_Latest" - will > it work with depend_on_past = True? or it will assume that DAG is used > False? > > On Thu, Oct 13, 2016 at 1:11 PM, siddharth anand > wrote: > > > Boris, > > > > *Question 1* > > Only_Run_Latest is in master - > > https://github.com/apache/incubator-airflow/commit/ > > edf033be65b575f44aa221d5d0ec9ecb6b32c67a. > > That will solve your problem. > > > > Releases come out one a quarter sometimes once every 2 quarters, so I > would > > recommend that you run off master or off your own fork. > > > > You could also achieve this yourself with the following code snippet. It > > uses a ShortCircuitOperator that will skip downstream tasks if the DagRun > > being executed is not the current one. It will work for any schedule. The > > code below has essentially been implemented in the LatestOnlyOperator > above > > for convenience. > > > > def skip_to_current_job(ds, **kwargs): > > > > now = datetime.now() > > > > left_window = kwargs['dag'].following_schedule(kwargs['execution_ > > date']) > > > > right_window = kwargs['dag'].following_schedule(left_window) > > > > logging.info(('Left Window {}, Now {}, Right Window {}' > > ).format(left_window,now,right_window)) > > > > if not now <= right_window: > > > > logging.info('Not latest execution, skipping downstream.') > > > > return False > > > > return True > > > > > > t0 = ShortCircuitOperator( > > > > task_id = 'short_circuit_if_not_current, > > > > provide_context = True, > > > > python_callable = skip_to_current_job, > > > > dag = dag > > > > ) > > > > > > -s > > > > > > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin > > wrote: > > > > > Hello all and thanks for such an amazing project! I have been > evaluating > > > Airflow and spent a few days reading about it and playing with it and I > > > have a few questions that I struggle to understand. > > > > > > Let's say I have a simple DAG that runs once a day and it is doing a > full > > > reload of tables from the source database so the process is not > > > incremental. > > > > > > Let's consider this scenario: > > > > > > Day 1 - OK > > > > > > Day 2 - airflow scheduler or server with airflow is down for some > reason > > > ((or > > > DAG is paused) > > > > > > Day 3 - still down(or DAG is paused) > > > > > > Day 4 - server is up and now needs to run missing jobs. > > > > > > > > > How can I make airflow to run only Day 4 job and not backfill Day 2 and > > 3? > > > > > > > > > I tried to do depend_on_past = True but it does not seem to do this > > trick. > > > > > > > > > I also found in a roadmap doc this but seems it is not made to the > > release > > > yet: > > > > > > > > > Only Run Latest - Champion : Sid > > > > > > • For cases where we need to only run the latest in a series of task > > > instance runs and mark the others as skipped. For example, we may have > > job > > > to execute a DB snapshot every day. If the DAG is paused for 5 days and > > > then unpaused, we don’t want to run all 5, just the latest. With this > > > feature, we will provide “cron” functionality for task scheduling that > is > > > not related to ETL > > > > > > > > > My second question, what if I have another DAG that does incremental > > loads > > > from a source table: > > > > > > > > > Day 1 - OK, loaded new/changed data for previous day > > > > > > Day 2 - source system is down (or DAG is paused), Airflow DagRun failed > > > > > > Day 3 - source system is down (or DAG is paused), Airflow DagRun failed > > > > > > Day 4 - source system is up, Airflow Dagrun succeeded > > > > > > > > > My problem (unless I am missing something), Airflow on Day 4 would use > > > execution time from Day 3, so the interval for incremental load would > be > > > since the last run (which was Failed). My hope it would use the last > > > _successful_ run so on Day 4 it would go back to Day 1. Is it possible > to > > > achieve this? > > > > > > I am aware of a manual backfill command via CLI but I am not sure I > want > > to > > > use due to all the issues and inconsistencies I've read about it. > > > > > > Thanks! > > > > > >
Re: scheduler questions
so for my second scenario, I think i would still need to run missing days jobs one by one (by clearing the failed ones) and I understand this is recommended approach as I figured from Maxime's video. But sometimes it is more efficient to combine all missing day runs in one so I would be using a window for incremental process as the last successful job (Day 1) to current run (Day 4) - so it will be only one DagRun not 3 to catch up. Does it make sense? is it possible? On Thu, Oct 13, 2016 at 1:16 PM, siddharth anandwrote: > *Question 2* > You can use depend_on_past=True. Then, future dag runs won't be scheduled > until past one succeed, which I specify as shown below: > > default_args = { > 'owner': 'sanand', > 'depends_on_past': True, > 'pool': 'ep_data_pipeline', > 'start_date': START_DATE, > 'email': [import_ep_pipeline_alert_email_dl], > 'email_on_failure': import_airflow_enable_notifications, > 'email_on_retry': import_airflow_enable_notifications, > 'retries': 3, > 'retry_delay': timedelta(seconds=30), > 'priority_weight': import_airflow_priority_weight}dag = > DAG(DAG_NAME, schedule_interval='@hourly', default_args=default_args, > sla_miss_callback=sla_alert_func) > > > > I also use retries to sidestep intermittent issues. If you need to retry a > failed dag run, you can clear that dag run in the UI (or CLI) and the > scheduler will rerun it. > > On Thu, Oct 13, 2016 at 10:11 AM, siddharth anand > wrote: > > > Boris, > > > > *Question 1* > > Only_Run_Latest is in master - https://github.com/apache/ > > incubator-airflow/commit/edf033be65b575f44aa221d5d0ec9ecb6b32c67a. That > > will solve your problem. > > > > Releases come out one a quarter sometimes once every 2 quarters, so I > > would recommend that you run off master or off your own fork. > > > > You could also achieve this yourself with the following code snippet. It > > uses a ShortCircuitOperator that will skip downstream tasks if the DagRun > > being executed is not the current one. It will work for any schedule. The > > code below has essentially been implemented in the LatestOnlyOperator > above > > for convenience. > > > > def skip_to_current_job(ds, **kwargs): > > > > now = datetime.now() > > > > left_window = kwargs['dag'].following_schedule(kwargs['execution_ > date' > > ]) > > > > right_window = kwargs['dag'].following_schedule(left_window) > > > > logging.info(('Left Window {}, Now {}, Right Window {}' > > ).format(left_window,now,right_window)) > > > > if not now <= right_window: > > > > logging.info('Not latest execution, skipping downstream.') > > > > return False > > > > return True > > > > > > t0 = ShortCircuitOperator( > > > > task_id = 'short_circuit_if_not_current, > > > > provide_context = True, > > > > python_callable = skip_to_current_job, > > > > dag = dag > > > > ) > > > > > > -s > > > > > > On Thu, Oct 13, 2016 at 7:46 AM, Boris Tyukin > > wrote: > > > >> Hello all and thanks for such an amazing project! I have been evaluating > >> Airflow and spent a few days reading about it and playing with it and I > >> have a few questions that I struggle to understand. > >> > >> Let's say I have a simple DAG that runs once a day and it is doing a > full > >> reload of tables from the source database so the process is not > >> incremental. > >> > >> Let's consider this scenario: > >> > >> Day 1 - OK > >> > >> Day 2 - airflow scheduler or server with airflow is down for some reason > >> ((or > >> DAG is paused) > >> > >> Day 3 - still down(or DAG is paused) > >> > >> Day 4 - server is up and now needs to run missing jobs. > >> > >> > >> How can I make airflow to run only Day 4 job and not backfill Day 2 and > 3? > >> > >> > >> I tried to do depend_on_past = True but it does not seem to do this > trick. > >> > >> > >> I also found in a roadmap doc this but seems it is not made to the > release > >> yet: > >> > >> > >> Only Run Latest - Champion : Sid > >> > >> • For cases where we need to only run the latest in a series of task > >> instance runs and mark the others as skipped. For example, we may have > job > >> to execute a DB snapshot every day. If the DAG is paused for 5 days and > >> then unpaused, we don’t want to run all 5, just the latest. With this > >> feature, we will provide “cron” functionality for task scheduling that > is > >> not related to ETL > >> > >> > >> My second question, what if I have another DAG that does incremental > loads > >> from a source table: > >> > >> > >> Day 1 - OK, loaded new/changed data for previous day > >> > >> Day 2 - source system is down (or DAG is paused), Airflow DagRun failed > >> > >> Day 3 - source system is down (or DAG is paused), Airflow DagRun failed > >> > >> Day 4 - source system is up, Airflow Dagrun succeeded > >> > >> > >> My problem (unless I am missing something), Airflow on Day 4 would use > >>
Re: scheduler questions
Hi Boris, To answer the first question, the backfill command has a flag to mark jobs as successful without running them. Take care to align the start and end times precisely as needed. As an example, for a job that runs daily at 7am: airflow backfill -s 2016-10-07T07 -e 2016-10-10T07 my-dag-name -m The "-m" parameter tells Airflow to mark it successful without running it. On Thu, Oct 13, 2016 at 10:46 AM, Boris Tyukinwrote: > Hello all and thanks for such an amazing project! I have been evaluating > Airflow and spent a few days reading about it and playing with it and I > have a few questions that I struggle to understand. > > Let's say I have a simple DAG that runs once a day and it is doing a full > reload of tables from the source database so the process is not > incremental. > > Let's consider this scenario: > > Day 1 - OK > > Day 2 - airflow scheduler or server with airflow is down for some reason > ((or > DAG is paused) > > Day 3 - still down(or DAG is paused) > > Day 4 - server is up and now needs to run missing jobs. > > > How can I make airflow to run only Day 4 job and not backfill Day 2 and 3? > > > I tried to do depend_on_past = True but it does not seem to do this trick. > > > I also found in a roadmap doc this but seems it is not made to the release > yet: > > > Only Run Latest - Champion : Sid > > • For cases where we need to only run the latest in a series of task > instance runs and mark the others as skipped. For example, we may have job > to execute a DB snapshot every day. If the DAG is paused for 5 days and > then unpaused, we don’t want to run all 5, just the latest. With this > feature, we will provide “cron” functionality for task scheduling that is > not related to ETL > > > My second question, what if I have another DAG that does incremental loads > from a source table: > > > Day 1 - OK, loaded new/changed data for previous day > > Day 2 - source system is down (or DAG is paused), Airflow DagRun failed > > Day 3 - source system is down (or DAG is paused), Airflow DagRun failed > > Day 4 - source system is up, Airflow Dagrun succeeded > > > My problem (unless I am missing something), Airflow on Day 4 would use > execution time from Day 3, so the interval for incremental load would be > since the last run (which was Failed). My hope it would use the last > _successful_ run so on Day 4 it would go back to Day 1. Is it possible to > achieve this? > > I am aware of a manual backfill command via CLI but I am not sure I want to > use due to all the issues and inconsistencies I've read about it. > > Thanks! > -- *Joe Napolitano *| Sr. Data Engineer www.blueapron.com | 5 Crosby Street, New York, NY 10013
scheduler questions
Hello all and thanks for such an amazing project! I have been evaluating Airflow and spent a few days reading about it and playing with it and I have a few questions that I struggle to understand. Let's say I have a simple DAG that runs once a day and it is doing a full reload of tables from the source database so the process is not incremental. Let's consider this scenario: Day 1 - OK Day 2 - airflow scheduler or server with airflow is down for some reason ((or DAG is paused) Day 3 - still down(or DAG is paused) Day 4 - server is up and now needs to run missing jobs. How can I make airflow to run only Day 4 job and not backfill Day 2 and 3? I tried to do depend_on_past = True but it does not seem to do this trick. I also found in a roadmap doc this but seems it is not made to the release yet: Only Run Latest - Champion : Sid • For cases where we need to only run the latest in a series of task instance runs and mark the others as skipped. For example, we may have job to execute a DB snapshot every day. If the DAG is paused for 5 days and then unpaused, we don’t want to run all 5, just the latest. With this feature, we will provide “cron” functionality for task scheduling that is not related to ETL My second question, what if I have another DAG that does incremental loads from a source table: Day 1 - OK, loaded new/changed data for previous day Day 2 - source system is down (or DAG is paused), Airflow DagRun failed Day 3 - source system is down (or DAG is paused), Airflow DagRun failed Day 4 - source system is up, Airflow Dagrun succeeded My problem (unless I am missing something), Airflow on Day 4 would use execution time from Day 3, so the interval for incremental load would be since the last run (which was Failed). My hope it would use the last _successful_ run so on Day 4 it would go back to Day 1. Is it possible to achieve this? I am aware of a manual backfill command via CLI but I am not sure I want to use due to all the issues and inconsistencies I've read about it. Thanks!