[2/4] incubator-airflow git commit: [AIRFLOW-779] Task should fail with specific message when deleted

2017-01-20 Thread bolke
[AIRFLOW-779] Task should fail with specific message when deleted

Testing Done:
- Killed a task while it was running using the
task instances UI, verified behavior is the same
as before, and logging worked

Closes #2006 from saguziel/aguziel-terminate-
nonexistent


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/92215875
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/92215875
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/92215875

Branch: refs/heads/v1-8-test
Commit: 9221587514e2a0155cdced2d3ae50129b0793a10
Parents: 241fd27
Author: Alex Guziel 
Authored: Fri Jan 20 14:32:29 2017 -0800
Committer: Dan Davydov 
Committed: Fri Jan 20 14:32:31 2017 -0800

--
 airflow/jobs.py | 16 +---
 1 file changed, 5 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/92215875/airflow/jobs.py
--
diff --git a/airflow/jobs.py b/airflow/jobs.py
index f1de333..350c6d4 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -2016,10 +2016,6 @@ class LocalTaskJob(BaseJob):
 self.pickle_id = pickle_id
 self.mark_success = mark_success
 
-# terminating state is used so that a job don't try to
-# terminate multiple times
-self.terminating = False
-
 # Keeps track of the fact that the task instance has been observed
 # as running at least once
 self.was_running = False
@@ -2083,17 +2079,16 @@ class LocalTaskJob(BaseJob):
 def heartbeat_callback(self, session=None):
 """Self destruct task if state has been moved away from running 
externally"""
 
-if self.terminating:
-# task is already terminating, let it breathe
-return
-
 # Suicide pill
 TI = models.TaskInstance
 ti = self.task_instance
 new_ti = session.query(TI).filter(
 TI.dag_id == ti.dag_id, TI.task_id == ti.task_id,
 TI.execution_date == ti.execution_date).scalar()
-if new_ti.state == State.RUNNING:
+if new_ti is None:
+logging.warning("Task instance does not exist in DB. Terminating")
+raise AirflowException("Task instance does not exist in DB")
+elif new_ti.state == State.RUNNING:
 self.was_running = True
 fqdn = socket.getfqdn()
 if not (fqdn == new_ti.hostname and
@@ -2110,5 +2105,4 @@ class LocalTaskJob(BaseJob):
 "State of this instance has been externally set to "
 "{self.task_instance.state}. "
 "Taking the poison pill. So long.".format(**locals()))
-self.task_runner.terminate()
-self.terminating = True
+raise AirflowException("Task instance state has been changed 
externally")



incubator-airflow git commit: [AIRFLOW-779] Task should fail with specific message when deleted

2017-01-20 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 241fd2709 -> 922158751


[AIRFLOW-779] Task should fail with specific message when deleted

Testing Done:
- Killed a task while it was running using the
task instances UI, verified behavior is the same
as before, and logging worked

Closes #2006 from saguziel/aguziel-terminate-
nonexistent


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/92215875
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/92215875
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/92215875

Branch: refs/heads/master
Commit: 9221587514e2a0155cdced2d3ae50129b0793a10
Parents: 241fd27
Author: Alex Guziel 
Authored: Fri Jan 20 14:32:29 2017 -0800
Committer: Dan Davydov 
Committed: Fri Jan 20 14:32:31 2017 -0800

--
 airflow/jobs.py | 16 +---
 1 file changed, 5 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/92215875/airflow/jobs.py
--
diff --git a/airflow/jobs.py b/airflow/jobs.py
index f1de333..350c6d4 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -2016,10 +2016,6 @@ class LocalTaskJob(BaseJob):
 self.pickle_id = pickle_id
 self.mark_success = mark_success
 
-# terminating state is used so that a job don't try to
-# terminate multiple times
-self.terminating = False
-
 # Keeps track of the fact that the task instance has been observed
 # as running at least once
 self.was_running = False
@@ -2083,17 +2079,16 @@ class LocalTaskJob(BaseJob):
 def heartbeat_callback(self, session=None):
 """Self destruct task if state has been moved away from running 
externally"""
 
-if self.terminating:
-# task is already terminating, let it breathe
-return
-
 # Suicide pill
 TI = models.TaskInstance
 ti = self.task_instance
 new_ti = session.query(TI).filter(
 TI.dag_id == ti.dag_id, TI.task_id == ti.task_id,
 TI.execution_date == ti.execution_date).scalar()
-if new_ti.state == State.RUNNING:
+if new_ti is None:
+logging.warning("Task instance does not exist in DB. Terminating")
+raise AirflowException("Task instance does not exist in DB")
+elif new_ti.state == State.RUNNING:
 self.was_running = True
 fqdn = socket.getfqdn()
 if not (fqdn == new_ti.hostname and
@@ -2110,5 +2105,4 @@ class LocalTaskJob(BaseJob):
 "State of this instance has been externally set to "
 "{self.task_instance.state}. "
 "Taking the poison pill. So long.".format(**locals()))
-self.task_runner.terminate()
-self.terminating = True
+raise AirflowException("Task instance state has been changed 
externally")