[jira] [Commented] (AIRFLOW-2325) Task logging with AWS Cloud watch

2018-12-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723670#comment-16723670
 ] 

ASF GitHub Bot commented on AIRFLOW-2325:
-

stale[bot] closed pull request #3229: [AIRFLOW-2325] Add cloudwatch task 
handler (IN PROGRESS)
URL: https://github.com/apache/incubator-airflow/pull/3229
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/utils/log/cloudwatch_task_handler.py 
b/airflow/utils/log/cloudwatch_task_handler.py
new file mode 100644
index 00..97e67b8c89
--- /dev/null
+++ b/airflow/utils/log/cloudwatch_task_handler.py
@@ -0,0 +1,124 @@
+# -*- coding: utf-8 -*-
+import os
+import logging
+
+import boto3
+import watchtower
+from jinja2 import Template
+from airflow import configuration
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class CloudwatchTaskHandler(logging.Handler, LoggingMixin):
+def __init__(self, log_group, filename_template, region_name=None, 
**kwargs):
+super(CloudwatchTaskHandler, self).__init__()
+self.handler = None
+self.log_group = log_group
+self.region_name = region_name
+self.filename_template = filename_template
+self.filename_jinja_template = None
+self.kwargs = kwargs
+self.closed = False
+
+if "{{" in self.filename_template: #jinja mode
+self.filename_jinja_template = Template(self.filename_template)
+
+def _render_filename(self, ti, try_number):
+if self.filename_jinja_template:
+jinja_context = ti.get_template_context()
+jinja_context['try_number'] = try_number
+return (
+self.filename_jinja_template.render(**jinja_context)
+.replace(':', '_')
+)
+
+return self.filename_template.format(
+dag_id=ti.dag_id,
+task_id=ti.task_id,
+execution_date=ti.execution_date.isoformat(),
+try_number=try_number,
+).replace(':', '_')
+
+def set_context(self, ti):
+kwargs = self.kwargs.copy()
+stream_name = kwargs.pop('stream_name', None)
+if stream_name is None:
+stream_name = self._render_filename(ti, ti.try_number)
+if 'boto3_session' not in kwargs and self.region_name is not None:
+kwargs['boto3_session'] = boto3.session.Session(
+region_name=self.region_name,
+)
+self.handler = watchtower.CloudWatchLogHandler(
+log_group=self.log_group,
+stream_name=stream_name,
+**kwargs
+)
+
+def emit(self, record):
+if self.handler is not None:
+self.handler.emit(record)
+
+def flush(self):
+if self.handler is not None:
+self.handler.flush()
+
+def close(self):
+"""
+Close and upload local log file to remote storage S3.
+"""
+# When application exit, system shuts down all handlers by
+# calling close method. Here we check if logger is already
+# closed to prevent uploading the log to remote storage multiple
+# times when `logging.shutdown` is called.
+if self.closed:
+return
+
+if self.handler is not None:
+self.handler.close()
+# Mark closed so we don't double write if close is called twice
+self.closed = True
+
+def read(self, task_instance, try_number=None):
+if try_number is None:
+next_try = task_instance.next_try_number
+try_numbers = list(range(1, next_try))
+elif try_number < 1:
+logs = [
+'Error fetching the logs. Try number {try_number} is invalid.',
+]
+return logs
+else:
+try_numbers = [try_number]
+
+logs = [''] * len(try_numbers)
+for i, try_number in enumerate(try_numbers):
+logs[i] += self._read(task_instance, try_number)
+
+return logs
+
+def _read(self, task_instance, try_number):
+stream_name = self._render_filename(task_instance, try_number)
+if self.handler is not None:
+client = self.handler.cwl_client
+else:
+client = boto3.client('logs', region_name=self.region_name)
+events = []
+try:
+response = client.get_log_events(
+logGroupName=self.log_group,
+logStreamName=stream_name,
+)
+events.extend(response['events'])
+next_token = response['nextForwardToken']
+while True:
+response = client.get_log_events(
+   

[jira] [Commented] (AIRFLOW-2325) Task logging with AWS Cloud watch

2018-10-05 Thread Iuliia Volkova (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16639410#comment-16639410
 ] 

Iuliia Volkova commented on AIRFLOW-2325:
-

[~fangpenlin], what with this task and PR - 
https://github.com/apache/incubator-airflow/pull/3229 ? Is it needed?

> Task logging with AWS Cloud watch
> -
>
> Key: AIRFLOW-2325
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2325
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: logging
>Reporter: Fang-Pen Lin
>Priority: Minor
>
> In many cases, it's ideal to use remote logging while running Airflow in 
> production, as the worker could be easily scale down or scale up. Or the 
> worker is running in containers, where the local storage is not meant to be 
> there forever. In that case, the S3 task logging handler could be used
> [https://github.com/apache/incubator-airflow/blob/master/airflow/utils/log/s3_task_handler.py]
> However, it comes with drawback. S3 logging handler only uploads the log when 
> the task completed or failed. For long running tasks, it's hard to know 
> what's going on with the process until it finishes.
> To make more real-time logging, I built a logging handler based on AWS 
> CloudWatch. It uses a third party python package `watchtower`
>  
> [https://github.com/kislyuk/watchtower/tree/master/watchtower]
>  
> I created a PR here [https://github.com/apache/incubator-airflow/pull/3229], 
> basically I just copy-pasted the code I wrote for my own project, it works 
> fine with 1.9 release, but never tested with master branch. Also, there is a 
> bug in watchtower causing task runner to hang forever when it completes. I 
> created an issue in their repo
> [https://github.com/kislyuk/watchtower/issues/57]
> And a PR for addressing that issue 
> [https://github.com/kislyuk/watchtower/pull/58]
>  
> The PR is still far from ready to be reviewed, but I just want to get some 
> feedback before I spend more time on it. I would like to see if youguys want 
> this cloudwatch handler goes into the main repo, or do youguys prefer it to 
> be a standalone third-party module. If it's that case, I can close this 
> ticket and create a standalone repo on my own. If the PR is welcome, then I 
> can spend more time on polishing it based on your feedback, add tests / 
> documents and other stuff.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2325) Task logging with AWS Cloud watch

2018-09-02 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16601577#comment-16601577
 ] 

Apache Spark commented on AIRFLOW-2325:
---

User 'fangpenlin' has created a pull request for this issue:
https://github.com/apache/incubator-airflow/pull/3229

> Task logging with AWS Cloud watch
> -
>
> Key: AIRFLOW-2325
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2325
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: logging
>Reporter: Fang-Pen Lin
>Priority: Minor
>
> In many cases, it's ideal to use remote logging while running Airflow in 
> production, as the worker could be easily scale down or scale up. Or the 
> worker is running in containers, where the local storage is not meant to be 
> there forever. In that case, the S3 task logging handler could be used
> [https://github.com/apache/incubator-airflow/blob/master/airflow/utils/log/s3_task_handler.py]
> However, it comes with drawback. S3 logging handler only uploads the log when 
> the task completed or failed. For long running tasks, it's hard to know 
> what's going on with the process until it finishes.
> To make more real-time logging, I built a logging handler based on AWS 
> CloudWatch. It uses a third party python package `watchtower`
>  
> [https://github.com/kislyuk/watchtower/tree/master/watchtower]
>  
> I created a PR here [https://github.com/apache/incubator-airflow/pull/3229], 
> basically I just copy-pasted the code I wrote for my own project, it works 
> fine with 1.9 release, but never tested with master branch. Also, there is a 
> bug in watchtower causing task runner to hang forever when it completes. I 
> created an issue in their repo
> [https://github.com/kislyuk/watchtower/issues/57]
> And a PR for addressing that issue 
> [https://github.com/kislyuk/watchtower/pull/58]
>  
> The PR is still far from ready to be reviewed, but I just want to get some 
> feedback before I spend more time on it. I would like to see if youguys want 
> this cloudwatch handler goes into the main repo, or do youguys prefer it to 
> be a standalone third-party module. If it's that case, I can close this 
> ticket and create a standalone repo on my own. If the PR is welcome, then I 
> can spend more time on polishing it based on your feedback, add tests / 
> documents and other stuff.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)