This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c866f49e5 recovery message (#34457)
6c866f49e5 is described below

commit 6c866f49e536051d603c5bc20bc27308cf3804c1
Author: Bowrna <mailbow...@gmail.com>
AuthorDate: Sat Apr 6 05:00:27 2024 +0530

    recovery message (#34457)
---
 airflow/jobs/job.py                   | 47 +++++++++++++++++++----------------
 airflow/serialization/pydantic/job.py |  9 +++----
 tests/jobs/test_base_job.py           | 10 +++++---
 3 files changed, 36 insertions(+), 30 deletions(-)

diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py
index 99ca45a3e9..b1e41499dc 100644
--- a/airflow/jobs/job.py
+++ b/airflow/jobs/job.py
@@ -17,7 +17,7 @@
 # under the License.
 from __future__ import annotations
 
-from functools import cached_property
+from functools import cached_property, lru_cache
 from time import sleep
 from typing import TYPE_CHECKING, Callable, NoReturn
 
@@ -56,6 +56,19 @@ def _resolve_dagrun_model():
     return DagRun
 
 
+@lru_cache
+def health_check_threshold(job_type: str, heartrate: int) -> int | float:
+    grace_multiplier = 2.1
+    health_check_threshold_value: int | float
+    if job_type == "SchedulerJob":
+        health_check_threshold_value = conf.getint("scheduler", 
"scheduler_health_check_threshold")
+    elif job_type == "TriggererJob":
+        health_check_threshold_value = conf.getfloat("triggerer", 
"triggerer_health_check_threshold")
+    else:
+        health_check_threshold_value = heartrate * grace_multiplier
+    return health_check_threshold_value
+
+
 class Job(Base, LoggingMixin):
     """
     The ORM class representing Job stored in the database.
@@ -112,6 +125,7 @@ class Job(Base, LoggingMixin):
             self.executor = executor
         self.start_date = timezone.utcnow()
         self.latest_heartbeat = timezone.utcnow()
+        self.previous_heartbeat = None
         if heartrate is not None:
             self.heartrate = heartrate
         self.unixname = getuser()
@@ -131,22 +145,18 @@ class Job(Base, LoggingMixin):
     def heartrate(self) -> float:
         return Job._heartrate(self.job_type)
 
-    def is_alive(self, grace_multiplier=2.1) -> bool:
+    def is_alive(self) -> bool:
         """
         Is this job currently alive.
 
         We define alive as in a state of RUNNING, and having sent a heartbeat
         within a multiple of the heartrate (default of 2.1)
-
-        :param grace_multiplier: multiplier of heartrate to require heart beat
-            within
         """
+        threshold_value = health_check_threshold(self.job_type, self.heartrate)
         return Job._is_alive(
-            job_type=self.job_type,
-            heartrate=self.heartrate,
             state=self.state,
+            health_check_threshold_value=threshold_value,
             latest_heartbeat=self.latest_heartbeat,
-            grace_multiplier=grace_multiplier,
         )
 
     @provide_session
@@ -206,16 +216,20 @@ class Job(Base, LoggingMixin):
 
             job = Job._update_heartbeat(job=self, session=session)
             self._merge_from(job)
-
+            time_since_last_heartbeat = (timezone.utcnow() - 
previous_heartbeat).total_seconds()
+            health_check_threshold_value = 
health_check_threshold(self.job_type, self.heartrate)
+            if time_since_last_heartbeat > health_check_threshold_value:
+                self.log.info("Heartbeat recovered after %.2f seconds", 
time_since_last_heartbeat)
             # At this point, the DB has updated.
             previous_heartbeat = self.latest_heartbeat
 
             heartbeat_callback(session)
             self.log.debug("[heartbeat]")
+            self.heartbeat_failed = False
         except OperationalError:
             Stats.incr(convert_camel_to_snake(self.__class__.__name__) + 
"_heartbeat_failure", 1, 1)
             if not self.heartbeat_failed:
-                self.log.exception("%s heartbeat got an exception", 
self.__class__.__name__)
+                self.log.exception("%s heartbeat failed with error", 
self.__class__.__name__)
                 self.heartbeat_failed = True
             if self.is_alive():
                 self.log.error(
@@ -278,22 +292,13 @@ class Job(Base, LoggingMixin):
 
     @staticmethod
     def _is_alive(
-        job_type: str | None,
-        heartrate: float,
         state: JobState | str | None,
+        health_check_threshold_value: float | int,
         latest_heartbeat: datetime.datetime,
-        grace_multiplier: float = 2.1,
     ) -> bool:
-        health_check_threshold: float
-        if job_type == "SchedulerJob":
-            health_check_threshold = conf.getint("scheduler", 
"scheduler_health_check_threshold")
-        elif job_type == "TriggererJob":
-            health_check_threshold = conf.getint("triggerer", 
"triggerer_health_check_threshold")
-        else:
-            health_check_threshold = heartrate * grace_multiplier
         return (
             state == JobState.RUNNING
-            and (timezone.utcnow() - latest_heartbeat).total_seconds() < 
health_check_threshold
+            and (timezone.utcnow() - latest_heartbeat).total_seconds() < 
health_check_threshold_value
         )
 
     @staticmethod
diff --git a/airflow/serialization/pydantic/job.py 
b/airflow/serialization/pydantic/job.py
index 7aec389ba9..bea8e9a3af 100644
--- a/airflow/serialization/pydantic/job.py
+++ b/airflow/serialization/pydantic/job.py
@@ -42,6 +42,7 @@ class JobPydantic(BaseModelPydantic):
     executor_class: Optional[str]
     hostname: Optional[str]
     unixname: Optional[str]
+    grace_multiplier: float = 2.1
 
     model_config = ConfigDict(from_attributes=True)
 
@@ -57,14 +58,12 @@ class JobPydantic(BaseModelPydantic):
             assert self.job_type is not None
         return Job._heartrate(self.job_type)
 
-    def is_alive(self, grace_multiplier=2.1) -> bool:
+    def is_alive(self) -> bool:
         """Is this job currently alive."""
-        from airflow.jobs.job import Job
+        from airflow.jobs.job import Job, health_check_threshold
 
         return Job._is_alive(
-            job_type=self.job_type,
-            heartrate=self.heartrate,
             state=self.state,
+            health_check_threshold_value=health_check_threshold(self.job_type, 
self.heartrate),
             latest_heartbeat=self.latest_heartbeat,
-            grace_multiplier=grace_multiplier,
         )
diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py
index a7eb4932bf..fcbb84fef7 100644
--- a/tests/jobs/test_base_job.py
+++ b/tests/jobs/test_base_job.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import datetime
+import logging
 import sys
 from unittest.mock import ANY, Mock, patch
 
@@ -207,16 +208,17 @@ class TestJob:
         job.latest_heartbeat = timezone.utcnow() - 
datetime.timedelta(seconds=10)
         assert job.is_alive() is False, "Completed jobs even with recent 
heartbeat should not be alive"
 
-    def test_heartbeat_failed(self):
+    def test_heartbeat_failed(self, caplog):
         when = timezone.utcnow() - datetime.timedelta(seconds=60)
         mock_session = Mock(name="MockSession")
         mock_session.commit.side_effect = OperationalError("Force fail", {}, 
None)
         job = Job(heartrate=10, state=State.RUNNING)
         job.latest_heartbeat = when
-
-        job.heartbeat(heartbeat_callback=lambda: None, session=mock_session)
-
+        with caplog.at_level(logging.ERROR):
+            job.heartbeat(heartbeat_callback=lambda: None, 
session=mock_session)
+        assert "heartbeat failed with error" in caplog.text
         assert job.latest_heartbeat == when, "attribute not updated when 
heartbeat fails"
+        assert job.heartbeat_failed
 
     @conf_vars(
         {

Reply via email to