Repository: incubator-airflow Updated Branches: refs/heads/master 702190f70 -> 53b8ddd2a
[AIRFLOW-2168] Remote logging for Azure Blob Storage add wasb_task_handler.py to enable remote logging on Azure Blob Storage. add file and read capabilities to wasb_hook to support wasb_task_handler Closes #3095 from marcusrehm/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/53b8ddd2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/53b8ddd2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/53b8ddd2 Branch: refs/heads/master Commit: 53b8ddd2a1cf9e4976175fb347f679513cbdeb2c Parents: 702190f Author: Marcus Rehm <marcus.r...@gmail.com> Authored: Tue Mar 6 10:26:39 2018 +0100 Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com> Committed: Tue Mar 6 10:26:39 2018 +0100 ---------------------------------------------------------------------- .../config_templates/airflow_local_settings.py | 23 +++ airflow/contrib/hooks/wasb_hook.py | 45 ++++- airflow/utils/log/wasb_task_handler.py | 175 +++++++++++++++++++ docs/integration.rst | 33 ++++ tests/contrib/hooks/test_wasb_hook.py | 20 +++ 5 files changed, 290 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/53b8ddd2/airflow/config_templates/airflow_local_settings.py ---------------------------------------------------------------------- diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index 899e815..861c7a9 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -35,6 +35,7 @@ PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log' # Storage bucket url for remote logging # s3 buckets should start with "s3://" # gcs buckets should start with "gs://" +# wasb buckets should start with "wasb" just to help Airflow select correct handler REMOTE_BASE_LOG_FOLDER = '' DEFAULT_LOGGING_CONFIG = { @@ -114,6 +115,26 @@ REMOTE_HANDLERS = { 'gcs_log_folder': REMOTE_BASE_LOG_FOLDER, 'filename_template': PROCESSOR_FILENAME_TEMPLATE, }, + }, + 'wasb': { + 'task': { + 'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler', + 'formatter': 'airflow', + 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), + 'wasb_log_folder': REMOTE_BASE_LOG_FOLDER, + 'wasb_container': 'airflow-logs', + 'filename_template': FILENAME_TEMPLATE, + 'delete_local_copy': False, + }, + 'processor': { + 'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler', + 'formatter': 'airflow', + 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), + 'wasb_log_folder': REMOTE_BASE_LOG_FOLDER, + 'wasb_container': 'airflow-logs', + 'filename_template': PROCESSOR_FILENAME_TEMPLATE, + 'delete_local_copy': False, + }, } } @@ -123,3 +144,5 @@ if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'): DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3']) elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'): DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs']) +elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'): + DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb']) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/53b8ddd2/airflow/contrib/hooks/wasb_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/wasb_hook.py b/airflow/contrib/hooks/wasb_hook.py index cc9922b..5a40e06 100644 --- a/airflow/contrib/hooks/wasb_hook.py +++ b/airflow/contrib/hooks/wasb_hook.py @@ -21,11 +21,11 @@ from azure.storage.blob import BlockBlobService class WasbHook(BaseHook): """ Interacts with Azure Blob Storage through the wasb:// protocol. - + Additional options passed in the 'extra' field of the connection will be passed to the `BlockBlockService()` constructor. For example, authenticate using a SAS token by adding {"sas_token": "YOUR_TOKEN"}. - + :param wasb_conn_id: Reference to the wasb connection. :type wasb_conn_id: str """ @@ -44,7 +44,7 @@ class WasbHook(BaseHook): def check_for_blob(self, container_name, blob_name, **kwargs): """ Check if a blob exists on Azure Blob Storage. - + :param container_name: Name of the container. :type container_name: str :param blob_name: Name of the blob. @@ -60,7 +60,7 @@ class WasbHook(BaseHook): def check_for_prefix(self, container_name, prefix, **kwargs): """ Check if a prefix exists on Azure Blob storage. - + :param container_name: Name of the container. :type container_name: str :param prefix: Prefix of the blob. @@ -78,7 +78,7 @@ class WasbHook(BaseHook): def load_file(self, file_path, container_name, blob_name, **kwargs): """ Upload a file to Azure Blob Storage. - + :param file_path: Path to the file to load. :type file_path: str :param container_name: Name of the container. @@ -96,7 +96,7 @@ class WasbHook(BaseHook): def load_string(self, string_data, container_name, blob_name, **kwargs): """ Upload a string to Azure Blob Storage. - + :param string_data: String to load. :type string_data: str :param container_name: Name of the container. @@ -110,3 +110,36 @@ class WasbHook(BaseHook): # Reorder the argument order from airflow.hooks.S3_hook.load_string. self.connection.create_blob_from_text(container_name, blob_name, string_data, **kwargs) + + def get_file(self, file_path, container_name, blob_name, **kwargs): + """ + Download a file from Azure Blob Storage. + + :param file_path: Path to the file to download. + :type file_path: str + :param container_name: Name of the container. + :type container_name: str + :param blob_name: Name of the blob. + :type blob_name: str + :param kwargs: Optional keyword arguments that + `BlockBlobService.create_blob_from_path()` takes. + :type kwargs: object + """ + return self.connection.get_blob_to_path(container_name, blob_name, + file_path, **kwargs) + + def read_file(self, container_name, blob_name, **kwargs): + """ + Read a file from Azure Blob Storage and return as a string. + + :param container_name: Name of the container. + :type container_name: str + :param blob_name: Name of the blob. + :type blob_name: str + :param kwargs: Optional keyword arguments that + `BlockBlobService.create_blob_from_path()` takes. + :type kwargs: object + """ + return self.connection.get_blob_to_text(container_name, + blob_name, + **kwargs).content http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/53b8ddd2/airflow/utils/log/wasb_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/wasb_task_handler.py b/airflow/utils/log/wasb_task_handler.py new file mode 100644 index 0000000..4784b10 --- /dev/null +++ b/airflow/utils/log/wasb_task_handler.py @@ -0,0 +1,175 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import shutil + +from airflow import configuration +from airflow.contrib.hooks.wasb_hook import WasbHook +from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.log.file_task_handler import FileTaskHandler +from azure.common import AzureHttpError + + +class WasbTaskHandler(FileTaskHandler, LoggingMixin): + """ + WasbTaskHandler is a python log handler that handles and reads + task instance logs. It extends airflow FileTaskHandler and + uploads to and reads from Wasb remote storage. + """ + + def __init__(self, base_log_folder, wasb_log_folder, wasb_container, + filename_template, delete_local_copy): + super(WasbTaskHandler, self).__init__(base_log_folder, filename_template) + self.wasb_container = wasb_container + self.remote_base = wasb_log_folder + self.log_relative_path = '' + self._hook = None + self.closed = False + self.upload_on_close = True + self.delete_local_copy = delete_local_copy + + def _build_hook(self): + remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID') + try: + return WasbHook(remote_conn_id) + except AzureHttpError: + self.log.error( + 'Could not create an WasbHook with connection id "%s". ' + 'Please make sure that airflow[azure] is installed and ' + 'the Wasb connection exists.', remote_conn_id + ) + + @property + def hook(self): + if self._hook is None: + self._hook = self._build_hook() + return self._hook + + def set_context(self, ti): + super(WasbTaskHandler, self).set_context(ti) + # Local location and remote location is needed to open and + # upload local log file to Wasb remote storage. + self.log_relative_path = self._render_filename(ti, ti.try_number) + self.upload_on_close = not ti.is_raw + + def close(self): + """ + Close and upload local log file to remote storage Wasb. + """ + # When application exit, system shuts down all handlers by + # calling close method. Here we check if logger is already + # closed to prevent uploading the log to remote storage multiple + # times when `logging.shutdown` is called. + if self.closed: + return + + super(WasbTaskHandler, self).close() + + if not self.upload_on_close: + return + + local_loc = os.path.join(self.local_base, self.log_relative_path) + remote_loc = os.path.join(self.remote_base, self.log_relative_path) + if os.path.exists(local_loc): + # read log and remove old logs to get just the latest additions + with open(local_loc, 'r') as logfile: + log = logfile.read() + self.wasb_write(log, remote_loc, append=True) + + if self.delete_local_copy: + shutil.rmtree(os.path.dirname(local_loc)) + # Mark closed so we don't double write if close is called twice + self.closed = True + + def _read(self, ti, try_number): + """ + Read logs of given task instance and try_number from Wasb remote storage. + If failed, read the log from task instance host machine. + :param ti: task instance object + :param try_number: task instance try_number to read logs from + """ + # Explicitly getting log relative path is necessary as the given + # task instance might be different than task instance passed in + # in set_context method. + log_relative_path = self._render_filename(ti, try_number) + remote_loc = os.path.join(self.remote_base, log_relative_path) + + if self.wasb_log_exists(remote_loc): + # If Wasb remote file exists, we do not fetch logs from task instance + # local machine even if there are errors reading remote logs, as + # returned remote_log will contain error messages. + remote_log = self.wasb_read(remote_loc, return_error=True) + log = '*** Reading remote log from {}.\n{}\n'.format( + remote_loc, remote_log) + else: + log = super(WasbTaskHandler, self)._read(ti, try_number) + + return log + + def wasb_log_exists(self, remote_log_location): + """ + Check if remote_log_location exists in remote storage + :param remote_log_location: log's location in remote storage + :return: True if location exists else False + """ + try: + return self.hook.check_for_blob(self.wasb_container, remote_log_location) + except Exception: + pass + return False + + def wasb_read(self, remote_log_location, return_error=False): + """ + Returns the log found at the remote_log_location. Returns '' if no + logs are found or there is an error. + :param remote_log_location: the log's location in remote storage + :type remote_log_location: string (path) + :param return_error: if True, returns a string error message if an + error occurs. Otherwise returns '' when an error occurs. + :type return_error: bool + """ + try: + return self.hook.read_file(self.wasb_container, remote_log_location) + except AzureHttpError: + msg = 'Could not read logs from {}'.format(remote_log_location) + self.log.exception(msg) + # return error if needed + if return_error: + return msg + + def wasb_write(self, log, remote_log_location, append=True): + """ + Writes the log to the remote_log_location. Fails silently if no hook + was created. + :param log: the log to write to the remote_log_location + :type log: string + :param remote_log_location: the log's location in remote storage + :type remote_log_location: string (path) + :param append: if False, any existing log file is overwritten. If True, + the new log is appended to any existing logs. + :type append: bool + """ + if append and self.wasb_log_exists(remote_log_location): + old_log = self.wasb_read(remote_log_location) + log = '\n'.join([old_log, log]) if old_log else log + + try: + self.hook.load_string( + log, + self.wasb_container, + remote_log_location, + ) + except AzureHttpError: + self.log.exception('Could not write logs to %s', + remote_log_location) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/53b8ddd2/docs/integration.rst ---------------------------------------------------------------------- diff --git a/docs/integration.rst b/docs/integration.rst index 92f59b6..162b5d8 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -113,6 +113,39 @@ WasbHook .. autoclass:: airflow.contrib.hooks.wasb_hook.WasbHook +Logging +''''''' + +Airflow can be configured to read and write task logs in Azure Blob Storage. +Follow the steps below to enable Azure Blob Storage logging. + +#. Airflow's logging system requires a custom .py file to be located in the ``PYTHONPATH``, so that it's importable from Airflow. Start by creating a directory to store the config file. ``$AIRFLOW_HOME/config`` is recommended. +#. Create empty files called ``$AIRFLOW_HOME/config/log_config.py`` and ``$AIRFLOW_HOME/config/__init__.py``. +#. Copy the contents of ``airflow/config_templates/airflow_local_settings.py`` into the ``log_config.py`` file that was just created in the step above. +#. Customize the following portions of the template: + + .. code-block:: bash + + # wasb buckets should start with "wasb" just to help Airflow select correct handler + REMOTE_BASE_LOG_FOLDER = 'wasb-<whatever you want here>' + + # Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG + LOGGING_CONFIG = ... + + + +#. Make sure a Azure Blob Storage (Wasb) connection hook has been defined in Airflow. The hook should have read and write access to the Azure Blob Storage bucket defined above in ``REMOTE_BASE_LOG_FOLDER``. + +#. Update ``$AIRFLOW_HOME/airflow.cfg`` to contain: + + .. code-block:: bash + + remote_logging = True + logging_config_class = log_config.LOGGING_CONFIG + remote_log_conn_id = <name of the Azure Blob Storage connection> + +#. Restart the Airflow webserver and scheduler, and trigger (or wait for) a new task execution. +#. Verify that logs are showing up for newly executed tasks in the bucket you've defined. .. _AWS: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/53b8ddd2/tests/contrib/hooks/test_wasb_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_wasb_hook.py b/tests/contrib/hooks/test_wasb_hook.py index df9cefe..96a25f9 100644 --- a/tests/contrib/hooks/test_wasb_hook.py +++ b/tests/contrib/hooks/test_wasb_hook.py @@ -118,6 +118,26 @@ class TestWasbHook(unittest.TestCase): 'container', 'blob', 'big string', max_connections=1 ) + @mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService', + autospec=True) + def test_get_file(self, mock_service): + mock_instance = mock_service.return_value + hook = WasbHook(wasb_conn_id='wasb_test_sas_token') + hook.get_file('path', 'container', 'blob', max_connections=1) + mock_instance.get_blob_to_path.assert_called_once_with( + 'container', 'blob', 'path', max_connections=1 + ) + + @mock.patch('airflow.contrib.hooks.wasb_hook.BlockBlobService', + autospec=True) + def test_read_file(self, mock_service): + mock_instance = mock_service.return_value + hook = WasbHook(wasb_conn_id='wasb_test_sas_token') + hook.read_file('container', 'blob', max_connections=1) + mock_instance.get_blob_to_text.assert_called_once_with( + 'container', 'blob', max_connections=1 + ) + if __name__ == '__main__': unittest.main()