Repository: incubator-airflow Updated Branches: refs/heads/master 0bc248fc7 -> b0669b532
[AIRFLOW-1385] Make Airflow task logging configurable This PR adds configurable task logging to Airflow. Please refer to #2422 for previous discussions. This is the first step of making entire Airflow logging configurable ([AIRFLOW-1454](https://issue s.apache.org/jira/browse/AIRFLOW-1454)). Closes #2464 from AllisonWang/allison--log- abstraction Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b0669b53 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b0669b53 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b0669b53 Branch: refs/heads/master Commit: b0669b532a7be9aa34a4390951deaa25897c62e6 Parents: 0bc248f Author: AllisonWang <allisonwang...@gmail.com> Authored: Fri Aug 11 11:38:37 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Fri Aug 11 11:38:39 2017 -0700 ---------------------------------------------------------------------- airflow/bin/cli.py | 106 ++------- airflow/config_templates/__init__.py | 13 ++ airflow/config_templates/default_airflow.cfg | 7 + .../config_templates/default_airflow_logging.py | 94 ++++++++ airflow/dag/__init__.py | 1 - airflow/settings.py | 14 ++ airflow/task_runner/base_task_runner.py | 2 + airflow/utils/log/__init__.py | 13 ++ airflow/utils/log/file_task_handler.py | 176 +++++++++++++++ airflow/utils/log/gcs_task_handler.py | 95 ++++++++ airflow/utils/log/s3_task_handler.py | 91 ++++++++ airflow/utils/logging.py | 47 +--- airflow/www/views.py | 225 +++++++------------ tests/utils/test_log_handlers.py | 73 ++++++ tests/utils/test_logging.py | 17 +- 15 files changed, 687 insertions(+), 287 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 077cb90..e9e60cb 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -22,7 +22,6 @@ import os import socket import subprocess import textwrap -import warnings from importlib import import_module import argparse @@ -54,8 +53,6 @@ from airflow.models import (DagModel, DagBag, TaskInstance, from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS) from airflow.utils import db as db_utils -from airflow.utils import logging as logging_utils -from airflow.utils.file import mkdirs from airflow.www.app import cached_app from sqlalchemy import func @@ -357,61 +354,22 @@ def run(args, dag=None): ti = TaskInstance(task, args.execution_date) ti.refresh_from_db() - logging.root.handlers = [] + logger = logging.getLogger('airflow.task') if args.raw: - # Output to STDOUT for the parent process to read and log - logging.basicConfig( - stream=sys.stdout, - level=settings.LOGGING_LEVEL, - format=settings.LOG_FORMAT) - else: - # Setting up logging to a file. - - # To handle log writing when tasks are impersonated, the log files need to - # be writable by the user that runs the Airflow command and the user - # that is impersonated. This is mainly to handle corner cases with the - # SubDagOperator. When the SubDagOperator is run, all of the operators - # run under the impersonated user and create appropriate log files - # as the impersonated user. However, if the user manually runs tasks - # of the SubDagOperator through the UI, then the log files are created - # by the user that runs the Airflow command. For example, the Airflow - # run command may be run by the `airflow_sudoable` user, but the Airflow - # tasks may be run by the `airflow` user. If the log files are not - # writable by both users, then it's possible that re-running a task - # via the UI (or vice versa) results in a permission error as the task - # tries to write to a log file created by the other user. - try_number = ti.try_number - log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER')) - log_relative_dir = logging_utils.get_log_directory(args.dag_id, args.task_id, - args.execution_date) - directory = os.path.join(log_base, log_relative_dir) - # Create the log file and give it group writable permissions - # TODO(aoen): Make log dirs and logs globally readable for now since the SubDag - # operator is not compatible with impersonation (e.g. if a Celery executor is used - # for a SubDag operator and the SubDag operator has a different owner than the - # parent DAG) - if not os.path.isdir(directory): - # Create the directory as globally writable using custom mkdirs - # as os.makedirs doesn't set mode properly. - mkdirs(directory, 0o775) - log_relative = logging_utils.get_log_filename( - args.dag_id, args.task_id, args.execution_date, try_number) - filename = os.path.join(log_base, log_relative) - - if not os.path.exists(filename): - open(filename, "a").close() - os.chmod(filename, 0o666) - - logging.basicConfig( - filename=filename, - level=settings.LOGGING_LEVEL, - format=settings.LOG_FORMAT) + logger = logging.getLogger('airflow.task.raw') + + for handler in logger.handlers: + try: + handler.set_context(ti) + except AttributeError: + # Not all handlers need to have context passed in so we ignore + # the error when handlers do not have set_context defined. + pass hostname = socket.getfqdn() logging.info("Running on host {}".format(hostname)) if args.local: - print("Logging into: " + filename) run_job = jobs.LocalTaskJob( task_instance=ti, mark_success=args.mark_success, @@ -469,43 +427,13 @@ def run(args, dag=None): if args.raw: return - # Force the log to flush, and set the handler to go back to normal so we - # don't continue logging to the task's log file. The flush is important - # because we subsequently read from the log to insert into S3 or Google - # cloud storage. - logging.root.handlers[0].flush() - logging.root.handlers = [] - - # store logs remotely - remote_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') - - # deprecated as of March 2016 - if not remote_base and conf.get('core', 'S3_LOG_FOLDER'): - warnings.warn( - 'The S3_LOG_FOLDER conf key has been replaced by ' - 'REMOTE_BASE_LOG_FOLDER. Your conf still works but please ' - 'update airflow.cfg to ensure future compatibility.', - DeprecationWarning) - remote_base = conf.get('core', 'S3_LOG_FOLDER') - - if os.path.exists(filename): - # read log and remove old logs to get just the latest additions - - with open(filename, 'r') as logfile: - log = logfile.read() - - remote_log_location = os.path.join(remote_base, log_relative) - logging.debug("Uploading to remote log location {}".format(remote_log_location)) - # S3 - if remote_base.startswith('s3:/'): - logging_utils.S3Log().write(log, remote_log_location) - # GCS - elif remote_base.startswith('gs:/'): - logging_utils.GCSLog().write(log, remote_log_location) - # Other - elif remote_base and remote_base != 'None': - logging.error( - 'Unsupported remote log location: {}'.format(remote_base)) + # Force the log to flush. The flush is important because we + # might subsequently read from the log to insert into S3 or + # Google cloud storage. Explicitly close the handler is + # needed in order to upload to remote storage services. + for handler in logger.handlers: + handler.flush() + handler.close() def task_failed_deps(args): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/config_templates/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/config_templates/__init__.py b/airflow/config_templates/__init__.py new file mode 100644 index 0000000..9d7677a --- /dev/null +++ b/airflow/config_templates/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/config_templates/default_airflow.cfg ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 33cee39..dcb99ed 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -122,6 +122,13 @@ security = # values at runtime) unit_test_mode = False +# User defined logging configuration file path. +logging_config_path = + +# Name of handler to read task instance logs. +# Default to use file task handler. +task_log_reader = file.task + [cli] # In what way should the cli access the API. The LocalClient will use the # database directly, while the json_client will use the api running on the http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/config_templates/default_airflow_logging.py ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_airflow_logging.py b/airflow/config_templates/default_airflow_logging.py new file mode 100644 index 0000000..d6ae036 --- /dev/null +++ b/airflow/config_templates/default_airflow_logging.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from airflow import configuration as conf + +# TODO: Logging format and level should be configured +# in this file instead of from airflow.cfg. Currently +# there are other log format and level configurations in +# settings.py and cli.py. Please see AIRFLOW-1455. + +LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper() +LOG_FORMAT = conf.get('core', 'log_format') + +BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER') + +# TODO: REMOTE_BASE_LOG_FOLDER should be deprecated and +# directly specify in the handler definitions. This is to +# provide compatibility to older remote log folder settings. +REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') +S3_LOG_FOLDER = '' +GCS_LOG_FOLDER = '' +if REMOTE_BASE_LOG_FOLDER.startswith('s3:/'): + S3_LOG_FOLDER = REMOTE_BASE_LOG_FOLDER +elif REMOTE_BASE_LOG_FOLDER.startswith('gs:/'): + GCS_LOG_FOLDER = REMOTE_BASE_LOG_FOLDER + +FILENAME_TEMPLATE = '{dag_id}/{task_id}/{execution_date}/{try_number}.log' + +DEFAULT_LOGGING_CONFIG = { + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': { + 'airflow.task': { + 'format': LOG_FORMAT, + }, + }, + 'handlers': { + 'console': { + 'class': 'logging.StreamHandler', + 'formatter': 'airflow.task', + 'stream': 'ext://sys.stdout' + }, + 'file.task': { + 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler', + 'formatter': 'airflow.task', + 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), + 'filename_template': FILENAME_TEMPLATE, + }, + 's3.task': { + 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler', + 'formatter': 'airflow.task', + 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), + 's3_log_folder': S3_LOG_FOLDER, + 'filename_template': FILENAME_TEMPLATE, + }, + 'gcs.task': { + 'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler', + 'formatter': 'airflow.task', + 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), + 'gcs_log_folder': GCS_LOG_FOLDER, + 'filename_template': FILENAME_TEMPLATE, + }, + }, + 'loggers': { + 'airflow.task': { + 'handlers': ['file.task'], + 'level': LOG_LEVEL, + 'propagate': False, + }, + 'airflow.task_runner': { + 'handlers': ['file.task'], + 'level': LOG_LEVEL, + 'propagate': True, + }, + 'airflow.task.raw': { + 'handlers': ['console'], + 'level': LOG_LEVEL, + 'propagate': False, + }, + } +} http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/dag/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/dag/__init__.py b/airflow/dag/__init__.py index 759b563..9d7677a 100644 --- a/airflow/dag/__init__.py +++ b/airflow/dag/__init__.py @@ -11,4 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/settings.py ---------------------------------------------------------------------- diff --git a/airflow/settings.py b/airflow/settings.py index 3f7560d..9567020 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -18,6 +18,7 @@ from __future__ import print_function from __future__ import unicode_literals import logging +import logging.config import os import sys @@ -169,6 +170,19 @@ configure_logging() configure_vars() configure_orm() +# TODO: Unify airflow logging setups. Please see AIRFLOW-1457. +logging_config_path = conf.get('core', 'logging_config_path') +try: + from logging_config_path import LOGGING_CONFIG + logging.debug("Successfully imported user-defined logging config.") +except Exception as e: + # Import default logging configurations. + logging.debug("Unable to load custom logging config file: {}." + " Using default airflow logging config instead".format(str(e))) + from airflow.config_templates.default_airflow_logging import \ + DEFAULT_LOGGING_CONFIG as LOGGING_CONFIG +logging.config.dictConfig(LOGGING_CONFIG) + # Const stuff KILOBYTE = 1024 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/task_runner/base_task_runner.py ---------------------------------------------------------------------- diff --git a/airflow/task_runner/base_task_runner.py b/airflow/task_runner/base_task_runner.py index fa3fd6b..7229be5 100644 --- a/airflow/task_runner/base_task_runner.py +++ b/airflow/task_runner/base_task_runner.py @@ -35,7 +35,9 @@ class BaseTaskRunner(LoggingMixin): associated task instance. :type local_task_job: airflow.jobs.LocalTaskJob """ + # Pass task instance context into log handlers to setup the logger. self._task_instance = local_task_job.task_instance + self.set_logger_contexts(self._task_instance) popen_prepend = [] cfg_path = None http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/utils/log/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/__init__.py b/airflow/utils/log/__init__.py new file mode 100644 index 0000000..9d7677a --- /dev/null +++ b/airflow/utils/log/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/utils/log/file_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py new file mode 100644 index 0000000..bce974c --- /dev/null +++ b/airflow/utils/log/file_task_handler.py @@ -0,0 +1,176 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os + +from airflow import configuration as conf +from airflow.configuration import AirflowConfigException +from airflow.utils.file import mkdirs + + +class FileTaskHandler(logging.Handler): + """ + FileTaskHandler is a python log handler that handles and reads + task instance logs. It creates and delegates log handling + to `logging.FileHandler` after receiving task instance context. + It reads logs from task instance's host machine. + """ + + def __init__(self, base_log_folder, filename_template): + """ + :param base_log_folder: Base log folder to place logs. + :param filename_template: template filename string + """ + super(FileTaskHandler, self).__init__() + self.handler = None + self.local_base = base_log_folder + self.filename_template = filename_template + + def set_context(self, ti): + """ + Provide task_instance context to airflow task handler. + :param ti: task instance object + """ + local_loc = self._init_file(ti) + self.handler = logging.FileHandler(local_loc) + self.handler.setFormatter(self.formatter) + self.handler.setLevel(self.level) + + def emit(self, record): + if self.handler is not None: + self.handler.emit(record) + + def flush(self): + if self.handler is not None: + self.handler.flush() + + def close(self): + if self.handler is not None: + self.handler.close() + + def _read(self, ti, try_number): + """ + Template method that contains custom logic of reading + logs given the try_number. + :param ti: task instance record + :param try_number: current try_number to read log from + :return: log message as a string + """ + # Task instance here might be different from task instance when + # initializing the handler. Thus explicitly getting log location + # is needed to get correct log path. + log_relative_path = self.filename_template.format( + dag_id=ti.dag_id, task_id=ti.task_id, + execution_date=ti.execution_date.isoformat(), try_number=try_number + 1) + loc = os.path.join(self.local_base, log_relative_path) + log = "" + + if os.path.exists(loc): + try: + with open(loc) as f: + log += "*** Reading local log.\n" + "".join(f.readlines()) + except Exception as e: + log = "*** Failed to load local log file: {}. {}\n".format(loc, str(e)) + else: + url = os.path.join("http://{ti.hostname}:{worker_log_server_port}/log", + log_relative_path).format( + ti=ti, + worker_log_server_port=conf.get('celery', 'WORKER_LOG_SERVER_PORT')) + log += "*** Log file isn't local.\n" + log += "*** Fetching here: {url}\n".format(**locals()) + try: + import requests + timeout = None # No timeout + try: + timeout = conf.getint('webserver', 'log_fetch_timeout_sec') + except (AirflowConfigException, ValueError): + pass + + response = requests.get(url, timeout=timeout) + response.raise_for_status() + log += '\n' + response.text + except Exception as e: + log += "*** Failed to fetch log file from worker. {}\n".format(str(e)) + + return log + + def read(self, task_instance, try_number=None): + """ + Read logs of given task instance from local machine. + :param task_instance: task instance object + :param try_number: task instance try_number to read logs from. If None + it returns all logs separated by try_number + :return: a list of logs + """ + # Task instance increments its try number when it starts to run. + # So the log for a particular task try will only show up when + # try number gets incremented in DB, i.e logs produced the time + # after cli run and before try_number + 1 in DB will not be displayed. + next_try = task_instance.try_number + + if try_number is None: + try_numbers = list(range(next_try)) + elif try_number < 0: + logs = ['Error fetching the logs. Try number {} is invalid.'.format(try_number)] + return logs + else: + try_numbers = [try_number] + + logs = [''] * len(try_numbers) + for i, try_number in enumerate(try_numbers): + logs[i] += self._read(task_instance, try_number) + + return logs + + def _init_file(self, ti): + """ + Create log directory and give it correct permissions. + :param ti: task instance object + :return relative log path of the given task instance + """ + # To handle log writing when tasks are impersonated, the log files need to + # be writable by the user that runs the Airflow command and the user + # that is impersonated. This is mainly to handle corner cases with the + # SubDagOperator. When the SubDagOperator is run, all of the operators + # run under the impersonated user and create appropriate log files + # as the impersonated user. However, if the user manually runs tasks + # of the SubDagOperator through the UI, then the log files are created + # by the user that runs the Airflow command. For example, the Airflow + # run command may be run by the `airflow_sudoable` user, but the Airflow + # tasks may be run by the `airflow` user. If the log files are not + # writable by both users, then it's possible that re-running a task + # via the UI (or vice versa) results in a permission error as the task + # tries to write to a log file created by the other user. + relative_path = self.filename_template.format( + dag_id=ti.dag_id, task_id=ti.task_id, + execution_date=ti.execution_date.isoformat(), try_number=ti.try_number + 1) + full_path = os.path.join(self.local_base, relative_path) + directory = os.path.dirname(full_path) + # Create the log file and give it group writable permissions + # TODO(aoen): Make log dirs and logs globally readable for now since the SubDag + # operator is not compatible with impersonation (e.g. if a Celery executor is used + # for a SubDag operator and the SubDag operator has a different owner than the + # parent DAG) + if not os.path.exists(directory): + # Create the directory as globally writable using custom mkdirs + # as os.makedirs doesn't set mode properly. + mkdirs(directory, 0o775) + + if not os.path.exists(full_path): + open(full_path, "a").close() + # TODO: Investigate using 444 instead of 666. + os.chmod(full_path, 0o666) + + return full_path http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/utils/log/gcs_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py new file mode 100644 index 0000000..5b35907 --- /dev/null +++ b/airflow/utils/log/gcs_task_handler.py @@ -0,0 +1,95 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import warnings + +from airflow import configuration as conf +from airflow.utils import logging as logging_utils +from airflow.utils.log.file_task_handler import FileTaskHandler + + +class GCSTaskHandler(FileTaskHandler): + """ + GCSTaskHandler is a python log handler that handles and reads + task instance logs. It extends airflow FileTaskHandler and + uploads to and reads from GCS remote storage. Upon log reading + failure, it reads from host machine's local disk. + """ + + def __init__(self, base_log_folder, gcs_log_folder, filename_template): + super(GCSTaskHandler, self).__init__(base_log_folder, filename_template) + self.remote_base = gcs_log_folder + self.log_relative_path = '' + self.closed = False + + def set_context(self, ti): + super(GCSTaskHandler, self).set_context(ti) + # Log relative path is used to construct local and remote + # log path to upload log files into GCS and read from the + # remote location. + self.log_relative_path = self.filename_template( + dag_id=ti.dag_id, task_id=ti.task_id, + execution_date=ti.execution_date.isoformat(), try_number=ti.try_number + 1) + + def close(self): + """ + Close and upload local log file to remote storage S3. + """ + # When application exit, system shuts down all handlers by + # calling close method. Here we check if logger is already + # closed to prevent uploading the log to remote storage multiple + # times when `logging.shutdown` is called. + if self.closed: + return + + super(GCSTaskHandler, self).close() + + local_loc = os.path.join(self.local_base, self.log_relative_path) + remote_loc = os.path.join(self.remote_base, self.log_relative_path) + if os.path.exists(local_loc): + # read log and remove old logs to get just the latest additions + with open(local_loc, 'r') as logfile: + log = logfile.read() + logging_utils.GCSLog().write(log, remote_loc) + + self.closed = True + + def _read(self, ti, try_number): + """ + Read logs of given task instance and try_number from GCS. + If failed, read the log from task instance host machine. + :param ti: task instance object + :param try_number: task instance try_number to read logs from + """ + # Explicitly getting log relative path is necessary as the given + # task instance might be different than task instance passed in + # in set_context method. + log_relative_path = self.filename_template.format( + dag_id=ti.dag_id, task_id=ti.task_id, + execution_date=ti.execution_date.isoformat(), try_number=try_number + 1) + remote_loc = os.path.join(self.remote_base, log_relative_path) + + gcs_log = logging_utils.GCSLog() + if gcs_log.log_exists(remote_loc): + # If GCS remote file exists, we do not fetch logs from task instance + # local machine even if there are errors reading remote logs, as + # remote_log will contain error message. + remote_log = gcs_log.read(remote_loc, return_error=True) + log = '*** Reading remote log from {}.\n{}\n'.format( + remote_loc, remote_log) + else: + log = super(GCSTaskHandler, self)._read(ti, try_number) + + return log http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/utils/log/s3_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py new file mode 100644 index 0000000..7268d22 --- /dev/null +++ b/airflow/utils/log/s3_task_handler.py @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from airflow.utils import logging as logging_utils +from airflow.utils.log.file_task_handler import FileTaskHandler + + +class S3TaskHandler(FileTaskHandler): + """ + S3TaskHandler is a python log handler that handles and reads + task instance logs. It extends airflow FileTaskHandler and + uploads to and reads from S3 remote storage. + """ + + def __init__(self, base_log_folder, s3_log_folder, filename_template): + super(S3TaskHandler, self).__init__(base_log_folder, filename_template) + self.remote_base = s3_log_folder + self.log_relative_path = '' + self.closed = False + + def set_context(self, ti): + super(S3TaskHandler, self).set_context(ti) + # Local location and remote location is needed to open and + # upload local log file to S3 remote storage. + self.log_relative_path = self.filename_template.format( + dag_id=ti.dag_id, task_id=ti.task_id, + execution_date=ti.execution_date.isoformat(), try_number=ti.try_number + 1) + + def close(self): + """ + Close and upload local log file to remote storage S3. + """ + # When application exit, system shuts down all handlers by + # calling close method. Here we check if logger is already + # closed to prevent uploading the log to remote storage multiple + # times when `logging.shutdown` is called. + if self.closed: + return + + super(S3TaskHandler, self).close() + + local_loc = os.path.join(self.local_base, self.log_relative_path) + remote_loc = os.path.join(self.remote_base, self.log_relative_path) + if os.path.exists(local_loc): + # read log and remove old logs to get just the latest additions + with open(local_loc, 'r') as logfile: + log = logfile.read() + logging_utils.S3Log().write(log, remote_loc) + + self.closed = True + + def _read(self, ti, try_number): + """ + Read logs of given task instance and try_number from S3 remote storage. + If failed, read the log from task instance host machine. + :param ti: task instance object + :param try_number: task instance try_number to read logs from + """ + # Explicitly getting log relative path is necessary as the given + # task instance might be different than task instance passed in + # in set_context method. + log_relative_path = self.filename_template.format( + dag_id=ti.dag_id, task_id=ti.task_id, + execution_date=ti.execution_date.isoformat(), try_number=try_number + 1) + remote_loc = os.path.join(self.remote_base, log_relative_path) + + s3_log = logging_utils.S3Log() + if s3_log.log_exists(remote_loc): + # If S3 remote file exists, we do not fetch logs from task instance + # local machine even if there are errors reading remote logs, as + # returned remote_log will contain error messages. + remote_log = s3_log.read(remote_loc, return_error=True) + log = '*** Reading remote log from {}.\n{}\n'.format( + remote_loc, remote_log) + else: + log = super(S3TaskHandler, self)._read(ti, try_number) + + return log http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/utils/logging.py ---------------------------------------------------------------------- diff --git a/airflow/utils/logging.py b/airflow/utils/logging.py index 6e18e52..c550c88 100644 --- a/airflow/utils/logging.py +++ b/airflow/utils/logging.py @@ -40,6 +40,16 @@ class LoggingMixin(object): self._logger = logging.root.getChild(self.__class__.__module__ + '.' + self.__class__.__name__) return self._logger + def set_logger_contexts(self, task_instance): + """ + Set the context for all handlers of current logger. + """ + for handler in self.logger.handlers: + try: + handler.set_context(task_instance) + except AttributeError: + pass + class S3Log(object): """ @@ -240,40 +250,3 @@ class GCSLog(object): bucket = parsed_url.netloc blob = parsed_url.path.strip('/') return (bucket, blob) - - -# TODO: get_log_filename and get_log_directory are temporary helper -# functions to get airflow log filename. Logic of using FileHandler -# will be extract out and those two functions will be moved. -# For more details, please check issue AIRFLOW-1385. -def get_log_filename(dag_id, task_id, execution_date, try_number): - """ - Return relative log path. - :arg dag_id: id of the dag - :arg task_id: id of the task - :arg execution_date: execution date of the task instance - :arg try_number: try_number of current task instance - """ - relative_dir = get_log_directory(dag_id, task_id, execution_date) - # For reporting purposes and keeping logs consistent with web UI - # display, we report based on 1-indexed, not 0-indexed lists - filename = "{}/{}.log".format(relative_dir, try_number+1) - - return filename - - -def get_log_directory(dag_id, task_id, execution_date): - """ - Return log directory path: dag_id/task_id/execution_date - :arg dag_id: id of the dag - :arg task_id: id of the task - :arg execution_date: execution date of the task instance - """ - # execution_date could be parsed in as unicode character - # instead of datetime object. - if isinstance(execution_date, six.string_types): - execution_date = dateutil.parser.parse(execution_date) - iso = execution_date.isoformat() - relative_dir = '{}/{}/{}'.format(dag_id, task_id, iso) - - return relative_dir http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/www/views.py ---------------------------------------------------------------------- diff --git a/airflow/www/views.py b/airflow/www/views.py index f8dffb9..3bfad5d 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -16,6 +16,7 @@ from past.builtins import basestring, unicode import ast +import logging import os import pkg_resources import socket @@ -67,7 +68,7 @@ from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, SCHEDULER_DEPS from airflow.models import BaseOperator from airflow.operators.subdag_operator import SubDagOperator -from airflow.utils.logging import LoggingMixin, get_log_filename +from airflow.utils.logging import LoggingMixin from airflow.utils.json import json_ser from airflow.utils.state import State from airflow.utils.db import provide_session @@ -225,16 +226,18 @@ def data_profiling_required(f): ''' Decorator for views requiring data profiling access ''' + @wraps(f) def decorated_function(*args, **kwargs): if ( - current_app.config['LOGIN_DISABLED'] or - (not current_user.is_anonymous() and current_user.data_profiling()) + current_app.config['LOGIN_DISABLED'] or + (not current_user.is_anonymous() and current_user.data_profiling()) ): return f(*args, **kwargs) else: flash("This page requires data profiling privileges", "error") return redirect(url_for('admin.index')) + return decorated_function @@ -280,8 +283,8 @@ def get_chart_height(dag): """ return 600 + len(dag.tasks) * 10 -class Airflow(BaseView): +class Airflow(BaseView): def is_visible(self): return False @@ -358,15 +361,15 @@ class Airflow(BaseView): if not payload['error'] and len(df) == 0: payload['error'] += "Empty result set. " elif ( - not payload['error'] and - chart.sql_layout == 'series' and - chart.chart_type != "datatable" and - len(df.columns) < 3): + not payload['error'] and + chart.sql_layout == 'series' and + chart.chart_type != "datatable" and + len(df.columns) < 3): payload['error'] += "SQL needs to return at least 3 columns. " elif ( - not payload['error'] and - chart.sql_layout == 'columns'and - len(df.columns) < 2): + not payload['error'] and + chart.sql_layout == 'columns' and + len(df.columns) < 2): payload['error'] += "SQL needs to return at least 2 columns. " elif not payload['error']: import numpy as np @@ -508,31 +511,31 @@ class Airflow(BaseView): LastDagRun = ( session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date')) - .join(Dag, Dag.dag_id == DagRun.dag_id) - .filter(DagRun.state != State.RUNNING) - .filter(Dag.is_active == True) - .group_by(DagRun.dag_id) - .subquery('last_dag_run') + .join(Dag, Dag.dag_id == DagRun.dag_id) + .filter(DagRun.state != State.RUNNING) + .filter(Dag.is_active == True) + .group_by(DagRun.dag_id) + .subquery('last_dag_run') ) RunningDagRun = ( session.query(DagRun.dag_id, DagRun.execution_date) - .join(Dag, Dag.dag_id == DagRun.dag_id) - .filter(DagRun.state == State.RUNNING) - .filter(Dag.is_active == True) - .subquery('running_dag_run') + .join(Dag, Dag.dag_id == DagRun.dag_id) + .filter(DagRun.state == State.RUNNING) + .filter(Dag.is_active == True) + .subquery('running_dag_run') ) # Select all task_instances from active dag_runs. # If no dag_run is active, return task instances from most recent dag_run. LastTI = ( session.query(TI.dag_id.label('dag_id'), TI.state.label('state')) - .join(LastDagRun, and_( + .join(LastDagRun, and_( LastDagRun.c.dag_id == TI.dag_id, LastDagRun.c.execution_date == TI.execution_date)) ) RunningTI = ( session.query(TI.dag_id.label('dag_id'), TI.state.label('state')) - .join(RunningDagRun, and_( + .join(RunningDagRun, and_( RunningDagRun.c.dag_id == TI.dag_id, RunningDagRun.c.execution_date == TI.execution_date)) ) @@ -540,7 +543,7 @@ class Airflow(BaseView): UnionTI = union_all(LastTI, RunningTI).alias('union_ti') qry = ( session.query(UnionTI.c.dag_id, UnionTI.c.state, sqla.func.count()) - .group_by(UnionTI.c.dag_id, UnionTI.c.state) + .group_by(UnionTI.c.dag_id, UnionTI.c.state) ) data = {} @@ -598,9 +601,9 @@ class Airflow(BaseView): TI = models.TaskInstance states = ( session.query(TI.state, sqla.func.count(TI.dag_id)) - .filter(TI.dag_id == dag_id) - .group_by(TI.state) - .all() + .filter(TI.dag_id == dag_id) + .group_by(TI.state) + .all() ) return self.render( 'airflow/dag_details.html', @@ -692,84 +695,7 @@ class Airflow(BaseView): task_id=task_id, execution_date=execution_date, form=form, - title=title,) - - def _get_log(self, ti, log_filename): - """ - Get log for a specific try number. - :param ti: current task instance - :param log_filename: relative filename to fetch the log - """ - # TODO: This is not the best practice. Log handler and - # reader should be configurable and separated from the - # frontend. The new airflow logging design is in progress. - # Please refer to #2422(https://github.com/apache/incubator-airflow/pull/2422). - log = '' - # Load remote log - remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') - remote_log_loaded = False - if remote_log_base: - remote_log_path = os.path.join(remote_log_base, log_filename) - remote_log = "" - - # S3 - if remote_log_path.startswith('s3:/'): - s3_log = log_utils.S3Log() - if s3_log.log_exists(remote_log_path): - remote_log += s3_log.read(remote_log_path, return_error=True) - remote_log_loaded = True - # GCS - elif remote_log_path.startswith('gs:/'): - gcs_log = log_utils.GCSLog() - if gcs_log.log_exists(remote_log_path): - remote_log += gcs_log.read(remote_log_path, return_error=True) - remote_log_loaded = True - # unsupported - else: - remote_log += '*** Unsupported remote log location.' - - if remote_log: - log += ('*** Reading remote log from {}.\n{}\n'.format( - remote_log_path, remote_log)) - - # We only want to display local log if the remote log is not loaded. - if not remote_log_loaded: - # Load local log - local_log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER')) - local_log_path = os.path.join(local_log_base, log_filename) - if os.path.exists(local_log_path): - try: - f = open(local_log_path) - log += "*** Reading local log.\n" + "".join(f.readlines()) - f.close() - except: - log = "*** Failed to load local log file: {0}.\n".format(local_log_path) - else: - WORKER_LOG_SERVER_PORT = conf.get('celery', 'WORKER_LOG_SERVER_PORT') - url = os.path.join( - "http://{ti.hostname}:{WORKER_LOG_SERVER_PORT}/log", log_filename - ).format(**locals()) - log += "*** Log file isn't local.\n" - log += "*** Fetching here: {url}\n".format(**locals()) - try: - import requests - timeout = None # No timeout - try: - timeout = conf.getint('webserver', 'log_fetch_timeout_sec') - except (AirflowConfigException, ValueError): - pass - - response = requests.get(url, timeout=timeout) - response.raise_for_status() - log += '\n' + response.text - except: - log += "*** Failed to fetch log file from work r.\n".format( - **locals()) - - if PY2 and not isinstance(log, unicode): - log = log.decode('utf-8') - - return log + title=title, ) @expose('/log') @login_required @@ -781,21 +707,27 @@ class Airflow(BaseView): dttm = dateutil.parser.parse(execution_date) form = DateTimeForm(data={'execution_date': dttm}) dag = dagbag.get_dag(dag_id) - TI = models.TaskInstance session = Session() - ti = session.query(TI).filter( - TI.dag_id == dag_id, - TI.task_id == task_id, - TI.execution_date == dttm).first() - logs = [] + ti = session.query(models.TaskInstance).filter( + models.TaskInstance.dag_id == dag_id, + models.TaskInstance.task_id == task_id, + models.TaskInstance.execution_date == dttm).first() if ti is None: logs = ["*** Task instance did not exist in the DB\n"] else: - logs = [''] * ti.try_number - for try_number in range(ti.try_number): - log_filename = get_log_filename( - dag_id, task_id, execution_date, try_number) - logs[try_number] += self._get_log(ti, log_filename) + logger = logging.getLogger('airflow.task') + task_log_reader = conf.get('core', 'task_log_reader') + handler = next((handler for handler in logger.handlers + if handler.name == task_log_reader), None) + try: + logs = handler.read(ti) + except AttributeError as e: + logs = ["Task log handler {} does not support read logs.\n{}\n" \ + .format(task_log_reader, e.message)] + + for i, log in enumerate(logs): + if PY2 and not isinstance(log, unicode): + logs[i] = log.decode('utf-8') return self.render( 'airflow/ti_log.html', @@ -858,9 +790,9 @@ class Airflow(BaseView): {} <br/> If this task instance does not start soon please contact your Airflow administrator for assistance.""" - .format( - "- This task instance already ran and had it's state changed manually (e.g. cleared in the UI)<br/>" - if ti.state == State.NONE else "")))] + .format( + "- This task instance already ran and had it's state changed manually (e.g. cleared in the UI)<br/>" + if ti.state == State.NONE else "")))] # Use the scheduler's context to figure out which dependencies are not met dep_context = DepContext(SCHEDULER_DEPS) @@ -1099,9 +1031,9 @@ class Airflow(BaseView): DR = models.DagRun dags = ( session.query(DR.dag_id, sqla.func.count(DR.id)) - .filter(DR.state == State.RUNNING) - .group_by(DR.dag_id) - .all() + .filter(DR.state == State.RUNNING) + .group_by(DR.dag_id) + .all() ) payload = [] for dag_id, active_dag_runs in dags: @@ -1238,11 +1170,11 @@ class Airflow(BaseView): DR = models.DagRun dag_runs = ( session.query(DR) - .filter( - DR.dag_id==dag.dag_id, - DR.execution_date<=base_date, - DR.execution_date>=min_date) - .all() + .filter( + DR.dag_id == dag.dag_id, + DR.execution_date <= base_date, + DR.execution_date >= min_date) + .all() ) dag_runs = { dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs} @@ -1285,7 +1217,7 @@ class Airflow(BaseView): def set_duration(tid): if (isinstance(tid, dict) and tid.get("state") == State.RUNNING and - tid["start_date"] is not None): + tid["start_date"] is not None): d = datetime.now() - dateutil.parser.parse(tid["start_date"]) tid["duration"] = d.total_seconds() return tid @@ -1308,6 +1240,7 @@ class Airflow(BaseView): 'depends_on_past': task.depends_on_past, 'ui_color': task.ui_color, } + data = { 'name': '[DAG]', 'children': [recurse_nodes(t, set()) for t in dag.roots], @@ -1388,8 +1321,8 @@ class Airflow(BaseView): DR = models.DagRun drs = ( session.query(DR) - .filter_by(dag_id=dag_id) - .order_by(desc(DR.execution_date)).all() + .filter_by(dag_id=dag_id) + .order_by(desc(DR.execution_date)).all() ) dr_choices = [] dr_state = None @@ -1406,6 +1339,7 @@ class Airflow(BaseView): ('TB', "Top->Bottom"), ('BT', "Bottom->Top"), )) + form = GraphForm( data={'execution_date': dttm.isoformat(), 'arrange': arrange}) @@ -1443,7 +1377,7 @@ class Airflow(BaseView): task_instances=json.dumps(task_instances, indent=2), tasks=json.dumps(tasks, indent=2), nodes=json.dumps(nodes, indent=2), - edges=json.dumps(edges, indent=2),) + edges=json.dumps(edges, indent=2), ) @expose('/duration') @login_required @@ -1471,7 +1405,6 @@ class Airflow(BaseView): include_upstream=True, include_downstream=False) - chart_height = get_chart_height(dag) chart = nvd3.lineChart( name="lineChart", x_is_date=True, height=chart_height, width="1200") @@ -1489,10 +1422,10 @@ class Airflow(BaseView): session .query(TF) .filter( - TF.dag_id == dag.dag_id, - TF.execution_date >= min_date, - TF.execution_date <= base_date, - TF.task_id.in_([t.task_id for t in dag.tasks])) + TF.dag_id == dag.dag_id, + TF.execution_date >= min_date, + TF.execution_date <= base_date, + TF.task_id.in_([t.task_id for t in dag.tasks])) .all() ) @@ -1525,7 +1458,7 @@ class Airflow(BaseView): y=scale_time_units(y[task.task_id], y_unit)) cum_chart.add_serie(name=task.task_id, x=x[task.task_id], y=scale_time_units(cum_y[task.task_id], - cum_y_unit)) + cum_y_unit)) dates = sorted(list({ti.execution_date for ti in tis})) max_date = max([ti.execution_date for ti in tis]) if dates else None @@ -1978,6 +1911,7 @@ class QueryView(wwwutils.DataProfilingMixin, BaseView): class QueryForm(Form): conn_id = SelectField("Layout", choices=db_choices) sql = TextAreaField("SQL", widget=wwwutils.AceEditorWidget()) + data = { 'conn_id': conn_id_str, 'sql': sql, @@ -2175,7 +2109,7 @@ class ChartModelView(wwwutils.DataProfilingMixin, AirflowModelView): (c.conn_id, c.conn_id) for c in ( Session().query(models.Connection.conn_id) - .group_by(models.Connection.conn_id) + .group_by(models.Connection.conn_id) ) ] } @@ -2189,6 +2123,7 @@ class ChartModelView(wwwutils.DataProfilingMixin, AirflowModelView): model.user_id = current_user.id model.last_modified = datetime.now() + chart_mapping = ( ('line', 'lineChart'), ('spline', 'lineChart'), @@ -2415,8 +2350,8 @@ class DagRunModelView(ModelViewOnly): deleted = set(session.query(models.DagRun) .filter(models.DagRun.id.in_(ids)) .all()) - session.query(models.DagRun)\ - .filter(models.DagRun.id.in_(ids))\ + session.query(models.DagRun) \ + .filter(models.DagRun.id.in_(ids)) \ .delete(synchronize_session='fetch') session.commit() dirty_ids = [] @@ -2779,9 +2714,9 @@ class DagModelView(wwwutils.SuperUserMixin, ModelView): """ return ( super(DagModelView, self) - .get_query() - .filter(or_(models.DagModel.is_active, models.DagModel.is_paused)) - .filter(~models.DagModel.is_subdag) + .get_query() + .filter(or_(models.DagModel.is_active, models.DagModel.is_paused)) + .filter(~models.DagModel.is_subdag) ) def get_count_query(self): @@ -2790,7 +2725,7 @@ class DagModelView(wwwutils.SuperUserMixin, ModelView): """ return ( super(DagModelView, self) - .get_count_query() - .filter(models.DagModel.is_active) - .filter(~models.DagModel.is_subdag) + .get_count_query() + .filter(models.DagModel.is_active) + .filter(~models.DagModel.is_subdag) ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/tests/utils/test_log_handlers.py ---------------------------------------------------------------------- diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py new file mode 100644 index 0000000..5b0d8a6 --- /dev/null +++ b/tests/utils/test_log_handlers.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import logging +import logging.config +import mock +import os +import unittest + +from datetime import datetime +from airflow.models import TaskInstance, DAG +from airflow.config_templates.default_airflow_logging import DEFAULT_LOGGING_CONFIG +from airflow.operators.dummy_operator import DummyOperator + +DEFAULT_DATE = datetime(2016, 1, 1) +TASK_LOGGER = 'airflow.task' +FILE_TASK_HANDLER = 'file.task' + + +class TestFileTaskLogHandler(unittest.TestCase): + + def setUp(self): + super(TestFileTaskLogHandler, self).setUp() + # We use file task handler by default. + logging.config.dictConfig(DEFAULT_LOGGING_CONFIG) + + def test_default_task_logging_setup(self): + # file task handler is used by default. + logger = logging.getLogger(TASK_LOGGER) + handlers = logger.handlers + self.assertEqual(len(handlers), 1) + handler = handlers[0] + self.assertEqual(handler.name, FILE_TASK_HANDLER) + + def test_file_task_handler(self): + dag = DAG('dag_for_testing_file_task_handler', start_date=DEFAULT_DATE) + task = DummyOperator(task_id='task_for_testing_file_log_handler', dag=dag) + ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) + + logger = logging.getLogger(TASK_LOGGER) + file_handler = next((handler for handler in logger.handlers + if handler.name == FILE_TASK_HANDLER), None) + self.assertIsNotNone(file_handler) + + file_handler.set_context(ti) + self.assertIsNotNone(file_handler.handler) + # We expect set_context generates a file locally. + log_filename = file_handler.handler.baseFilename + self.assertTrue(os.path.isfile(log_filename)) + + logger.info("test") + ti.run() + + self.assertTrue(hasattr(file_handler, 'read')) + # Return value of read must be a list. + logs = file_handler.read(ti) + self.assertTrue(isinstance(logs, list)) + self.assertEqual(len(logs), 1) + + # Remove the generated tmp log file. + os.remove(log_filename) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/tests/utils/test_logging.py ---------------------------------------------------------------------- diff --git a/tests/utils/test_logging.py b/tests/utils/test_logging.py index 3f9f5d6..72c5d49 100644 --- a/tests/utils/test_logging.py +++ b/tests/utils/test_logging.py @@ -12,26 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import unittest - import mock +import unittest from airflow.utils import logging from datetime import datetime - -class TestLogging(unittest.TestCase): - - def test_get_log_filename(self): - self.assertEqual( - logging.get_log_filename( - dag_id='dag_id', - task_id='task_id', - execution_date=datetime(2017, 1, 1, 0, 0, 0), - try_number=0, - ), - 'dag_id/task_id/2017-01-01T00:00:00/1.log', - ) +DEFAULT_DATE = datetime(2016, 1, 1) class TestS3Log(unittest.TestCase):