This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push: new 414bb20 Serve logs with Scheduler when using Local or Sequential Executor (#15557) 414bb20 is described below commit 414bb20fad6c6a50c5a209f6d81f5ca3d679b083 Author: Kaxil Naik <kaxiln...@gmail.com> AuthorDate: Thu Apr 29 16:06:06 2021 +0100 Serve logs with Scheduler when using Local or Sequential Executor (#15557) Currently, the `serve_logs` endpoint only exists on Celery workers. This means if someone launches Airflow with the `LocalExecutor` and wants to grab the logs from the scheduler, there is no way to move that to the webserver if it is on a different pod/machine. This commit makes the scheduler automatically serves logs when using `LocalExecutor` or `SequentialExecutor`. However, it means for Airflow <= 2.0.2, the Helm Chart won't serve logs. closes https://github.com/apache/airflow/pull/15070 closes https://github.com/apache/airflow/issues/13331 closes https://github.com/apache/airflow/issues/15071 closes https://github.com/apache/airflow/issues/14222 --- airflow/cli/cli_parser.py | 1 + airflow/cli/commands/scheduler_command.py | 22 +++++++ .../templates/scheduler/scheduler-deployment.yaml | 29 ++------- tests/cli/commands/test_scheduler_command.py | 72 ++++++++++++++++++++++ 4 files changed, 101 insertions(+), 23 deletions(-) diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index f3f0344..e1a5826 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -1517,6 +1517,7 @@ airflow_commands: List[CLICommand] = [ ARG_STDOUT, ARG_STDERR, ARG_LOG_FILE, + ARG_SKIP_SERVE_LOGS, ), epilog=( 'Signals:\n' diff --git a/airflow/cli/commands/scheduler_command.py b/airflow/cli/commands/scheduler_command.py index b66dafc..368db6f 100644 --- a/airflow/cli/commands/scheduler_command.py +++ b/airflow/cli/commands/scheduler_command.py @@ -17,6 +17,8 @@ """Scheduler command""" import signal +from multiprocessing import Process +from typing import Optional import daemon from daemon.pidfile import TimeoutPIDLockFile @@ -30,6 +32,8 @@ from airflow.utils.cli import process_subdir, setup_locations, setup_logging, si @cli_utils.action_logging def scheduler(args): """Starts Airflow Scheduler""" + skip_serve_logs = args.skip_serve_logs + print(settings.HEADER) job = SchedulerJob( subdir=process_subdir(args.subdir), @@ -50,9 +54,27 @@ def scheduler(args): stderr=stderr_handle, ) with ctx: + sub_proc = _serve_logs(skip_serve_logs) job.run() else: signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGTERM, sigint_handler) signal.signal(signal.SIGQUIT, sigquit_handler) + sub_proc = _serve_logs(skip_serve_logs) job.run() + + if sub_proc: + sub_proc.terminate() + + +def _serve_logs(skip_serve_logs: bool = False) -> Optional[Process]: + """Starts serve_logs sub-process""" + from airflow.configuration import conf + from airflow.utils.serve_logs import serve_logs + + if conf.get("core", "executor") in ["LocalExecutor", "SequentialExecutor"]: + if skip_serve_logs is False: + sub_proc = Process(target=serve_logs) + sub_proc.start() + return sub_proc + return None diff --git a/chart/templates/scheduler/scheduler-deployment.yaml b/chart/templates/scheduler/scheduler-deployment.yaml index 7e90442..9e452b3 100644 --- a/chart/templates/scheduler/scheduler-deployment.yaml +++ b/chart/templates/scheduler/scheduler-deployment.yaml @@ -141,6 +141,12 @@ spec: SchedulerJob.latest_heartbeat.desc()).limit(1).first() sys.exit(0 if job.is_alive() else 1) + {{- if and $local (not $elasticsearch) }} + # Serve logs if we're in local mode and we don't have elasticsearch enabled. + ports: + - name: worker-logs + containerPort: {{ .Values.ports.workerLogs }} + {{- end }} resources: {{ toYaml .Values.scheduler.resources | indent 12 }} volumeMounts: @@ -178,29 +184,6 @@ spec: volumeMounts: - name: logs mountPath: {{ template "airflow_logs" . }} -{{- if and $local (not $elasticsearch) }} - # Start the sidecar log server if we're in local mode and - # we don't have elasticsearch enabled. - - name: scheduler-logs - image: {{ template "airflow_image" . }} - imagePullPolicy: {{ .Values.images.airflow.pullPolicy }} - args: ["serve_logs"] - ports: - - name: worker-logs - containerPort: {{ .Values.ports.workerLogs }} - volumeMounts: - - name: logs - mountPath: {{ template "airflow_logs" . }} - - name: config - mountPath: {{ template "airflow_config_path" . }} - subPath: airflow.cfg - readOnly: true - envFrom: - {{- include "custom_airflow_environment_from" . | default "\n []" | indent 10 }} - env: - {{- include "custom_airflow_environment" . | indent 10 }} - {{- include "standard_airflow_environment" . | indent 10 }} -{{- end }} {{- if .Values.scheduler.extraContainers }} {{- toYaml .Values.scheduler.extraContainers | nindent 8 }} {{- end }} diff --git a/tests/cli/commands/test_scheduler_command.py b/tests/cli/commands/test_scheduler_command.py new file mode 100644 index 0000000..59bde70 --- /dev/null +++ b/tests/cli/commands/test_scheduler_command.py @@ -0,0 +1,72 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import unittest +from unittest import mock + +from parameterized import parameterized + +from airflow.cli import cli_parser +from airflow.cli.commands import scheduler_command +from airflow.utils.serve_logs import serve_logs +from tests.test_utils.config import conf_vars + + +class TestSchedulerCommand(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.parser = cli_parser.get_parser() + + @parameterized.expand( + [ + ("CeleryExecutor", False), + ("LocalExecutor", True), + ("SequentialExecutor", True), + ("KubernetesExecutor", False), + ] + ) + @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJob") + @mock.patch("airflow.cli.commands.scheduler_command.Process") + def test_serve_logs_on_scheduler( + self, + executor, + expect_serve_logs, + mock_process, + mock_scheduler_job, + ): + args = self.parser.parse_args(['scheduler']) + + with conf_vars({("core", "executor"): executor}): + scheduler_command.scheduler(args) + if expect_serve_logs: + mock_process.assert_called_once_with(target=serve_logs) + else: + mock_process.assert_not_called() + + @parameterized.expand( + [ + ("LocalExecutor",), + ("SequentialExecutor",), + ] + ) + @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJob") + @mock.patch("airflow.cli.commands.scheduler_command.Process") + def test_skip_serve_logs(self, executor, mock_process, mock_scheduler_job): + args = self.parser.parse_args(['scheduler', '--skip-serve-logs']) + with conf_vars({("core", "executor"): executor}): + scheduler_command.scheduler(args) + mock_process.assert_not_called()