[jira] [Commented] (AIRFLOW-2325) Task logging with AWS Cloud watch
[ https://issues.apache.org/jira/browse/AIRFLOW-2325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723670#comment-16723670 ] ASF GitHub Bot commented on AIRFLOW-2325: - stale[bot] closed pull request #3229: [AIRFLOW-2325] Add cloudwatch task handler (IN PROGRESS) URL: https://github.com/apache/incubator-airflow/pull/3229 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/utils/log/cloudwatch_task_handler.py b/airflow/utils/log/cloudwatch_task_handler.py new file mode 100644 index 00..97e67b8c89 --- /dev/null +++ b/airflow/utils/log/cloudwatch_task_handler.py @@ -0,0 +1,124 @@ +# -*- coding: utf-8 -*- +import os +import logging + +import boto3 +import watchtower +from jinja2 import Template +from airflow import configuration +from airflow.utils.log.logging_mixin import LoggingMixin + + +class CloudwatchTaskHandler(logging.Handler, LoggingMixin): +def __init__(self, log_group, filename_template, region_name=None, **kwargs): +super(CloudwatchTaskHandler, self).__init__() +self.handler = None +self.log_group = log_group +self.region_name = region_name +self.filename_template = filename_template +self.filename_jinja_template = None +self.kwargs = kwargs +self.closed = False + +if "{{" in self.filename_template: #jinja mode +self.filename_jinja_template = Template(self.filename_template) + +def _render_filename(self, ti, try_number): +if self.filename_jinja_template: +jinja_context = ti.get_template_context() +jinja_context['try_number'] = try_number +return ( +self.filename_jinja_template.render(**jinja_context) +.replace(':', '_') +) + +return self.filename_template.format( +dag_id=ti.dag_id, +task_id=ti.task_id, +execution_date=ti.execution_date.isoformat(), +try_number=try_number, +).replace(':', '_') + +def set_context(self, ti): +kwargs = self.kwargs.copy() +stream_name = kwargs.pop('stream_name', None) +if stream_name is None: +stream_name = self._render_filename(ti, ti.try_number) +if 'boto3_session' not in kwargs and self.region_name is not None: +kwargs['boto3_session'] = boto3.session.Session( +region_name=self.region_name, +) +self.handler = watchtower.CloudWatchLogHandler( +log_group=self.log_group, +stream_name=stream_name, +**kwargs +) + +def emit(self, record): +if self.handler is not None: +self.handler.emit(record) + +def flush(self): +if self.handler is not None: +self.handler.flush() + +def close(self): +""" +Close and upload local log file to remote storage S3. +""" +# When application exit, system shuts down all handlers by +# calling close method. Here we check if logger is already +# closed to prevent uploading the log to remote storage multiple +# times when `logging.shutdown` is called. +if self.closed: +return + +if self.handler is not None: +self.handler.close() +# Mark closed so we don't double write if close is called twice +self.closed = True + +def read(self, task_instance, try_number=None): +if try_number is None: +next_try = task_instance.next_try_number +try_numbers = list(range(1, next_try)) +elif try_number < 1: +logs = [ +'Error fetching the logs. Try number {try_number} is invalid.', +] +return logs +else: +try_numbers = [try_number] + +logs = [''] * len(try_numbers) +for i, try_number in enumerate(try_numbers): +logs[i] += self._read(task_instance, try_number) + +return logs + +def _read(self, task_instance, try_number): +stream_name = self._render_filename(task_instance, try_number) +if self.handler is not None: +client = self.handler.cwl_client +else: +client = boto3.client('logs', region_name=self.region_name) +events = [] +try: +response = client.get_log_events( +logGroupName=self.log_group, +logStreamName=stream_name, +) +events.extend(response['events']) +next_token = response['nextForwardToken'] +while True: +response = client.get_log_events( +
[jira] [Commented] (AIRFLOW-2325) Task logging with AWS Cloud watch
[ https://issues.apache.org/jira/browse/AIRFLOW-2325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16639410#comment-16639410 ] Iuliia Volkova commented on AIRFLOW-2325: - [~fangpenlin], what with this task and PR - https://github.com/apache/incubator-airflow/pull/3229 ? Is it needed? > Task logging with AWS Cloud watch > - > > Key: AIRFLOW-2325 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2325 > Project: Apache Airflow > Issue Type: New Feature > Components: logging >Reporter: Fang-Pen Lin >Priority: Minor > > In many cases, it's ideal to use remote logging while running Airflow in > production, as the worker could be easily scale down or scale up. Or the > worker is running in containers, where the local storage is not meant to be > there forever. In that case, the S3 task logging handler could be used > [https://github.com/apache/incubator-airflow/blob/master/airflow/utils/log/s3_task_handler.py] > However, it comes with drawback. S3 logging handler only uploads the log when > the task completed or failed. For long running tasks, it's hard to know > what's going on with the process until it finishes. > To make more real-time logging, I built a logging handler based on AWS > CloudWatch. It uses a third party python package `watchtower` > > [https://github.com/kislyuk/watchtower/tree/master/watchtower] > > I created a PR here [https://github.com/apache/incubator-airflow/pull/3229], > basically I just copy-pasted the code I wrote for my own project, it works > fine with 1.9 release, but never tested with master branch. Also, there is a > bug in watchtower causing task runner to hang forever when it completes. I > created an issue in their repo > [https://github.com/kislyuk/watchtower/issues/57] > And a PR for addressing that issue > [https://github.com/kislyuk/watchtower/pull/58] > > The PR is still far from ready to be reviewed, but I just want to get some > feedback before I spend more time on it. I would like to see if youguys want > this cloudwatch handler goes into the main repo, or do youguys prefer it to > be a standalone third-party module. If it's that case, I can close this > ticket and create a standalone repo on my own. If the PR is welcome, then I > can spend more time on polishing it based on your feedback, add tests / > documents and other stuff. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2325) Task logging with AWS Cloud watch
[ https://issues.apache.org/jira/browse/AIRFLOW-2325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16601577#comment-16601577 ] Apache Spark commented on AIRFLOW-2325: --- User 'fangpenlin' has created a pull request for this issue: https://github.com/apache/incubator-airflow/pull/3229 > Task logging with AWS Cloud watch > - > > Key: AIRFLOW-2325 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2325 > Project: Apache Airflow > Issue Type: New Feature > Components: logging >Reporter: Fang-Pen Lin >Priority: Minor > > In many cases, it's ideal to use remote logging while running Airflow in > production, as the worker could be easily scale down or scale up. Or the > worker is running in containers, where the local storage is not meant to be > there forever. In that case, the S3 task logging handler could be used > [https://github.com/apache/incubator-airflow/blob/master/airflow/utils/log/s3_task_handler.py] > However, it comes with drawback. S3 logging handler only uploads the log when > the task completed or failed. For long running tasks, it's hard to know > what's going on with the process until it finishes. > To make more real-time logging, I built a logging handler based on AWS > CloudWatch. It uses a third party python package `watchtower` > > [https://github.com/kislyuk/watchtower/tree/master/watchtower] > > I created a PR here [https://github.com/apache/incubator-airflow/pull/3229], > basically I just copy-pasted the code I wrote for my own project, it works > fine with 1.9 release, but never tested with master branch. Also, there is a > bug in watchtower causing task runner to hang forever when it completes. I > created an issue in their repo > [https://github.com/kislyuk/watchtower/issues/57] > And a PR for addressing that issue > [https://github.com/kislyuk/watchtower/pull/58] > > The PR is still far from ready to be reviewed, but I just want to get some > feedback before I spend more time on it. I would like to see if youguys want > this cloudwatch handler goes into the main repo, or do youguys prefer it to > be a standalone third-party module. If it's that case, I can close this > ticket and create a standalone repo on my own. If the PR is welcome, then I > can spend more time on polishing it based on your feedback, add tests / > documents and other stuff. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)