Repository: incubator-airflow Updated Branches: refs/heads/master e6ef06c53 -> b9576d57b
Revert "[AIRFLOW-1385] Create abstraction for Airflow task logging" This reverts commit e6ef06c53fd4449db6e665cce5cad9418dde232f which was committed accidentally. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b9576d57 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b9576d57 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b9576d57 Branch: refs/heads/master Commit: b9576d57b6063908e488654f0b21b338c10069fd Parents: e6ef06c Author: Dan Davydov <dan.davy...@airbnb.com> Authored: Thu Jul 20 18:07:17 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Thu Jul 20 18:07:17 2017 -0700 ---------------------------------------------------------------------- airflow/bin/cli.py | 111 ++++++++++--- airflow/config_templates/__init__.py | 13 -- airflow/config_templates/default_airflow.cfg | 6 - .../config_templates/default_airflow_logging.py | 73 --------- airflow/settings.py | 13 +- airflow/task_runner/base_task_runner.py | 1 - airflow/utils/log/__init__.py | 13 -- airflow/utils/log/file_task_handler.py | 158 ------------------- airflow/utils/log/s3_task_handler.py | 90 ----------- airflow/utils/logging.py | 11 -- airflow/www/views.py | 104 +++++++++--- 11 files changed, 177 insertions(+), 416 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 11f415a..f568d5d 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -22,6 +22,7 @@ import os import socket import subprocess import textwrap +import warnings from importlib import import_module import argparse @@ -51,6 +52,8 @@ from airflow.models import (DagModel, DagBag, TaskInstance, Connection) from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS) from airflow.utils import db as db_utils +from airflow.utils import logging as logging_utils +from airflow.utils.file import mkdirs from airflow.www.app import cached_app from sqlalchemy import func @@ -324,6 +327,55 @@ def run(args, dag=None): settings.configure_vars() settings.configure_orm() + logging.root.handlers = [] + if args.raw: + # Output to STDOUT for the parent process to read and log + logging.basicConfig( + stream=sys.stdout, + level=settings.LOGGING_LEVEL, + format=settings.LOG_FORMAT) + else: + # Setting up logging to a file. + + # To handle log writing when tasks are impersonated, the log files need to + # be writable by the user that runs the Airflow command and the user + # that is impersonated. This is mainly to handle corner cases with the + # SubDagOperator. When the SubDagOperator is run, all of the operators + # run under the impersonated user and create appropriate log files + # as the impersonated user. However, if the user manually runs tasks + # of the SubDagOperator through the UI, then the log files are created + # by the user that runs the Airflow command. For example, the Airflow + # run command may be run by the `airflow_sudoable` user, but the Airflow + # tasks may be run by the `airflow` user. If the log files are not + # writable by both users, then it's possible that re-running a task + # via the UI (or vice versa) results in a permission error as the task + # tries to write to a log file created by the other user. + log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER')) + directory = log_base + "/{args.dag_id}/{args.task_id}".format(args=args) + # Create the log file and give it group writable permissions + # TODO(aoen): Make log dirs and logs globally readable for now since the SubDag + # operator is not compatible with impersonation (e.g. if a Celery executor is used + # for a SubDag operator and the SubDag operator has a different owner than the + # parent DAG) + if not os.path.exists(directory): + # Create the directory as globally writable using custom mkdirs + # as os.makedirs doesn't set mode properly. + mkdirs(directory, 0o775) + iso = args.execution_date.isoformat() + filename = "{directory}/{iso}".format(**locals()) + + if not os.path.exists(filename): + open(filename, "a").close() + os.chmod(filename, 0o666) + + logging.basicConfig( + filename=filename, + level=settings.LOGGING_LEVEL, + format=settings.LOG_FORMAT) + + hostname = socket.getfqdn() + logging.info("Running on host {}".format(hostname)) + if not args.pickle and not dag: dag = get_dag(args) elif not dag: @@ -339,21 +391,8 @@ def run(args, dag=None): ti = TaskInstance(task, args.execution_date) ti.refresh_from_db() - logger = logging.getLogger('airflow.task') - if args.raw: - logger = logging.getLogger('airflow.task.raw') - - for handler in logger.handlers: - try: - print("inside cli, setting up context") - handler.set_context(ti) - except AttributeError: - pass - - hostname = socket.getfqdn() - logger.info("Running on host {}".format(hostname)) - if args.local: + print("Logging into: " + filename) run_job = jobs.LocalTaskJob( task_instance=ti, mark_success=args.mark_success, @@ -411,13 +450,43 @@ def run(args, dag=None): if args.raw: return - # Force the log to flush. The flush is important because we - # subsequently read from the log to insert into S3 or Google - # cloud storage. Explicitly close the handler is needed in order - # to upload to remote storage services. - for handler in logger.handlers: - handler.flush() - handler.close() + # Force the log to flush, and set the handler to go back to normal so we + # don't continue logging to the task's log file. The flush is important + # because we subsequently read from the log to insert into S3 or Google + # cloud storage. + logging.root.handlers[0].flush() + logging.root.handlers = [] + + # store logs remotely + remote_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') + + # deprecated as of March 2016 + if not remote_base and conf.get('core', 'S3_LOG_FOLDER'): + warnings.warn( + 'The S3_LOG_FOLDER conf key has been replaced by ' + 'REMOTE_BASE_LOG_FOLDER. Your conf still works but please ' + 'update airflow.cfg to ensure future compatibility.', + DeprecationWarning) + remote_base = conf.get('core', 'S3_LOG_FOLDER') + + if os.path.exists(filename): + # read log and remove old logs to get just the latest additions + + with open(filename, 'r') as logfile: + log = logfile.read() + + remote_log_location = filename.replace(log_base, remote_base) + # S3 + if remote_base.startswith('s3:/'): + logging_utils.S3Log().write(log, remote_log_location) + # GCS + elif remote_base.startswith('gs:/'): + logging_utils.GCSLog().write(log, remote_log_location) + # Other + elif remote_base and remote_base != 'None': + logging.error( + 'Unsupported remote log location: {}'.format(remote_base)) + def task_failed_deps(args): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/config_templates/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/config_templates/__init__.py b/airflow/config_templates/__init__.py deleted file mode 100644 index 9d7677a..0000000 --- a/airflow/config_templates/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/config_templates/default_airflow.cfg ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 77592d9..ddd1ba8 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -122,12 +122,6 @@ security = # values at runtime) unit_test_mode = False -# Logging configuration path -logging_config_path = airflow.logging.airflow_logging_config.AIRFLOW_LOGGING_CONFIG - -# Name of handler to read task instance logs -task_log_reader = airflow.task - [cli] # In what way should the cli access the API. The LocalClient will use the # database directly, while the json_client will use the api running on the http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/config_templates/default_airflow_logging.py ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_airflow_logging.py b/airflow/config_templates/default_airflow_logging.py deleted file mode 100644 index 241571f..0000000 --- a/airflow/config_templates/default_airflow_logging.py +++ /dev/null @@ -1,73 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from airflow import configuration as conf - -# TODO: Logging format and level should be configured -# in this file instead of from airflow.cfg. Currently -# there are other log format and level configurations in -# settings.py and cli.py. - -LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper() -LOG_FORMAT = conf.get('core', 'log_format') - - -BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER') -# TODO: This should be specified as s3_remote and/or gcs_remote -REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') - -DEFAULT_LOGGING_CONFIG = { - 'version': 1, - 'disable_existing_loggers': False, - 'formatters': { - 'airflow.task': { - 'format': LOG_FORMAT, - } - }, - 'handlers': { - 'console': { - 'class': 'logging.StreamHandler', - 'formatter': 'airflow.task', - 'stream': 'ext://sys.stdout' - }, - 'file.task': { - 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler', - 'formatter': 'airflow.task', - 'base_log_folder': BASE_LOG_FOLDER, - }, - 's3.task': { - 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler', - 'base_log_folder': BASE_LOG_FOLDER, - 'remote_base_log_folder': REMOTE_BASE_LOG_FOLDER, - 'formatter': 'airflow.task', - }, - }, - 'loggers': { - 'airflow.task': { - 'handlers': ['file.task'], - 'level': LOG_LEVEL, - 'propagate': False, - }, - 'airflow.task_runner': { - 'handlers': ['file.task'], - 'level': LOG_LEVEL, - 'propagate': True, - }, - 'airflow.task.raw': { - 'handlers': ['console'], - 'level': LOG_LEVEL, - 'propagate': False, - }, - } -} http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/settings.py ---------------------------------------------------------------------- diff --git a/airflow/settings.py b/airflow/settings.py index 7a61bce..3f7560d 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -18,7 +18,6 @@ from __future__ import print_function from __future__ import unicode_literals import logging -import logging.config import os import sys @@ -163,23 +162,13 @@ def configure_orm(disable_connection_pool=False): try: from airflow_local_settings import * logging.info("Loaded airflow_local_settings.") -except Exception: +except: pass configure_logging() configure_vars() configure_orm() -# TODO: Merge airflow logging configurations. -logging_config_path = conf.get('core', 'logging_config_path') -try: - from logging_config_path import LOGGING_CONFIG -except Exception: - # Import default logging configuration - from airflow.config_templates.default_airflow_logging import \ - DEFAULT_LOGGING_CONFIG as LOGGING_CONFIG -logging.config.dictConfig(LOGGING_CONFIG) - # Const stuff KILOBYTE = 1024 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/task_runner/base_task_runner.py ---------------------------------------------------------------------- diff --git a/airflow/task_runner/base_task_runner.py b/airflow/task_runner/base_task_runner.py index 02404d1..fa3fd6b 100644 --- a/airflow/task_runner/base_task_runner.py +++ b/airflow/task_runner/base_task_runner.py @@ -36,7 +36,6 @@ class BaseTaskRunner(LoggingMixin): :type local_task_job: airflow.jobs.LocalTaskJob """ self._task_instance = local_task_job.task_instance - self.set_logger_contexts(self._task_instance) popen_prepend = [] cfg_path = None http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/utils/log/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/__init__.py b/airflow/utils/log/__init__.py deleted file mode 100644 index 9d7677a..0000000 --- a/airflow/utils/log/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/utils/log/file_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py deleted file mode 100644 index 94588dd..0000000 --- a/airflow/utils/log/file_task_handler.py +++ /dev/null @@ -1,158 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import os - -from airflow import configuration as conf -from airflow.configuration import AirflowConfigException -from airflow.utils.file import mkdirs - - -class FileTaskHandler(logging.Handler): - """ - FileTaskHandler is a python log handler that handles and reads - task instance logs. It creates and delegates log handling - to `logging.FileHandler` after receiving task instance context. - It reads logs from task instance's host machine. - """ - - def __init__(self, base_log_folder): - super(FileTaskHandler, self).__init__() - self.handler = None - self.local_base = os.path.expanduser(base_log_folder) - - def set_context(self, task_instance): - """ - Provide task_instance context to airflow task handler. - :param task_instance: task instance object - """ - self._init_file(task_instance) - local_loc = self.get_local_loc(task_instance) - self.handler = logging.FileHandler(local_loc) - self.handler.setFormatter(self.formatter) - self.handler.setLevel(self.level) - - def emit(self, record): - if self.handler: - self.handler.emit(record) - - def flush(self): - if self.handler: - self.handler.flush() - - def close(self): - if self.handler: - self.handler.close() - - def read(self, task_instance): - """ - Read log of given task instance from task instance host server. - :param task_instance: task instance database record - """ - log = "" - # Task instance here might be different from task instance when - # initializing the handler. Thus explicitly getting log location - # is needed to get correct log path. - loc = self.get_local_loc(task_instance) - - if os.path.exists(loc): - try: - with open(loc) as f: - log += "*** Reading local log.\n" + "".join(f.readlines()) - except Exception: - log = "*** Failed to load local log file: {0}.\n".format(loc) - else: - url = os.path.join("http://{ti.hostname}:{worker_log_server_port}/log", - self.get_log_relative_path(task_instance)).format( - ti=task_instance, - worker_log_server_port=conf.get('celery', 'WORKER_LOG_SERVER_PORT')) - log += "*** Log file isn't local.\n" - log += "*** Fetching here: {url}\n".format(**locals()) - try: - import requests - timeout = None # No timeout - try: - timeout = conf.getint('webserver', 'log_fetch_timeout_sec') - except (AirflowConfigException, ValueError): - pass - - response = requests.get(url, timeout=timeout) - response.raise_for_status() - log += '\n' + response.text - except Exception: - log += "*** Failed to fetch log file from worker.\n".format( - **locals()) - return log - - def get_log_relative_dir(self, ti): - """ - Get relative log file directory. - :param ti: task instance object - """ - return "{}/{}".format(ti.dag_id, ti.task_id) - - def get_log_relative_path(self, ti): - """ - Get relative log file path. - :param ti: task instance object - """ - directory = self.get_log_relative_dir(ti) - filename = "{}.log".format(ti.execution_date.isoformat()) - return os.path.join(directory, filename) - - def get_local_loc(self, ti): - """ - Get full local log path given task instance object. - :param ti: task instance object - """ - log_relative_path = self.get_log_relative_path(ti) - return os.path.join(self.local_base, log_relative_path) - - def _init_file(self, ti): - """ - Create log directory and give it correct permissions. - :param ti: task instance object - :return relative log path of the given task instance - """ - # To handle log writing when tasks are impersonated, the log files need to - # be writable by the user that runs the Airflow command and the user - # that is impersonated. This is mainly to handle corner cases with the - # SubDagOperator. When the SubDagOperator is run, all of the operators - # run under the impersonated user and create appropriate log files - # as the impersonated user. However, if the user manually runs tasks - # of the SubDagOperator through the UI, then the log files are created - # by the user that runs the Airflow command. For example, the Airflow - # run command may be run by the `airflow_sudoable` user, but the Airflow - # tasks may be run by the `airflow` user. If the log files are not - # writable by both users, then it's possible that re-running a task - # via the UI (or vice versa) results in a permission error as the task - # tries to write to a log file created by the other user. - relative_log_dir = self.get_log_relative_dir(ti) - directory = os.path.join(self.local_base, relative_log_dir) - # Create the log file and give it group writable permissions - # TODO(aoen): Make log dirs and logs globally readable for now since the SubDag - # operator is not compatible with impersonation (e.g. if a Celery executor is used - # for a SubDag operator and the SubDag operator has a different owner than the - # parent DAG) - if not os.path.exists(directory): - # Create the directory as globally writable using custom mkdirs - # as os.makedirs doesn't set mode properly. - mkdirs(directory, 0o775) - - local_loc = self.get_local_loc(ti) - - if not os.path.exists(local_loc): - open(local_loc, "a").close() - os.chmod(local_loc, 0o666) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/utils/log/s3_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py deleted file mode 100644 index 5c8ad7e..0000000 --- a/airflow/utils/log/s3_task_handler.py +++ /dev/null @@ -1,90 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import warnings - -from airflow import configuration as conf -from airflow.utils import logging as logging_utils -from airflow.utils.log.file_task_handler import FileTaskHandler - - -class S3TaskHandler(FileTaskHandler): - """ - S3TaskHandler is a python log handler that handles and reads - task instance logs. It extends airflow FileTaskHandler and - uploads to and reads from S3 remote storage. - """ - - def __init__(self, base_log_folder, remote_base_log_folder): - super(S3TaskHandler, self).__init__(base_log_folder) - self.remote_base = remote_base_log_folder - # deprecated as of March 2016 - if not self.remote_base and conf.get('core', 'S3_LOG_FOLDER'): - warnings.warn( - 'The S3_LOG_FOLDER conf key has been replaced by ' - 'REMOTE_BASE_LOG_FOLDER. Your conf still works but please ' - 'update airflow.cfg to ensure future compatibility.', - DeprecationWarning) - self.remote_base = conf.get('core', 'S3_LOG_FOLDER') - self.closed = False - - def set_context(self, task_instance): - super(S3TaskHandler, self).set_context(task_instance) - # Local location and remote location is needed to open and - # upload local log file to S3 remote storage. - self.local_loc = self.get_local_loc(task_instance) - self.remote_loc = os.path.join(self.remote_base, - self.get_log_relative_path(task_instance)) - - def close(self): - """ - Close and upload local log file to remote storage S3. - """ - # When application exit, system shuts down all handlers by - # calling close method. Here we check if logger is already - # closed to prevent uploading the log to remote storage multiple - # times when `logging.shutdown` is called. - if self.closed: - return - - super(S3TaskHandler, self).close() - - if os.path.exists(self.local_loc): - # read log and remove old logs to get just the latest additions - with open(self.local_loc, 'r') as logfile: - log = logfile.read() - logging_utils.S3Log().write(log, self.remote_loc) - - self.closed = True - - def read(self, task_instance): - """ - Read logs of given task instance from S3 remote storage. If failed, - read the log from local machine. - :param task_instance: task instance object - """ - log = "" - remote_loc = os.path.join(self.remote_base, self.get_log_relative_path(task_instance)) - # TODO: check if the remote_loc exist first instead of checking - # if remote_log here. This logic is going to modified once logs are split - # by try_number - remote_log = logging_utils.S3Log().read(remote_loc, return_error=True) - if remote_log: - log += ('*** Reading remote log from {}.\n{}\n'.format( - remote_loc, remote_log)) - else: - log = super(S3TaskHandler, self).read(task_instance) - - return log http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/utils/logging.py ---------------------------------------------------------------------- diff --git a/airflow/utils/logging.py b/airflow/utils/logging.py index 2df3086..96767cb 100644 --- a/airflow/utils/logging.py +++ b/airflow/utils/logging.py @@ -35,20 +35,9 @@ class LoggingMixin(object): try: return self._logger except AttributeError: - print(self.__class__.__module__ + '.' + self.__class__.__name__) self._logger = logging.root.getChild(self.__class__.__module__ + '.' + self.__class__.__name__) return self._logger - def set_logger_contexts(self, task_instance): - """ - Set the context for all handlers of the logger. - """ - for handler in self.logger.handlers: - try: - handler.set_context(task_instance) - except AttributeError: - pass - class S3Log(object): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b9576d57/airflow/www/views.py ---------------------------------------------------------------------- diff --git a/airflow/www/views.py b/airflow/www/views.py index c429765..6c39462 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -16,7 +16,6 @@ from past.builtins import basestring, unicode import ast -import logging import os import pkg_resources import socket @@ -73,10 +72,12 @@ from airflow.utils.json import json_ser from airflow.utils.state import State from airflow.utils.db import provide_session from airflow.utils.helpers import alchemy_to_dict +from airflow.utils import logging as log_utils from airflow.utils.dates import infer_time_unit, scale_time_units from airflow.www import utils as wwwutils from airflow.www.forms import DateTimeForm, DateTimeWithNumRunsForm from airflow.www.validators import GreaterEqualThan +from airflow.configuration import AirflowConfigException QUERY_LIMIT = 100000 CHART_LIMIT = 200000 @@ -697,32 +698,99 @@ class Airflow(BaseView): @login_required @wwwutils.action_logging def log(self): + BASE_LOG_FOLDER = os.path.expanduser( + conf.get('core', 'BASE_LOG_FOLDER')) dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') execution_date = request.args.get('execution_date') + dag = dagbag.get_dag(dag_id) + log_relative = "{dag_id}/{task_id}/{execution_date}".format( + **locals()) + loc = os.path.join(BASE_LOG_FOLDER, log_relative) + loc = loc.format(**locals()) + log = "" + TI = models.TaskInstance dttm = dateutil.parser.parse(execution_date) form = DateTimeForm(data={'execution_date': dttm}) - dag = dagbag.get_dag(dag_id) - session = Session() - ti = session.query(models.TaskInstance).filter( - models.TaskInstance.dag_id == dag_id, models.TaskInstance.task_id == task_id, - models.TaskInstance.execution_date == dttm).first() + ti = session.query(TI).filter( + TI.dag_id == dag_id, TI.task_id == task_id, + TI.execution_date == dttm).first() + if ti is None: log = "*** Task instance did not exist in the DB\n" else: - logger = logging.getLogger('airflow.task') - task_log_reader = conf.get('core', 'task_log_reader') - for handler in logger.handlers: - print("handler name is {}".format(handler.name)) - handler = next((handler for handler in logger.handlers - if handler.name == task_log_reader), None) - try: - log = handler.read(ti) - except AttributeError as e: - log = "Task log handler {} does not support read logs.\n".format( - task_log_reader) - log += e.message + # load remote logs + remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') + remote_log_loaded = False + if remote_log_base: + remote_log_path = os.path.join(remote_log_base, log_relative) + remote_log = "" + + # Only display errors reading the log if the task completed or ran at least + # once before (otherwise there won't be any remote log stored). + ti_execution_completed = ti.state in {State.SUCCESS, State.FAILED} + ti_ran_more_than_once = ti.try_number > 1 + surface_log_retrieval_errors = ( + ti_execution_completed or ti_ran_more_than_once) + + # S3 + if remote_log_path.startswith('s3:/'): + remote_log += log_utils.S3Log().read( + remote_log_path, return_error=surface_log_retrieval_errors) + remote_log_loaded = True + # GCS + elif remote_log_path.startswith('gs:/'): + remote_log += log_utils.GCSLog().read( + remote_log_path, return_error=surface_log_retrieval_errors) + remote_log_loaded = True + # unsupported + else: + remote_log += '*** Unsupported remote log location.' + + if remote_log: + log += ('*** Reading remote log from {}.\n{}\n'.format( + remote_log_path, remote_log)) + + # We only want to display the + # local logs while the task is running if a remote log configuration is set up + # since the logs will be transfered there after the run completes. + # TODO(aoen): One problem here is that if a task is running on a worker it + # already ran on, then duplicate logs will be printed for all of the previous + # runs of the task that already completed since they will have been printed as + # part of the remote log section above. This can be fixed either by streaming + # logs to the log servers as tasks are running, or by creating a proper + # abstraction for multiple task instance runs). + if not remote_log_loaded or ti.state == State.RUNNING: + if os.path.exists(loc): + try: + f = open(loc) + log += "*** Reading local log.\n" + "".join(f.readlines()) + f.close() + except: + log = "*** Failed to load local log file: {0}.\n".format(loc) + else: + WORKER_LOG_SERVER_PORT = \ + conf.get('celery', 'WORKER_LOG_SERVER_PORT') + url = os.path.join( + "http://{ti.hostname}:{WORKER_LOG_SERVER_PORT}/log", log_relative + ).format(**locals()) + log += "*** Log file isn't local.\n" + log += "*** Fetching here: {url}\n".format(**locals()) + try: + import requests + timeout = None # No timeout + try: + timeout = conf.getint('webserver', 'log_fetch_timeout_sec') + except (AirflowConfigException, ValueError): + pass + + response = requests.get(url, timeout=timeout) + response.raise_for_status() + log += '\n' + response.text + except: + log += "*** Failed to fetch log file from worker.\n".format( + **locals()) if PY2 and not isinstance(log, unicode): log = log.decode('utf-8')