[jira] [Updated] (AIRFLOW-2145) Deadlock after clearing a running task
[ https://issues.apache.org/jira/browse/AIRFLOW-2145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] George Roldugin updated AIRFLOW-2145: - Priority: Minor (was: Major) > Deadlock after clearing a running task > -- > > Key: AIRFLOW-2145 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2145 > Project: Apache Airflow > Issue Type: Bug >Affects Versions: 1.9.0 >Reporter: George Roldugin >Priority: Minor > Attachments: image-2018-02-23-18-59-11-828.png, > image-2018-02-23-19-00-37-741.png, image-2018-02-23-19-00-55-630.png, > image-2018-02-23-19-01-45-012.png, image-2018-02-23-19-01-57-498.png, > image-2018-02-23-19-02-18-837.png > > > TL;DR The essense of the issue is that whenever a currently running ask is > cleared, the dagrun enters a deadlocked state and fails. > > We see this in production with Celery executors and {{TimeDeltaSensor}}, and > I've been able to reproduce it locally with both {{TimeDeltaSensor}} and > {{WebHDFSSensor}}. > Here's the minimal example: > {code:java} > from datetime import datetime, timedelta > import airflow > from airflow.operators.sensors import TimeDeltaSensor > from airflow.operators.dummy_operator import DummyOperator > with airflow.DAG( > 'foo', > schedule_interval='@daily', > start_date=datetime(2018, 1, 1)) as dag: > wait_for_upstream_sla = TimeDeltaSensor( > task_id="wait_for_upstream_sla", > delta=timedelta(days=365*10) > ) > do_work = DummyOperator(task_id='do_work') > dag >> wait_for_upstream_sla >> do_work > {code} > > Sequence of actions, relevant DEBUG level logs, and some UI screenshots > {code:java} > airflow clear foo -e 2018-02-22 --no_confirm && airflow backfill foo -s > 2018-02-22 -e 2018-02-22{code} > {code:java} > [2018-02-23 17:17:45,983] {__init__.py:45} INFO - Using executor > SequentialExecutor > [2018-02-23 17:17:46,069] {models.py:189} INFO - Filling up the DagBag from > /Users/grol/Drive/dev/reporting/dags > ... > [2018-02-23 17:17:47,563] {jobs.py:2180} DEBUG - Task instance to run > > state scheduled > ... > {code} > !image-2018-02-23-18-59-11-828.png|width=418,height=87! > Now we clear all DAG's tasks externally: > {code:java} > airflow clear foo -e 2018-02-22 --no_confirm > {code} > This causes the following: > {code:java} > [2018-02-23 17:17:55,258] {base_task_runner.py:98} INFO - Subtask: > [2018-02-23 17:17:55,258] {sensors.py:629} INFO - Checking if the time > (2018-02-23 16:19:00) has come > [2018-02-23 17:17:58,844] {jobs.py:184} DEBUG - [heart] Boom. > [2018-02-23 17:18:03,848] {jobs.py:184} DEBUG - [heart] Boom. > [2018-02-23 17:18:08,856] {jobs.py:2585} WARNING - State of this instance has > been externally set to shutdown. Taking the poison pill. > [2018-02-23 17:18:08,874] {helpers.py:266} DEBUG - There are no descendant > processes to kill > [2018-02-23 17:18:08,875] {jobs.py:184} DEBUG - [heart] Boom. > [2018-02-23 17:18:08,900] {helpers.py:266} DEBUG - There are no descendant > processes to kill > [2018-02-23 17:18:08,922] {helpers.py:266} DEBUG - There are no descendant > processes to kill > [2018-02-23 17:18:09,005] {sequential_executor.py:47} ERROR - Failed to > execute task Command 'airflow run foo wait_for_upstream_sla > 2018-02-22T00:00:00 --local -sd DAGS_FOLDER/foo.py' returned non-zero exit > status 1. > [2018-02-23 17:18:09,012] {jobs.py:2004} DEBUG - Executor state: failed task > > [2018-02-23 17:18:09,018] {models.py:4584} INFO - Updating state for foo @ 2018-02-22 00:00:00: backfill_2018-02-22T00:00:00, externally > triggered: False> considering 2 task(s) > [2018-02-23 17:18:09,021] {models.py:1215} DEBUG - 2018-02-22 00:00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, > The task did not have depends_on_past set. > [2018-02-23 17:18:09,021] {models.py:1215} DEBUG - 2018-02-22 00:00:00 [None]> dependency 'Not In Retry Period' PASSED: True, > The context specified that being in a retry period was permitted. > [2018-02-23 17:18:09,027] {models.py:1215} DEBUG - 2018-02-22 00:00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's > trigger rule 'all_success' requires all upstream tasks to have succeeded, but > found 1 non-success(es). upstream_tasks_state={'skipped': 0, 'successes': 0, > 'failed': 0, 'upstream_failed': 0, 'done': 0, 'total': 1}, > upstream_task_ids=['wait_for_upstream_sla'] > [2018-02-23 17:18:09,029] {models.py:4643} INFO - Deadlock; marking run > triggered: False> failed > [2018-02-23 17:18:09,045] {jobs.py:2125} INFO - [backfill progress] | > finished run 1 of 1 | tasks waiting: 1 | succeeded: 0 | kicked_off: 1 | > failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1 > [2018-02-23 17:18:09,045] {jobs.py:2129} DEBUG - Finished dag run loop > iteration. Remaining tasks [ [scheduled]>] > [2018-02-23
[jira] [Updated] (AIRFLOW-2145) Deadlock after clearing a running task
[ https://issues.apache.org/jira/browse/AIRFLOW-2145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] George Roldugin updated AIRFLOW-2145: - Description: TL;DR The essense of the issue is that whenever a currently running ask is cleared, the dagrun enters a deadlocked state and fails. We see this in production with Celery executors and {{TimeDeltaSensor}}, and I've been able to reproduce it locally with both {{TimeDeltaSensor}} and {{WebHDFSSensor}}. Here's the minimal example: {code:java} from datetime import datetime, timedelta import airflow from airflow.operators.sensors import TimeDeltaSensor from airflow.operators.dummy_operator import DummyOperator with airflow.DAG( 'foo', schedule_interval='@daily', start_date=datetime(2018, 1, 1)) as dag: wait_for_upstream_sla = TimeDeltaSensor( task_id="wait_for_upstream_sla", delta=timedelta(days=365*10) ) do_work = DummyOperator(task_id='do_work') dag >> wait_for_upstream_sla >> do_work {code} Sequence of actions, relevant DEBUG level logs, and some UI screenshots {code:java} airflow clear foo -e 2018-02-22 --no_confirm && airflow backfill foo -s 2018-02-22 -e 2018-02-22{code} {code:java} [2018-02-23 17:17:45,983] {__init__.py:45} INFO - Using executor SequentialExecutor [2018-02-23 17:17:46,069] {models.py:189} INFO - Filling up the DagBag from /Users/grol/Drive/dev/reporting/dags ... [2018-02-23 17:17:47,563] {jobs.py:2180} DEBUG - Task instance to run state scheduled ... {code} !image-2018-02-23-18-59-11-828.png|width=418,height=87! Now we clear all DAG's tasks externally: {code:java} airflow clear foo -e 2018-02-22 --no_confirm {code} This causes the following: {code:java} [2018-02-23 17:17:55,258] {base_task_runner.py:98} INFO - Subtask: [2018-02-23 17:17:55,258] {sensors.py:629} INFO - Checking if the time (2018-02-23 16:19:00) has come [2018-02-23 17:17:58,844] {jobs.py:184} DEBUG - [heart] Boom. [2018-02-23 17:18:03,848] {jobs.py:184} DEBUG - [heart] Boom. [2018-02-23 17:18:08,856] {jobs.py:2585} WARNING - State of this instance has been externally set to shutdown. Taking the poison pill. [2018-02-23 17:18:08,874] {helpers.py:266} DEBUG - There are no descendant processes to kill [2018-02-23 17:18:08,875] {jobs.py:184} DEBUG - [heart] Boom. [2018-02-23 17:18:08,900] {helpers.py:266} DEBUG - There are no descendant processes to kill [2018-02-23 17:18:08,922] {helpers.py:266} DEBUG - There are no descendant processes to kill [2018-02-23 17:18:09,005] {sequential_executor.py:47} ERROR - Failed to execute task Command 'airflow run foo wait_for_upstream_sla 2018-02-22T00:00:00 --local -sd DAGS_FOLDER/foo.py' returned non-zero exit status 1. [2018-02-23 17:18:09,012] {jobs.py:2004} DEBUG - Executor state: failed task [2018-02-23 17:18:09,018] {models.py:4584} INFO - Updating state for considering 2 task(s) [2018-02-23 17:18:09,021] {models.py:1215} DEBUG - dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set. [2018-02-23 17:18:09,021] {models.py:1215} DEBUG - dependency 'Not In Retry Period' PASSED: True, The context specified that being in a retry period was permitted. [2018-02-23 17:18:09,027] {models.py:1215} DEBUG - dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'skipped': 0, 'successes': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0, 'total': 1}, upstream_task_ids=['wait_for_upstream_sla'] [2018-02-23 17:18:09,029] {models.py:4643} INFO - Deadlock; marking run failed [2018-02-23 17:18:09,045] {jobs.py:2125} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1 [2018-02-23 17:18:09,045] {jobs.py:2129} DEBUG - Finished dag run loop iteration. Remaining tasks [] [2018-02-23 17:18:09,045] {jobs.py:2160} DEBUG - *** Clearing out not_ready list *** [2018-02-23 17:18:09,048] {jobs.py:2180} DEBUG - Task instance to run state None [2018-02-23 17:18:09,049] {jobs.py:2186} WARNING - FIXME: task instance {} state was set to None externally. This should not happen [2018-02-23 17:18:09,053] {models.py:1215} DEBUG - dependency 'Task Instance State' PASSED: True, Task state scheduled was valid. [2018-02-23 17:18:09,053] {models.py:1215} DEBUG - dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set. [2018-02-23 17:18:09,056] {models.py:1215} DEBUG - dependency 'Task Concurrency' PASSED: True, Task concurrency is not set. [2018-02-23 17:18:09,056] {models.py:1215} DEBUG - dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying. [2018-02-23 17:18:09,061] {models.py:1215} DEBUG - dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all
[jira] [Updated] (AIRFLOW-2145) Deadlock after clearing a running task
[ https://issues.apache.org/jira/browse/AIRFLOW-2145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] George Roldugin updated AIRFLOW-2145: - Description: The essense of the issue is that whenever a currently running ask is cleared, the dagrun enters a deadlocked state and fails. We see this in production with Celery executors and {{TimeDeltaSensor}}, and I've been able to reproduce it locally with both {{TimeDeltaSensor}} and {{WebHDFSSensor}}. Here's the minimal example: {code:java} from datetime import datetime, timedelta import airflow from airflow.operators.sensors import TimeDeltaSensor from airflow.operators.dummy_operator import DummyOperator with airflow.DAG( 'foo', schedule_interval='@daily', start_date=datetime(2018, 1, 1)) as dag: wait_for_upstream_sla = TimeDeltaSensor( task_id="wait_for_upstream_sla", delta=timedelta(days=365*10) ) do_work = DummyOperator(task_id='do_work') dag >> wait_for_upstream_sla >> do_work {code} Sequence of actions, relevant DEBUG level logs, and some UI screenshots {code:java} airflow clear foo -e 2018-02-22 --no_confirm && airflow backfill foo -s 2018-02-22 -e 2018-02-22{code} {code:java} [2018-02-23 17:17:45,983] {__init__.py:45} INFO - Using executor SequentialExecutor [2018-02-23 17:17:46,069] {models.py:189} INFO - Filling up the DagBag from /Users/grol/Drive/dev/reporting/dags ... [2018-02-23 17:17:47,563] {jobs.py:2180} DEBUG - Task instance to run state scheduled ... {code} !image-2018-02-23-18-59-11-828.png|width=418,height=87! Now we clear all DAG's tasks externally: {code:java} airflow clear foo -e 2018-02-22 --no_confirm {code} This causes the following: {code:java} [2018-02-23 17:17:55,258] {base_task_runner.py:98} INFO - Subtask: [2018-02-23 17:17:55,258] {sensors.py:629} INFO - Checking if the time (2018-02-23 16:19:00) has come [2018-02-23 17:17:58,844] {jobs.py:184} DEBUG - [heart] Boom. [2018-02-23 17:18:03,848] {jobs.py:184} DEBUG - [heart] Boom. [2018-02-23 17:18:08,856] {jobs.py:2585} WARNING - State of this instance has been externally set to shutdown. Taking the poison pill. [2018-02-23 17:18:08,874] {helpers.py:266} DEBUG - There are no descendant processes to kill [2018-02-23 17:18:08,875] {jobs.py:184} DEBUG - [heart] Boom. [2018-02-23 17:18:08,900] {helpers.py:266} DEBUG - There are no descendant processes to kill [2018-02-23 17:18:08,922] {helpers.py:266} DEBUG - There are no descendant processes to kill [2018-02-23 17:18:09,005] {sequential_executor.py:47} ERROR - Failed to execute task Command 'airflow run foo wait_for_upstream_sla 2018-02-22T00:00:00 --local -sd DAGS_FOLDER/foo.py' returned non-zero exit status 1. [2018-02-23 17:18:09,012] {jobs.py:2004} DEBUG - Executor state: failed task [2018-02-23 17:18:09,018] {models.py:4584} INFO - Updating state for considering 2 task(s) [2018-02-23 17:18:09,021] {models.py:1215} DEBUG - dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set. [2018-02-23 17:18:09,021] {models.py:1215} DEBUG - dependency 'Not In Retry Period' PASSED: True, The context specified that being in a retry period was permitted. [2018-02-23 17:18:09,027] {models.py:1215} DEBUG - dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'skipped': 0, 'successes': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0, 'total': 1}, upstream_task_ids=['wait_for_upstream_sla'] [2018-02-23 17:18:09,029] {models.py:4643} INFO - Deadlock; marking run failed [2018-02-23 17:18:09,045] {jobs.py:2125} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1 [2018-02-23 17:18:09,045] {jobs.py:2129} DEBUG - Finished dag run loop iteration. Remaining tasks [] [2018-02-23 17:18:09,045] {jobs.py:2160} DEBUG - *** Clearing out not_ready list *** [2018-02-23 17:18:09,048] {jobs.py:2180} DEBUG - Task instance to run state None [2018-02-23 17:18:09,049] {jobs.py:2186} WARNING - FIXME: task instance {} state was set to None externally. This should not happen [2018-02-23 17:18:09,053] {models.py:1215} DEBUG - dependency 'Task Instance State' PASSED: True, Task state scheduled was valid. [2018-02-23 17:18:09,053] {models.py:1215} DEBUG - dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set. [2018-02-23 17:18:09,056] {models.py:1215} DEBUG - dependency 'Task Concurrency' PASSED: True, Task concurrency is not set. [2018-02-23 17:18:09,056] {models.py:1215} DEBUG - dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying. [2018-02-23 17:18:09,061] {models.py:1215} DEBUG - dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all