[2/4] incubator-airflow git commit: [AIRFLOW-779] Task should fail with specific message when deleted
[AIRFLOW-779] Task should fail with specific message when deleted Testing Done: - Killed a task while it was running using the task instances UI, verified behavior is the same as before, and logging worked Closes #2006 from saguziel/aguziel-terminate- nonexistent Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/92215875 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/92215875 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/92215875 Branch: refs/heads/v1-8-test Commit: 9221587514e2a0155cdced2d3ae50129b0793a10 Parents: 241fd27 Author: Alex GuzielAuthored: Fri Jan 20 14:32:29 2017 -0800 Committer: Dan Davydov Committed: Fri Jan 20 14:32:31 2017 -0800 -- airflow/jobs.py | 16 +--- 1 file changed, 5 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/92215875/airflow/jobs.py -- diff --git a/airflow/jobs.py b/airflow/jobs.py index f1de333..350c6d4 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -2016,10 +2016,6 @@ class LocalTaskJob(BaseJob): self.pickle_id = pickle_id self.mark_success = mark_success -# terminating state is used so that a job don't try to -# terminate multiple times -self.terminating = False - # Keeps track of the fact that the task instance has been observed # as running at least once self.was_running = False @@ -2083,17 +2079,16 @@ class LocalTaskJob(BaseJob): def heartbeat_callback(self, session=None): """Self destruct task if state has been moved away from running externally""" -if self.terminating: -# task is already terminating, let it breathe -return - # Suicide pill TI = models.TaskInstance ti = self.task_instance new_ti = session.query(TI).filter( TI.dag_id == ti.dag_id, TI.task_id == ti.task_id, TI.execution_date == ti.execution_date).scalar() -if new_ti.state == State.RUNNING: +if new_ti is None: +logging.warning("Task instance does not exist in DB. Terminating") +raise AirflowException("Task instance does not exist in DB") +elif new_ti.state == State.RUNNING: self.was_running = True fqdn = socket.getfqdn() if not (fqdn == new_ti.hostname and @@ -2110,5 +2105,4 @@ class LocalTaskJob(BaseJob): "State of this instance has been externally set to " "{self.task_instance.state}. " "Taking the poison pill. So long.".format(**locals())) -self.task_runner.terminate() -self.terminating = True +raise AirflowException("Task instance state has been changed externally")
incubator-airflow git commit: [AIRFLOW-779] Task should fail with specific message when deleted
Repository: incubator-airflow Updated Branches: refs/heads/master 241fd2709 -> 922158751 [AIRFLOW-779] Task should fail with specific message when deleted Testing Done: - Killed a task while it was running using the task instances UI, verified behavior is the same as before, and logging worked Closes #2006 from saguziel/aguziel-terminate- nonexistent Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/92215875 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/92215875 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/92215875 Branch: refs/heads/master Commit: 9221587514e2a0155cdced2d3ae50129b0793a10 Parents: 241fd27 Author: Alex GuzielAuthored: Fri Jan 20 14:32:29 2017 -0800 Committer: Dan Davydov Committed: Fri Jan 20 14:32:31 2017 -0800 -- airflow/jobs.py | 16 +--- 1 file changed, 5 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/92215875/airflow/jobs.py -- diff --git a/airflow/jobs.py b/airflow/jobs.py index f1de333..350c6d4 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -2016,10 +2016,6 @@ class LocalTaskJob(BaseJob): self.pickle_id = pickle_id self.mark_success = mark_success -# terminating state is used so that a job don't try to -# terminate multiple times -self.terminating = False - # Keeps track of the fact that the task instance has been observed # as running at least once self.was_running = False @@ -2083,17 +2079,16 @@ class LocalTaskJob(BaseJob): def heartbeat_callback(self, session=None): """Self destruct task if state has been moved away from running externally""" -if self.terminating: -# task is already terminating, let it breathe -return - # Suicide pill TI = models.TaskInstance ti = self.task_instance new_ti = session.query(TI).filter( TI.dag_id == ti.dag_id, TI.task_id == ti.task_id, TI.execution_date == ti.execution_date).scalar() -if new_ti.state == State.RUNNING: +if new_ti is None: +logging.warning("Task instance does not exist in DB. Terminating") +raise AirflowException("Task instance does not exist in DB") +elif new_ti.state == State.RUNNING: self.was_running = True fqdn = socket.getfqdn() if not (fqdn == new_ti.hostname and @@ -2110,5 +2105,4 @@ class LocalTaskJob(BaseJob): "State of this instance has been externally set to " "{self.task_instance.state}. " "Taking the poison pill. So long.".format(**locals())) -self.task_runner.terminate() -self.terminating = True +raise AirflowException("Task instance state has been changed externally")