[jira] [Commented] (AIRFLOW-1419) Trigger Rule not respected downstream of BranchPythonOperator

2018-11-15 Thread Conrad Lee (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688014#comment-16688014
 ] 

Conrad Lee commented on AIRFLOW-1419:
-

[~xnuinside] thanks for having a look.  Also thanks for fining the bug in the 
example code–I've fixed that.

 

I'm not sure this should be closed though.  As I recall, before 1.8.2, no dummy 
operator was required at all, because task-skip propagated differently.  When 
1.8.2 came along, all of a sudden a dummy was necessary – the question is 
whether this is desired.

I much preferred this previous behavior–why should a dummy operator be 
necessary at all?  If one of the child tasks has a trigger rule thats stops the 
propagation of task-skipping (such as ALL_DONE), then IMHO it should never be 
skipped.

> Trigger Rule not respected downstream of BranchPythonOperator
> -
>
> Key: AIRFLOW-1419
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1419
> Project: Apache Airflow
>  Issue Type: Bug
>Affects Versions: 1.8.2
>Reporter: Conrad Lee
>Priority: Major
>
> Lets consider the following DAG:
> {noformat}
>   
>  /  \
> branch_op confluence_op
>  \__work_op/
> {noformat}
> This is implemented in the following code:
> {code:java}
> import airflow
> from airflow.operators.python_operator import BranchPythonOperator
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.utils.trigger_rule import TriggerRule
> from airflow.models import DAG
> args = {
> 'owner': 'airflow',
> 'start_date': airflow.utils.dates.days_ago(2)
> }
> dag = DAG(
> dag_id='branch_skip_problem',
> default_args=args,
> schedule_interval="@daily")
> branch_op = BranchPythonOperator(
> task_id='branch_op',
> python_callable=lambda: 'work_op',
> dag=dag)
> work_op = DummyOperator(task_id='work_op', dag=dag)
> confluence_op = DummyOperator(task_id='confluence_op', dag=dag, 
> trigger_rule=TriggerRule.ALL_DONE)
> branch_op.set_downstream(confluence_op)
> branch_op.set_downstream(work_op)
> work_op.set_downstream(confluence_op)
> {code}
> Note that branch_op is a BranchPythonOperator, work_op and confluence_op are 
> DummyOperators, and that confluence_op has its trigger_rule set to ALL_DONE.
> In dag runs where brancher_op chooses to activate work_op as its child, 
> confluence_op never runs. This doesn't seem right, because confluence_op has 
> two parents and a trigger_rule set that it'll run as soon as all of its 
> parents are done (whether or not they are skipped).
> I know this example seems contrived and that in practice there are better 
> ways of conditionally executing work_op. However, this is the minimal code to 
> illustrate the problem. You can imagine that this problem might actually 
> creep up in practice where originally there was a good reason to use the 
> BranchPythonOperator, and then time passes and someone modifies one of the 
> branches so that it doesn't really contain any children anymore, thus 
> resembling the example.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-3316) GCS to BQ operator leaves schema_fields operator unset when autodetect=True

2018-11-08 Thread Conrad Lee (JIRA)
Conrad Lee created AIRFLOW-3316:
---

 Summary: GCS to BQ operator leaves schema_fields operator unset 
when autodetect=True
 Key: AIRFLOW-3316
 URL: https://issues.apache.org/jira/browse/AIRFLOW-3316
 Project: Apache Airflow
  Issue Type: Bug
  Components: operators
Affects Versions: 1.10.1
Reporter: Conrad Lee
Assignee: Conrad Lee


When I use the GoogleCloudStorageToBigQueryOperator to load data from Parquet 
into BigQuery, I leave the schema_fields argument set to 'None' and set 
autodetect=True.

 

This causes the following error: 

 
{code:java}
[2018-11-08 09:42:03,690] {models.py:1736} ERROR - local variable 
'schema_fields' referenced before assignment
Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models.py", line 1633, in _run_raw_tas
result = task_copy.execute(context=context
  File "/home/airflow/gcs/plugins/bq_operator_updated.py", line 2018, in execut
schema_fields=schema_fields
UnboundLocalError: local variable 'schema_fields' referenced before assignmen
{code}
 

The problem is this set of checks in which the schema_fields variable is set 
neglects to cover all the cases
{code:java}
if not self.schema_fields:
  if self.schema_object and self.source_format != 'DATASTORE_BACKUP':
gcs_hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, 
delegate_to=self.delegate_to)

schema_fields = json.loads(gcs_hook.download(
  self.bucket,
  self.schema_object).decode("utf-8"))
  elif self.schema_object is None and self.autodetect is False:
raise ValueError('At least one of `schema_fields`, `schema_object`, '
'or `autodetect` must be passed.')

else:
schema_fields = self.schema_fields

{code}
After the `elif` we need to handle the case where autodetect is set to True.  
This can be done by simply adding two lines:
{code:java}
if not self.schema_fields:
  if self.schema_object and self.source_format != 'DATASTORE_BACKUP':
gcs_hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, 
delegate_to=self.delegate_to)

schema_fields = json.loads(gcs_hook.download(
  self.bucket,
  self.schema_object).decode("utf-8"))
  elif self.schema_object is None and self.autodetect is False:
raise ValueError('At least one of `schema_fields`, `schema_object`, '
'or `autodetect` must be passed.')
  else:
schema_fiels = None
else:
schema_fields = self.schema_fields{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state

2017-07-19 Thread Conrad Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Conrad Lee updated AIRFLOW-1428:

Description: 
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293].
  
* That's raised because {{no_dependencies_met}} is {{True}}, when it should be 
{{False}}
* no_dependencies_met is True because when you call 
{{skipped_child_op.get_failed_dep_statuses()}}, it returns 
[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]

So basically, because {{skipped_child_op}}'s trigger rule is {{all_success}} 
and its parent is skipped, the scheduler can't find any tasks that are ready 
for execution.  The Dag is then marked as failed (due to deadlock).  I have 
looked for a trigger rule that would cause skipped parents to be not failed, 
but that doesn't seem to exist.  I don't want to use {{ALL_DONE}} because I 
want {{skipped_child_op}} to be skipped.  Thus, there seems to be no way for me 
to properly implement this very simple DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], 
[AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and 
[AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719].  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.

  was:
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293].
  
* That's raised because {{no_dependencies_met}} is {{True}}, when it should be 
{{False}}
* no_dependencies_met is True because when you call 
{{skipped_child_op.get_failed_dep_statuses()}}, it returns 
[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]

So basically, because {{skipped_child_op}}'s trigger rule is {{all_success}} 
and its parent is skipped, the scheduler can't find any tasks that are ready 
for execution.  The Dag is then marked as failed (due to deadlock).  I have 
looked for a trigger rule that would cause skipped parents to be not failed, 
but that doesn't seem to exist.  I don't want to use {{ALL_DONE}} because I 
want {{skipped_child_op}} to be skipped.  Thus, there seems to be no way for me 
to properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], 
[AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and 
[AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719].  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.


> DagRun deadlocks when all tasks' dependencies have skipped state
> 
>
> Key: AIRFLOW-1428
> URL: 

[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state

2017-07-19 Thread Conrad Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Conrad Lee updated AIRFLOW-1428:

Description: 
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293].
  
* That's raised because {{no_dependencies_met}} is {{True}}, when it should be 
{{False}}
* no_dependencies_met is True because when you call 
{{skipped_child_op.get_failed_dep_statuses()}}, it returns 
[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]

So basically, because {{skipped_child_op}}'s trigger rule is {{all_success}} 
and its parent is skipped, the scheduler can't find any tasks that are ready 
for execution.  The Dag is then marked as failed (due to deadlock).  I have 
looked for a trigger rule that would cause skipped parents to be not failed, 
but that doesn't seem to exist.  I don't want to use {{ALL_DONE}} because I 
want {{skipped_child_op}} to be skipped.  Thus, there seems to be no way for me 
to properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], 
[AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and 
[AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719].  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.

  was:
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293].
  
* That's raised because {{no_dependencies_met}} is {{True}}, when it should be 
{{False}}
* no_dependencies_met is True because when you call 
{{skipped_child_op.get_failed_dep_statuses()}}, it returns 
[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]

So basically, because {{skipped_child_op}}'s trigger rule is {{all_success}} 
and its parent is skipped, the can't find any tasks that are ready for 
execution.  The Dag is then marked as failed (due to deadlock).  I have looked 
for a trigger rule that would cause skipped parents to be not failed, but that 
doesn't seem to exist.  I don't want to use {{ALL_DONE}} because I want 
{{skipped_child_op}} to be skipped.  Thus, there seems to be no way for me to 
properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], 
[AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and 
[AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719].  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.


> DagRun deadlocks when all tasks' dependencies have skipped state
> 
>
> Key: AIRFLOW-1428
> URL: 

[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state

2017-07-19 Thread Conrad Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Conrad Lee updated AIRFLOW-1428:

Description: 
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293].
  
* That's raised because {{no_dependencies_met}} is {{True}}, when it should be 
{{False}}
* no_dependencies_met is True because when you call 
{{skipped_child_op.get_failed_dep_statuses()}}, it returns 
[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]

So basically, because {{skipped_child_op}}'s trigger rule is {{all_success}} 
and its parent is skipped, the can't find any tasks that are ready for 
execution.  The Dag is then marked as failed (due to deadlock).  I have looked 
for a trigger rule that would cause skipped parents to be not failed, but that 
doesn't seem to exist.  I don't want to use {{ALL_DONE}} because I want 
{{skipped_child_op}} to be skipped.  Thus, there seems to be no way for me to 
properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], 
[AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and 
[AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719].  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.

  was:
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293].
  
* That's raised because {{no_dependencies_met}} is {{True}}, when it should be 
{{False}}
* no_dependencies_met is True because when you call 
{{skipped_child_op.get_failed_dep_statuses()}}, it returns 
[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]

So basically, because {{skipped_child_op}}'s parent is skipped, it is 
considered failed.  I have looked for a trigger rule that would cause skipped 
parents to be not failed, but that doesn't seem to exist.  I don't want to use 
{{ALL_DONE}} because I want {{skipped_child_op}} to be skipped.  Thus, there 
seems to be no way for me to properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], 
[AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and 
[AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719].  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.


> DagRun deadlocks when all tasks' dependencies have skipped state
> 
>
> Key: AIRFLOW-1428
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1428
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DagRun, 

[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state

2017-07-19 Thread Conrad Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Conrad Lee updated AIRFLOW-1428:

Description: 
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293].
  
* That's raised because {{no_dependencies_met}} is {{True}}, when it should be 
{{False}}
* no_dependencies_met is True because when you call 
{{skipped_child_op.get_failed_dep_statuses()}}, it returns 
[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]

So basically, because {{skipped_child_op}}'s parent is skipped, it is 
considered failed.  I have looked for a trigger rule that would cause skipped 
parents to be not failed, but that doesn't seem to exist.  I don't want to use 
{{ALL_DONE}} because I want {{skipped_child_op}} to be skipped.  Thus, there 
seems to be no way for me to properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], 
[AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and 
[AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719].  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.

  was:
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293].
  
* That's raised because {{no_dependencies_met}} is {{True}}, when it should be 
{{False}}
* no_dependencies_met is True because when you call 
{{skipped_child_op.get_failed_dep_statuses()}}, it returns 
[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]

So basically, because `skipped_child_op`'s parent is skipped, it is considered 
failed.  I have looked for a trigger rule that would cause skipped parents to 
be not failed, but that doesn't seem to exist.  I don't want to use 
{{ALL_DONE}} because I want {{skipped_child_op}} to be skipped.  Thus, there 
seems to be no way for me to properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], 
[AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and 
[AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719].  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.


> DagRun deadlocks when all tasks' dependencies have skipped state
> 
>
> Key: AIRFLOW-1428
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1428
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DagRun, dependencies
>Affects Versions: 1.8.2
> Environment: LocalExecutor with postgres
>Reporter: Conrad Lee
> 

[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state

2017-07-19 Thread Conrad Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Conrad Lee updated AIRFLOW-1428:

Description: 
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293].
  
* That's raised because {{no_dependencies_met}} is {{True}}, when it should be 
{{False}}
* no_dependencies_met is True because when you call 
{{skipped_child_op.get_failed_dep_statuses()}}, it returns 
[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]

So basically, because `skipped_child_op`'s parent is skipped, it is considered 
failed.  I have looked for a trigger rule that would cause skipped parents to 
be not failed, but that doesn't seem to exist.  I don't want to use 
{{ALL_DONE}} because I want {{skipped_child_op}} to be skipped.  Thus, there 
seems to be no way for me to properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], 
[AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and 
[AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719].  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.

  was:
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293].
  
* That's raised because {{no_dependencies_met}} is {{True}}, when it should be 
{{False}}
* no_dependencies_met is True because when you call 
{{skipped_child_op.get_failed_dep_statuses()}}, it returns {{ 
[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")] }}

So basically, because `skipped_child_op`'s parent is skipped, it is considered 
failed.  I have looked for a trigger rule that would cause skipped parents to 
be not failed, but that doesn't seem to exist.  I don't want to use 
{{ALL_DONE}} because I want {{skipped_child_op}} to be skipped.  Thus, there 
seems to be no way for me to properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], 
[AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and 
[AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719].  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.


> DagRun deadlocks when all tasks' dependencies have skipped state
> 
>
> Key: AIRFLOW-1428
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1428
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DagRun, dependencies
>Affects Versions: 1.8.2
> Environment: LocalExecutor with postgres
>Reporter: Conrad Lee
> 

[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state

2017-07-19 Thread Conrad Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Conrad Lee updated AIRFLOW-1428:

Description: 
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293].
  
* That's raised because {{no_dependencies_met}} is {{True}}, when it should be 
{{False}}
* no_dependencies_met is True because when you call 
{{skipped_child_op.get_failed_dep_statuses()}}, it returns {{ 
[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")] }}

So basically, because `skipped_child_op`'s parent is skipped, it is considered 
failed.  I have looked for a trigger rule that would cause skipped parents to 
be not failed, but that doesn't seem to exist.  I don't want to use 
{{ALL_DONE}} because I want {{skipped_child_op}} to be skipped.  Thus, there 
seems to be no way for me to properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], 
[AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and 
[AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719].  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.

  was:
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293].
  
* That's raised because {{no_dependencies_met}} is {{True}}, when it should be 
{{False}}
* no_dependencies_met is True because when you call 
{{skipped_child_op.get_failed_dep_statuses()}}, it returns 
{{[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger 
rule 'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]}}

So basically, because `skipped_child_op`'s parent is skipped, it is considered 
failed.  I have looked for a trigger rule that would cause skipped parents to 
be not failed, but that doesn't seem to exist.  I don't want to use 
{{ALL_DONE}} because I want {{skipped_child_op}} to be skipped.  Thus, there 
seems to be no way for me to properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], 
[AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and 
[AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719].  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.


> DagRun deadlocks when all tasks' dependencies have skipped state
> 
>
> Key: AIRFLOW-1428
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1428
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DagRun, dependencies
>Affects Versions: 1.8.2
> Environment: LocalExecutor with postgres
>Reporter: Conrad 

[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state

2017-07-19 Thread Conrad Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Conrad Lee updated AIRFLOW-1428:

Description: 
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293].
  
* That's raised because {{no_dependencies_met}} is {{True}}, when it should be 
{{False}}
* no_dependencies_met is True because when you call 
{{skipped_child_op.get_failed_dep_statuses()}}, it returns 
{{[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger 
rule 'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]}}

So basically, because `skipped_child_op`'s parent is skipped, it is considered 
failed.  I have looked for a trigger rule that would cause skipped parents to 
be not failed, but that doesn't seem to exist.  I don't want to use 
{{ALL_DONE}} because I want {{skipped_child_op}} to be skipped.  Thus, there 
seems to be no way for me to properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], 
[AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and 
[AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719].  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.

  was:
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state: 
[https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293].
  
* That's raised because {{no_dependencies_met}} is {{True}}, when it should be 
{{False}}
* no_dependencies_met is True because when you call 
{{skipped_child_op.get_failed_dep_statuses()}}, it returns 
{{[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger 
rule 'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]}}

So basically, because `skipped_child_op`'s parent is skipped, it is considered 
failed.  I have looked for a trigger rule that would cause skipped parents to 
be not failed, but that doesn't seem to exist.  I don't want to use 
{{ALL_DONE}} because I want {{skipped_child_op}} to be skipped.  Thus, there 
seems to be no way for me to properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983](https://issues.apache.org/jira/browse/AIRFLOW-983), 
[AIRFLOW-992](https://issues.apache.org/jira/browse/AIRFLOW-992), and 
[AIRFLOW-719](https://issues.apache.org/jira/browse/AIRFLOW-719).  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.


> DagRun deadlocks when all tasks' dependencies have skipped state
> 
>
> Key: AIRFLOW-1428
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1428
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DagRun, dependencies
>Affects Versions: 1.8.2
> Environment: LocalExecutor with postgres
>Reporter: Conrad Lee

[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state

2017-07-19 Thread Conrad Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Conrad Lee updated AIRFLOW-1428:

Description: 
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state: 
[https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293].
  
* That's raised because {{no_dependencies_met}} is {{True}}, when it should be 
{{False}}
* no_dependencies_met is True because when you call 
{{skipped_child_op.get_failed_dep_statuses()}}, it returns 
{{[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger 
rule 'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]}}

So basically, because `skipped_child_op`'s parent is skipped, it is considered 
failed.  I have looked for a trigger rule that would cause skipped parents to 
be not failed, but that doesn't seem to exist.  I don't want to use 
{{ALL_DONE}} because I want {{skipped_child_op}} to be skipped.  Thus, there 
seems to be no way for me to properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983](https://issues.apache.org/jira/browse/AIRFLOW-983), 
[AIRFLOW-992](https://issues.apache.org/jira/browse/AIRFLOW-992), and 
[AIRFLOW-719](https://issues.apache.org/jira/browse/AIRFLOW-719).  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.

  was:
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here](https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293).
  
* That's raised because {{no_dependencies_met}} is True, when it should be False
* no_dependencies_met is True because when you call 
`skipped_child_op.get_failed_dep_statuses()`, it returns ` 
[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]`

So basically, because `skipped_child_op`'s parent is skipped, it is considered 
failed.  I have looked for a trigger rule that would cause skipped parents to 
be not failed, but that doesn't seem to exist.  I don't want to use `ALL_DONE` 
because I want `skipped_child_op` to be skipped.  Thus, there seems to be no 
way for me to properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983](https://issues.apache.org/jira/browse/AIRFLOW-983), 
[AIRFLOW-992](https://issues.apache.org/jira/browse/AIRFLOW-992), and 
[AIRFLOW-719](https://issues.apache.org/jira/browse/AIRFLOW-719).  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.


> DagRun deadlocks when all tasks' dependencies have skipped state
> 
>
> Key: AIRFLOW-1428
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1428
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DagRun, dependencies
>Affects Versions: 1.8.2
> Environment: LocalExecutor with postgres
>Reporter: Conrad Lee
>

[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state

2017-07-19 Thread Conrad Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Conrad Lee updated AIRFLOW-1428:

Description: 
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here](https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293).
  
* That's raised because {{no_dependencies_met}} is True, when it should be False
* no_dependencies_met is True because when you call 
`skipped_child_op.get_failed_dep_statuses()`, it returns ` 
[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]`

So basically, because `skipped_child_op`'s parent is skipped, it is considered 
failed.  I have looked for a trigger rule that would cause skipped parents to 
be not failed, but that doesn't seem to exist.  I don't want to use `ALL_DONE` 
because I want `skipped_child_op` to be skipped.  Thus, there seems to be no 
way for me to properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983](https://issues.apache.org/jira/browse/AIRFLOW-983), 
[AIRFLOW-992](https://issues.apache.org/jira/browse/AIRFLOW-992), and 
[AIRFLOW-719](https://issues.apache.org/jira/browse/AIRFLOW-719).  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.

  was:
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here](https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293).
  
* That's raised because {{code}} no_dependencies_met {{code}} is True, when it 
should be False
* no_dependencies_met is True because when you call 
`skipped_child_op.get_failed_dep_statuses()`, it returns ` 
[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]`

So basically, because `skipped_child_op`'s parent is skipped, it is considered 
failed.  I have looked for a trigger rule that would cause skipped parents to 
be not failed, but that doesn't seem to exist.  I don't want to use `ALL_DONE` 
because I want `skipped_child_op` to be skipped.  Thus, there seems to be no 
way for me to properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983](https://issues.apache.org/jira/browse/AIRFLOW-983), 
[AIRFLOW-992](https://issues.apache.org/jira/browse/AIRFLOW-992), and 
[AIRFLOW-719](https://issues.apache.org/jira/browse/AIRFLOW-719).  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.


> DagRun deadlocks when all tasks' dependencies have skipped state
> 
>
> Key: AIRFLOW-1428
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1428
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DagRun, dependencies
>Affects Versions: 1.8.2
> Environment: LocalExecutor with postgres
>Reporter: Conrad Lee
>

[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state

2017-07-19 Thread Conrad Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Conrad Lee updated AIRFLOW-1428:

Description: 
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here](https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293).
  
* That's raised because {{code}} no_dependencies_met {{code}} is True, when it 
should be False
* no_dependencies_met is True because when you call 
`skipped_child_op.get_failed_dep_statuses()`, it returns ` 
[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]`

So basically, because `skipped_child_op`'s parent is skipped, it is considered 
failed.  I have looked for a trigger rule that would cause skipped parents to 
be not failed, but that doesn't seem to exist.  I don't want to use `ALL_DONE` 
because I want `skipped_child_op` to be skipped.  Thus, there seems to be no 
way for me to properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983](https://issues.apache.org/jira/browse/AIRFLOW-983), 
[AIRFLOW-992](https://issues.apache.org/jira/browse/AIRFLOW-992), and 
[AIRFLOW-719](https://issues.apache.org/jira/browse/AIRFLOW-719).  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.

  was:
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here](https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293).
  
* That's raised because {code} no_dependencies_met {code} is True, when it 
should be False
* no_dependencies_met is True because when you call 
`skipped_child_op.get_failed_dep_statuses()`, it returns ` 
[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]`

So basically, because `skipped_child_op`'s parent is skipped, it is considered 
failed.  I have looked for a trigger rule that would cause skipped parents to 
be not failed, but that doesn't seem to exist.  I don't want to use `ALL_DONE` 
because I want `skipped_child_op` to be skipped.  Thus, there seems to be no 
way for me to properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983](https://issues.apache.org/jira/browse/AIRFLOW-983), 
[AIRFLOW-992](https://issues.apache.org/jira/browse/AIRFLOW-992), and 
[AIRFLOW-719](https://issues.apache.org/jira/browse/AIRFLOW-719).  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.


> DagRun deadlocks when all tasks' dependencies have skipped state
> 
>
> Key: AIRFLOW-1428
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1428
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DagRun, dependencies
>Affects Versions: 1.8.2
> Environment: LocalExecutor with postgres
>Reporter: Conrad 

[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state

2017-07-19 Thread Conrad Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Conrad Lee updated AIRFLOW-1428:

Description: 
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here](https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293).
  
* That's raised because {code} no_dependencies_met {code} is True, when it 
should be False
* no_dependencies_met is True because when you call 
`skipped_child_op.get_failed_dep_statuses()`, it returns ` 
[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]`

So basically, because `skipped_child_op`'s parent is skipped, it is considered 
failed.  I have looked for a trigger rule that would cause skipped parents to 
be not failed, but that doesn't seem to exist.  I don't want to use `ALL_DONE` 
because I want `skipped_child_op` to be skipped.  Thus, there seems to be no 
way for me to properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983](https://issues.apache.org/jira/browse/AIRFLOW-983), 
[AIRFLOW-992](https://issues.apache.org/jira/browse/AIRFLOW-992), and 
[AIRFLOW-719](https://issues.apache.org/jira/browse/AIRFLOW-719).  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.

  was:
In controlling which tasks are executed in a DagRun, it's common for tasks to 
skip themselves, e.g., by raising an AirflowSkipException.  One controls how 
skips propagate using trigger rules.It is currently unclear to me how to 
propagate skipped states without causing the DagRun to deadlock.

Consider the following simple example

{code}
def raise_skip():
raise AirflowSkipException

skip_op = PythonOperator(
task_id='skip_op',
python_callable=raise_skip,
dag=dag)

skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag)

skip_op.set_downstream(skipped_child_op)
{code}

When I run the code above, the DagRun deadlocks.  I have dug into why:
* The deadlock is detected by DagRun.update_state 
[here](https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293).
  
* That's raised because `no_dependencies_met` is `True`, when it should be 
`False`
* no_dependencies_met is True because when you call 
`skipped_child_op.get_failed_dep_statuses()`, it returns ` 
[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 
'all_success' requires all upstream tasks to have succeeded, but found 1 
non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 
0, 'upstream_failed': 0, 'done': 1}, 
upstream_task_ids=['metas_update_skipper']")]`

So basically, because `skipped_child_op`'s parent is skipped, it is considered 
failed.  I have looked for a trigger rule that would cause skipped parents to 
be not failed, but that doesn't seem to exist.  I don't want to use `ALL_DONE` 
because I want `skipped_child_op` to be skipped.  Thus, there seems to be no 
way for me to properly implement this very simply DAG.

It seems that the airflow community has gone back and forth on how to handle 
skipped tasks--see 
[AIRFLOW-983](https://issues.apache.org/jira/browse/AIRFLOW-983), 
[AIRFLOW-992](https://issues.apache.org/jira/browse/AIRFLOW-992), and 
[AIRFLOW-719](https://issues.apache.org/jira/browse/AIRFLOW-719).  There seems 
to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but 
nobody has implemented it.


> DagRun deadlocks when all tasks' dependencies have skipped state
> 
>
> Key: AIRFLOW-1428
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1428
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DagRun, dependencies
>Affects Versions: 1.8.2
> Environment: LocalExecutor with postgres
>Reporter: Conrad Lee
>