Repository: incubator-airflow Updated Branches: refs/heads/master 392772326 -> e6ef06c53
[AIRFLOW-1385] Create abstraction for Airflow task logging This PR adds abilities to provide customized implementations of airflow task logging. It creates an abstraction for setting up, cleaning up and get task instance logs. This change is primarily a refactor of logging logic. It is tested locally with custom logging implementations. Closes #2422 from AllisonWang/allison--log-handler Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e6ef06c5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e6ef06c5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e6ef06c5 Branch: refs/heads/master Commit: e6ef06c53fd4449db6e665cce5cad9418dde232f Parents: 3927723 Author: AllisonWang <allisonwang...@gmail.com> Authored: Thu Jul 20 18:03:23 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Thu Jul 20 18:03:26 2017 -0700 ---------------------------------------------------------------------- airflow/bin/cli.py | 111 +++---------- airflow/config_templates/__init__.py | 13 ++ airflow/config_templates/default_airflow.cfg | 6 + .../config_templates/default_airflow_logging.py | 73 +++++++++ airflow/settings.py | 13 +- airflow/task_runner/base_task_runner.py | 1 + airflow/utils/log/__init__.py | 13 ++ airflow/utils/log/file_task_handler.py | 158 +++++++++++++++++++ airflow/utils/log/s3_task_handler.py | 90 +++++++++++ airflow/utils/logging.py | 11 ++ airflow/www/views.py | 104 +++--------- 11 files changed, 416 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index f568d5d..11f415a 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -22,7 +22,6 @@ import os import socket import subprocess import textwrap -import warnings from importlib import import_module import argparse @@ -52,8 +51,6 @@ from airflow.models import (DagModel, DagBag, TaskInstance, Connection) from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS) from airflow.utils import db as db_utils -from airflow.utils import logging as logging_utils -from airflow.utils.file import mkdirs from airflow.www.app import cached_app from sqlalchemy import func @@ -327,55 +324,6 @@ def run(args, dag=None): settings.configure_vars() settings.configure_orm() - logging.root.handlers = [] - if args.raw: - # Output to STDOUT for the parent process to read and log - logging.basicConfig( - stream=sys.stdout, - level=settings.LOGGING_LEVEL, - format=settings.LOG_FORMAT) - else: - # Setting up logging to a file. - - # To handle log writing when tasks are impersonated, the log files need to - # be writable by the user that runs the Airflow command and the user - # that is impersonated. This is mainly to handle corner cases with the - # SubDagOperator. When the SubDagOperator is run, all of the operators - # run under the impersonated user and create appropriate log files - # as the impersonated user. However, if the user manually runs tasks - # of the SubDagOperator through the UI, then the log files are created - # by the user that runs the Airflow command. For example, the Airflow - # run command may be run by the `airflow_sudoable` user, but the Airflow - # tasks may be run by the `airflow` user. If the log files are not - # writable by both users, then it's possible that re-running a task - # via the UI (or vice versa) results in a permission error as the task - # tries to write to a log file created by the other user. - log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER')) - directory = log_base + "/{args.dag_id}/{args.task_id}".format(args=args) - # Create the log file and give it group writable permissions - # TODO(aoen): Make log dirs and logs globally readable for now since the SubDag - # operator is not compatible with impersonation (e.g. if a Celery executor is used - # for a SubDag operator and the SubDag operator has a different owner than the - # parent DAG) - if not os.path.exists(directory): - # Create the directory as globally writable using custom mkdirs - # as os.makedirs doesn't set mode properly. - mkdirs(directory, 0o775) - iso = args.execution_date.isoformat() - filename = "{directory}/{iso}".format(**locals()) - - if not os.path.exists(filename): - open(filename, "a").close() - os.chmod(filename, 0o666) - - logging.basicConfig( - filename=filename, - level=settings.LOGGING_LEVEL, - format=settings.LOG_FORMAT) - - hostname = socket.getfqdn() - logging.info("Running on host {}".format(hostname)) - if not args.pickle and not dag: dag = get_dag(args) elif not dag: @@ -391,8 +339,21 @@ def run(args, dag=None): ti = TaskInstance(task, args.execution_date) ti.refresh_from_db() + logger = logging.getLogger('airflow.task') + if args.raw: + logger = logging.getLogger('airflow.task.raw') + + for handler in logger.handlers: + try: + print("inside cli, setting up context") + handler.set_context(ti) + except AttributeError: + pass + + hostname = socket.getfqdn() + logger.info("Running on host {}".format(hostname)) + if args.local: - print("Logging into: " + filename) run_job = jobs.LocalTaskJob( task_instance=ti, mark_success=args.mark_success, @@ -450,43 +411,13 @@ def run(args, dag=None): if args.raw: return - # Force the log to flush, and set the handler to go back to normal so we - # don't continue logging to the task's log file. The flush is important - # because we subsequently read from the log to insert into S3 or Google - # cloud storage. - logging.root.handlers[0].flush() - logging.root.handlers = [] - - # store logs remotely - remote_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') - - # deprecated as of March 2016 - if not remote_base and conf.get('core', 'S3_LOG_FOLDER'): - warnings.warn( - 'The S3_LOG_FOLDER conf key has been replaced by ' - 'REMOTE_BASE_LOG_FOLDER. Your conf still works but please ' - 'update airflow.cfg to ensure future compatibility.', - DeprecationWarning) - remote_base = conf.get('core', 'S3_LOG_FOLDER') - - if os.path.exists(filename): - # read log and remove old logs to get just the latest additions - - with open(filename, 'r') as logfile: - log = logfile.read() - - remote_log_location = filename.replace(log_base, remote_base) - # S3 - if remote_base.startswith('s3:/'): - logging_utils.S3Log().write(log, remote_log_location) - # GCS - elif remote_base.startswith('gs:/'): - logging_utils.GCSLog().write(log, remote_log_location) - # Other - elif remote_base and remote_base != 'None': - logging.error( - 'Unsupported remote log location: {}'.format(remote_base)) - + # Force the log to flush. The flush is important because we + # subsequently read from the log to insert into S3 or Google + # cloud storage. Explicitly close the handler is needed in order + # to upload to remote storage services. + for handler in logger.handlers: + handler.flush() + handler.close() def task_failed_deps(args): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/config_templates/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/config_templates/__init__.py b/airflow/config_templates/__init__.py new file mode 100644 index 0000000..9d7677a --- /dev/null +++ b/airflow/config_templates/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/config_templates/default_airflow.cfg ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index ddd1ba8..77592d9 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -122,6 +122,12 @@ security = # values at runtime) unit_test_mode = False +# Logging configuration path +logging_config_path = airflow.logging.airflow_logging_config.AIRFLOW_LOGGING_CONFIG + +# Name of handler to read task instance logs +task_log_reader = airflow.task + [cli] # In what way should the cli access the API. The LocalClient will use the # database directly, while the json_client will use the api running on the http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/config_templates/default_airflow_logging.py ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_airflow_logging.py b/airflow/config_templates/default_airflow_logging.py new file mode 100644 index 0000000..241571f --- /dev/null +++ b/airflow/config_templates/default_airflow_logging.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow import configuration as conf + +# TODO: Logging format and level should be configured +# in this file instead of from airflow.cfg. Currently +# there are other log format and level configurations in +# settings.py and cli.py. + +LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper() +LOG_FORMAT = conf.get('core', 'log_format') + + +BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER') +# TODO: This should be specified as s3_remote and/or gcs_remote +REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') + +DEFAULT_LOGGING_CONFIG = { + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': { + 'airflow.task': { + 'format': LOG_FORMAT, + } + }, + 'handlers': { + 'console': { + 'class': 'logging.StreamHandler', + 'formatter': 'airflow.task', + 'stream': 'ext://sys.stdout' + }, + 'file.task': { + 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler', + 'formatter': 'airflow.task', + 'base_log_folder': BASE_LOG_FOLDER, + }, + 's3.task': { + 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler', + 'base_log_folder': BASE_LOG_FOLDER, + 'remote_base_log_folder': REMOTE_BASE_LOG_FOLDER, + 'formatter': 'airflow.task', + }, + }, + 'loggers': { + 'airflow.task': { + 'handlers': ['file.task'], + 'level': LOG_LEVEL, + 'propagate': False, + }, + 'airflow.task_runner': { + 'handlers': ['file.task'], + 'level': LOG_LEVEL, + 'propagate': True, + }, + 'airflow.task.raw': { + 'handlers': ['console'], + 'level': LOG_LEVEL, + 'propagate': False, + }, + } +} http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/settings.py ---------------------------------------------------------------------- diff --git a/airflow/settings.py b/airflow/settings.py index 3f7560d..7a61bce 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -18,6 +18,7 @@ from __future__ import print_function from __future__ import unicode_literals import logging +import logging.config import os import sys @@ -162,13 +163,23 @@ def configure_orm(disable_connection_pool=False): try: from airflow_local_settings import * logging.info("Loaded airflow_local_settings.") -except: +except Exception: pass configure_logging() configure_vars() configure_orm() +# TODO: Merge airflow logging configurations. +logging_config_path = conf.get('core', 'logging_config_path') +try: + from logging_config_path import LOGGING_CONFIG +except Exception: + # Import default logging configuration + from airflow.config_templates.default_airflow_logging import \ + DEFAULT_LOGGING_CONFIG as LOGGING_CONFIG +logging.config.dictConfig(LOGGING_CONFIG) + # Const stuff KILOBYTE = 1024 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/task_runner/base_task_runner.py ---------------------------------------------------------------------- diff --git a/airflow/task_runner/base_task_runner.py b/airflow/task_runner/base_task_runner.py index fa3fd6b..02404d1 100644 --- a/airflow/task_runner/base_task_runner.py +++ b/airflow/task_runner/base_task_runner.py @@ -36,6 +36,7 @@ class BaseTaskRunner(LoggingMixin): :type local_task_job: airflow.jobs.LocalTaskJob """ self._task_instance = local_task_job.task_instance + self.set_logger_contexts(self._task_instance) popen_prepend = [] cfg_path = None http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/utils/log/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/__init__.py b/airflow/utils/log/__init__.py new file mode 100644 index 0000000..9d7677a --- /dev/null +++ b/airflow/utils/log/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/utils/log/file_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py new file mode 100644 index 0000000..94588dd --- /dev/null +++ b/airflow/utils/log/file_task_handler.py @@ -0,0 +1,158 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os + +from airflow import configuration as conf +from airflow.configuration import AirflowConfigException +from airflow.utils.file import mkdirs + + +class FileTaskHandler(logging.Handler): + """ + FileTaskHandler is a python log handler that handles and reads + task instance logs. It creates and delegates log handling + to `logging.FileHandler` after receiving task instance context. + It reads logs from task instance's host machine. + """ + + def __init__(self, base_log_folder): + super(FileTaskHandler, self).__init__() + self.handler = None + self.local_base = os.path.expanduser(base_log_folder) + + def set_context(self, task_instance): + """ + Provide task_instance context to airflow task handler. + :param task_instance: task instance object + """ + self._init_file(task_instance) + local_loc = self.get_local_loc(task_instance) + self.handler = logging.FileHandler(local_loc) + self.handler.setFormatter(self.formatter) + self.handler.setLevel(self.level) + + def emit(self, record): + if self.handler: + self.handler.emit(record) + + def flush(self): + if self.handler: + self.handler.flush() + + def close(self): + if self.handler: + self.handler.close() + + def read(self, task_instance): + """ + Read log of given task instance from task instance host server. + :param task_instance: task instance database record + """ + log = "" + # Task instance here might be different from task instance when + # initializing the handler. Thus explicitly getting log location + # is needed to get correct log path. + loc = self.get_local_loc(task_instance) + + if os.path.exists(loc): + try: + with open(loc) as f: + log += "*** Reading local log.\n" + "".join(f.readlines()) + except Exception: + log = "*** Failed to load local log file: {0}.\n".format(loc) + else: + url = os.path.join("http://{ti.hostname}:{worker_log_server_port}/log", + self.get_log_relative_path(task_instance)).format( + ti=task_instance, + worker_log_server_port=conf.get('celery', 'WORKER_LOG_SERVER_PORT')) + log += "*** Log file isn't local.\n" + log += "*** Fetching here: {url}\n".format(**locals()) + try: + import requests + timeout = None # No timeout + try: + timeout = conf.getint('webserver', 'log_fetch_timeout_sec') + except (AirflowConfigException, ValueError): + pass + + response = requests.get(url, timeout=timeout) + response.raise_for_status() + log += '\n' + response.text + except Exception: + log += "*** Failed to fetch log file from worker.\n".format( + **locals()) + return log + + def get_log_relative_dir(self, ti): + """ + Get relative log file directory. + :param ti: task instance object + """ + return "{}/{}".format(ti.dag_id, ti.task_id) + + def get_log_relative_path(self, ti): + """ + Get relative log file path. + :param ti: task instance object + """ + directory = self.get_log_relative_dir(ti) + filename = "{}.log".format(ti.execution_date.isoformat()) + return os.path.join(directory, filename) + + def get_local_loc(self, ti): + """ + Get full local log path given task instance object. + :param ti: task instance object + """ + log_relative_path = self.get_log_relative_path(ti) + return os.path.join(self.local_base, log_relative_path) + + def _init_file(self, ti): + """ + Create log directory and give it correct permissions. + :param ti: task instance object + :return relative log path of the given task instance + """ + # To handle log writing when tasks are impersonated, the log files need to + # be writable by the user that runs the Airflow command and the user + # that is impersonated. This is mainly to handle corner cases with the + # SubDagOperator. When the SubDagOperator is run, all of the operators + # run under the impersonated user and create appropriate log files + # as the impersonated user. However, if the user manually runs tasks + # of the SubDagOperator through the UI, then the log files are created + # by the user that runs the Airflow command. For example, the Airflow + # run command may be run by the `airflow_sudoable` user, but the Airflow + # tasks may be run by the `airflow` user. If the log files are not + # writable by both users, then it's possible that re-running a task + # via the UI (or vice versa) results in a permission error as the task + # tries to write to a log file created by the other user. + relative_log_dir = self.get_log_relative_dir(ti) + directory = os.path.join(self.local_base, relative_log_dir) + # Create the log file and give it group writable permissions + # TODO(aoen): Make log dirs and logs globally readable for now since the SubDag + # operator is not compatible with impersonation (e.g. if a Celery executor is used + # for a SubDag operator and the SubDag operator has a different owner than the + # parent DAG) + if not os.path.exists(directory): + # Create the directory as globally writable using custom mkdirs + # as os.makedirs doesn't set mode properly. + mkdirs(directory, 0o775) + + local_loc = self.get_local_loc(ti) + + if not os.path.exists(local_loc): + open(local_loc, "a").close() + os.chmod(local_loc, 0o666) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/utils/log/s3_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py new file mode 100644 index 0000000..5c8ad7e --- /dev/null +++ b/airflow/utils/log/s3_task_handler.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import warnings + +from airflow import configuration as conf +from airflow.utils import logging as logging_utils +from airflow.utils.log.file_task_handler import FileTaskHandler + + +class S3TaskHandler(FileTaskHandler): + """ + S3TaskHandler is a python log handler that handles and reads + task instance logs. It extends airflow FileTaskHandler and + uploads to and reads from S3 remote storage. + """ + + def __init__(self, base_log_folder, remote_base_log_folder): + super(S3TaskHandler, self).__init__(base_log_folder) + self.remote_base = remote_base_log_folder + # deprecated as of March 2016 + if not self.remote_base and conf.get('core', 'S3_LOG_FOLDER'): + warnings.warn( + 'The S3_LOG_FOLDER conf key has been replaced by ' + 'REMOTE_BASE_LOG_FOLDER. Your conf still works but please ' + 'update airflow.cfg to ensure future compatibility.', + DeprecationWarning) + self.remote_base = conf.get('core', 'S3_LOG_FOLDER') + self.closed = False + + def set_context(self, task_instance): + super(S3TaskHandler, self).set_context(task_instance) + # Local location and remote location is needed to open and + # upload local log file to S3 remote storage. + self.local_loc = self.get_local_loc(task_instance) + self.remote_loc = os.path.join(self.remote_base, + self.get_log_relative_path(task_instance)) + + def close(self): + """ + Close and upload local log file to remote storage S3. + """ + # When application exit, system shuts down all handlers by + # calling close method. Here we check if logger is already + # closed to prevent uploading the log to remote storage multiple + # times when `logging.shutdown` is called. + if self.closed: + return + + super(S3TaskHandler, self).close() + + if os.path.exists(self.local_loc): + # read log and remove old logs to get just the latest additions + with open(self.local_loc, 'r') as logfile: + log = logfile.read() + logging_utils.S3Log().write(log, self.remote_loc) + + self.closed = True + + def read(self, task_instance): + """ + Read logs of given task instance from S3 remote storage. If failed, + read the log from local machine. + :param task_instance: task instance object + """ + log = "" + remote_loc = os.path.join(self.remote_base, self.get_log_relative_path(task_instance)) + # TODO: check if the remote_loc exist first instead of checking + # if remote_log here. This logic is going to modified once logs are split + # by try_number + remote_log = logging_utils.S3Log().read(remote_loc, return_error=True) + if remote_log: + log += ('*** Reading remote log from {}.\n{}\n'.format( + remote_loc, remote_log)) + else: + log = super(S3TaskHandler, self).read(task_instance) + + return log http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/utils/logging.py ---------------------------------------------------------------------- diff --git a/airflow/utils/logging.py b/airflow/utils/logging.py index 96767cb..2df3086 100644 --- a/airflow/utils/logging.py +++ b/airflow/utils/logging.py @@ -35,9 +35,20 @@ class LoggingMixin(object): try: return self._logger except AttributeError: + print(self.__class__.__module__ + '.' + self.__class__.__name__) self._logger = logging.root.getChild(self.__class__.__module__ + '.' + self.__class__.__name__) return self._logger + def set_logger_contexts(self, task_instance): + """ + Set the context for all handlers of the logger. + """ + for handler in self.logger.handlers: + try: + handler.set_context(task_instance) + except AttributeError: + pass + class S3Log(object): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/www/views.py ---------------------------------------------------------------------- diff --git a/airflow/www/views.py b/airflow/www/views.py index 6c39462..c429765 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -16,6 +16,7 @@ from past.builtins import basestring, unicode import ast +import logging import os import pkg_resources import socket @@ -72,12 +73,10 @@ from airflow.utils.json import json_ser from airflow.utils.state import State from airflow.utils.db import provide_session from airflow.utils.helpers import alchemy_to_dict -from airflow.utils import logging as log_utils from airflow.utils.dates import infer_time_unit, scale_time_units from airflow.www import utils as wwwutils from airflow.www.forms import DateTimeForm, DateTimeWithNumRunsForm from airflow.www.validators import GreaterEqualThan -from airflow.configuration import AirflowConfigException QUERY_LIMIT = 100000 CHART_LIMIT = 200000 @@ -698,99 +697,32 @@ class Airflow(BaseView): @login_required @wwwutils.action_logging def log(self): - BASE_LOG_FOLDER = os.path.expanduser( - conf.get('core', 'BASE_LOG_FOLDER')) dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') execution_date = request.args.get('execution_date') - dag = dagbag.get_dag(dag_id) - log_relative = "{dag_id}/{task_id}/{execution_date}".format( - **locals()) - loc = os.path.join(BASE_LOG_FOLDER, log_relative) - loc = loc.format(**locals()) - log = "" - TI = models.TaskInstance dttm = dateutil.parser.parse(execution_date) form = DateTimeForm(data={'execution_date': dttm}) - session = Session() - ti = session.query(TI).filter( - TI.dag_id == dag_id, TI.task_id == task_id, - TI.execution_date == dttm).first() + dag = dagbag.get_dag(dag_id) + session = Session() + ti = session.query(models.TaskInstance).filter( + models.TaskInstance.dag_id == dag_id, models.TaskInstance.task_id == task_id, + models.TaskInstance.execution_date == dttm).first() if ti is None: log = "*** Task instance did not exist in the DB\n" else: - # load remote logs - remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') - remote_log_loaded = False - if remote_log_base: - remote_log_path = os.path.join(remote_log_base, log_relative) - remote_log = "" - - # Only display errors reading the log if the task completed or ran at least - # once before (otherwise there won't be any remote log stored). - ti_execution_completed = ti.state in {State.SUCCESS, State.FAILED} - ti_ran_more_than_once = ti.try_number > 1 - surface_log_retrieval_errors = ( - ti_execution_completed or ti_ran_more_than_once) - - # S3 - if remote_log_path.startswith('s3:/'): - remote_log += log_utils.S3Log().read( - remote_log_path, return_error=surface_log_retrieval_errors) - remote_log_loaded = True - # GCS - elif remote_log_path.startswith('gs:/'): - remote_log += log_utils.GCSLog().read( - remote_log_path, return_error=surface_log_retrieval_errors) - remote_log_loaded = True - # unsupported - else: - remote_log += '*** Unsupported remote log location.' - - if remote_log: - log += ('*** Reading remote log from {}.\n{}\n'.format( - remote_log_path, remote_log)) - - # We only want to display the - # local logs while the task is running if a remote log configuration is set up - # since the logs will be transfered there after the run completes. - # TODO(aoen): One problem here is that if a task is running on a worker it - # already ran on, then duplicate logs will be printed for all of the previous - # runs of the task that already completed since they will have been printed as - # part of the remote log section above. This can be fixed either by streaming - # logs to the log servers as tasks are running, or by creating a proper - # abstraction for multiple task instance runs). - if not remote_log_loaded or ti.state == State.RUNNING: - if os.path.exists(loc): - try: - f = open(loc) - log += "*** Reading local log.\n" + "".join(f.readlines()) - f.close() - except: - log = "*** Failed to load local log file: {0}.\n".format(loc) - else: - WORKER_LOG_SERVER_PORT = \ - conf.get('celery', 'WORKER_LOG_SERVER_PORT') - url = os.path.join( - "http://{ti.hostname}:{WORKER_LOG_SERVER_PORT}/log", log_relative - ).format(**locals()) - log += "*** Log file isn't local.\n" - log += "*** Fetching here: {url}\n".format(**locals()) - try: - import requests - timeout = None # No timeout - try: - timeout = conf.getint('webserver', 'log_fetch_timeout_sec') - except (AirflowConfigException, ValueError): - pass - - response = requests.get(url, timeout=timeout) - response.raise_for_status() - log += '\n' + response.text - except: - log += "*** Failed to fetch log file from worker.\n".format( - **locals()) + logger = logging.getLogger('airflow.task') + task_log_reader = conf.get('core', 'task_log_reader') + for handler in logger.handlers: + print("handler name is {}".format(handler.name)) + handler = next((handler for handler in logger.handlers + if handler.name == task_log_reader), None) + try: + log = handler.read(ti) + except AttributeError as e: + log = "Task log handler {} does not support read logs.\n".format( + task_log_reader) + log += e.message if PY2 and not isinstance(log, unicode): log = log.decode('utf-8')