Repository: incubator-airflow
Updated Branches:
  refs/heads/master 0bc248fc7 -> b0669b532


[AIRFLOW-1385] Make Airflow task logging configurable

This PR adds configurable task logging to Airflow.
Please refer to #2422 for previous discussions.
This is the first step of making entire Airflow
logging configurable ([AIRFLOW-1454](https://issue
s.apache.org/jira/browse/AIRFLOW-1454)).

Closes #2464 from AllisonWang/allison--log-
abstraction


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b0669b53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b0669b53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b0669b53

Branch: refs/heads/master
Commit: b0669b532a7be9aa34a4390951deaa25897c62e6
Parents: 0bc248f
Author: AllisonWang <allisonwang...@gmail.com>
Authored: Fri Aug 11 11:38:37 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Fri Aug 11 11:38:39 2017 -0700

----------------------------------------------------------------------
 airflow/bin/cli.py                              | 106 ++-------
 airflow/config_templates/__init__.py            |  13 ++
 airflow/config_templates/default_airflow.cfg    |   7 +
 .../config_templates/default_airflow_logging.py |  94 ++++++++
 airflow/dag/__init__.py                         |   1 -
 airflow/settings.py                             |  14 ++
 airflow/task_runner/base_task_runner.py         |   2 +
 airflow/utils/log/__init__.py                   |  13 ++
 airflow/utils/log/file_task_handler.py          | 176 +++++++++++++++
 airflow/utils/log/gcs_task_handler.py           |  95 ++++++++
 airflow/utils/log/s3_task_handler.py            |  91 ++++++++
 airflow/utils/logging.py                        |  47 +---
 airflow/www/views.py                            | 225 +++++++------------
 tests/utils/test_log_handlers.py                |  73 ++++++
 tests/utils/test_logging.py                     |  17 +-
 15 files changed, 687 insertions(+), 287 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 077cb90..e9e60cb 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -22,7 +22,6 @@ import os
 import socket
 import subprocess
 import textwrap
-import warnings
 from importlib import import_module
 
 import argparse
@@ -54,8 +53,6 @@ from airflow.models import (DagModel, DagBag, TaskInstance,
 
 from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
 from airflow.utils import db as db_utils
-from airflow.utils import logging as logging_utils
-from airflow.utils.file import mkdirs
 from airflow.www.app import cached_app
 
 from sqlalchemy import func
@@ -357,61 +354,22 @@ def run(args, dag=None):
     ti = TaskInstance(task, args.execution_date)
     ti.refresh_from_db()
 
-    logging.root.handlers = []
+    logger = logging.getLogger('airflow.task')
     if args.raw:
-        # Output to STDOUT for the parent process to read and log
-        logging.basicConfig(
-            stream=sys.stdout,
-            level=settings.LOGGING_LEVEL,
-            format=settings.LOG_FORMAT)
-    else:
-        # Setting up logging to a file.
-
-        # To handle log writing when tasks are impersonated, the log files 
need to
-        # be writable by the user that runs the Airflow command and the user
-        # that is impersonated. This is mainly to handle corner cases with the
-        # SubDagOperator. When the SubDagOperator is run, all of the operators
-        # run under the impersonated user and create appropriate log files
-        # as the impersonated user. However, if the user manually runs tasks
-        # of the SubDagOperator through the UI, then the log files are created
-        # by the user that runs the Airflow command. For example, the Airflow
-        # run command may be run by the `airflow_sudoable` user, but the 
Airflow
-        # tasks may be run by the `airflow` user. If the log files are not
-        # writable by both users, then it's possible that re-running a task
-        # via the UI (or vice versa) results in a permission error as the task
-        # tries to write to a log file created by the other user.
-        try_number = ti.try_number
-        log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
-        log_relative_dir = logging_utils.get_log_directory(args.dag_id, 
args.task_id,
-                                                           args.execution_date)
-        directory = os.path.join(log_base, log_relative_dir)
-        # Create the log file and give it group writable permissions
-        # TODO(aoen): Make log dirs and logs globally readable for now since 
the SubDag
-        # operator is not compatible with impersonation (e.g. if a Celery 
executor is used
-        # for a SubDag operator and the SubDag operator has a different owner 
than the
-        # parent DAG)
-        if not os.path.isdir(directory):
-            # Create the directory as globally writable using custom mkdirs
-            # as os.makedirs doesn't set mode properly.
-            mkdirs(directory, 0o775)
-        log_relative = logging_utils.get_log_filename(
-            args.dag_id, args.task_id, args.execution_date, try_number)
-        filename = os.path.join(log_base, log_relative)
-
-        if not os.path.exists(filename):
-            open(filename, "a").close()
-            os.chmod(filename, 0o666)
-
-        logging.basicConfig(
-            filename=filename,
-            level=settings.LOGGING_LEVEL,
-            format=settings.LOG_FORMAT)
+        logger = logging.getLogger('airflow.task.raw')
+
+    for handler in logger.handlers:
+        try:
+            handler.set_context(ti)
+        except AttributeError:
+            # Not all handlers need to have context passed in so we ignore
+            # the error when handlers do not have set_context defined.
+            pass
 
     hostname = socket.getfqdn()
     logging.info("Running on host {}".format(hostname))
 
     if args.local:
-        print("Logging into: " + filename)
         run_job = jobs.LocalTaskJob(
             task_instance=ti,
             mark_success=args.mark_success,
@@ -469,43 +427,13 @@ def run(args, dag=None):
     if args.raw:
         return
 
-    # Force the log to flush, and set the handler to go back to normal so we
-    # don't continue logging to the task's log file. The flush is important
-    # because we subsequently read from the log to insert into S3 or Google
-    # cloud storage.
-    logging.root.handlers[0].flush()
-    logging.root.handlers = []
-
-    # store logs remotely
-    remote_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
-
-    # deprecated as of March 2016
-    if not remote_base and conf.get('core', 'S3_LOG_FOLDER'):
-        warnings.warn(
-            'The S3_LOG_FOLDER conf key has been replaced by '
-            'REMOTE_BASE_LOG_FOLDER. Your conf still works but please '
-            'update airflow.cfg to ensure future compatibility.',
-            DeprecationWarning)
-        remote_base = conf.get('core', 'S3_LOG_FOLDER')
-
-    if os.path.exists(filename):
-        # read log and remove old logs to get just the latest additions
-
-        with open(filename, 'r') as logfile:
-            log = logfile.read()
-
-        remote_log_location = os.path.join(remote_base, log_relative)
-        logging.debug("Uploading to remote log location 
{}".format(remote_log_location))
-        # S3
-        if remote_base.startswith('s3:/'):
-            logging_utils.S3Log().write(log, remote_log_location)
-        # GCS
-        elif remote_base.startswith('gs:/'):
-            logging_utils.GCSLog().write(log, remote_log_location)
-        # Other
-        elif remote_base and remote_base != 'None':
-            logging.error(
-                'Unsupported remote log location: {}'.format(remote_base))
+    # Force the log to flush. The flush is important because we
+    # might subsequently read from the log to insert into S3 or
+    # Google cloud storage. Explicitly close the handler is
+    # needed in order to upload to remote storage services.
+    for handler in logger.handlers:
+        handler.flush()
+        handler.close()
 
 
 def task_failed_deps(args):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/config_templates/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/__init__.py 
b/airflow/config_templates/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/airflow/config_templates/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index 33cee39..dcb99ed 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -122,6 +122,13 @@ security =
 # values at runtime)
 unit_test_mode = False
 
+# User defined logging configuration file path.
+logging_config_path =
+
+# Name of handler to read task instance logs.
+# Default to use file task handler.
+task_log_reader = file.task
+
 [cli]
 # In what way should the cli access the API. The LocalClient will use the
 # database directly, while the json_client will use the api running on the

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/config_templates/default_airflow_logging.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow_logging.py 
b/airflow/config_templates/default_airflow_logging.py
new file mode 100644
index 0000000..d6ae036
--- /dev/null
+++ b/airflow/config_templates/default_airflow_logging.py
@@ -0,0 +1,94 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+from airflow import configuration as conf
+
+# TODO: Logging format and level should be configured
+# in this file instead of from airflow.cfg. Currently
+# there are other log format and level configurations in
+# settings.py and cli.py. Please see AIRFLOW-1455.
+
+LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
+LOG_FORMAT = conf.get('core', 'log_format')
+
+BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
+
+# TODO: REMOTE_BASE_LOG_FOLDER should be deprecated and
+# directly specify in the handler definitions. This is to
+# provide compatibility to older remote log folder settings.
+REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
+S3_LOG_FOLDER = ''
+GCS_LOG_FOLDER = ''
+if REMOTE_BASE_LOG_FOLDER.startswith('s3:/'):
+    S3_LOG_FOLDER = REMOTE_BASE_LOG_FOLDER
+elif REMOTE_BASE_LOG_FOLDER.startswith('gs:/'):
+    GCS_LOG_FOLDER = REMOTE_BASE_LOG_FOLDER
+
+FILENAME_TEMPLATE = '{dag_id}/{task_id}/{execution_date}/{try_number}.log'
+
+DEFAULT_LOGGING_CONFIG = {
+    'version': 1,
+    'disable_existing_loggers': False,
+    'formatters': {
+        'airflow.task': {
+            'format': LOG_FORMAT,
+        },
+    },
+    'handlers': {
+        'console': {
+            'class': 'logging.StreamHandler',
+            'formatter': 'airflow.task',
+            'stream': 'ext://sys.stdout'
+        },
+        'file.task': {
+            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
+            'formatter': 'airflow.task',
+            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+            'filename_template': FILENAME_TEMPLATE,
+        },
+        's3.task': {
+            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
+            'formatter': 'airflow.task',
+            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+            's3_log_folder': S3_LOG_FOLDER,
+            'filename_template': FILENAME_TEMPLATE,
+        },
+        'gcs.task': {
+            'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
+            'formatter': 'airflow.task',
+            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+            'gcs_log_folder': GCS_LOG_FOLDER,
+            'filename_template': FILENAME_TEMPLATE,
+        },
+    },
+    'loggers': {
+        'airflow.task': {
+            'handlers': ['file.task'],
+            'level': LOG_LEVEL,
+            'propagate': False,
+        },
+        'airflow.task_runner': {
+            'handlers': ['file.task'],
+            'level': LOG_LEVEL,
+            'propagate': True,
+        },
+        'airflow.task.raw': {
+            'handlers': ['console'],
+            'level': LOG_LEVEL,
+            'propagate': False,
+        },
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/dag/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/dag/__init__.py b/airflow/dag/__init__.py
index 759b563..9d7677a 100644
--- a/airflow/dag/__init__.py
+++ b/airflow/dag/__init__.py
@@ -11,4 +11,3 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index 3f7560d..9567020 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -18,6 +18,7 @@ from __future__ import print_function
 from __future__ import unicode_literals
 
 import logging
+import logging.config
 import os
 import sys
 
@@ -169,6 +170,19 @@ configure_logging()
 configure_vars()
 configure_orm()
 
+# TODO: Unify airflow logging setups. Please see AIRFLOW-1457.
+logging_config_path = conf.get('core', 'logging_config_path')
+try:
+    from logging_config_path import LOGGING_CONFIG
+    logging.debug("Successfully imported user-defined logging config.")
+except Exception as e:
+    # Import default logging configurations.
+    logging.debug("Unable to load custom logging config file: {}."
+                  " Using default airflow logging config 
instead".format(str(e)))
+    from airflow.config_templates.default_airflow_logging import \
+        DEFAULT_LOGGING_CONFIG as LOGGING_CONFIG
+logging.config.dictConfig(LOGGING_CONFIG)
+
 # Const stuff
 
 KILOBYTE = 1024

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/task_runner/base_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/task_runner/base_task_runner.py 
b/airflow/task_runner/base_task_runner.py
index fa3fd6b..7229be5 100644
--- a/airflow/task_runner/base_task_runner.py
+++ b/airflow/task_runner/base_task_runner.py
@@ -35,7 +35,9 @@ class BaseTaskRunner(LoggingMixin):
         associated task instance.
         :type local_task_job: airflow.jobs.LocalTaskJob
         """
+        # Pass task instance context into log handlers to setup the logger.
         self._task_instance = local_task_job.task_instance
+        self.set_logger_contexts(self._task_instance)
 
         popen_prepend = []
         cfg_path = None

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/utils/log/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/__init__.py b/airflow/utils/log/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/airflow/utils/log/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/utils/log/file_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/file_task_handler.py 
b/airflow/utils/log/file_task_handler.py
new file mode 100644
index 0000000..bce974c
--- /dev/null
+++ b/airflow/utils/log/file_task_handler.py
@@ -0,0 +1,176 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+
+from airflow import configuration as conf
+from airflow.configuration import AirflowConfigException
+from airflow.utils.file import mkdirs
+
+
+class FileTaskHandler(logging.Handler):
+    """
+    FileTaskHandler is a python log handler that handles and reads
+    task instance logs. It creates and delegates log handling
+    to `logging.FileHandler` after receiving task instance context.
+    It reads logs from task instance's host machine.
+    """
+
+    def __init__(self, base_log_folder, filename_template):
+        """
+        :param base_log_folder: Base log folder to place logs.
+        :param filename_template: template filename string
+        """
+        super(FileTaskHandler, self).__init__()
+        self.handler = None
+        self.local_base = base_log_folder
+        self.filename_template = filename_template
+
+    def set_context(self, ti):
+        """
+        Provide task_instance context to airflow task handler.
+        :param ti: task instance object
+        """
+        local_loc = self._init_file(ti)
+        self.handler = logging.FileHandler(local_loc)
+        self.handler.setFormatter(self.formatter)
+        self.handler.setLevel(self.level)
+
+    def emit(self, record):
+        if self.handler is not None:
+            self.handler.emit(record)
+
+    def flush(self):
+        if self.handler is not None:
+            self.handler.flush()
+
+    def close(self):
+        if self.handler is not None:
+            self.handler.close()
+
+    def _read(self, ti, try_number):
+        """
+        Template method that contains custom logic of reading
+        logs given the try_number.
+        :param ti: task instance record
+        :param try_number: current try_number to read log from
+        :return: log message as a string
+        """
+        # Task instance here might be different from task instance when
+        # initializing the handler. Thus explicitly getting log location
+        # is needed to get correct log path.
+        log_relative_path = self.filename_template.format(
+            dag_id=ti.dag_id, task_id=ti.task_id,
+            execution_date=ti.execution_date.isoformat(), 
try_number=try_number + 1)
+        loc = os.path.join(self.local_base, log_relative_path)
+        log = ""
+
+        if os.path.exists(loc):
+            try:
+                with open(loc) as f:
+                    log += "*** Reading local log.\n" + "".join(f.readlines())
+            except Exception as e:
+                log = "*** Failed to load local log file: {}. 
{}\n".format(loc, str(e))
+        else:
+            url = 
os.path.join("http://{ti.hostname}:{worker_log_server_port}/log";,
+                               log_relative_path).format(
+                ti=ti,
+                worker_log_server_port=conf.get('celery', 
'WORKER_LOG_SERVER_PORT'))
+            log += "*** Log file isn't local.\n"
+            log += "*** Fetching here: {url}\n".format(**locals())
+            try:
+                import requests
+                timeout = None  # No timeout
+                try:
+                    timeout = conf.getint('webserver', 'log_fetch_timeout_sec')
+                except (AirflowConfigException, ValueError):
+                    pass
+
+                response = requests.get(url, timeout=timeout)
+                response.raise_for_status()
+                log += '\n' + response.text
+            except Exception as e:
+                log += "*** Failed to fetch log file from worker. 
{}\n".format(str(e))
+
+        return log
+
+    def read(self, task_instance, try_number=None):
+        """
+        Read logs of given task instance from local machine.
+        :param task_instance: task instance object
+        :param try_number: task instance try_number to read logs from. If None
+                           it returns all logs separated by try_number
+        :return: a list of logs
+        """
+        # Task instance increments its try number when it starts to run.
+        # So the log for a particular task try will only show up when
+        # try number gets incremented in DB, i.e logs produced the time
+        # after cli run and before try_number + 1 in DB will not be displayed.
+        next_try = task_instance.try_number
+
+        if try_number is None:
+            try_numbers = list(range(next_try))
+        elif try_number < 0:
+            logs = ['Error fetching the logs. Try number {} is 
invalid.'.format(try_number)]
+            return logs
+        else:
+            try_numbers = [try_number]
+
+        logs = [''] * len(try_numbers)
+        for i, try_number in enumerate(try_numbers):
+            logs[i] += self._read(task_instance, try_number)
+
+        return logs
+
+    def _init_file(self, ti):
+        """
+        Create log directory and give it correct permissions.
+        :param ti: task instance object
+        :return relative log path of the given task instance
+        """
+        # To handle log writing when tasks are impersonated, the log files 
need to
+        # be writable by the user that runs the Airflow command and the user
+        # that is impersonated. This is mainly to handle corner cases with the
+        # SubDagOperator. When the SubDagOperator is run, all of the operators
+        # run under the impersonated user and create appropriate log files
+        # as the impersonated user. However, if the user manually runs tasks
+        # of the SubDagOperator through the UI, then the log files are created
+        # by the user that runs the Airflow command. For example, the Airflow
+        # run command may be run by the `airflow_sudoable` user, but the 
Airflow
+        # tasks may be run by the `airflow` user. If the log files are not
+        # writable by both users, then it's possible that re-running a task
+        # via the UI (or vice versa) results in a permission error as the task
+        # tries to write to a log file created by the other user.
+        relative_path = self.filename_template.format(
+            dag_id=ti.dag_id, task_id=ti.task_id,
+            execution_date=ti.execution_date.isoformat(), 
try_number=ti.try_number + 1)
+        full_path = os.path.join(self.local_base, relative_path)
+        directory = os.path.dirname(full_path)
+        # Create the log file and give it group writable permissions
+        # TODO(aoen): Make log dirs and logs globally readable for now since 
the SubDag
+        # operator is not compatible with impersonation (e.g. if a Celery 
executor is used
+        # for a SubDag operator and the SubDag operator has a different owner 
than the
+        # parent DAG)
+        if not os.path.exists(directory):
+            # Create the directory as globally writable using custom mkdirs
+            # as os.makedirs doesn't set mode properly.
+            mkdirs(directory, 0o775)
+
+        if not os.path.exists(full_path):
+            open(full_path, "a").close()
+            # TODO: Investigate using 444 instead of 666.
+            os.chmod(full_path, 0o666)
+
+        return full_path

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/utils/log/gcs_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/gcs_task_handler.py 
b/airflow/utils/log/gcs_task_handler.py
new file mode 100644
index 0000000..5b35907
--- /dev/null
+++ b/airflow/utils/log/gcs_task_handler.py
@@ -0,0 +1,95 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import warnings
+
+from airflow import configuration as conf
+from airflow.utils import logging as logging_utils
+from airflow.utils.log.file_task_handler import FileTaskHandler
+
+
+class GCSTaskHandler(FileTaskHandler):
+    """
+    GCSTaskHandler is a python log handler that handles and reads
+    task instance logs. It extends airflow FileTaskHandler and
+    uploads to and reads from GCS remote storage. Upon log reading
+    failure, it reads from host machine's local disk.
+    """
+
+    def __init__(self, base_log_folder, gcs_log_folder, filename_template):
+        super(GCSTaskHandler, self).__init__(base_log_folder, 
filename_template)
+        self.remote_base = gcs_log_folder
+        self.log_relative_path = ''
+        self.closed = False
+
+    def set_context(self, ti):
+        super(GCSTaskHandler, self).set_context(ti)
+        # Log relative path is used to construct local and remote
+        # log path to upload log files into GCS and read from the
+        # remote location.
+        self.log_relative_path = self.filename_template(
+            dag_id=ti.dag_id, task_id=ti.task_id,
+            execution_date=ti.execution_date.isoformat(), 
try_number=ti.try_number + 1)
+
+    def close(self):
+        """
+        Close and upload local log file to remote storage S3.
+        """
+        # When application exit, system shuts down all handlers by
+        # calling close method. Here we check if logger is already
+        # closed to prevent uploading the log to remote storage multiple
+        # times when `logging.shutdown` is called.
+        if self.closed:
+            return
+
+        super(GCSTaskHandler, self).close()
+
+        local_loc = os.path.join(self.local_base, self.log_relative_path)
+        remote_loc = os.path.join(self.remote_base, self.log_relative_path)
+        if os.path.exists(local_loc):
+            # read log and remove old logs to get just the latest additions
+            with open(local_loc, 'r') as logfile:
+                log = logfile.read()
+            logging_utils.GCSLog().write(log, remote_loc)
+
+        self.closed = True
+
+    def _read(self, ti, try_number):
+        """
+        Read logs of given task instance and try_number from GCS.
+        If failed, read the log from task instance host machine.
+        :param ti: task instance object
+        :param try_number: task instance try_number to read logs from
+        """
+        # Explicitly getting log relative path is necessary as the given
+        # task instance might be different than task instance passed in
+        # in set_context method.
+        log_relative_path = self.filename_template.format(
+            dag_id=ti.dag_id, task_id=ti.task_id,
+            execution_date=ti.execution_date.isoformat(), 
try_number=try_number + 1)
+        remote_loc = os.path.join(self.remote_base, log_relative_path)
+
+        gcs_log = logging_utils.GCSLog()
+        if gcs_log.log_exists(remote_loc):
+            # If GCS remote file exists, we do not fetch logs from task 
instance
+            # local machine even if there are errors reading remote logs, as
+            # remote_log will contain error message.
+            remote_log = gcs_log.read(remote_loc, return_error=True)
+            log = '*** Reading remote log from {}.\n{}\n'.format(
+                remote_loc, remote_log)
+        else:
+            log = super(GCSTaskHandler, self)._read(ti, try_number)
+
+        return log

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/utils/log/s3_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/s3_task_handler.py 
b/airflow/utils/log/s3_task_handler.py
new file mode 100644
index 0000000..7268d22
--- /dev/null
+++ b/airflow/utils/log/s3_task_handler.py
@@ -0,0 +1,91 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+from airflow.utils import logging as logging_utils
+from airflow.utils.log.file_task_handler import FileTaskHandler
+
+
+class S3TaskHandler(FileTaskHandler):
+    """
+    S3TaskHandler is a python log handler that handles and reads
+    task instance logs. It extends airflow FileTaskHandler and
+    uploads to and reads from S3 remote storage.
+    """
+
+    def __init__(self, base_log_folder, s3_log_folder, filename_template):
+        super(S3TaskHandler, self).__init__(base_log_folder, filename_template)
+        self.remote_base = s3_log_folder
+        self.log_relative_path = ''
+        self.closed = False
+
+    def set_context(self, ti):
+        super(S3TaskHandler, self).set_context(ti)
+        # Local location and remote location is needed to open and
+        # upload local log file to S3 remote storage.
+        self.log_relative_path = self.filename_template.format(
+            dag_id=ti.dag_id, task_id=ti.task_id,
+            execution_date=ti.execution_date.isoformat(), 
try_number=ti.try_number + 1)
+
+    def close(self):
+        """
+        Close and upload local log file to remote storage S3.
+        """
+        # When application exit, system shuts down all handlers by
+        # calling close method. Here we check if logger is already
+        # closed to prevent uploading the log to remote storage multiple
+        # times when `logging.shutdown` is called.
+        if self.closed:
+            return
+
+        super(S3TaskHandler, self).close()
+
+        local_loc = os.path.join(self.local_base, self.log_relative_path)
+        remote_loc = os.path.join(self.remote_base, self.log_relative_path)
+        if os.path.exists(local_loc):
+            # read log and remove old logs to get just the latest additions
+            with open(local_loc, 'r') as logfile:
+                log = logfile.read()
+            logging_utils.S3Log().write(log, remote_loc)
+
+        self.closed = True
+
+    def _read(self, ti, try_number):
+        """
+        Read logs of given task instance and try_number from S3 remote storage.
+        If failed, read the log from task instance host machine.
+        :param ti: task instance object
+        :param try_number: task instance try_number to read logs from
+        """
+        # Explicitly getting log relative path is necessary as the given
+        # task instance might be different than task instance passed in
+        # in set_context method.
+        log_relative_path = self.filename_template.format(
+            dag_id=ti.dag_id, task_id=ti.task_id,
+            execution_date=ti.execution_date.isoformat(), 
try_number=try_number + 1)
+        remote_loc = os.path.join(self.remote_base, log_relative_path)
+
+        s3_log = logging_utils.S3Log()
+        if s3_log.log_exists(remote_loc):
+            # If S3 remote file exists, we do not fetch logs from task instance
+            # local machine even if there are errors reading remote logs, as
+            # returned remote_log will contain error messages.
+            remote_log = s3_log.read(remote_loc, return_error=True)
+            log = '*** Reading remote log from {}.\n{}\n'.format(
+                remote_loc, remote_log)
+        else:
+            log = super(S3TaskHandler, self)._read(ti, try_number)
+
+        return log

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/utils/logging.py
----------------------------------------------------------------------
diff --git a/airflow/utils/logging.py b/airflow/utils/logging.py
index 6e18e52..c550c88 100644
--- a/airflow/utils/logging.py
+++ b/airflow/utils/logging.py
@@ -40,6 +40,16 @@ class LoggingMixin(object):
             self._logger = logging.root.getChild(self.__class__.__module__ + 
'.' + self.__class__.__name__)
             return self._logger
 
+    def set_logger_contexts(self, task_instance):
+        """
+        Set the context for all handlers of current logger.
+        """
+        for handler in self.logger.handlers:
+            try:
+                handler.set_context(task_instance)
+            except AttributeError:
+                pass
+
 
 class S3Log(object):
     """
@@ -240,40 +250,3 @@ class GCSLog(object):
             bucket = parsed_url.netloc
             blob = parsed_url.path.strip('/')
             return (bucket, blob)
-
-
-# TODO: get_log_filename and get_log_directory are temporary helper
-# functions to get airflow log filename. Logic of using FileHandler
-# will be extract out and those two functions will be moved.
-# For more details, please check issue AIRFLOW-1385.
-def get_log_filename(dag_id, task_id, execution_date, try_number):
-    """
-    Return relative log path.
-    :arg dag_id: id of the dag
-    :arg task_id: id of the task
-    :arg execution_date: execution date of the task instance
-    :arg try_number: try_number of current task instance
-    """
-    relative_dir = get_log_directory(dag_id, task_id, execution_date)
-    # For reporting purposes and keeping logs consistent with web UI
-    # display, we report based on 1-indexed, not 0-indexed lists
-    filename = "{}/{}.log".format(relative_dir, try_number+1)
-
-    return filename
-
-
-def get_log_directory(dag_id, task_id, execution_date):
-    """
-    Return log directory path: dag_id/task_id/execution_date
-    :arg dag_id: id of the dag
-    :arg task_id: id of the task
-    :arg execution_date: execution date of the task instance
-    """
-    # execution_date could be parsed in as unicode character
-    # instead of datetime object.
-    if isinstance(execution_date, six.string_types):
-        execution_date = dateutil.parser.parse(execution_date)
-    iso = execution_date.isoformat()
-    relative_dir = '{}/{}/{}'.format(dag_id, task_id, iso)
-
-    return relative_dir

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index f8dffb9..3bfad5d 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -16,6 +16,7 @@
 from past.builtins import basestring, unicode
 
 import ast
+import logging
 import os
 import pkg_resources
 import socket
@@ -67,7 +68,7 @@ from airflow.ti_deps.dep_context import DepContext, 
QUEUE_DEPS, SCHEDULER_DEPS
 from airflow.models import BaseOperator
 from airflow.operators.subdag_operator import SubDagOperator
 
-from airflow.utils.logging import LoggingMixin, get_log_filename
+from airflow.utils.logging import LoggingMixin
 from airflow.utils.json import json_ser
 from airflow.utils.state import State
 from airflow.utils.db import provide_session
@@ -225,16 +226,18 @@ def data_profiling_required(f):
     '''
     Decorator for views requiring data profiling access
     '''
+
     @wraps(f)
     def decorated_function(*args, **kwargs):
         if (
-            current_app.config['LOGIN_DISABLED'] or
-            (not current_user.is_anonymous() and current_user.data_profiling())
+                current_app.config['LOGIN_DISABLED'] or
+                (not current_user.is_anonymous() and 
current_user.data_profiling())
         ):
             return f(*args, **kwargs)
         else:
             flash("This page requires data profiling privileges", "error")
             return redirect(url_for('admin.index'))
+
     return decorated_function
 
 
@@ -280,8 +283,8 @@ def get_chart_height(dag):
     """
     return 600 + len(dag.tasks) * 10
 
-class Airflow(BaseView):
 
+class Airflow(BaseView):
     def is_visible(self):
         return False
 
@@ -358,15 +361,15 @@ class Airflow(BaseView):
         if not payload['error'] and len(df) == 0:
             payload['error'] += "Empty result set. "
         elif (
-                not payload['error'] and
-                chart.sql_layout == 'series' and
-                chart.chart_type != "datatable" and
-                len(df.columns) < 3):
+                        not payload['error'] and
+                            chart.sql_layout == 'series' and
+                        chart.chart_type != "datatable" and
+                    len(df.columns) < 3):
             payload['error'] += "SQL needs to return at least 3 columns. "
         elif (
-                not payload['error'] and
-                chart.sql_layout == 'columns'and
-                len(df.columns) < 2):
+                    not payload['error'] and
+                        chart.sql_layout == 'columns' and
+                    len(df.columns) < 2):
             payload['error'] += "SQL needs to return at least 2 columns. "
         elif not payload['error']:
             import numpy as np
@@ -508,31 +511,31 @@ class Airflow(BaseView):
 
         LastDagRun = (
             session.query(DagRun.dag_id, 
sqla.func.max(DagRun.execution_date).label('execution_date'))
-            .join(Dag, Dag.dag_id == DagRun.dag_id)
-            .filter(DagRun.state != State.RUNNING)
-            .filter(Dag.is_active == True)
-            .group_by(DagRun.dag_id)
-            .subquery('last_dag_run')
+                .join(Dag, Dag.dag_id == DagRun.dag_id)
+                .filter(DagRun.state != State.RUNNING)
+                .filter(Dag.is_active == True)
+                .group_by(DagRun.dag_id)
+                .subquery('last_dag_run')
         )
         RunningDagRun = (
             session.query(DagRun.dag_id, DagRun.execution_date)
-            .join(Dag, Dag.dag_id == DagRun.dag_id)
-            .filter(DagRun.state == State.RUNNING)
-            .filter(Dag.is_active == True)
-            .subquery('running_dag_run')
+                .join(Dag, Dag.dag_id == DagRun.dag_id)
+                .filter(DagRun.state == State.RUNNING)
+                .filter(Dag.is_active == True)
+                .subquery('running_dag_run')
         )
 
         # Select all task_instances from active dag_runs.
         # If no dag_run is active, return task instances from most recent 
dag_run.
         LastTI = (
             session.query(TI.dag_id.label('dag_id'), TI.state.label('state'))
-            .join(LastDagRun, and_(
+                .join(LastDagRun, and_(
                 LastDagRun.c.dag_id == TI.dag_id,
                 LastDagRun.c.execution_date == TI.execution_date))
         )
         RunningTI = (
             session.query(TI.dag_id.label('dag_id'), TI.state.label('state'))
-            .join(RunningDagRun, and_(
+                .join(RunningDagRun, and_(
                 RunningDagRun.c.dag_id == TI.dag_id,
                 RunningDagRun.c.execution_date == TI.execution_date))
         )
@@ -540,7 +543,7 @@ class Airflow(BaseView):
         UnionTI = union_all(LastTI, RunningTI).alias('union_ti')
         qry = (
             session.query(UnionTI.c.dag_id, UnionTI.c.state, sqla.func.count())
-            .group_by(UnionTI.c.dag_id, UnionTI.c.state)
+                .group_by(UnionTI.c.dag_id, UnionTI.c.state)
         )
 
         data = {}
@@ -598,9 +601,9 @@ class Airflow(BaseView):
         TI = models.TaskInstance
         states = (
             session.query(TI.state, sqla.func.count(TI.dag_id))
-            .filter(TI.dag_id == dag_id)
-            .group_by(TI.state)
-            .all()
+                .filter(TI.dag_id == dag_id)
+                .group_by(TI.state)
+                .all()
         )
         return self.render(
             'airflow/dag_details.html',
@@ -692,84 +695,7 @@ class Airflow(BaseView):
             task_id=task_id,
             execution_date=execution_date,
             form=form,
-            title=title,)
-
-    def _get_log(self, ti, log_filename):
-        """
-        Get log for a specific try number.
-        :param ti: current task instance
-        :param log_filename: relative filename to fetch the log
-        """
-        # TODO: This is not the best practice. Log handler and
-        # reader should be configurable and separated from the
-        # frontend. The new airflow logging design is in progress.
-        # Please refer to 
#2422(https://github.com/apache/incubator-airflow/pull/2422).
-        log = ''
-        # Load remote log
-        remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
-        remote_log_loaded = False
-        if remote_log_base:
-            remote_log_path = os.path.join(remote_log_base, log_filename)
-            remote_log = ""
-
-            # S3
-            if remote_log_path.startswith('s3:/'):
-                s3_log = log_utils.S3Log()
-                if s3_log.log_exists(remote_log_path):
-                    remote_log += s3_log.read(remote_log_path, 
return_error=True)
-                    remote_log_loaded = True
-            # GCS
-            elif remote_log_path.startswith('gs:/'):
-                gcs_log = log_utils.GCSLog()
-                if gcs_log.log_exists(remote_log_path):
-                    remote_log += gcs_log.read(remote_log_path, 
return_error=True)
-                    remote_log_loaded = True
-            # unsupported
-            else:
-                remote_log += '*** Unsupported remote log location.'
-
-            if remote_log:
-                log += ('*** Reading remote log from {}.\n{}\n'.format(
-                    remote_log_path, remote_log))
-
-        # We only want to display local log if the remote log is not loaded.
-        if not remote_log_loaded:
-            # Load local log
-            local_log_base = os.path.expanduser(conf.get('core', 
'BASE_LOG_FOLDER'))
-            local_log_path = os.path.join(local_log_base, log_filename)
-            if os.path.exists(local_log_path):
-                try:
-                    f = open(local_log_path)
-                    log += "*** Reading local log.\n" + "".join(f.readlines())
-                    f.close()
-                except:
-                    log = "*** Failed to load local log file: 
{0}.\n".format(local_log_path)
-            else:
-                WORKER_LOG_SERVER_PORT = conf.get('celery', 
'WORKER_LOG_SERVER_PORT')
-                url = os.path.join(
-                    "http://{ti.hostname}:{WORKER_LOG_SERVER_PORT}/log";, 
log_filename
-                ).format(**locals())
-                log += "*** Log file isn't local.\n"
-                log += "*** Fetching here: {url}\n".format(**locals())
-                try:
-                    import requests
-                    timeout = None  # No timeout
-                    try:
-                        timeout = conf.getint('webserver', 
'log_fetch_timeout_sec')
-                    except (AirflowConfigException, ValueError):
-                        pass
-
-                    response = requests.get(url, timeout=timeout)
-                    response.raise_for_status()
-                    log += '\n' + response.text
-                except:
-                    log += "*** Failed to fetch log file from work 
r.\n".format(
-                        **locals())
-
-        if PY2 and not isinstance(log, unicode):
-            log = log.decode('utf-8')
-
-        return log
+            title=title, )
 
     @expose('/log')
     @login_required
@@ -781,21 +707,27 @@ class Airflow(BaseView):
         dttm = dateutil.parser.parse(execution_date)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
-        TI = models.TaskInstance
         session = Session()
-        ti = session.query(TI).filter(
-            TI.dag_id == dag_id,
-            TI.task_id == task_id,
-            TI.execution_date == dttm).first()
-        logs = []
+        ti = session.query(models.TaskInstance).filter(
+            models.TaskInstance.dag_id == dag_id,
+            models.TaskInstance.task_id == task_id,
+            models.TaskInstance.execution_date == dttm).first()
         if ti is None:
             logs = ["*** Task instance did not exist in the DB\n"]
         else:
-            logs = [''] * ti.try_number
-            for try_number in range(ti.try_number):
-                log_filename = get_log_filename(
-                    dag_id, task_id, execution_date, try_number)
-                logs[try_number] += self._get_log(ti, log_filename)
+            logger = logging.getLogger('airflow.task')
+            task_log_reader = conf.get('core', 'task_log_reader')
+            handler = next((handler for handler in logger.handlers
+                            if handler.name == task_log_reader), None)
+            try:
+                logs = handler.read(ti)
+            except AttributeError as e:
+                logs = ["Task log handler {} does not support read 
logs.\n{}\n" \
+                            .format(task_log_reader, e.message)]
+
+        for i, log in enumerate(logs):
+            if PY2 and not isinstance(log, unicode):
+                logs[i] = log.decode('utf-8')
 
         return self.render(
             'airflow/ti_log.html',
@@ -858,9 +790,9 @@ class Airflow(BaseView):
             {}
             <br/>
             If this task instance does not start soon please contact your 
Airflow administrator for assistance."""
-                   .format(
-                       "- This task instance already ran and had it's state 
changed manually (e.g. cleared in the UI)<br/>"
-                       if ti.state == State.NONE else "")))]
+                .format(
+                "- This task instance already ran and had it's state changed 
manually (e.g. cleared in the UI)<br/>"
+                if ti.state == State.NONE else "")))]
 
         # Use the scheduler's context to figure out which dependencies are not 
met
         dep_context = DepContext(SCHEDULER_DEPS)
@@ -1099,9 +1031,9 @@ class Airflow(BaseView):
         DR = models.DagRun
         dags = (
             session.query(DR.dag_id, sqla.func.count(DR.id))
-            .filter(DR.state == State.RUNNING)
-            .group_by(DR.dag_id)
-            .all()
+                .filter(DR.state == State.RUNNING)
+                .group_by(DR.dag_id)
+                .all()
         )
         payload = []
         for dag_id, active_dag_runs in dags:
@@ -1238,11 +1170,11 @@ class Airflow(BaseView):
         DR = models.DagRun
         dag_runs = (
             session.query(DR)
-            .filter(
-                DR.dag_id==dag.dag_id,
-                DR.execution_date<=base_date,
-                DR.execution_date>=min_date)
-            .all()
+                .filter(
+                DR.dag_id == dag.dag_id,
+                DR.execution_date <= base_date,
+                DR.execution_date >= min_date)
+                .all()
         )
         dag_runs = {
             dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs}
@@ -1285,7 +1217,7 @@ class Airflow(BaseView):
 
             def set_duration(tid):
                 if (isinstance(tid, dict) and tid.get("state") == 
State.RUNNING and
-                        tid["start_date"] is not None):
+                            tid["start_date"] is not None):
                     d = datetime.now() - 
dateutil.parser.parse(tid["start_date"])
                     tid["duration"] = d.total_seconds()
                 return tid
@@ -1308,6 +1240,7 @@ class Airflow(BaseView):
                 'depends_on_past': task.depends_on_past,
                 'ui_color': task.ui_color,
             }
+
         data = {
             'name': '[DAG]',
             'children': [recurse_nodes(t, set()) for t in dag.roots],
@@ -1388,8 +1321,8 @@ class Airflow(BaseView):
         DR = models.DagRun
         drs = (
             session.query(DR)
-            .filter_by(dag_id=dag_id)
-            .order_by(desc(DR.execution_date)).all()
+                .filter_by(dag_id=dag_id)
+                .order_by(desc(DR.execution_date)).all()
         )
         dr_choices = []
         dr_state = None
@@ -1406,6 +1339,7 @@ class Airflow(BaseView):
                 ('TB', "Top->Bottom"),
                 ('BT', "Bottom->Top"),
             ))
+
         form = GraphForm(
             data={'execution_date': dttm.isoformat(), 'arrange': arrange})
 
@@ -1443,7 +1377,7 @@ class Airflow(BaseView):
             task_instances=json.dumps(task_instances, indent=2),
             tasks=json.dumps(tasks, indent=2),
             nodes=json.dumps(nodes, indent=2),
-            edges=json.dumps(edges, indent=2),)
+            edges=json.dumps(edges, indent=2), )
 
     @expose('/duration')
     @login_required
@@ -1471,7 +1405,6 @@ class Airflow(BaseView):
                 include_upstream=True,
                 include_downstream=False)
 
-
         chart_height = get_chart_height(dag)
         chart = nvd3.lineChart(
             name="lineChart", x_is_date=True, height=chart_height, 
width="1200")
@@ -1489,10 +1422,10 @@ class Airflow(BaseView):
             session
                 .query(TF)
                 .filter(
-                    TF.dag_id == dag.dag_id,
-                    TF.execution_date >= min_date,
-                    TF.execution_date <= base_date,
-                    TF.task_id.in_([t.task_id for t in dag.tasks]))
+                TF.dag_id == dag.dag_id,
+                TF.execution_date >= min_date,
+                TF.execution_date <= base_date,
+                TF.task_id.in_([t.task_id for t in dag.tasks]))
                 .all()
         )
 
@@ -1525,7 +1458,7 @@ class Airflow(BaseView):
                                 y=scale_time_units(y[task.task_id], y_unit))
                 cum_chart.add_serie(name=task.task_id, x=x[task.task_id],
                                     y=scale_time_units(cum_y[task.task_id],
-                                    cum_y_unit))
+                                                       cum_y_unit))
 
         dates = sorted(list({ti.execution_date for ti in tis}))
         max_date = max([ti.execution_date for ti in tis]) if dates else None
@@ -1978,6 +1911,7 @@ class QueryView(wwwutils.DataProfilingMixin, BaseView):
         class QueryForm(Form):
             conn_id = SelectField("Layout", choices=db_choices)
             sql = TextAreaField("SQL", widget=wwwutils.AceEditorWidget())
+
         data = {
             'conn_id': conn_id_str,
             'sql': sql,
@@ -2175,7 +2109,7 @@ class ChartModelView(wwwutils.DataProfilingMixin, 
AirflowModelView):
             (c.conn_id, c.conn_id)
             for c in (
                 Session().query(models.Connection.conn_id)
-                .group_by(models.Connection.conn_id)
+                    .group_by(models.Connection.conn_id)
             )
         ]
     }
@@ -2189,6 +2123,7 @@ class ChartModelView(wwwutils.DataProfilingMixin, 
AirflowModelView):
             model.user_id = current_user.id
         model.last_modified = datetime.now()
 
+
 chart_mapping = (
     ('line', 'lineChart'),
     ('spline', 'lineChart'),
@@ -2415,8 +2350,8 @@ class DagRunModelView(ModelViewOnly):
         deleted = set(session.query(models.DagRun)
                       .filter(models.DagRun.id.in_(ids))
                       .all())
-        session.query(models.DagRun)\
-            .filter(models.DagRun.id.in_(ids))\
+        session.query(models.DagRun) \
+            .filter(models.DagRun.id.in_(ids)) \
             .delete(synchronize_session='fetch')
         session.commit()
         dirty_ids = []
@@ -2779,9 +2714,9 @@ class DagModelView(wwwutils.SuperUserMixin, ModelView):
         """
         return (
             super(DagModelView, self)
-            .get_query()
-            .filter(or_(models.DagModel.is_active, models.DagModel.is_paused))
-            .filter(~models.DagModel.is_subdag)
+                .get_query()
+                .filter(or_(models.DagModel.is_active, 
models.DagModel.is_paused))
+                .filter(~models.DagModel.is_subdag)
         )
 
     def get_count_query(self):
@@ -2790,7 +2725,7 @@ class DagModelView(wwwutils.SuperUserMixin, ModelView):
         """
         return (
             super(DagModelView, self)
-            .get_count_query()
-            .filter(models.DagModel.is_active)
-            .filter(~models.DagModel.is_subdag)
+                .get_count_query()
+                .filter(models.DagModel.is_active)
+                .filter(~models.DagModel.is_subdag)
         )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/tests/utils/test_log_handlers.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
new file mode 100644
index 0000000..5b0d8a6
--- /dev/null
+++ b/tests/utils/test_log_handlers.py
@@ -0,0 +1,73 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+import logging
+import logging.config
+import mock
+import os
+import unittest
+
+from datetime import datetime
+from airflow.models import TaskInstance, DAG
+from airflow.config_templates.default_airflow_logging import 
DEFAULT_LOGGING_CONFIG
+from airflow.operators.dummy_operator import DummyOperator
+
+DEFAULT_DATE = datetime(2016, 1, 1)
+TASK_LOGGER = 'airflow.task'
+FILE_TASK_HANDLER = 'file.task'
+
+
+class TestFileTaskLogHandler(unittest.TestCase):
+
+    def setUp(self):
+        super(TestFileTaskLogHandler, self).setUp()
+        # We use file task handler by default.
+        logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+
+    def test_default_task_logging_setup(self):
+        # file task handler is used by default.
+        logger = logging.getLogger(TASK_LOGGER)
+        handlers = logger.handlers
+        self.assertEqual(len(handlers), 1)
+        handler = handlers[0]
+        self.assertEqual(handler.name, FILE_TASK_HANDLER)
+
+    def test_file_task_handler(self):
+        dag = DAG('dag_for_testing_file_task_handler', start_date=DEFAULT_DATE)
+        task = DummyOperator(task_id='task_for_testing_file_log_handler', 
dag=dag)
+        ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+
+        logger = logging.getLogger(TASK_LOGGER)
+        file_handler = next((handler for handler in logger.handlers
+                             if handler.name == FILE_TASK_HANDLER), None)
+        self.assertIsNotNone(file_handler)
+
+        file_handler.set_context(ti)
+        self.assertIsNotNone(file_handler.handler)
+        # We expect set_context generates a file locally.
+        log_filename = file_handler.handler.baseFilename
+        self.assertTrue(os.path.isfile(log_filename))
+
+        logger.info("test")
+        ti.run()
+
+        self.assertTrue(hasattr(file_handler, 'read'))
+        # Return value of read must be a list.
+        logs = file_handler.read(ti)
+        self.assertTrue(isinstance(logs, list))
+        self.assertEqual(len(logs), 1)
+
+        # Remove the generated tmp log file.
+        os.remove(log_filename)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0669b53/tests/utils/test_logging.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_logging.py b/tests/utils/test_logging.py
index 3f9f5d6..72c5d49 100644
--- a/tests/utils/test_logging.py
+++ b/tests/utils/test_logging.py
@@ -12,26 +12,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import unittest
-
 import mock
+import unittest
 
 from airflow.utils import logging
 from datetime import datetime
 
-
-class TestLogging(unittest.TestCase):
-
-    def test_get_log_filename(self):
-        self.assertEqual(
-            logging.get_log_filename(
-                dag_id='dag_id',
-                task_id='task_id',
-                execution_date=datetime(2017, 1, 1, 0, 0, 0),
-                try_number=0,
-            ),
-            'dag_id/task_id/2017-01-01T00:00:00/1.log',
-        )
+DEFAULT_DATE = datetime(2016, 1, 1)
 
 
 class TestS3Log(unittest.TestCase):

Reply via email to