Repository: incubator-airflow
Updated Branches:
  refs/heads/master 392772326 -> e6ef06c53


[AIRFLOW-1385] Create abstraction for Airflow task logging

This PR adds abilities to provide customized
implementations of airflow task logging. It
creates an abstraction for setting up, cleaning up
and get task instance logs.

This change is primarily a refactor of logging
logic. It is tested locally with custom logging
implementations.

Closes #2422 from AllisonWang/allison--log-handler


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e6ef06c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e6ef06c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e6ef06c5

Branch: refs/heads/master
Commit: e6ef06c53fd4449db6e665cce5cad9418dde232f
Parents: 3927723
Author: AllisonWang <allisonwang...@gmail.com>
Authored: Thu Jul 20 18:03:23 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Thu Jul 20 18:03:26 2017 -0700

----------------------------------------------------------------------
 airflow/bin/cli.py                              | 111 +++----------
 airflow/config_templates/__init__.py            |  13 ++
 airflow/config_templates/default_airflow.cfg    |   6 +
 .../config_templates/default_airflow_logging.py |  73 +++++++++
 airflow/settings.py                             |  13 +-
 airflow/task_runner/base_task_runner.py         |   1 +
 airflow/utils/log/__init__.py                   |  13 ++
 airflow/utils/log/file_task_handler.py          | 158 +++++++++++++++++++
 airflow/utils/log/s3_task_handler.py            |  90 +++++++++++
 airflow/utils/logging.py                        |  11 ++
 airflow/www/views.py                            | 104 +++---------
 11 files changed, 416 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index f568d5d..11f415a 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -22,7 +22,6 @@ import os
 import socket
 import subprocess
 import textwrap
-import warnings
 from importlib import import_module
 
 import argparse
@@ -52,8 +51,6 @@ from airflow.models import (DagModel, DagBag, TaskInstance,
                             Connection)
 from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
 from airflow.utils import db as db_utils
-from airflow.utils import logging as logging_utils
-from airflow.utils.file import mkdirs
 from airflow.www.app import cached_app
 
 from sqlalchemy import func
@@ -327,55 +324,6 @@ def run(args, dag=None):
         settings.configure_vars()
         settings.configure_orm()
 
-    logging.root.handlers = []
-    if args.raw:
-        # Output to STDOUT for the parent process to read and log
-        logging.basicConfig(
-            stream=sys.stdout,
-            level=settings.LOGGING_LEVEL,
-            format=settings.LOG_FORMAT)
-    else:
-        # Setting up logging to a file.
-
-        # To handle log writing when tasks are impersonated, the log files 
need to
-        # be writable by the user that runs the Airflow command and the user
-        # that is impersonated. This is mainly to handle corner cases with the
-        # SubDagOperator. When the SubDagOperator is run, all of the operators
-        # run under the impersonated user and create appropriate log files
-        # as the impersonated user. However, if the user manually runs tasks
-        # of the SubDagOperator through the UI, then the log files are created
-        # by the user that runs the Airflow command. For example, the Airflow
-        # run command may be run by the `airflow_sudoable` user, but the 
Airflow
-        # tasks may be run by the `airflow` user. If the log files are not
-        # writable by both users, then it's possible that re-running a task
-        # via the UI (or vice versa) results in a permission error as the task
-        # tries to write to a log file created by the other user.
-        log_base = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
-        directory = log_base + 
"/{args.dag_id}/{args.task_id}".format(args=args)
-        # Create the log file and give it group writable permissions
-        # TODO(aoen): Make log dirs and logs globally readable for now since 
the SubDag
-        # operator is not compatible with impersonation (e.g. if a Celery 
executor is used
-        # for a SubDag operator and the SubDag operator has a different owner 
than the
-        # parent DAG)
-        if not os.path.exists(directory):
-            # Create the directory as globally writable using custom mkdirs
-            # as os.makedirs doesn't set mode properly.
-            mkdirs(directory, 0o775)
-        iso = args.execution_date.isoformat()
-        filename = "{directory}/{iso}".format(**locals())
-
-        if not os.path.exists(filename):
-            open(filename, "a").close()
-            os.chmod(filename, 0o666)
-
-        logging.basicConfig(
-            filename=filename,
-            level=settings.LOGGING_LEVEL,
-            format=settings.LOG_FORMAT)
-
-    hostname = socket.getfqdn()
-    logging.info("Running on host {}".format(hostname))
-
     if not args.pickle and not dag:
         dag = get_dag(args)
     elif not dag:
@@ -391,8 +339,21 @@ def run(args, dag=None):
     ti = TaskInstance(task, args.execution_date)
     ti.refresh_from_db()
 
+    logger = logging.getLogger('airflow.task')
+    if args.raw:
+        logger = logging.getLogger('airflow.task.raw')
+
+    for handler in logger.handlers:
+        try:
+            print("inside cli, setting up context")
+            handler.set_context(ti)
+        except AttributeError:
+            pass
+
+    hostname = socket.getfqdn()
+    logger.info("Running on host {}".format(hostname))
+
     if args.local:
-        print("Logging into: " + filename)
         run_job = jobs.LocalTaskJob(
             task_instance=ti,
             mark_success=args.mark_success,
@@ -450,43 +411,13 @@ def run(args, dag=None):
     if args.raw:
         return
 
-    # Force the log to flush, and set the handler to go back to normal so we
-    # don't continue logging to the task's log file. The flush is important
-    # because we subsequently read from the log to insert into S3 or Google
-    # cloud storage.
-    logging.root.handlers[0].flush()
-    logging.root.handlers = []
-
-    # store logs remotely
-    remote_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
-
-    # deprecated as of March 2016
-    if not remote_base and conf.get('core', 'S3_LOG_FOLDER'):
-        warnings.warn(
-            'The S3_LOG_FOLDER conf key has been replaced by '
-            'REMOTE_BASE_LOG_FOLDER. Your conf still works but please '
-            'update airflow.cfg to ensure future compatibility.',
-            DeprecationWarning)
-        remote_base = conf.get('core', 'S3_LOG_FOLDER')
-
-    if os.path.exists(filename):
-        # read log and remove old logs to get just the latest additions
-
-        with open(filename, 'r') as logfile:
-            log = logfile.read()
-
-        remote_log_location = filename.replace(log_base, remote_base)
-        # S3
-        if remote_base.startswith('s3:/'):
-            logging_utils.S3Log().write(log, remote_log_location)
-        # GCS
-        elif remote_base.startswith('gs:/'):
-            logging_utils.GCSLog().write(log, remote_log_location)
-        # Other
-        elif remote_base and remote_base != 'None':
-            logging.error(
-                'Unsupported remote log location: {}'.format(remote_base))
-
+    # Force the log to flush. The flush is important because we
+    # subsequently read from the log to insert into S3 or Google
+    # cloud storage. Explicitly close the handler is needed in order
+    # to upload to remote storage services.
+    for handler in logger.handlers:
+        handler.flush()
+        handler.close()
 
 def task_failed_deps(args):
     """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/config_templates/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/__init__.py 
b/airflow/config_templates/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/airflow/config_templates/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index ddd1ba8..77592d9 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -122,6 +122,12 @@ security =
 # values at runtime)
 unit_test_mode = False
 
+# Logging configuration path
+logging_config_path = 
airflow.logging.airflow_logging_config.AIRFLOW_LOGGING_CONFIG
+
+# Name of handler to read task instance logs
+task_log_reader = airflow.task
+
 [cli]
 # In what way should the cli access the API. The LocalClient will use the
 # database directly, while the json_client will use the api running on the

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/config_templates/default_airflow_logging.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow_logging.py 
b/airflow/config_templates/default_airflow_logging.py
new file mode 100644
index 0000000..241571f
--- /dev/null
+++ b/airflow/config_templates/default_airflow_logging.py
@@ -0,0 +1,73 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow import configuration as conf
+
+# TODO: Logging format and level should be configured
+# in this file instead of from airflow.cfg. Currently
+# there are other log format and level configurations in
+# settings.py and cli.py.
+
+LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
+LOG_FORMAT = conf.get('core', 'log_format')
+
+
+BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
+# TODO: This should be specified as s3_remote and/or gcs_remote
+REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
+
+DEFAULT_LOGGING_CONFIG = {
+    'version': 1,
+    'disable_existing_loggers': False,
+    'formatters': {
+        'airflow.task': {
+            'format': LOG_FORMAT,
+        }
+    },
+    'handlers': {
+        'console': {
+            'class': 'logging.StreamHandler',
+            'formatter': 'airflow.task',
+            'stream': 'ext://sys.stdout'
+        },
+        'file.task': {
+            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
+            'formatter': 'airflow.task',
+            'base_log_folder': BASE_LOG_FOLDER,
+        },
+        's3.task': {
+            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
+            'base_log_folder': BASE_LOG_FOLDER,
+            'remote_base_log_folder': REMOTE_BASE_LOG_FOLDER,
+            'formatter': 'airflow.task',
+        },
+    },
+    'loggers': {
+        'airflow.task': {
+            'handlers': ['file.task'],
+            'level': LOG_LEVEL,
+            'propagate': False,
+        },
+        'airflow.task_runner': {
+            'handlers': ['file.task'],
+            'level': LOG_LEVEL,
+            'propagate': True,
+        },
+        'airflow.task.raw': {
+            'handlers': ['console'],
+            'level': LOG_LEVEL,
+            'propagate': False,
+        },
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index 3f7560d..7a61bce 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -18,6 +18,7 @@ from __future__ import print_function
 from __future__ import unicode_literals
 
 import logging
+import logging.config
 import os
 import sys
 
@@ -162,13 +163,23 @@ def configure_orm(disable_connection_pool=False):
 try:
     from airflow_local_settings import *
     logging.info("Loaded airflow_local_settings.")
-except:
+except Exception:
     pass
 
 configure_logging()
 configure_vars()
 configure_orm()
 
+# TODO: Merge airflow logging configurations.
+logging_config_path = conf.get('core', 'logging_config_path')
+try:
+    from logging_config_path import LOGGING_CONFIG
+except Exception:
+    # Import default logging configuration
+    from airflow.config_templates.default_airflow_logging import \
+        DEFAULT_LOGGING_CONFIG as LOGGING_CONFIG
+logging.config.dictConfig(LOGGING_CONFIG)
+
 # Const stuff
 
 KILOBYTE = 1024

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/task_runner/base_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/task_runner/base_task_runner.py 
b/airflow/task_runner/base_task_runner.py
index fa3fd6b..02404d1 100644
--- a/airflow/task_runner/base_task_runner.py
+++ b/airflow/task_runner/base_task_runner.py
@@ -36,6 +36,7 @@ class BaseTaskRunner(LoggingMixin):
         :type local_task_job: airflow.jobs.LocalTaskJob
         """
         self._task_instance = local_task_job.task_instance
+        self.set_logger_contexts(self._task_instance)
 
         popen_prepend = []
         cfg_path = None

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/utils/log/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/__init__.py b/airflow/utils/log/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/airflow/utils/log/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/utils/log/file_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/file_task_handler.py 
b/airflow/utils/log/file_task_handler.py
new file mode 100644
index 0000000..94588dd
--- /dev/null
+++ b/airflow/utils/log/file_task_handler.py
@@ -0,0 +1,158 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+
+from airflow import configuration as conf
+from airflow.configuration import AirflowConfigException
+from airflow.utils.file import mkdirs
+
+
+class FileTaskHandler(logging.Handler):
+    """
+    FileTaskHandler is a python log handler that handles and reads
+    task instance logs. It creates and delegates log handling
+    to `logging.FileHandler` after receiving task instance context.
+    It reads logs from task instance's host machine.
+    """
+
+    def __init__(self, base_log_folder):
+        super(FileTaskHandler, self).__init__()
+        self.handler = None
+        self.local_base = os.path.expanduser(base_log_folder)
+
+    def set_context(self, task_instance):
+        """
+        Provide task_instance context to airflow task handler.
+        :param task_instance: task instance object
+        """
+        self._init_file(task_instance)
+        local_loc = self.get_local_loc(task_instance)
+        self.handler = logging.FileHandler(local_loc)
+        self.handler.setFormatter(self.formatter)
+        self.handler.setLevel(self.level)
+
+    def emit(self, record):
+        if self.handler:
+            self.handler.emit(record)
+
+    def flush(self):
+        if self.handler:
+            self.handler.flush()
+
+    def close(self):
+        if self.handler:
+            self.handler.close()
+
+    def read(self, task_instance):
+        """
+        Read log of given task instance from task instance host server.
+        :param task_instance: task instance database record
+        """
+        log = ""
+        # Task instance here might be different from task instance when
+        # initializing the handler. Thus explicitly getting log location
+        # is needed to get correct log path.
+        loc = self.get_local_loc(task_instance)
+
+        if os.path.exists(loc):
+            try:
+                with open(loc) as f:
+                    log += "*** Reading local log.\n" + "".join(f.readlines())
+            except Exception:
+                log = "*** Failed to load local log file: {0}.\n".format(loc)
+        else:
+            url = 
os.path.join("http://{ti.hostname}:{worker_log_server_port}/log";,
+                               
self.get_log_relative_path(task_instance)).format(
+                ti=task_instance,
+                worker_log_server_port=conf.get('celery', 
'WORKER_LOG_SERVER_PORT'))
+            log += "*** Log file isn't local.\n"
+            log += "*** Fetching here: {url}\n".format(**locals())
+            try:
+                import requests
+                timeout = None  # No timeout
+                try:
+                    timeout = conf.getint('webserver', 'log_fetch_timeout_sec')
+                except (AirflowConfigException, ValueError):
+                    pass
+
+                response = requests.get(url, timeout=timeout)
+                response.raise_for_status()
+                log += '\n' + response.text
+            except Exception:
+                log += "*** Failed to fetch log file from worker.\n".format(
+                    **locals())
+        return log
+
+    def get_log_relative_dir(self, ti):
+        """
+        Get relative log file directory.
+        :param ti: task instance object
+        """
+        return "{}/{}".format(ti.dag_id, ti.task_id)
+
+    def get_log_relative_path(self, ti):
+        """
+        Get relative log file path.
+        :param ti: task instance object
+        """
+        directory = self.get_log_relative_dir(ti)
+        filename = "{}.log".format(ti.execution_date.isoformat())
+        return os.path.join(directory, filename)
+
+    def get_local_loc(self, ti):
+        """
+        Get full local log path given task instance object.
+        :param ti: task instance object
+        """
+        log_relative_path = self.get_log_relative_path(ti)
+        return os.path.join(self.local_base, log_relative_path)
+
+    def _init_file(self, ti):
+        """
+        Create log directory and give it correct permissions.
+        :param ti: task instance object
+        :return relative log path of the given task instance
+        """
+        # To handle log writing when tasks are impersonated, the log files 
need to
+        # be writable by the user that runs the Airflow command and the user
+        # that is impersonated. This is mainly to handle corner cases with the
+        # SubDagOperator. When the SubDagOperator is run, all of the operators
+        # run under the impersonated user and create appropriate log files
+        # as the impersonated user. However, if the user manually runs tasks
+        # of the SubDagOperator through the UI, then the log files are created
+        # by the user that runs the Airflow command. For example, the Airflow
+        # run command may be run by the `airflow_sudoable` user, but the 
Airflow
+        # tasks may be run by the `airflow` user. If the log files are not
+        # writable by both users, then it's possible that re-running a task
+        # via the UI (or vice versa) results in a permission error as the task
+        # tries to write to a log file created by the other user.
+        relative_log_dir = self.get_log_relative_dir(ti)
+        directory = os.path.join(self.local_base, relative_log_dir)
+        # Create the log file and give it group writable permissions
+        # TODO(aoen): Make log dirs and logs globally readable for now since 
the SubDag
+        # operator is not compatible with impersonation (e.g. if a Celery 
executor is used
+        # for a SubDag operator and the SubDag operator has a different owner 
than the
+        # parent DAG)
+        if not os.path.exists(directory):
+            # Create the directory as globally writable using custom mkdirs
+            # as os.makedirs doesn't set mode properly.
+            mkdirs(directory, 0o775)
+
+        local_loc = self.get_local_loc(ti)
+
+        if not os.path.exists(local_loc):
+            open(local_loc, "a").close()
+            os.chmod(local_loc, 0o666)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/utils/log/s3_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/s3_task_handler.py 
b/airflow/utils/log/s3_task_handler.py
new file mode 100644
index 0000000..5c8ad7e
--- /dev/null
+++ b/airflow/utils/log/s3_task_handler.py
@@ -0,0 +1,90 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import warnings
+
+from airflow import configuration as conf
+from airflow.utils import logging as logging_utils
+from airflow.utils.log.file_task_handler import FileTaskHandler
+
+
+class S3TaskHandler(FileTaskHandler):
+    """
+    S3TaskHandler is a python log handler that handles and reads
+    task instance logs. It extends airflow FileTaskHandler and
+    uploads to and reads from S3 remote storage.
+    """
+
+    def __init__(self, base_log_folder, remote_base_log_folder):
+        super(S3TaskHandler, self).__init__(base_log_folder)
+        self.remote_base = remote_base_log_folder
+        # deprecated as of March 2016
+        if not self.remote_base and conf.get('core', 'S3_LOG_FOLDER'):
+            warnings.warn(
+                'The S3_LOG_FOLDER conf key has been replaced by '
+                'REMOTE_BASE_LOG_FOLDER. Your conf still works but please '
+                'update airflow.cfg to ensure future compatibility.',
+                DeprecationWarning)
+            self.remote_base = conf.get('core', 'S3_LOG_FOLDER')
+        self.closed = False
+
+    def set_context(self, task_instance):
+        super(S3TaskHandler, self).set_context(task_instance)
+        # Local location and remote location is needed to open and
+        # upload local log file to S3 remote storage.
+        self.local_loc = self.get_local_loc(task_instance)
+        self.remote_loc = os.path.join(self.remote_base,
+                                       
self.get_log_relative_path(task_instance))
+
+    def close(self):
+        """
+        Close and upload local log file to remote storage S3.
+        """
+        # When application exit, system shuts down all handlers by
+        # calling close method. Here we check if logger is already
+        # closed to prevent uploading the log to remote storage multiple
+        # times when `logging.shutdown` is called.
+        if self.closed:
+            return
+
+        super(S3TaskHandler, self).close()
+
+        if os.path.exists(self.local_loc):
+            # read log and remove old logs to get just the latest additions
+            with open(self.local_loc, 'r') as logfile:
+                log = logfile.read()
+            logging_utils.S3Log().write(log, self.remote_loc)
+
+        self.closed = True
+
+    def read(self, task_instance):
+        """
+        Read logs of given task instance from S3 remote storage. If failed,
+        read the log from local machine.
+        :param task_instance: task instance object
+        """
+        log = ""
+        remote_loc = os.path.join(self.remote_base, 
self.get_log_relative_path(task_instance))
+        # TODO: check if the remote_loc exist first instead of checking
+        # if remote_log here. This logic is going to modified once logs are 
split
+        # by try_number
+        remote_log = logging_utils.S3Log().read(remote_loc, return_error=True)
+        if remote_log:
+            log += ('*** Reading remote log from {}.\n{}\n'.format(
+                remote_loc, remote_log))
+        else:
+            log = super(S3TaskHandler, self).read(task_instance)
+
+        return log

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/utils/logging.py
----------------------------------------------------------------------
diff --git a/airflow/utils/logging.py b/airflow/utils/logging.py
index 96767cb..2df3086 100644
--- a/airflow/utils/logging.py
+++ b/airflow/utils/logging.py
@@ -35,9 +35,20 @@ class LoggingMixin(object):
         try:
             return self._logger
         except AttributeError:
+            print(self.__class__.__module__ + '.' + self.__class__.__name__)
             self._logger = logging.root.getChild(self.__class__.__module__ + 
'.' + self.__class__.__name__)
             return self._logger
 
+    def set_logger_contexts(self, task_instance):
+        """
+        Set the context for all handlers of the logger.
+        """
+        for handler in self.logger.handlers:
+            try:
+                handler.set_context(task_instance)
+            except AttributeError:
+                pass
+
 
 class S3Log(object):
     """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e6ef06c5/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 6c39462..c429765 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -16,6 +16,7 @@
 from past.builtins import basestring, unicode
 
 import ast
+import logging
 import os
 import pkg_resources
 import socket
@@ -72,12 +73,10 @@ from airflow.utils.json import json_ser
 from airflow.utils.state import State
 from airflow.utils.db import provide_session
 from airflow.utils.helpers import alchemy_to_dict
-from airflow.utils import logging as log_utils
 from airflow.utils.dates import infer_time_unit, scale_time_units
 from airflow.www import utils as wwwutils
 from airflow.www.forms import DateTimeForm, DateTimeWithNumRunsForm
 from airflow.www.validators import GreaterEqualThan
-from airflow.configuration import AirflowConfigException
 
 QUERY_LIMIT = 100000
 CHART_LIMIT = 200000
@@ -698,99 +697,32 @@ class Airflow(BaseView):
     @login_required
     @wwwutils.action_logging
     def log(self):
-        BASE_LOG_FOLDER = os.path.expanduser(
-            conf.get('core', 'BASE_LOG_FOLDER'))
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
         execution_date = request.args.get('execution_date')
-        dag = dagbag.get_dag(dag_id)
-        log_relative = "{dag_id}/{task_id}/{execution_date}".format(
-            **locals())
-        loc = os.path.join(BASE_LOG_FOLDER, log_relative)
-        loc = loc.format(**locals())
-        log = ""
-        TI = models.TaskInstance
         dttm = dateutil.parser.parse(execution_date)
         form = DateTimeForm(data={'execution_date': dttm})
-        session = Session()
-        ti = session.query(TI).filter(
-            TI.dag_id == dag_id, TI.task_id == task_id,
-            TI.execution_date == dttm).first()
+        dag = dagbag.get_dag(dag_id)
 
+        session = Session()
+        ti = session.query(models.TaskInstance).filter(
+            models.TaskInstance.dag_id == dag_id, models.TaskInstance.task_id 
== task_id,
+            models.TaskInstance.execution_date == dttm).first()
         if ti is None:
             log = "*** Task instance did not exist in the DB\n"
         else:
-            # load remote logs
-            remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
-            remote_log_loaded = False
-            if remote_log_base:
-                remote_log_path = os.path.join(remote_log_base, log_relative)
-                remote_log = ""
-
-                # Only display errors reading the log if the task completed or 
ran at least
-                # once before (otherwise there won't be any remote log stored).
-                ti_execution_completed = ti.state in {State.SUCCESS, 
State.FAILED}
-                ti_ran_more_than_once = ti.try_number > 1
-                surface_log_retrieval_errors = (
-                    ti_execution_completed or ti_ran_more_than_once)
-
-                # S3
-                if remote_log_path.startswith('s3:/'):
-                    remote_log += log_utils.S3Log().read(
-                        remote_log_path, 
return_error=surface_log_retrieval_errors)
-                    remote_log_loaded = True
-                # GCS
-                elif remote_log_path.startswith('gs:/'):
-                    remote_log += log_utils.GCSLog().read(
-                        remote_log_path, 
return_error=surface_log_retrieval_errors)
-                    remote_log_loaded = True
-                # unsupported
-                else:
-                    remote_log += '*** Unsupported remote log location.'
-
-                if remote_log:
-                    log += ('*** Reading remote log from {}.\n{}\n'.format(
-                        remote_log_path, remote_log))
-
-            # We only want to display the
-            # local logs while the task is running if a remote log 
configuration is set up
-            # since the logs will be transfered there after the run completes.
-            # TODO(aoen): One problem here is that if a task is running on a 
worker it
-            # already ran on, then duplicate logs will be printed for all of 
the previous
-            # runs of the task that already completed since they will have 
been printed as
-            # part of the remote log section above. This can be fixed either 
by streaming
-            # logs to the log servers as tasks are running, or by creating a 
proper
-            # abstraction for multiple task instance runs).
-            if not remote_log_loaded or ti.state == State.RUNNING:
-                if os.path.exists(loc):
-                    try:
-                        f = open(loc)
-                        log += "*** Reading local log.\n" + 
"".join(f.readlines())
-                        f.close()
-                    except:
-                        log = "*** Failed to load local log file: 
{0}.\n".format(loc)
-                else:
-                    WORKER_LOG_SERVER_PORT = \
-                        conf.get('celery', 'WORKER_LOG_SERVER_PORT')
-                    url = os.path.join(
-                        "http://{ti.hostname}:{WORKER_LOG_SERVER_PORT}/log";, 
log_relative
-                    ).format(**locals())
-                    log += "*** Log file isn't local.\n"
-                    log += "*** Fetching here: {url}\n".format(**locals())
-                    try:
-                        import requests
-                        timeout = None  # No timeout
-                        try:
-                            timeout = conf.getint('webserver', 
'log_fetch_timeout_sec')
-                        except (AirflowConfigException, ValueError):
-                            pass
-
-                        response = requests.get(url, timeout=timeout)
-                        response.raise_for_status()
-                        log += '\n' + response.text
-                    except:
-                        log += "*** Failed to fetch log file from 
worker.\n".format(
-                            **locals())
+            logger = logging.getLogger('airflow.task')
+            task_log_reader = conf.get('core', 'task_log_reader')
+            for handler in logger.handlers:
+                print("handler name is {}".format(handler.name))
+            handler = next((handler for handler in logger.handlers
+                           if handler.name == task_log_reader), None)
+            try:
+                log = handler.read(ti)
+            except AttributeError as e:
+                log = "Task log handler {} does not support read 
logs.\n".format(
+                    task_log_reader)
+                log += e.message
 
         if PY2 and not isinstance(log, unicode):
             log = log.decode('utf-8')

Reply via email to