This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new eca077b3b9 Get served logs when remote or executor logs not available for non-running task try (#39177) eca077b3b9 is described below commit eca077b3b994813a09942497704ec61e35efd7d5 Author: Kalle Ahlström <71292737+kahls...@users.noreply.github.com> AuthorDate: Thu Apr 25 16:19:37 2024 +0300 Get served logs when remote or executor logs not available for non-running task try (#39177) --- airflow/utils/log/file_task_handler.py | 12 ++++++------ tests/utils/test_log_handlers.py | 14 ++++++++++++-- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 72b8deedbb..2a1dfd25f6 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -366,10 +366,7 @@ class FileTaskHandler(logging.Handler): executor_messages: list[str] = [] executor_logs: list[str] = [] served_logs: list[str] = [] - is_running = ti.try_number == try_number and ti.state in ( - TaskInstanceState.RUNNING, - TaskInstanceState.DEFERRED, - ) + is_in_running_or_deferred = ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) with suppress(NotImplementedError): remote_messages, remote_logs = self._read_remote_logs(ti, try_number, metadata) messages_list.extend(remote_messages) @@ -384,7 +381,9 @@ class FileTaskHandler(logging.Handler): worker_log_full_path = Path(self.local_base, worker_log_rel_path) local_messages, local_logs = self._read_from_local(worker_log_full_path) messages_list.extend(local_messages) - if is_running and not executor_messages: + if is_in_running_or_deferred and not executor_messages and not remote_logs: + # While task instance is still running and we don't have either executor nor remote logs, look for served logs + # This is for cases when users have not setup remote logging nor shared drive for logs served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) messages_list.extend(served_messages) elif ti.state not in State.unfinished and not (local_logs or remote_logs): @@ -404,11 +403,12 @@ class FileTaskHandler(logging.Handler): ) log_pos = len(logs) messages = "".join([f"*** {x}\n" for x in messages_list]) + end_of_log = ti.try_number != try_number or not is_in_running_or_deferred if metadata and "log_pos" in metadata: previous_chars = metadata["log_pos"] logs = logs[previous_chars:] # Cut off previously passed log test as new tail out_message = logs if "log_pos" in (metadata or {}) else messages + logs - return out_message, {"end_of_log": not is_running, "log_pos": log_pos} + return out_message, {"end_of_log": end_of_log, "log_pos": log_pos} @staticmethod def _get_pod_namespace(ti: TaskInstance): diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index ba849825d5..2bfc574b64 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -315,7 +315,7 @@ class TestFileTaskLogHandler: def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instance): """Test for executors which do not have `get_task_log` method, it fallbacks to reading - log from worker. But it happens only for the latest try_number.""" + log from worker if and only if remote logs aren't found""" executor_name = "CeleryExecutor" ti = create_task_instance( @@ -336,7 +336,17 @@ class TestFileTaskLogHandler: fth._read_from_logs_server.assert_called_once() assert actual == ("*** this message\nthis\nlog\ncontent", {"end_of_log": False, "log_pos": 16}) - # Previous try_number is from remote logs without reaching worker server + # Previous try_number should return served logs when remote logs aren't implemented + fth._read_from_logs_server = mock.Mock() + fth._read_from_logs_server.return_value = ["served logs try_number=1"], ["this\nlog\ncontent"] + actual = fth._read(ti=ti, try_number=1) + fth._read_from_logs_server.assert_called_once() + assert actual == ( + "*** served logs try_number=1\nthis\nlog\ncontent", + {"end_of_log": True, "log_pos": 16}, + ) + + # When remote_logs is implemented, previous try_number is from remote logs without reaching worker server fth._read_from_logs_server.reset_mock() fth._read_remote_logs = mock.Mock() fth._read_remote_logs.return_value = ["remote logs"], ["remote\nlog\ncontent"]