[ 
https://issues.apache.org/jira/browse/AIRFLOW-3418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711972#comment-16711972
 ] 

Gabriel Silk commented on AIRFLOW-3418:
---------------------------------------

I'm seeing this issue as well, and I would re-iterate the criticality of this 
issue. It's currently breaking our production clusters.

> Task stuck in running state, unable to clear
> --------------------------------------------
>
>                 Key: AIRFLOW-3418
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3418
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: worker
>    Affects Versions: 1.10.1
>            Reporter: James Meickle
>            Priority: Critical
>
> One of our tasks (a custom operator that sleep-waits until NYSE market close) 
> got stuck in a "running" state in the metadata db without making any 
> progress. This is what it looked like in the logs:
> {code:java}
> [2018-11-29 00:01:14,064] {{base_task_runner.py:101}} INFO - Job 38275: 
> Subtask after_close [2018-11-29 00:01:14,063] {{cli.py:484}} INFO - Running 
> <TaskInstance: reconciliation_filemover.after_close 2018-11-28T00:00:00+00:00 
> [running]> on host airflow-core-i-0a53cac37067d957d.dlg.fnd.dynoquant.com
> [2018-11-29 06:03:57,643] {{models.py:1355}} INFO - Dependencies not met for 
> <TaskInstance: reconciliation_filemover.after_close 2018-11-28T00:00:00+00:00 
> [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' 
> state which is not a valid state for execution. The task must be cleared in 
> order to be run.
> [2018-11-29 06:03:57,644] {{models.py:1355}} INFO - Dependencies not met for 
> <TaskInstance: reconciliation_filemover.after_close 2018-11-28T00:00:00+00:00 
> [running]>, dependency 'Task Instance Not Already Running' FAILED: Task is 
> already running, it started on 2018-11-29 00:01:10.876344+00:00.
> [2018-11-29 06:03:57,646] {{logging_mixin.py:95}} INFO - [2018-11-29 
> 06:03:57,646] {{jobs.py:2614}} INFO - Task is not able to be run
> {code}
> Seeing this state, we attempted to "clear" it in the web UI. This yielded a 
> complex backtrace:
> {code:java}
> Traceback (most recent call last):
>   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", 
> line 1982, in wsgi_app
>     response = self.full_dispatch_request()
>   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", 
> line 1614, in full_dispatch_request
>     rv = self.handle_user_exception(e)
>   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", 
> line 1517, in handle_user_exception
>     reraise(exc_type, exc_value, tb)
>   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/_compat.py",
>  line 33, in reraise
>     raise value
>   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", 
> line 1612, in full_dispatch_request
>     rv = self.dispatch_request()
>   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", 
> line 1598, in dispatch_request
>     return self.view_functions[rule.endpoint](**req.view_args)
>   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask_appbuilder/security/decorators.py",
>  line 26, in wraps
>     return f(self, *args, **kwargs)
>   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/www_rbac/decorators.py",
>  line 55, in wrapper
>     return f(*args, **kwargs)
>   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/www_rbac/views.py",
>  line 837, in clear
>     include_upstream=upstream)
>   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
>  line 4011, in sub_dag
>     dag = copy.deepcopy(self)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 166, 
> in deepcopy
>     y = copier(memo)
>   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
>  line 3996, in __deepcopy__
>     setattr(result, k, copy.deepcopy(v, memo))
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, 
> in deepcopy
>     y = copier(x, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, 
> in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 166, 
> in deepcopy
>     y = copier(memo)
>   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
>  line 2740, in __deepcopy__
>     setattr(result, k, copy.deepcopy(v, memo))
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, 
> in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, 
> in _reconstruct
>     state = deepcopy(state, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, 
> in deepcopy
>     y = copier(x, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, 
> in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, 
> in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, 
> in _reconstruct
>     state = deepcopy(state, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, 
> in deepcopy
>     y = copier(x, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, 
> in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, 
> in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, 
> in _reconstruct
>     state = deepcopy(state, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, 
> in deepcopy
>     y = copier(x, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, 
> in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, 
> in deepcopy
>     y = copier(x, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, 
> in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, 
> in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, 
> in _reconstruct
>     state = deepcopy(state, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, 
> in deepcopy
>     y = copier(x, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, 
> in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, 
> in deepcopy
>     y = copier(x, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 218, 
> in _deepcopy_list
>     y.append(deepcopy(a, memo))
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, 
> in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, 
> in _reconstruct
>     state = deepcopy(state, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, 
> in deepcopy
>     y = copier(x, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, 
> in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 174, 
> in deepcopy
>     rv = reductor(4)
> TypeError: cannot serialize '_io.TextIOWrapper' object
> {code}
> After browsing through Airflow's code I had a suspicion that this was simply 
> the "clear" code in the UI not handling some property on one of our 
> operators. I instead used the Browse feature to edit the metadata state db 
> directly. This did result in the status change; in the task being set to 
> "up_for_retry", and the same logfile now having additional contents:
> {code:java}
> [2018-11-29 14:18:11,390] {{logging_mixin.py:95}} INFO - [2018-11-29 
> 14:18:11,390] {{jobs.py:2695}} WARNING - State of this instance has been 
> externally set to failed. Taking the poison pill.
> [2018-11-29 14:18:11,399] {{helpers.py:240}} INFO - Sending Signals.SIGTERM 
> to GPID 5287
> [2018-11-29 14:18:11,399] {{models.py:1636}} ERROR - Received SIGTERM. 
> Terminating subprocesses.
> [2018-11-29 14:18:11,418] {{models.py:1760}} ERROR - Task received SIGTERM 
> signal
> Traceback (most recent call last):
>   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
>  line 1654, in _run_raw_task
>     result = task_copy.execute(context=context)
>   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/sensors/base_sensor_operator.py",
>  line 78, in execute
>     sleep(self.poke_interval)
>   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
>  line 1638, in signal_handler
>     raise AirflowException("Task received SIGTERM signal")
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-11-29 14:18:11,420] {{models.py:1783}} INFO - Marking task as 
> UP_FOR_RETRY
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: 
> Subtask after_close Traceback (most recent call last):
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: 
> Subtask after_close   File "/home/airflow/virtualenvs/airflow/bin/airflow", 
> line 32, in <module>
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: 
> Subtask after_close     args.func(args)
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: 
> Subtask after_close   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/utils/cli.py",
>  line 74, in wrapper
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: 
> Subtask after_close     return f(*args, **kwargs)
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: 
> Subtask after_close   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/bin/cli.py",
>  line 490, in run
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: 
> Subtask after_close     _run(args, dag, ti)
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: 
> Subtask after_close   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/bin/cli.py",
>  line 406, in _run
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: 
> Subtask after_close     pool=args.pool,
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: 
> Subtask after_close   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/utils/db.py",
>  line 74, in wrapper
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: 
> Subtask after_close     return func(*args, **kwargs)
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: 
> Subtask after_close   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
>  line 1654, in _run_raw_task
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: 
> Subtask after_close     result = task_copy.execute(context=context)
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: 
> Subtask after_close   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/sensors/base_sensor_operator.py",
>  line 78, in execute
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: 
> Subtask after_close     sleep(self.poke_interval)
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: 
> Subtask after_close   File 
> "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
>  line 1638, in signal_handler
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: 
> Subtask after_close     raise AirflowException("Task received SIGTERM signal")
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: 
> Subtask after_close airflow.exceptions.AirflowException: Task received 
> SIGTERM signal
> [2018-11-29 14:18:11,693] {{helpers.py:230}} INFO - Process 
> psutil.Process(pid=5287 (terminated)) (5287) terminated with exit code 1
> [2018-11-29 14:18:11,694] {{logging_mixin.py:95}} INFO - [2018-11-29 
> 14:18:11,693] {{jobs.py:2627}} INFO - Task exited with return code 0
> {code}
> The log line about "not able to be run" comes from jobs.py and it's unclear 
> to me why this would be called in this case (two workers grabbing the same 
> message...?) or why the task would just hang in a "running" state: 
> https://github.com/apache/incubator-airflow/blob/1.10.1/airflow/jobs.py#L2614
> We had not previously observed any of this behavior. We had just upgraded to 
> 1.10.1 earlier this week.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to