Repository: incubator-airflow
Updated Branches:
  refs/heads/master 702190f70 -> 53b8ddd2a


[AIRFLOW-2168] Remote logging for Azure Blob Storage

add wasb_task_handler.py to enable remote logging
on Azure Blob Storage.
add file and read capabilities to wasb_hook to
support wasb_task_handler

Closes #3095 from marcusrehm/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/53b8ddd2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/53b8ddd2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/53b8ddd2

Branch: refs/heads/master
Commit: 53b8ddd2a1cf9e4976175fb347f679513cbdeb2c
Parents: 702190f
Author: Marcus Rehm <marcus.r...@gmail.com>
Authored: Tue Mar 6 10:26:39 2018 +0100
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Tue Mar 6 10:26:39 2018 +0100

----------------------------------------------------------------------
 .../config_templates/airflow_local_settings.py  |  23 +++
 airflow/contrib/hooks/wasb_hook.py              |  45 ++++-
 airflow/utils/log/wasb_task_handler.py          | 175 +++++++++++++++++++
 docs/integration.rst                            |  33 ++++
 tests/contrib/hooks/test_wasb_hook.py           |  20 +++
 5 files changed, 290 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/53b8ddd2/airflow/config_templates/airflow_local_settings.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/airflow_local_settings.py 
b/airflow/config_templates/airflow_local_settings.py
index 899e815..861c7a9 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -35,6 +35,7 @@ PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
 # Storage bucket url for remote logging
 # s3 buckets should start with "s3://"
 # gcs buckets should start with "gs://"
+# wasb buckets should start with "wasb" just to help Airflow select correct 
handler
 REMOTE_BASE_LOG_FOLDER = ''
 
 DEFAULT_LOGGING_CONFIG = {
@@ -114,6 +115,26 @@ REMOTE_HANDLERS = {
             'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
             'filename_template': PROCESSOR_FILENAME_TEMPLATE,
         },
+    },
+    'wasb': {
+        'task': {
+            'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler',
+            'formatter': 'airflow',
+            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+            'wasb_log_folder': REMOTE_BASE_LOG_FOLDER,
+            'wasb_container': 'airflow-logs',
+            'filename_template': FILENAME_TEMPLATE,
+            'delete_local_copy': False,
+        },
+        'processor': {
+            'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler',
+            'formatter': 'airflow',
+            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
+            'wasb_log_folder': REMOTE_BASE_LOG_FOLDER,
+            'wasb_container': 'airflow-logs',
+            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
+            'delete_local_copy': False,
+        },
     }
 }
 
@@ -123,3 +144,5 @@ if REMOTE_LOGGING and 
REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
         DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
 elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
         DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
+elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
+        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/53b8ddd2/airflow/contrib/hooks/wasb_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/wasb_hook.py 
b/airflow/contrib/hooks/wasb_hook.py
index cc9922b..5a40e06 100644
--- a/airflow/contrib/hooks/wasb_hook.py
+++ b/airflow/contrib/hooks/wasb_hook.py
@@ -21,11 +21,11 @@ from azure.storage.blob import BlockBlobService
 class WasbHook(BaseHook):
     """
     Interacts with Azure Blob Storage through the wasb:// protocol.
-    
+
     Additional options passed in the 'extra' field of the connection will be
     passed to the `BlockBlockService()` constructor. For example, authenticate
     using a SAS token by adding {"sas_token": "YOUR_TOKEN"}.
-    
+
     :param wasb_conn_id: Reference to the wasb connection.
     :type wasb_conn_id: str
     """
@@ -44,7 +44,7 @@ class WasbHook(BaseHook):
     def check_for_blob(self, container_name, blob_name, **kwargs):
         """
         Check if a blob exists on Azure Blob Storage.
-        
+
         :param container_name: Name of the container.
         :type container_name: str
         :param blob_name: Name of the blob.
@@ -60,7 +60,7 @@ class WasbHook(BaseHook):
     def check_for_prefix(self, container_name, prefix, **kwargs):
         """
         Check if a prefix exists on Azure Blob storage.
-        
+
         :param container_name: Name of the container.
         :type container_name: str
         :param prefix: Prefix of the blob.
@@ -78,7 +78,7 @@ class WasbHook(BaseHook):
     def load_file(self, file_path, container_name, blob_name, **kwargs):
         """
         Upload a file to Azure Blob Storage.
-        
+
         :param file_path: Path to the file to load.
         :type file_path: str
         :param container_name: Name of the container.
@@ -96,7 +96,7 @@ class WasbHook(BaseHook):
     def load_string(self, string_data, container_name, blob_name, **kwargs):
         """
         Upload a string to Azure Blob Storage.
-        
+
         :param string_data: String to load.
         :type string_data: str
         :param container_name: Name of the container.
@@ -110,3 +110,36 @@ class WasbHook(BaseHook):
         # Reorder the argument order from airflow.hooks.S3_hook.load_string.
         self.connection.create_blob_from_text(container_name, blob_name,
                                               string_data, **kwargs)
+
+    def get_file(self, file_path, container_name, blob_name, **kwargs):
+        """
+        Download a file from Azure Blob Storage.
+
+        :param file_path: Path to the file to download.
+        :type file_path: str
+        :param container_name: Name of the container.
+        :type container_name: str
+        :param blob_name: Name of the blob.
+        :type blob_name: str
+        :param kwargs: Optional keyword arguments that
+            `BlockBlobService.create_blob_from_path()` takes.
+        :type kwargs: object
+        """
+        return self.connection.get_blob_to_path(container_name, blob_name,
+                                                file_path, **kwargs)
+
+    def read_file(self, container_name, blob_name, **kwargs):
+        """
+        Read a file from Azure Blob Storage and return as a string.
+
+        :param container_name: Name of the container.
+        :type container_name: str
+        :param blob_name: Name of the blob.
+        :type blob_name: str
+        :param kwargs: Optional keyword arguments that
+            `BlockBlobService.create_blob_from_path()` takes.
+        :type kwargs: object
+        """
+        return self.connection.get_blob_to_text(container_name,
+                                                blob_name,
+                                                **kwargs).content

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/53b8ddd2/airflow/utils/log/wasb_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/wasb_task_handler.py 
b/airflow/utils/log/wasb_task_handler.py
new file mode 100644
index 0000000..4784b10
--- /dev/null
+++ b/airflow/utils/log/wasb_task_handler.py
@@ -0,0 +1,175 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import shutil
+
+from airflow import configuration
+from airflow.contrib.hooks.wasb_hook import WasbHook
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from azure.common import AzureHttpError
+
+
+class WasbTaskHandler(FileTaskHandler, LoggingMixin):
+    """
+    WasbTaskHandler is a python log handler that handles and reads
+    task instance logs. It extends airflow FileTaskHandler and
+    uploads to and reads from Wasb remote storage.
+    """
+
+    def __init__(self, base_log_folder, wasb_log_folder, wasb_container,
+                 filename_template, delete_local_copy):
+        super(WasbTaskHandler, self).__init__(base_log_folder, 
filename_template)
+        self.wasb_container = wasb_container
+        self.remote_base = wasb_log_folder
+        self.log_relative_path = ''
+        self._hook = None
+        self.closed = False
+        self.upload_on_close = True
+        self.delete_local_copy = delete_local_copy
+
+    def _build_hook(self):
+        remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID')
+        try:
+            return WasbHook(remote_conn_id)
+        except AzureHttpError:
+            self.log.error(
+                'Could not create an WasbHook with connection id "%s". '
+                'Please make sure that airflow[azure] is installed and '
+                'the Wasb connection exists.', remote_conn_id
+            )
+
+    @property
+    def hook(self):
+        if self._hook is None:
+            self._hook = self._build_hook()
+        return self._hook
+
+    def set_context(self, ti):
+        super(WasbTaskHandler, self).set_context(ti)
+        # Local location and remote location is needed to open and
+        # upload local log file to Wasb remote storage.
+        self.log_relative_path = self._render_filename(ti, ti.try_number)
+        self.upload_on_close = not ti.is_raw
+
+    def close(self):
+        """
+        Close and upload local log file to remote storage Wasb.
+        """
+        # When application exit, system shuts down all handlers by
+        # calling close method. Here we check if logger is already
+        # closed to prevent uploading the log to remote storage multiple
+        # times when `logging.shutdown` is called.
+        if self.closed:
+            return
+
+        super(WasbTaskHandler, self).close()
+
+        if not self.upload_on_close:
+            return
+
+        local_loc = os.path.join(self.local_base, self.log_relative_path)
+        remote_loc = os.path.join(self.remote_base, self.log_relative_path)
+        if os.path.exists(local_loc):
+            # read log and remove old logs to get just the latest additions
+            with open(local_loc, 'r') as logfile:
+                log = logfile.read()
+            self.wasb_write(log, remote_loc, append=True)
+
+            if self.delete_local_copy:
+                shutil.rmtree(os.path.dirname(local_loc))
+        # Mark closed so we don't double write if close is called twice
+        self.closed = True
+
+    def _read(self, ti, try_number):
+        """
+        Read logs of given task instance and try_number from Wasb remote 
storage.
+        If failed, read the log from task instance host machine.
+        :param ti: task instance object
+        :param try_number: task instance try_number to read logs from
+        """
+        # Explicitly getting log relative path is necessary as the given
+        # task instance might be different than task instance passed in
+        # in set_context method.
+        log_relative_path = self._render_filename(ti, try_number)
+        remote_loc = os.path.join(self.remote_base, log_relative_path)
+
+        if self.wasb_log_exists(remote_loc):
+            # If Wasb remote file exists, we do not fetch logs from task 
instance
+            # local machine even if there are errors reading remote logs, as
+            # returned remote_log will contain error messages.
+            remote_log = self.wasb_read(remote_loc, return_error=True)
+            log = '*** Reading remote log from {}.\n{}\n'.format(
+                remote_loc, remote_log)
+        else:
+            log = super(WasbTaskHandler, self)._read(ti, try_number)
+
+        return log
+
+    def wasb_log_exists(self, remote_log_location):
+        """
+        Check if remote_log_location exists in remote storage
+        :param remote_log_location: log's location in remote storage
+        :return: True if location exists else False
+        """
+        try:
+            return self.hook.check_for_blob(self.wasb_container, 
remote_log_location)
+        except Exception:
+            pass
+        return False
+
+    def wasb_read(self, remote_log_location, return_error=False):
+        """
+        Returns the log found at the remote_log_location. Returns '' if no
+        logs are found or there is an error.
+        :param remote_log_location: the log's location in remote storage
+        :type remote_log_location: string (path)
+        :param return_error: if True, returns a string error message if an
+            error occurs. Otherwise returns '' when an error occurs.
+        :type return_error: bool
+        """
+        try:
+            return self.hook.read_file(self.wasb_container, 
remote_log_location)
+        except AzureHttpError:
+            msg = 'Could not read logs from {}'.format(remote_log_location)
+            self.log.exception(msg)
+            # return error if needed
+            if return_error:
+                return msg
+
+    def wasb_write(self, log, remote_log_location, append=True):
+        """
+        Writes the log to the remote_log_location. Fails silently if no hook
+        was created.
+        :param log: the log to write to the remote_log_location
+        :type log: string
+        :param remote_log_location: the log's location in remote storage
+        :type remote_log_location: string (path)
+        :param append: if False, any existing log file is overwritten. If True,
+            the new log is appended to any existing logs.
+        :type append: bool
+        """
+        if append and self.wasb_log_exists(remote_log_location):
+            old_log = self.wasb_read(remote_log_location)
+            log = '\n'.join([old_log, log]) if old_log else log
+
+        try:
+            self.hook.load_string(
+                log,
+                self.wasb_container,
+                remote_log_location,
+            )
+        except AzureHttpError:
+            self.log.exception('Could not write logs to %s',
+                               remote_log_location)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/53b8ddd2/docs/integration.rst
----------------------------------------------------------------------
diff --git a/docs/integration.rst b/docs/integration.rst
index 92f59b6..162b5d8 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -113,6 +113,39 @@ WasbHook
 
 .. autoclass:: airflow.contrib.hooks.wasb_hook.WasbHook
 
+Logging
+'''''''
+
+Airflow can be configured to read and write task logs in Azure Blob Storage.
+Follow the steps below to enable Azure Blob Storage logging.
+
+#. Airflow's logging system requires a custom .py file to be located in the 
``PYTHONPATH``, so that it's importable from Airflow. Start by creating a 
directory to store the config file. ``$AIRFLOW_HOME/config`` is recommended.
+#. Create empty files called ``$AIRFLOW_HOME/config/log_config.py`` and 
``$AIRFLOW_HOME/config/__init__.py``.
+#. Copy the contents of ``airflow/config_templates/airflow_local_settings.py`` 
into the ``log_config.py`` file that was just created in the step above.
+#. Customize the following portions of the template:
+
+    .. code-block:: bash
+
+        # wasb buckets should start with "wasb" just to help Airflow select 
correct handler
+        REMOTE_BASE_LOG_FOLDER = 'wasb-<whatever you want here>'
+
+        # Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG
+        LOGGING_CONFIG = ...
+
+        
+
+#. Make sure a Azure Blob Storage (Wasb) connection hook has been defined in 
Airflow. The hook should have read and write access to the Azure Blob Storage 
bucket defined above in ``REMOTE_BASE_LOG_FOLDER``.
+
+#. Update ``$AIRFLOW_HOME/airflow.cfg`` to contain:
+
+    .. code-block:: bash
+
+        remote_logging = True
+        logging_config_class = log_config.LOGGING_CONFIG
+        remote_log_conn_id = <name of the Azure Blob Storage connection>
+
+#. Restart the Airflow webserver and scheduler, and trigger (or wait for) a 
new task execution.
+#. Verify that logs are showing up for newly executed tasks in the bucket 
you've defined.
 
 
 .. _AWS:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/53b8ddd2/tests/contrib/hooks/test_wasb_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_wasb_hook.py 
b/tests/contrib/hooks/test_wasb_hook.py
index df9cefe..96a25f9 100644
--- a/tests/contrib/hooks/test_wasb_hook.py
+++ b/tests/contrib/hooks/test_wasb_hook.py
@@ -118,6 +118,26 @@ class TestWasbHook(unittest.TestCase):
             'container', 'blob', 'big string', max_connections=1
         )
 
+    @mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService',
+                autospec=True)
+    def test_get_file(self, mock_service):
+        mock_instance = mock_service.return_value
+        hook = WasbHook(wasb_conn_id='wasb_test_sas_token')
+        hook.get_file('path', 'container', 'blob', max_connections=1)
+        mock_instance.get_blob_to_path.assert_called_once_with(
+            'container', 'blob', 'path', max_connections=1
+        )
+
+    @mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService',
+                autospec=True)
+    def test_read_file(self, mock_service):
+        mock_instance = mock_service.return_value
+        hook = WasbHook(wasb_conn_id='wasb_test_sas_token')
+        hook.read_file('container', 'blob', max_connections=1)
+        mock_instance.get_blob_to_text.assert_called_once_with(
+            'container', 'blob', max_connections=1
+        )
+
 
 if __name__ == '__main__':
     unittest.main()

Reply via email to