This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 6c866f49e5 recovery message (#34457) 6c866f49e5 is described below commit 6c866f49e536051d603c5bc20bc27308cf3804c1 Author: Bowrna <mailbow...@gmail.com> AuthorDate: Sat Apr 6 05:00:27 2024 +0530 recovery message (#34457) --- airflow/jobs/job.py | 47 +++++++++++++++++++---------------- airflow/serialization/pydantic/job.py | 9 +++---- tests/jobs/test_base_job.py | 10 +++++--- 3 files changed, 36 insertions(+), 30 deletions(-) diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py index 99ca45a3e9..b1e41499dc 100644 --- a/airflow/jobs/job.py +++ b/airflow/jobs/job.py @@ -17,7 +17,7 @@ # under the License. from __future__ import annotations -from functools import cached_property +from functools import cached_property, lru_cache from time import sleep from typing import TYPE_CHECKING, Callable, NoReturn @@ -56,6 +56,19 @@ def _resolve_dagrun_model(): return DagRun +@lru_cache +def health_check_threshold(job_type: str, heartrate: int) -> int | float: + grace_multiplier = 2.1 + health_check_threshold_value: int | float + if job_type == "SchedulerJob": + health_check_threshold_value = conf.getint("scheduler", "scheduler_health_check_threshold") + elif job_type == "TriggererJob": + health_check_threshold_value = conf.getfloat("triggerer", "triggerer_health_check_threshold") + else: + health_check_threshold_value = heartrate * grace_multiplier + return health_check_threshold_value + + class Job(Base, LoggingMixin): """ The ORM class representing Job stored in the database. @@ -112,6 +125,7 @@ class Job(Base, LoggingMixin): self.executor = executor self.start_date = timezone.utcnow() self.latest_heartbeat = timezone.utcnow() + self.previous_heartbeat = None if heartrate is not None: self.heartrate = heartrate self.unixname = getuser() @@ -131,22 +145,18 @@ class Job(Base, LoggingMixin): def heartrate(self) -> float: return Job._heartrate(self.job_type) - def is_alive(self, grace_multiplier=2.1) -> bool: + def is_alive(self) -> bool: """ Is this job currently alive. We define alive as in a state of RUNNING, and having sent a heartbeat within a multiple of the heartrate (default of 2.1) - - :param grace_multiplier: multiplier of heartrate to require heart beat - within """ + threshold_value = health_check_threshold(self.job_type, self.heartrate) return Job._is_alive( - job_type=self.job_type, - heartrate=self.heartrate, state=self.state, + health_check_threshold_value=threshold_value, latest_heartbeat=self.latest_heartbeat, - grace_multiplier=grace_multiplier, ) @provide_session @@ -206,16 +216,20 @@ class Job(Base, LoggingMixin): job = Job._update_heartbeat(job=self, session=session) self._merge_from(job) - + time_since_last_heartbeat = (timezone.utcnow() - previous_heartbeat).total_seconds() + health_check_threshold_value = health_check_threshold(self.job_type, self.heartrate) + if time_since_last_heartbeat > health_check_threshold_value: + self.log.info("Heartbeat recovered after %.2f seconds", time_since_last_heartbeat) # At this point, the DB has updated. previous_heartbeat = self.latest_heartbeat heartbeat_callback(session) self.log.debug("[heartbeat]") + self.heartbeat_failed = False except OperationalError: Stats.incr(convert_camel_to_snake(self.__class__.__name__) + "_heartbeat_failure", 1, 1) if not self.heartbeat_failed: - self.log.exception("%s heartbeat got an exception", self.__class__.__name__) + self.log.exception("%s heartbeat failed with error", self.__class__.__name__) self.heartbeat_failed = True if self.is_alive(): self.log.error( @@ -278,22 +292,13 @@ class Job(Base, LoggingMixin): @staticmethod def _is_alive( - job_type: str | None, - heartrate: float, state: JobState | str | None, + health_check_threshold_value: float | int, latest_heartbeat: datetime.datetime, - grace_multiplier: float = 2.1, ) -> bool: - health_check_threshold: float - if job_type == "SchedulerJob": - health_check_threshold = conf.getint("scheduler", "scheduler_health_check_threshold") - elif job_type == "TriggererJob": - health_check_threshold = conf.getint("triggerer", "triggerer_health_check_threshold") - else: - health_check_threshold = heartrate * grace_multiplier return ( state == JobState.RUNNING - and (timezone.utcnow() - latest_heartbeat).total_seconds() < health_check_threshold + and (timezone.utcnow() - latest_heartbeat).total_seconds() < health_check_threshold_value ) @staticmethod diff --git a/airflow/serialization/pydantic/job.py b/airflow/serialization/pydantic/job.py index 7aec389ba9..bea8e9a3af 100644 --- a/airflow/serialization/pydantic/job.py +++ b/airflow/serialization/pydantic/job.py @@ -42,6 +42,7 @@ class JobPydantic(BaseModelPydantic): executor_class: Optional[str] hostname: Optional[str] unixname: Optional[str] + grace_multiplier: float = 2.1 model_config = ConfigDict(from_attributes=True) @@ -57,14 +58,12 @@ class JobPydantic(BaseModelPydantic): assert self.job_type is not None return Job._heartrate(self.job_type) - def is_alive(self, grace_multiplier=2.1) -> bool: + def is_alive(self) -> bool: """Is this job currently alive.""" - from airflow.jobs.job import Job + from airflow.jobs.job import Job, health_check_threshold return Job._is_alive( - job_type=self.job_type, - heartrate=self.heartrate, state=self.state, + health_check_threshold_value=health_check_threshold(self.job_type, self.heartrate), latest_heartbeat=self.latest_heartbeat, - grace_multiplier=grace_multiplier, ) diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py index a7eb4932bf..fcbb84fef7 100644 --- a/tests/jobs/test_base_job.py +++ b/tests/jobs/test_base_job.py @@ -18,6 +18,7 @@ from __future__ import annotations import datetime +import logging import sys from unittest.mock import ANY, Mock, patch @@ -207,16 +208,17 @@ class TestJob: job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=10) assert job.is_alive() is False, "Completed jobs even with recent heartbeat should not be alive" - def test_heartbeat_failed(self): + def test_heartbeat_failed(self, caplog): when = timezone.utcnow() - datetime.timedelta(seconds=60) mock_session = Mock(name="MockSession") mock_session.commit.side_effect = OperationalError("Force fail", {}, None) job = Job(heartrate=10, state=State.RUNNING) job.latest_heartbeat = when - - job.heartbeat(heartbeat_callback=lambda: None, session=mock_session) - + with caplog.at_level(logging.ERROR): + job.heartbeat(heartbeat_callback=lambda: None, session=mock_session) + assert "heartbeat failed with error" in caplog.text assert job.latest_heartbeat == when, "attribute not updated when heartbeat fails" + assert job.heartbeat_failed @conf_vars( {