[jira] [Commented] (AIRFLOW-1419) Trigger Rule not respected downstream of BranchPythonOperator
[ https://issues.apache.org/jira/browse/AIRFLOW-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16688014#comment-16688014 ] Conrad Lee commented on AIRFLOW-1419: - [~xnuinside] thanks for having a look. Also thanks for fining the bug in the example code–I've fixed that. I'm not sure this should be closed though. As I recall, before 1.8.2, no dummy operator was required at all, because task-skip propagated differently. When 1.8.2 came along, all of a sudden a dummy was necessary – the question is whether this is desired. I much preferred this previous behavior–why should a dummy operator be necessary at all? If one of the child tasks has a trigger rule thats stops the propagation of task-skipping (such as ALL_DONE), then IMHO it should never be skipped. > Trigger Rule not respected downstream of BranchPythonOperator > - > > Key: AIRFLOW-1419 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1419 > Project: Apache Airflow > Issue Type: Bug >Affects Versions: 1.8.2 >Reporter: Conrad Lee >Priority: Major > > Lets consider the following DAG: > {noformat} > > / \ > branch_op confluence_op > \__work_op/ > {noformat} > This is implemented in the following code: > {code:java} > import airflow > from airflow.operators.python_operator import BranchPythonOperator > from airflow.operators.dummy_operator import DummyOperator > from airflow.utils.trigger_rule import TriggerRule > from airflow.models import DAG > args = { > 'owner': 'airflow', > 'start_date': airflow.utils.dates.days_ago(2) > } > dag = DAG( > dag_id='branch_skip_problem', > default_args=args, > schedule_interval="@daily") > branch_op = BranchPythonOperator( > task_id='branch_op', > python_callable=lambda: 'work_op', > dag=dag) > work_op = DummyOperator(task_id='work_op', dag=dag) > confluence_op = DummyOperator(task_id='confluence_op', dag=dag, > trigger_rule=TriggerRule.ALL_DONE) > branch_op.set_downstream(confluence_op) > branch_op.set_downstream(work_op) > work_op.set_downstream(confluence_op) > {code} > Note that branch_op is a BranchPythonOperator, work_op and confluence_op are > DummyOperators, and that confluence_op has its trigger_rule set to ALL_DONE. > In dag runs where brancher_op chooses to activate work_op as its child, > confluence_op never runs. This doesn't seem right, because confluence_op has > two parents and a trigger_rule set that it'll run as soon as all of its > parents are done (whether or not they are skipped). > I know this example seems contrived and that in practice there are better > ways of conditionally executing work_op. However, this is the minimal code to > illustrate the problem. You can imagine that this problem might actually > creep up in practice where originally there was a good reason to use the > BranchPythonOperator, and then time passes and someone modifies one of the > branches so that it doesn't really contain any children anymore, thus > resembling the example. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-3316) GCS to BQ operator leaves schema_fields operator unset when autodetect=True
Conrad Lee created AIRFLOW-3316: --- Summary: GCS to BQ operator leaves schema_fields operator unset when autodetect=True Key: AIRFLOW-3316 URL: https://issues.apache.org/jira/browse/AIRFLOW-3316 Project: Apache Airflow Issue Type: Bug Components: operators Affects Versions: 1.10.1 Reporter: Conrad Lee Assignee: Conrad Lee When I use the GoogleCloudStorageToBigQueryOperator to load data from Parquet into BigQuery, I leave the schema_fields argument set to 'None' and set autodetect=True. This causes the following error: {code:java} [2018-11-08 09:42:03,690] {models.py:1736} ERROR - local variable 'schema_fields' referenced before assignment Traceback (most recent call last) File "/usr/local/lib/airflow/airflow/models.py", line 1633, in _run_raw_tas result = task_copy.execute(context=context File "/home/airflow/gcs/plugins/bq_operator_updated.py", line 2018, in execut schema_fields=schema_fields UnboundLocalError: local variable 'schema_fields' referenced before assignmen {code} The problem is this set of checks in which the schema_fields variable is set neglects to cover all the cases {code:java} if not self.schema_fields: if self.schema_object and self.source_format != 'DATASTORE_BACKUP': gcs_hook = GoogleCloudStorageHook( google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, delegate_to=self.delegate_to) schema_fields = json.loads(gcs_hook.download( self.bucket, self.schema_object).decode("utf-8")) elif self.schema_object is None and self.autodetect is False: raise ValueError('At least one of `schema_fields`, `schema_object`, ' 'or `autodetect` must be passed.') else: schema_fields = self.schema_fields {code} After the `elif` we need to handle the case where autodetect is set to True. This can be done by simply adding two lines: {code:java} if not self.schema_fields: if self.schema_object and self.source_format != 'DATASTORE_BACKUP': gcs_hook = GoogleCloudStorageHook( google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, delegate_to=self.delegate_to) schema_fields = json.loads(gcs_hook.download( self.bucket, self.schema_object).decode("utf-8")) elif self.schema_object is None and self.autodetect is False: raise ValueError('At least one of `schema_fields`, `schema_object`, ' 'or `autodetect` must be passed.') else: schema_fiels = None else: schema_fields = self.schema_fields{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state
[ https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Conrad Lee updated AIRFLOW-1428: Description: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293]. * That's raised because {{no_dependencies_met}} is {{True}}, when it should be {{False}} * no_dependencies_met is True because when you call {{skipped_child_op.get_failed_dep_statuses()}}, it returns [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")] So basically, because {{skipped_child_op}}'s trigger rule is {{all_success}} and its parent is skipped, the scheduler can't find any tasks that are ready for execution. The Dag is then marked as failed (due to deadlock). I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use {{ALL_DONE}} because I want {{skipped_child_op}} to be skipped. Thus, there seems to be no way for me to properly implement this very simple DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], [AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and [AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719]. There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. was: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293]. * That's raised because {{no_dependencies_met}} is {{True}}, when it should be {{False}} * no_dependencies_met is True because when you call {{skipped_child_op.get_failed_dep_statuses()}}, it returns [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")] So basically, because {{skipped_child_op}}'s trigger rule is {{all_success}} and its parent is skipped, the scheduler can't find any tasks that are ready for execution. The Dag is then marked as failed (due to deadlock). I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use {{ALL_DONE}} because I want {{skipped_child_op}} to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], [AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and [AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719]. There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. > DagRun deadlocks when all tasks' dependencies have skipped state > > > Key: AIRFLOW-1428 > URL:
[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state
[ https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Conrad Lee updated AIRFLOW-1428: Description: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293]. * That's raised because {{no_dependencies_met}} is {{True}}, when it should be {{False}} * no_dependencies_met is True because when you call {{skipped_child_op.get_failed_dep_statuses()}}, it returns [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")] So basically, because {{skipped_child_op}}'s trigger rule is {{all_success}} and its parent is skipped, the scheduler can't find any tasks that are ready for execution. The Dag is then marked as failed (due to deadlock). I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use {{ALL_DONE}} because I want {{skipped_child_op}} to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], [AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and [AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719]. There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. was: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293]. * That's raised because {{no_dependencies_met}} is {{True}}, when it should be {{False}} * no_dependencies_met is True because when you call {{skipped_child_op.get_failed_dep_statuses()}}, it returns [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")] So basically, because {{skipped_child_op}}'s trigger rule is {{all_success}} and its parent is skipped, the can't find any tasks that are ready for execution. The Dag is then marked as failed (due to deadlock). I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use {{ALL_DONE}} because I want {{skipped_child_op}} to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], [AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and [AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719]. There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. > DagRun deadlocks when all tasks' dependencies have skipped state > > > Key: AIRFLOW-1428 > URL:
[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state
[ https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Conrad Lee updated AIRFLOW-1428: Description: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293]. * That's raised because {{no_dependencies_met}} is {{True}}, when it should be {{False}} * no_dependencies_met is True because when you call {{skipped_child_op.get_failed_dep_statuses()}}, it returns [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")] So basically, because {{skipped_child_op}}'s trigger rule is {{all_success}} and its parent is skipped, the can't find any tasks that are ready for execution. The Dag is then marked as failed (due to deadlock). I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use {{ALL_DONE}} because I want {{skipped_child_op}} to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], [AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and [AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719]. There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. was: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293]. * That's raised because {{no_dependencies_met}} is {{True}}, when it should be {{False}} * no_dependencies_met is True because when you call {{skipped_child_op.get_failed_dep_statuses()}}, it returns [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")] So basically, because {{skipped_child_op}}'s parent is skipped, it is considered failed. I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use {{ALL_DONE}} because I want {{skipped_child_op}} to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], [AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and [AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719]. There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. > DagRun deadlocks when all tasks' dependencies have skipped state > > > Key: AIRFLOW-1428 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1428 > Project: Apache Airflow > Issue Type: Bug > Components: DagRun,
[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state
[ https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Conrad Lee updated AIRFLOW-1428: Description: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293]. * That's raised because {{no_dependencies_met}} is {{True}}, when it should be {{False}} * no_dependencies_met is True because when you call {{skipped_child_op.get_failed_dep_statuses()}}, it returns [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")] So basically, because {{skipped_child_op}}'s parent is skipped, it is considered failed. I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use {{ALL_DONE}} because I want {{skipped_child_op}} to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], [AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and [AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719]. There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. was: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293]. * That's raised because {{no_dependencies_met}} is {{True}}, when it should be {{False}} * no_dependencies_met is True because when you call {{skipped_child_op.get_failed_dep_statuses()}}, it returns [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")] So basically, because `skipped_child_op`'s parent is skipped, it is considered failed. I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use {{ALL_DONE}} because I want {{skipped_child_op}} to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], [AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and [AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719]. There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. > DagRun deadlocks when all tasks' dependencies have skipped state > > > Key: AIRFLOW-1428 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1428 > Project: Apache Airflow > Issue Type: Bug > Components: DagRun, dependencies >Affects Versions: 1.8.2 > Environment: LocalExecutor with postgres >Reporter: Conrad Lee >
[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state
[ https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Conrad Lee updated AIRFLOW-1428: Description: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293]. * That's raised because {{no_dependencies_met}} is {{True}}, when it should be {{False}} * no_dependencies_met is True because when you call {{skipped_child_op.get_failed_dep_statuses()}}, it returns [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")] So basically, because `skipped_child_op`'s parent is skipped, it is considered failed. I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use {{ALL_DONE}} because I want {{skipped_child_op}} to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], [AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and [AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719]. There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. was: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293]. * That's raised because {{no_dependencies_met}} is {{True}}, when it should be {{False}} * no_dependencies_met is True because when you call {{skipped_child_op.get_failed_dep_statuses()}}, it returns {{ [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")] }} So basically, because `skipped_child_op`'s parent is skipped, it is considered failed. I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use {{ALL_DONE}} because I want {{skipped_child_op}} to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], [AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and [AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719]. There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. > DagRun deadlocks when all tasks' dependencies have skipped state > > > Key: AIRFLOW-1428 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1428 > Project: Apache Airflow > Issue Type: Bug > Components: DagRun, dependencies >Affects Versions: 1.8.2 > Environment: LocalExecutor with postgres >Reporter: Conrad Lee >
[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state
[ https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Conrad Lee updated AIRFLOW-1428: Description: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293]. * That's raised because {{no_dependencies_met}} is {{True}}, when it should be {{False}} * no_dependencies_met is True because when you call {{skipped_child_op.get_failed_dep_statuses()}}, it returns {{ [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")] }} So basically, because `skipped_child_op`'s parent is skipped, it is considered failed. I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use {{ALL_DONE}} because I want {{skipped_child_op}} to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], [AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and [AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719]. There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. was: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293]. * That's raised because {{no_dependencies_met}} is {{True}}, when it should be {{False}} * no_dependencies_met is True because when you call {{skipped_child_op.get_failed_dep_statuses()}}, it returns {{[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")]}} So basically, because `skipped_child_op`'s parent is skipped, it is considered failed. I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use {{ALL_DONE}} because I want {{skipped_child_op}} to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], [AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and [AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719]. There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. > DagRun deadlocks when all tasks' dependencies have skipped state > > > Key: AIRFLOW-1428 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1428 > Project: Apache Airflow > Issue Type: Bug > Components: DagRun, dependencies >Affects Versions: 1.8.2 > Environment: LocalExecutor with postgres >Reporter: Conrad
[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state
[ https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Conrad Lee updated AIRFLOW-1428: Description: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here|https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293]. * That's raised because {{no_dependencies_met}} is {{True}}, when it should be {{False}} * no_dependencies_met is True because when you call {{skipped_child_op.get_failed_dep_statuses()}}, it returns {{[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")]}} So basically, because `skipped_child_op`'s parent is skipped, it is considered failed. I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use {{ALL_DONE}} because I want {{skipped_child_op}} to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983|https://issues.apache.org/jira/browse/AIRFLOW-983], [AIRFLOW-992|https://issues.apache.org/jira/browse/AIRFLOW-992], and [AIRFLOW-719|https://issues.apache.org/jira/browse/AIRFLOW-719]. There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. was: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state: [https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293]. * That's raised because {{no_dependencies_met}} is {{True}}, when it should be {{False}} * no_dependencies_met is True because when you call {{skipped_child_op.get_failed_dep_statuses()}}, it returns {{[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")]}} So basically, because `skipped_child_op`'s parent is skipped, it is considered failed. I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use {{ALL_DONE}} because I want {{skipped_child_op}} to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983](https://issues.apache.org/jira/browse/AIRFLOW-983), [AIRFLOW-992](https://issues.apache.org/jira/browse/AIRFLOW-992), and [AIRFLOW-719](https://issues.apache.org/jira/browse/AIRFLOW-719). There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. > DagRun deadlocks when all tasks' dependencies have skipped state > > > Key: AIRFLOW-1428 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1428 > Project: Apache Airflow > Issue Type: Bug > Components: DagRun, dependencies >Affects Versions: 1.8.2 > Environment: LocalExecutor with postgres >Reporter: Conrad Lee
[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state
[ https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Conrad Lee updated AIRFLOW-1428: Description: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state: [https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293]. * That's raised because {{no_dependencies_met}} is {{True}}, when it should be {{False}} * no_dependencies_met is True because when you call {{skipped_child_op.get_failed_dep_statuses()}}, it returns {{[TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")]}} So basically, because `skipped_child_op`'s parent is skipped, it is considered failed. I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use {{ALL_DONE}} because I want {{skipped_child_op}} to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983](https://issues.apache.org/jira/browse/AIRFLOW-983), [AIRFLOW-992](https://issues.apache.org/jira/browse/AIRFLOW-992), and [AIRFLOW-719](https://issues.apache.org/jira/browse/AIRFLOW-719). There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. was: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here](https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293). * That's raised because {{no_dependencies_met}} is True, when it should be False * no_dependencies_met is True because when you call `skipped_child_op.get_failed_dep_statuses()`, it returns ` [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")]` So basically, because `skipped_child_op`'s parent is skipped, it is considered failed. I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use `ALL_DONE` because I want `skipped_child_op` to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983](https://issues.apache.org/jira/browse/AIRFLOW-983), [AIRFLOW-992](https://issues.apache.org/jira/browse/AIRFLOW-992), and [AIRFLOW-719](https://issues.apache.org/jira/browse/AIRFLOW-719). There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. > DagRun deadlocks when all tasks' dependencies have skipped state > > > Key: AIRFLOW-1428 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1428 > Project: Apache Airflow > Issue Type: Bug > Components: DagRun, dependencies >Affects Versions: 1.8.2 > Environment: LocalExecutor with postgres >Reporter: Conrad Lee >
[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state
[ https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Conrad Lee updated AIRFLOW-1428: Description: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here](https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293). * That's raised because {{no_dependencies_met}} is True, when it should be False * no_dependencies_met is True because when you call `skipped_child_op.get_failed_dep_statuses()`, it returns ` [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")]` So basically, because `skipped_child_op`'s parent is skipped, it is considered failed. I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use `ALL_DONE` because I want `skipped_child_op` to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983](https://issues.apache.org/jira/browse/AIRFLOW-983), [AIRFLOW-992](https://issues.apache.org/jira/browse/AIRFLOW-992), and [AIRFLOW-719](https://issues.apache.org/jira/browse/AIRFLOW-719). There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. was: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here](https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293). * That's raised because {{code}} no_dependencies_met {{code}} is True, when it should be False * no_dependencies_met is True because when you call `skipped_child_op.get_failed_dep_statuses()`, it returns ` [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")]` So basically, because `skipped_child_op`'s parent is skipped, it is considered failed. I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use `ALL_DONE` because I want `skipped_child_op` to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983](https://issues.apache.org/jira/browse/AIRFLOW-983), [AIRFLOW-992](https://issues.apache.org/jira/browse/AIRFLOW-992), and [AIRFLOW-719](https://issues.apache.org/jira/browse/AIRFLOW-719). There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. > DagRun deadlocks when all tasks' dependencies have skipped state > > > Key: AIRFLOW-1428 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1428 > Project: Apache Airflow > Issue Type: Bug > Components: DagRun, dependencies >Affects Versions: 1.8.2 > Environment: LocalExecutor with postgres >Reporter: Conrad Lee >
[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state
[ https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Conrad Lee updated AIRFLOW-1428: Description: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here](https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293). * That's raised because {{code}} no_dependencies_met {{code}} is True, when it should be False * no_dependencies_met is True because when you call `skipped_child_op.get_failed_dep_statuses()`, it returns ` [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")]` So basically, because `skipped_child_op`'s parent is skipped, it is considered failed. I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use `ALL_DONE` because I want `skipped_child_op` to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983](https://issues.apache.org/jira/browse/AIRFLOW-983), [AIRFLOW-992](https://issues.apache.org/jira/browse/AIRFLOW-992), and [AIRFLOW-719](https://issues.apache.org/jira/browse/AIRFLOW-719). There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. was: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here](https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293). * That's raised because {code} no_dependencies_met {code} is True, when it should be False * no_dependencies_met is True because when you call `skipped_child_op.get_failed_dep_statuses()`, it returns ` [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")]` So basically, because `skipped_child_op`'s parent is skipped, it is considered failed. I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use `ALL_DONE` because I want `skipped_child_op` to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983](https://issues.apache.org/jira/browse/AIRFLOW-983), [AIRFLOW-992](https://issues.apache.org/jira/browse/AIRFLOW-992), and [AIRFLOW-719](https://issues.apache.org/jira/browse/AIRFLOW-719). There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. > DagRun deadlocks when all tasks' dependencies have skipped state > > > Key: AIRFLOW-1428 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1428 > Project: Apache Airflow > Issue Type: Bug > Components: DagRun, dependencies >Affects Versions: 1.8.2 > Environment: LocalExecutor with postgres >Reporter: Conrad
[jira] [Updated] (AIRFLOW-1428) DagRun deadlocks when all tasks' dependencies have skipped state
[ https://issues.apache.org/jira/browse/AIRFLOW-1428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Conrad Lee updated AIRFLOW-1428: Description: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here](https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293). * That's raised because {code} no_dependencies_met {code} is True, when it should be False * no_dependencies_met is True because when you call `skipped_child_op.get_failed_dep_statuses()`, it returns ` [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")]` So basically, because `skipped_child_op`'s parent is skipped, it is considered failed. I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use `ALL_DONE` because I want `skipped_child_op` to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983](https://issues.apache.org/jira/browse/AIRFLOW-983), [AIRFLOW-992](https://issues.apache.org/jira/browse/AIRFLOW-992), and [AIRFLOW-719](https://issues.apache.org/jira/browse/AIRFLOW-719). There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. was: In controlling which tasks are executed in a DagRun, it's common for tasks to skip themselves, e.g., by raising an AirflowSkipException. One controls how skips propagate using trigger rules.It is currently unclear to me how to propagate skipped states without causing the DagRun to deadlock. Consider the following simple example {code} def raise_skip(): raise AirflowSkipException skip_op = PythonOperator( task_id='skip_op', python_callable=raise_skip, dag=dag) skipped_child_op = DummyOperator(task_id='skipped_child_op', dag=dag) skip_op.set_downstream(skipped_child_op) {code} When I run the code above, the DagRun deadlocks. I have dug into why: * The deadlock is detected by DagRun.update_state [here](https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L4290-L4293). * That's raised because `no_dependencies_met` is `True`, when it should be `False` * no_dependencies_met is True because when you call `skipped_child_op.get_failed_dep_statuses()`, it returns ` [TIDepStatus(dep_name='Trigger Rule', passed=False, reason="Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'skipped': 1, 'failed': 0, 'upstream_failed': 0, 'done': 1}, upstream_task_ids=['metas_update_skipper']")]` So basically, because `skipped_child_op`'s parent is skipped, it is considered failed. I have looked for a trigger rule that would cause skipped parents to be not failed, but that doesn't seem to exist. I don't want to use `ALL_DONE` because I want `skipped_child_op` to be skipped. Thus, there seems to be no way for me to properly implement this very simply DAG. It seems that the airflow community has gone back and forth on how to handle skipped tasks--see [AIRFLOW-983](https://issues.apache.org/jira/browse/AIRFLOW-983), [AIRFLOW-992](https://issues.apache.org/jira/browse/AIRFLOW-992), and [AIRFLOW-719](https://issues.apache.org/jira/browse/AIRFLOW-719). There seems to be support for adding a trigger rule called `ALL_SUCCESS_OR_SKIPPED`, but nobody has implemented it. > DagRun deadlocks when all tasks' dependencies have skipped state > > > Key: AIRFLOW-1428 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1428 > Project: Apache Airflow > Issue Type: Bug > Components: DagRun, dependencies >Affects Versions: 1.8.2 > Environment: LocalExecutor with postgres >Reporter: Conrad Lee >