[jira] [Commented] (AIRFLOW-2145) Deadlock after clearing a running task
[ https://issues.apache.org/jira/browse/AIRFLOW-2145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560442#comment-16560442 ] Abdul Nimeri commented on AIRFLOW-2145: --- Opened a PR: https://github.com/apache/incubator-airflow/pull/3657 > Deadlock after clearing a running task > -- > > Key: AIRFLOW-2145 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2145 > Project: Apache Airflow > Issue Type: Bug >Affects Versions: 1.9.0 >Reporter: George Roldugin >Priority: Minor > Attachments: image-2018-02-23-18-59-11-828.png, > image-2018-02-23-19-00-37-741.png, image-2018-02-23-19-00-55-630.png, > image-2018-02-23-19-01-45-012.png, image-2018-02-23-19-01-57-498.png, > image-2018-02-23-19-02-18-837.png > > > TL;DR The essense of the issue is that whenever a currently running ask is > cleared, the dagrun enters a deadlocked state and fails. > > We see this in production with Celery executors and {{TimeDeltaSensor}}, and > I've been able to reproduce it locally with both {{TimeDeltaSensor}} and > {{WebHDFSSensor}}. > Here's the minimal example: > {code:java} > from datetime import datetime, timedelta > import airflow > from airflow.operators.sensors import TimeDeltaSensor > from airflow.operators.dummy_operator import DummyOperator > with airflow.DAG( > 'foo', > schedule_interval='@daily', > start_date=datetime(2018, 1, 1)) as dag: > wait_for_upstream_sla = TimeDeltaSensor( > task_id="wait_for_upstream_sla", > delta=timedelta(days=365*10) > ) > do_work = DummyOperator(task_id='do_work') > dag >> wait_for_upstream_sla >> do_work > {code} > > Sequence of actions, relevant DEBUG level logs, and some UI screenshots > {code:java} > airflow clear foo -e 2018-02-22 --no_confirm && airflow backfill foo -s > 2018-02-22 -e 2018-02-22{code} > {code:java} > [2018-02-23 17:17:45,983] {__init__.py:45} INFO - Using executor > SequentialExecutor > [2018-02-23 17:17:46,069] {models.py:189} INFO - Filling up the DagBag from > /Users/grol/Drive/dev/reporting/dags > ... > [2018-02-23 17:17:47,563] {jobs.py:2180} DEBUG - Task instance to run > > state scheduled > ... > {code} > !image-2018-02-23-18-59-11-828.png|width=418,height=87! > Now we clear all DAG's tasks externally: > {code:java} > airflow clear foo -e 2018-02-22 --no_confirm > {code} > This causes the following: > {code:java} > [2018-02-23 17:17:55,258] {base_task_runner.py:98} INFO - Subtask: > [2018-02-23 17:17:55,258] {sensors.py:629} INFO - Checking if the time > (2018-02-23 16:19:00) has come > [2018-02-23 17:17:58,844] {jobs.py:184} DEBUG - [heart] Boom. > [2018-02-23 17:18:03,848] {jobs.py:184} DEBUG - [heart] Boom. > [2018-02-23 17:18:08,856] {jobs.py:2585} WARNING - State of this instance has > been externally set to shutdown. Taking the poison pill. > [2018-02-23 17:18:08,874] {helpers.py:266} DEBUG - There are no descendant > processes to kill > [2018-02-23 17:18:08,875] {jobs.py:184} DEBUG - [heart] Boom. > [2018-02-23 17:18:08,900] {helpers.py:266} DEBUG - There are no descendant > processes to kill > [2018-02-23 17:18:08,922] {helpers.py:266} DEBUG - There are no descendant > processes to kill > [2018-02-23 17:18:09,005] {sequential_executor.py:47} ERROR - Failed to > execute task Command 'airflow run foo wait_for_upstream_sla > 2018-02-22T00:00:00 --local -sd DAGS_FOLDER/foo.py' returned non-zero exit > status 1. > [2018-02-23 17:18:09,012] {jobs.py:2004} DEBUG - Executor state: failed task > > [2018-02-23 17:18:09,018] {models.py:4584} INFO - Updating state for foo @ 2018-02-22 00:00:00: backfill_2018-02-22T00:00:00, externally > triggered: False> considering 2 task(s) > [2018-02-23 17:18:09,021] {models.py:1215} DEBUG - 2018-02-22 00:00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, > The task did not have depends_on_past set. > [2018-02-23 17:18:09,021] {models.py:1215} DEBUG - 2018-02-22 00:00:00 [None]> dependency 'Not In Retry Period' PASSED: True, > The context specified that being in a retry period was permitted. > [2018-02-23 17:18:09,027] {models.py:1215} DEBUG - 2018-02-22 00:00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's > trigger rule 'all_success' requires all upstream tasks to have succeeded, but > found 1 non-success(es). upstream_tasks_state={'skipped': 0, 'successes': 0, > 'failed': 0, 'upstream_failed': 0, 'done': 0, 'total': 1}, > upstream_task_ids=['wait_for_upstream_sla'] > [2018-02-23 17:18:09,029] {models.py:4643} INFO - Deadlock; marking run > triggered: False> failed > [2018-02-23 17:18:09,045] {jobs.py:2125} INFO - [backfill progress] | > finished run 1 of 1 | tasks waiting: 1 | succeeded: 0 | kicked_off: 1 | > failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1 > [2018-02-23 17:18:09,045] {jobs.py:2129} DEBUG - Finished dag
[jira] [Commented] (AIRFLOW-2145) Deadlock after clearing a running task
[ https://issues.apache.org/jira/browse/AIRFLOW-2145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560427#comment-16560427 ] Abdul Nimeri commented on AIRFLOW-2145: --- I think I ran into the same issue. In my case, it was because the upstream task moved into the `shutdown` state while it's being cleared, and then the dag run believed it was deadlocked because: * the only "unfinished" task is the downstream (e.g. `do_work`) * all "unfinished" tasks don't have their dependencies met (because its upstream isn't done) I think the bug is that `shutdown` isn't included as an "unfinished" state (see [https://github.com/apache/incubator-airflow/blob/c7a472ed6b0d8a4720f57ba1140c8cf665757167/airflow/utils/state.py#L110)] and is considered a "finished" state. Which doesn't seem right considering you'd never want a DAG be considered complete while you have an instance that's `shutdown` (and might have some retries left) > Deadlock after clearing a running task > -- > > Key: AIRFLOW-2145 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2145 > Project: Apache Airflow > Issue Type: Bug >Affects Versions: 1.9.0 >Reporter: George Roldugin >Priority: Minor > Attachments: image-2018-02-23-18-59-11-828.png, > image-2018-02-23-19-00-37-741.png, image-2018-02-23-19-00-55-630.png, > image-2018-02-23-19-01-45-012.png, image-2018-02-23-19-01-57-498.png, > image-2018-02-23-19-02-18-837.png > > > TL;DR The essense of the issue is that whenever a currently running ask is > cleared, the dagrun enters a deadlocked state and fails. > > We see this in production with Celery executors and {{TimeDeltaSensor}}, and > I've been able to reproduce it locally with both {{TimeDeltaSensor}} and > {{WebHDFSSensor}}. > Here's the minimal example: > {code:java} > from datetime import datetime, timedelta > import airflow > from airflow.operators.sensors import TimeDeltaSensor > from airflow.operators.dummy_operator import DummyOperator > with airflow.DAG( > 'foo', > schedule_interval='@daily', > start_date=datetime(2018, 1, 1)) as dag: > wait_for_upstream_sla = TimeDeltaSensor( > task_id="wait_for_upstream_sla", > delta=timedelta(days=365*10) > ) > do_work = DummyOperator(task_id='do_work') > dag >> wait_for_upstream_sla >> do_work > {code} > > Sequence of actions, relevant DEBUG level logs, and some UI screenshots > {code:java} > airflow clear foo -e 2018-02-22 --no_confirm && airflow backfill foo -s > 2018-02-22 -e 2018-02-22{code} > {code:java} > [2018-02-23 17:17:45,983] {__init__.py:45} INFO - Using executor > SequentialExecutor > [2018-02-23 17:17:46,069] {models.py:189} INFO - Filling up the DagBag from > /Users/grol/Drive/dev/reporting/dags > ... > [2018-02-23 17:17:47,563] {jobs.py:2180} DEBUG - Task instance to run > > state scheduled > ... > {code} > !image-2018-02-23-18-59-11-828.png|width=418,height=87! > Now we clear all DAG's tasks externally: > {code:java} > airflow clear foo -e 2018-02-22 --no_confirm > {code} > This causes the following: > {code:java} > [2018-02-23 17:17:55,258] {base_task_runner.py:98} INFO - Subtask: > [2018-02-23 17:17:55,258] {sensors.py:629} INFO - Checking if the time > (2018-02-23 16:19:00) has come > [2018-02-23 17:17:58,844] {jobs.py:184} DEBUG - [heart] Boom. > [2018-02-23 17:18:03,848] {jobs.py:184} DEBUG - [heart] Boom. > [2018-02-23 17:18:08,856] {jobs.py:2585} WARNING - State of this instance has > been externally set to shutdown. Taking the poison pill. > [2018-02-23 17:18:08,874] {helpers.py:266} DEBUG - There are no descendant > processes to kill > [2018-02-23 17:18:08,875] {jobs.py:184} DEBUG - [heart] Boom. > [2018-02-23 17:18:08,900] {helpers.py:266} DEBUG - There are no descendant > processes to kill > [2018-02-23 17:18:08,922] {helpers.py:266} DEBUG - There are no descendant > processes to kill > [2018-02-23 17:18:09,005] {sequential_executor.py:47} ERROR - Failed to > execute task Command 'airflow run foo wait_for_upstream_sla > 2018-02-22T00:00:00 --local -sd DAGS_FOLDER/foo.py' returned non-zero exit > status 1. > [2018-02-23 17:18:09,012] {jobs.py:2004} DEBUG - Executor state: failed task > > [2018-02-23 17:18:09,018] {models.py:4584} INFO - Updating state for foo @ 2018-02-22 00:00:00: backfill_2018-02-22T00:00:00, externally > triggered: False> considering 2 task(s) > [2018-02-23 17:18:09,021] {models.py:1215} DEBUG - 2018-02-22 00:00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, > The task did not have depends_on_past set. > [2018-02-23 17:18:09,021] {models.py:1215} DEBUG - 2018-02-22 00:00:00 [None]> dependency 'Not In Retry Period' PASSED: True, > The context specified that being in a retry period was permitted. > [2018-02-23 17:18:09,027] {models.py:1215} DEBUG - 2018-02-22 00:00:00
[jira] [Commented] (AIRFLOW-2777) dag.sub_dag(...) speedups
[ https://issues.apache.org/jira/browse/AIRFLOW-2777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551422#comment-16551422 ] Abdul Nimeri commented on AIRFLOW-2777: --- https://github.com/apache/incubator-airflow/pull/3621 > dag.sub_dag(...) speedups > - > > Key: AIRFLOW-2777 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2777 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Abdul Nimeri >Assignee: Abdul Nimeri >Priority: Minor > > dag.sub_dag(...) currently works by first deep copying the entire dag, and > then filtering down to the appropriate tasks. that can be slow since deep > copying a big dag takes a while. specifically, copying over all the tasks is > the bottleneck > this can be a lot faster by instead only copying over the filtered down tasks > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2777) dag.sub_dag(...) speedups
Abdul Nimeri created AIRFLOW-2777: - Summary: dag.sub_dag(...) speedups Key: AIRFLOW-2777 URL: https://issues.apache.org/jira/browse/AIRFLOW-2777 Project: Apache Airflow Issue Type: Improvement Reporter: Abdul Nimeri Assignee: Abdul Nimeri dag.sub_dag(...) currently works by first deep copying the entire dag, and then filtering down to the appropriate tasks. that can be slow since deep copying a big dag takes a while. specifically, copying over all the tasks is the bottleneck this can be a lot faster by instead only copying over the filtered down tasks -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2776) Tree view JSON is unnecessarily large
[ https://issues.apache.org/jira/browse/AIRFLOW-2776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551356#comment-16551356 ] Abdul Nimeri commented on AIRFLOW-2776: --- https://github.com/apache/incubator-airflow/pull/3620 > Tree view JSON is unnecessarily large > - > > Key: AIRFLOW-2776 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2776 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Abdul Nimeri >Assignee: Abdul Nimeri >Priority: Minor > > The tree view generates JSON that can be massive for bigger DAGs –– some of > our tree views at stripe have 10s of MBs of JSON. > The [generated JSON is > prettified|https://github.com/apache/incubator-airflow/blob/52c745da71a6da798f7322956967b5e818b56e48/airflow/www/views.py#L1480], > which both takes up more CPU time during serialization, as well as slowing > everything else that uses it. We patched this on stripe's fork by removing > all whitespace and had a ton of speedup for bigger tree views. Considering > the JSON is only meant to be used programmatically, this is probably an easy > win. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2776) Tree view JSON is unnecessarily large
Abdul Nimeri created AIRFLOW-2776: - Summary: Tree view JSON is unnecessarily large Key: AIRFLOW-2776 URL: https://issues.apache.org/jira/browse/AIRFLOW-2776 Project: Apache Airflow Issue Type: Improvement Reporter: Abdul Nimeri Assignee: Abdul Nimeri The tree view generates JSON that can be massive for bigger DAGs –– some of our tree views at stripe have 10s of MBs of JSON. The [generated JSON is prettified|https://github.com/apache/incubator-airflow/blob/52c745da71a6da798f7322956967b5e818b56e48/airflow/www/views.py#L1480], which both takes up more CPU time during serialization, as well as slowing everything else that uses it. We patched this on stripe's fork by removing all whitespace and had a ton of speedup for bigger tree views. Considering the JSON is only meant to be used programmatically, this is probably an easy win. -- This message was sent by Atlassian JIRA (v7.6.3#76005)