[jira] [Comment Edited] (AIRFLOW-47) ExternalTaskSensor causes scheduling dead lock

2016-05-18 Thread Amikam Snir (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-47?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15286981#comment-15286981
 ] 

Amikam Snir edited comment on AIRFLOW-47 at 5/19/16 5:44 AM:
-

[~criccomini], The scheduler's algorithm should use something like topological 
sort has part of it's decision, which task/ operator should run (It should take 
the node(task) with incoming degree zero). At this case you have sub-graphs & 
it didn't take it into consideration. This will reduce the severity of the 
problem since it will move it from dead-lock to starvation (Because you still 
should handle [this 
case|https://issues.apache.org/jira/browse/AIRFLOW-47?focusedCommentId=15278649=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15278649]).



was (Author: asnir):
The scheduler's algorithm should use topological sorting has part of it's 
decision, which task/ operator should run. At this case you have sub-graphs & 
it didn't take it into consideration. This will reduce the severity of the 
problem since it will move it from dead-lock to starvation (Because you still 
should handle [this 
case|https://issues.apache.org/jira/browse/AIRFLOW-47?focusedCommentId=15278649=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15278649]).


> ExternalTaskSensor causes scheduling dead lock
> --
>
> Key: AIRFLOW-47
> URL: https://issues.apache.org/jira/browse/AIRFLOW-47
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators, scheduler
>Affects Versions: Airflow 1.7.0
> Environment: CentOS 6.5
> Airflow 1.7.0 with SequentialExecuter 
>Reporter: Hila Visan
> Attachments: screenshot-1.png
>
>
> We are trying to use 'ExternalTaskSensor' to coordinate between a daily DAG 
> and an hourly DAG  (daily dags  depend on hourly).
> Relevant code: 
> *Daily DAG definition:*
> {code:title=2_daily_dag.py|borderStyle=solid}
> default_args = {
> …
> 'start_date': datetime(2016, 4, 2),
> …
> }
> dag = DAG(dag_id='2_daily_agg', default_args=default_args, 
> schedule_interval="@daily")
> ext_dep = ExternalTaskSensor(
> external_dag_id='1_hourly_agg',
> external_task_id='print_hourly1',
> task_id='evening_hours_sensor',
> dag=dag)
> {code}
> *Hourly DAG definition:*
> {code:title=1_hourly_dag.py|borderStyle=solid}
> default_args = {
> …
> 'start_date': datetime(2016, 4, 1),
> …
> }
> dag = DAG(dag_id='1_hourly_agg', default_args=default_args, 
> schedule_interval="@hourly")
> t1 = BashOperator(
> task_id='print_hourly1',
> bash_command='echo hourly job1',
> dag=dag)
> {code}
> The hourly dag was executed twice for the following execution dates:
> 04-01T00:00:00
> 04-01T01:00:00
> Then the daily dag was executed, and is still running  
> According to logs, daily dag is waiting for hourly dag to complete:
> {noformat}
> [2016-05-04 06:01:20,978] {models.py:1041} INFO - 
> Executing on 2016-04-03 
> 00:00:00
> [2016-05-04 06:01:20,984] {sensors.py:188} INFO - Poking for 
> 1_hourly_agg.print_hourly1 on 2016-04-02 00:00:00 ... 
> [2016-05-04 06:02:21,053] {sensors.py:188} INFO - Poking for 
> 1_hourly_agg.print_hourly1 on 2016-04-02 00:00:00 ... }}
> {noformat}
> How can I solve this dead-lock?
> In Addition- I didn't understand if it means that the daily dag depends only 
> on the "last" hourly dag of the same day (23-24pm)? 
> What happens if the hourly dag of other hour fails?
> Thanks a lot! 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (AIRFLOW-47) ExternalTaskSensor causes scheduling dead lock

2016-05-10 Thread Amikam Snir (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-47?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15277884#comment-15277884
 ] 

Amikam Snir edited comment on AIRFLOW-47 at 5/10/16 1:47 PM:
-

[~hilaviz], Please open a new issue/ edit the description. 
The problem is that you got a dead-lock. 
The Daily DAG instances consume all the resources. The Daily is depend on the 
hourly, but the resources (e.g. workers) are already occupied by the Daily 
instances. 
Task that wait for something to happen e.g. external task, should signal the 
scheduler (voluntarily giving up it's turn). These tasks should move to the 
waiting queue similar to the OS scheduling queues concept.



was (Author: asnir):
[~hilaviz], Please open a new issue/ edit the description. 
The problem is that you got a dead-lock. 
The Daily DAG instances consume all the resources. The Daily is depend on the 
hourly, but the resources (e.g. workers) are already occupied by the Daily 
instances. 
DAG that wait for something to happen e.g. external DAG, should signal the 
scheduler (voluntarily giving up it's turn). 
Those instances should move to the waiting queue similar to the OS scheduling 
queues concept.


> ExternalTaskSensor causes scheduling dead lock
> --
>
> Key: AIRFLOW-47
> URL: https://issues.apache.org/jira/browse/AIRFLOW-47
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators, scheduler
>Affects Versions: Airflow 1.7.0
> Environment: CentOS 6.5
> Airflow 1.7.0 with SequentialExecuter 
>Reporter: Hila Visan
>Priority: Trivial
>
> We are trying to use 'ExternalTaskSensor' to coordinate between a daily DAG 
> and an hourly DAG  (daily dags  depend on hourly).
> Relevant code: 
> *Daily DAG definition:*
> {code:title=2_daily_dag.py|borderStyle=solid}
> default_args = {
> …
> 'start_date': datetime(2016, 4, 2),
> …
> }
> dag = DAG(dag_id='2_daily_agg', default_args=default_args, 
> schedule_interval="@daily")
> ext_dep = ExternalTaskSensor(
> external_dag_id='1_hourly_agg',
> external_task_id='print_hourly1',
> task_id='evening_hours_sensor',
> dag=dag)
> {code}
> *Hourly DAG definition:*
> {code:title=1_hourly_dag.py|borderStyle=solid}
> default_args = {
> …
> 'start_date': datetime(2016, 4, 1),
> …
> }
> dag = DAG(dag_id='1_hourly_agg', default_args=default_args, 
> schedule_interval="@hourly")
> t1 = BashOperator(
> task_id='print_hourly1',
> bash_command='echo hourly job1',
> dag=dag)
> {code}
> The hourly dag was executed twice for the following execution dates:
> 04-01T00:00:00
> 04-01T01:00:00
> Then the daily dag was executed, and is still running  
> According to logs, daily dag is waiting for hourly dag to complete:
> {noformat}
> [2016-05-04 06:01:20,978] {models.py:1041} INFO - 
> Executing on 2016-04-03 
> 00:00:00
> [2016-05-04 06:01:20,984] {sensors.py:188} INFO - Poking for 
> 1_hourly_agg.print_hourly1 on 2016-04-02 00:00:00 ... 
> [2016-05-04 06:02:21,053] {sensors.py:188} INFO - Poking for 
> 1_hourly_agg.print_hourly1 on 2016-04-02 00:00:00 ... }}
> {noformat}
> How can I solve this dead-lock?
> In Addition- I didn't understand if it means that the daily dag depends only 
> on the "last" hourly dag of the same day (23-24pm)? 
> What happens if the hourly dag of other hour fails?
> Thanks a lot! 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (AIRFLOW-47) ExternalTaskSensor causes scheduling dead lock

2016-05-10 Thread Amikam Snir (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-47?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15277884#comment-15277884
 ] 

Amikam Snir edited comment on AIRFLOW-47 at 5/10/16 10:07 AM:
--

[~hilaviz], Please open a new issue/ edit the description. 
The problem is that you got a dead-lock. 
The Daily DAG instances consume all the resources. The Daily is depend on the 
hourly, but the resources (e.g. workers) are already occupied by the Daily 
instances. 
DAG that wait for something to happen e.g. external DAG, should signal the 
scheduler (voluntarily giving up it's turn). 
Those instances should move to the waiting queue similar to the OS scheduling 
queues concept.



was (Author: asnir):
@hilaviz, Please open a new issue/ edit the description. 
The problem is that you got a dead-lock. 
The Daily DAG instances consume all the resources. The Daily is depend on the 
hourly, but the resources (e.g. workers) are already occupied by the Daily 
instances. 
DAG that wait for something to happen e.g. external DAG, should signal the 
scheduler (voluntarily giving up it's turn). 
Those instances should move to the waiting queue similar to the OS scheduling 
queues concept.


> ExternalTaskSensor causes scheduling dead lock
> --
>
> Key: AIRFLOW-47
> URL: https://issues.apache.org/jira/browse/AIRFLOW-47
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators, scheduler
>Affects Versions: Airflow 1.7.0
> Environment: CentOS 6.5
> Airflow 1.7.0 with SequentialExecuter 
>Reporter: Hila Visan
>Priority: Trivial
>
> We are trying to use 'ExternalTaskSensor' to coordinate between a daily DAG 
> and an hourly DAG  (daily dags  depend on hourly).
> Relevant code: 
> *Daily DAG definition:*
> {code:title=2_daily_dag.py|borderStyle=solid}
> default_args = {
> …
> 'start_date': datetime(2016, 4, 2),
> …
> }
> dag = DAG(dag_id='2_daily_agg', default_args=default_args, 
> schedule_interval="@daily")
> ext_dep = ExternalTaskSensor(
> external_dag_id='1_hourly_agg',
> external_task_id='print_hourly1',
> task_id='evening_hours_sensor',
> dag=dag)
> {code}
> *Hourly DAG definition:*
> {code:title=1_hourly_dag.py|borderStyle=solid}
> default_args = {
> …
> 'start_date': datetime(2016, 4, 1),
> …
> }
> dag = DAG(dag_id='1_hourly_agg', default_args=default_args, 
> schedule_interval="@hourly")
> t1 = BashOperator(
> task_id='print_hourly1',
> bash_command='echo hourly job1',
> dag=dag)
> {code}
> The hourly dag was executed twice for the following execution dates:
> 04-01T00:00:00
> 04-01T01:00:00
> Then the daily dag was executed, and is still running  
> According to logs, daily dag is waiting for hourly dag to complete:
> {noformat}
> [2016-05-04 06:01:20,978] {models.py:1041} INFO - 
> Executing on 2016-04-03 
> 00:00:00
> [2016-05-04 06:01:20,984] {sensors.py:188} INFO - Poking for 
> 1_hourly_agg.print_hourly1 on 2016-04-02 00:00:00 ... 
> [2016-05-04 06:02:21,053] {sensors.py:188} INFO - Poking for 
> 1_hourly_agg.print_hourly1 on 2016-04-02 00:00:00 ... }}
> {noformat}
> How can I solve this dead-lock?
> In Addition- I didn't understand if it means that the daily dag depends only 
> on the "last" hourly dag of the same day (23-24pm)? 
> What happens if the hourly dag of other hour fails?
> Thanks a lot! 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (AIRFLOW-47) ExternalTaskSensor causes scheduling dead lock

2016-05-04 Thread Hila Visan (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-47?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15271869#comment-15271869
 ] 

Hila Visan edited comment on AIRFLOW-47 at 5/5/16 4:26 AM:
---

Unfortunately this workaround is partially working...
I also tried to configure Airflow to work with the CeleryExecutor to check if 
it solves the problem - and it doesn't.

h6.This is my use case:
I changed the _'start_date'_ parameter of both dags to 1/4/16.
The hourly dag was executed for each hour from 1/4/16 00:00 until 2/4/16 8:00.
The daily dag was executes twice for 1/4/16 00:00 and 2/4/16 00:00, and 
currently there are 16 jobs in state of running for the execution dates of 
3/4/16-18/4/16 (00:00),
but each one of them is waiting for a specific hourly dag to run.
*I have only 16 executors and now the system is stuck.*

Does it mean that we can use ExternalTaskSensor only for the current date? and 
can't perform retroactive processing?

Does Airflow offers other operator that can handle the dependency of 
hourly_dag->daily_dag->weekly_dag->monthly_dag?

Thanks


was (Author: hilaviz):
Unfortunately this workaround is partially working...
I also tried to configure Airflow to work with the CeleryExecutor to check if 
it solves the problem - and it doesn't.

h6.This is my use case:
I changed the _'start_date'_ parameter of both dags to 1/4/16.
The hourly dag was executed for each hour from 1/4/16 00:00 until 2/4/16 8:00 
(28 runs in total).
The daily dag was executes twice for 1/4/16 00:00 and 2/4/16 00:00, and 
currently there are 16 jobs in state of running for the execution dates of 
3/4/16-18/4/16 (00:00),
but each one of them is waiting for a specific hourly dag to run.
*I have only 16 executors and now the system is stuck.*

Does it mean that we can use ExternalTaskSensor only for the current date? and 
can't perform retroactive processing?

Does Airflow offers other operator that can handle the dependency of 
hourly_dag->daily_dag->weekly_dag->monthly_dag?

Thanks

> ExternalTaskSensor causes scheduling dead lock
> --
>
> Key: AIRFLOW-47
> URL: https://issues.apache.org/jira/browse/AIRFLOW-47
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators, scheduler
>Affects Versions: Airflow 1.7.0
> Environment: CentOS 6.5
> Airflow 1.7.0 with SequentialExecuter 
>Reporter: Hila Visan
>Priority: Trivial
>
> We are trying to use 'ExternalTaskSensor' to coordinate between a daily DAG 
> and an hourly DAG  (daily dags  depend on hourly).
> Relevant code: 
> *Daily DAG definition:*
> {code:title=2_daily_dag.py|borderStyle=solid}
> default_args = {
> …
> 'start_date': datetime(2016, 4, 2),
> …
> }
> dag = DAG(dag_id='2_daily_agg', default_args=default_args, 
> schedule_interval="@daily")
> ext_dep = ExternalTaskSensor(
> external_dag_id='1_hourly_agg',
> external_task_id='print_hourly1',
> task_id='evening_hours_sensor',
> dag=dag)
> {code}
> *Hourly DAG definition:*
> {code:title=1_hourly_dag.py|borderStyle=solid}
> default_args = {
> …
> 'start_date': datetime(2016, 4, 1),
> …
> }
> dag = DAG(dag_id='1_hourly_agg', default_args=default_args, 
> schedule_interval="@hourly")
> t1 = BashOperator(
> task_id='print_hourly1',
> bash_command='echo hourly job1',
> dag=dag)
> {code}
> The hourly dag was executed twice for the following execution dates:
> 04-01T00:00:00
> 04-01T01:00:00
> Then the daily dag was executed, and is still running  
> According to logs, daily dag is waiting for hourly dag to complete:
> {noformat}
> [2016-05-04 06:01:20,978] {models.py:1041} INFO - 
> Executing on 2016-04-03 
> 00:00:00
> [2016-05-04 06:01:20,984] {sensors.py:188} INFO - Poking for 
> 1_hourly_agg.print_hourly1 on 2016-04-02 00:00:00 ... 
> [2016-05-04 06:02:21,053] {sensors.py:188} INFO - Poking for 
> 1_hourly_agg.print_hourly1 on 2016-04-02 00:00:00 ... }}
> {noformat}
> How can I solve this dead-lock?
> In Addition- I didn't understand if it means that the daily dag depends only 
> on the "last" hourly dag of the same day (23-24pm)? 
> What happens if the hourly dag of other hour fails?
> Thanks a lot! 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)