[ https://issues.apache.org/jira/browse/AIRFLOW-3418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711972#comment-16711972 ]
Gabriel Silk commented on AIRFLOW-3418: --------------------------------------- I'm seeing this issue as well, and I would re-iterate the criticality of this issue. It's currently breaking our production clusters. > Task stuck in running state, unable to clear > -------------------------------------------- > > Key: AIRFLOW-3418 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3418 > Project: Apache Airflow > Issue Type: Bug > Components: worker > Affects Versions: 1.10.1 > Reporter: James Meickle > Priority: Critical > > One of our tasks (a custom operator that sleep-waits until NYSE market close) > got stuck in a "running" state in the metadata db without making any > progress. This is what it looked like in the logs: > {code:java} > [2018-11-29 00:01:14,064] {{base_task_runner.py:101}} INFO - Job 38275: > Subtask after_close [2018-11-29 00:01:14,063] {{cli.py:484}} INFO - Running > <TaskInstance: reconciliation_filemover.after_close 2018-11-28T00:00:00+00:00 > [running]> on host airflow-core-i-0a53cac37067d957d.dlg.fnd.dynoquant.com > [2018-11-29 06:03:57,643] {{models.py:1355}} INFO - Dependencies not met for > <TaskInstance: reconciliation_filemover.after_close 2018-11-28T00:00:00+00:00 > [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' > state which is not a valid state for execution. The task must be cleared in > order to be run. > [2018-11-29 06:03:57,644] {{models.py:1355}} INFO - Dependencies not met for > <TaskInstance: reconciliation_filemover.after_close 2018-11-28T00:00:00+00:00 > [running]>, dependency 'Task Instance Not Already Running' FAILED: Task is > already running, it started on 2018-11-29 00:01:10.876344+00:00. > [2018-11-29 06:03:57,646] {{logging_mixin.py:95}} INFO - [2018-11-29 > 06:03:57,646] {{jobs.py:2614}} INFO - Task is not able to be run > {code} > Seeing this state, we attempted to "clear" it in the web UI. This yielded a > complex backtrace: > {code:java} > Traceback (most recent call last): > File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", > line 1982, in wsgi_app > response = self.full_dispatch_request() > File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", > line 1614, in full_dispatch_request > rv = self.handle_user_exception(e) > File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", > line 1517, in handle_user_exception > reraise(exc_type, exc_value, tb) > File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/_compat.py", > line 33, in reraise > raise value > File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", > line 1612, in full_dispatch_request > rv = self.dispatch_request() > File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", > line 1598, in dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask_appbuilder/security/decorators.py", > line 26, in wraps > return f(self, *args, **kwargs) > File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/www_rbac/decorators.py", > line 55, in wrapper > return f(*args, **kwargs) > File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/www_rbac/views.py", > line 837, in clear > include_upstream=upstream) > File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py", > line 4011, in sub_dag > dag = copy.deepcopy(self) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 166, > in deepcopy > y = copier(memo) > File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py", > line 3996, in __deepcopy__ > setattr(result, k, copy.deepcopy(v, memo)) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, > in deepcopy > y = copier(x, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, > in _deepcopy_dict > y[deepcopy(key, memo)] = deepcopy(value, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 166, > in deepcopy > y = copier(memo) > File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py", > line 2740, in __deepcopy__ > setattr(result, k, copy.deepcopy(v, memo)) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, > in deepcopy > y = _reconstruct(x, rv, 1, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, > in _reconstruct > state = deepcopy(state, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, > in deepcopy > y = copier(x, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, > in _deepcopy_dict > y[deepcopy(key, memo)] = deepcopy(value, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, > in deepcopy > y = _reconstruct(x, rv, 1, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, > in _reconstruct > state = deepcopy(state, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, > in deepcopy > y = copier(x, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, > in _deepcopy_dict > y[deepcopy(key, memo)] = deepcopy(value, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, > in deepcopy > y = _reconstruct(x, rv, 1, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, > in _reconstruct > state = deepcopy(state, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, > in deepcopy > y = copier(x, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, > in _deepcopy_dict > y[deepcopy(key, memo)] = deepcopy(value, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, > in deepcopy > y = copier(x, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, > in _deepcopy_dict > y[deepcopy(key, memo)] = deepcopy(value, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, > in deepcopy > y = _reconstruct(x, rv, 1, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, > in _reconstruct > state = deepcopy(state, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, > in deepcopy > y = copier(x, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, > in _deepcopy_dict > y[deepcopy(key, memo)] = deepcopy(value, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, > in deepcopy > y = copier(x, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 218, > in _deepcopy_list > y.append(deepcopy(a, memo)) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, > in deepcopy > y = _reconstruct(x, rv, 1, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, > in _reconstruct > state = deepcopy(state, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, > in deepcopy > y = copier(x, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, > in _deepcopy_dict > y[deepcopy(key, memo)] = deepcopy(value, memo) > File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 174, > in deepcopy > rv = reductor(4) > TypeError: cannot serialize '_io.TextIOWrapper' object > {code} > After browsing through Airflow's code I had a suspicion that this was simply > the "clear" code in the UI not handling some property on one of our > operators. I instead used the Browse feature to edit the metadata state db > directly. This did result in the status change; in the task being set to > "up_for_retry", and the same logfile now having additional contents: > {code:java} > [2018-11-29 14:18:11,390] {{logging_mixin.py:95}} INFO - [2018-11-29 > 14:18:11,390] {{jobs.py:2695}} WARNING - State of this instance has been > externally set to failed. Taking the poison pill. > [2018-11-29 14:18:11,399] {{helpers.py:240}} INFO - Sending Signals.SIGTERM > to GPID 5287 > [2018-11-29 14:18:11,399] {{models.py:1636}} ERROR - Received SIGTERM. > Terminating subprocesses. > [2018-11-29 14:18:11,418] {{models.py:1760}} ERROR - Task received SIGTERM > signal > Traceback (most recent call last): > File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py", > line 1654, in _run_raw_task > result = task_copy.execute(context=context) > File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/sensors/base_sensor_operator.py", > line 78, in execute > sleep(self.poke_interval) > File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py", > line 1638, in signal_handler > raise AirflowException("Task received SIGTERM signal") > airflow.exceptions.AirflowException: Task received SIGTERM signal > [2018-11-29 14:18:11,420] {{models.py:1783}} INFO - Marking task as > UP_FOR_RETRY > [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: > Subtask after_close Traceback (most recent call last): > [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: > Subtask after_close File "/home/airflow/virtualenvs/airflow/bin/airflow", > line 32, in <module> > [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: > Subtask after_close args.func(args) > [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: > Subtask after_close File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/utils/cli.py", > line 74, in wrapper > [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: > Subtask after_close return f(*args, **kwargs) > [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: > Subtask after_close File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/bin/cli.py", > line 490, in run > [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: > Subtask after_close _run(args, dag, ti) > [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: > Subtask after_close File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/bin/cli.py", > line 406, in _run > [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: > Subtask after_close pool=args.pool, > [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: > Subtask after_close File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/utils/db.py", > line 74, in wrapper > [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: > Subtask after_close return func(*args, **kwargs) > [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: > Subtask after_close File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py", > line 1654, in _run_raw_task > [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: > Subtask after_close result = task_copy.execute(context=context) > [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: > Subtask after_close File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/sensors/base_sensor_operator.py", > line 78, in execute > [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: > Subtask after_close sleep(self.poke_interval) > [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: > Subtask after_close File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py", > line 1638, in signal_handler > [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: > Subtask after_close raise AirflowException("Task received SIGTERM signal") > [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: > Subtask after_close airflow.exceptions.AirflowException: Task received > SIGTERM signal > [2018-11-29 14:18:11,693] {{helpers.py:230}} INFO - Process > psutil.Process(pid=5287 (terminated)) (5287) terminated with exit code 1 > [2018-11-29 14:18:11,694] {{logging_mixin.py:95}} INFO - [2018-11-29 > 14:18:11,693] {{jobs.py:2627}} INFO - Task exited with return code 0 > {code} > The log line about "not able to be run" comes from jobs.py and it's unclear > to me why this would be called in this case (two workers grabbing the same > message...?) or why the task would just hang in a "running" state: > https://github.com/apache/incubator-airflow/blob/1.10.1/airflow/jobs.py#L2614 > We had not previously observed any of this behavior. We had just upgraded to > 1.10.1 earlier this week. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)