[GitHub] [airflow] andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler

2019-06-03 Thread GitBox
andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] 
Add stdout output options to Elasticsearch task log handler
URL: https://github.com/apache/airflow/pull/5048#discussion_r289957508
 
 

 ##
 File path: UPDATING.md
 ##
 @@ -24,6 +24,16 @@ assists users migrating to a new version.
 
 ## Airflow Master
 
+### Changes in writing Logs to Elasticsearch
+
+* renamed `elasticsearch_host` parameter to `host` under `[elasticsearch]` 
section in `airflow.cfg`
 
 Review comment:
   @schnie pls review ^


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler

2019-06-03 Thread GitBox
andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] 
Add stdout output options to Elasticsearch task log handler
URL: https://github.com/apache/airflow/pull/5048#discussion_r289957144
 
 

 ##
 File path: UPDATING.md
 ##
 @@ -594,8 +594,8 @@ log_filename_template =  ti.dag_id / 
ti.task_id / ts /{{
 log_processor_filename_template =  filename .log
 
 [elasticsearch]
-elasticsearch_log_id_template = 
{{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
 
 Review comment:
   @ashb done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler

2019-05-31 Thread GitBox
andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] 
Add stdout output options to Elasticsearch task log handler
URL: https://github.com/apache/airflow/pull/5048#discussion_r289309846
 
 

 ##
 File path: airflow/config_templates/default_airflow.cfg
 ##
 @@ -561,13 +561,22 @@ api_rev = v3
 hide_sensitive_variable_fields = True
 
 [elasticsearch]
+# Elasticsearch host
 elasticsearch_host =
+# Format of the log_id, which is used to query for a given tasks logs
 elasticsearch_log_id_template = 
{{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
+# Used to mark the end of a log stream for a task
 elasticsearch_end_of_log_mark = end_of_log
 # Qualified URL for an elasticsearch frontend (like Kibana) with a template 
argument for log_id
 # Code will construct log_id using the log_id template from the argument above.
 # NOTE: The code will prefix the https:// automatically, don't include that 
here.
 elasticsearch_frontend =
+# Write the task logs to the stdout of the worker, rather than the default 
files
+elasticsearch_write_stdout = False
 
 Review comment:
   @ashb depreciation is good point, let me dig into it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler

2019-05-28 Thread GitBox
andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] 
Add stdout output options to Elasticsearch task log handler
URL: https://github.com/apache/airflow/pull/5048#discussion_r288039427
 
 

 ##
 File path: airflow/config_templates/default_airflow.cfg
 ##
 @@ -561,13 +561,22 @@ api_rev = v3
 hide_sensitive_variable_fields = True
 
 [elasticsearch]
+# Elasticsearch host
 elasticsearch_host =
+# Format of the log_id, which is used to query for a given tasks logs
 elasticsearch_log_id_template = 
{{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
+# Used to mark the end of a log stream for a task
 elasticsearch_end_of_log_mark = end_of_log
 # Qualified URL for an elasticsearch frontend (like Kibana) with a template 
argument for log_id
 # Code will construct log_id using the log_id template from the argument above.
 # NOTE: The code will prefix the https:// automatically, don't include that 
here.
 elasticsearch_frontend =
+# Write the task logs to the stdout of the worker, rather than the default 
files
+elasticsearch_write_stdout = False
 
 Review comment:
   > Is this option specific to Elasticsearch remote logging or could it be 
useful for other loggers too?
   
   I think it's only for ElasticSearch for now, and probably it can be useful 
for all loggers in future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler

2019-05-28 Thread GitBox
andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] 
Add stdout output options to Elasticsearch task log handler
URL: https://github.com/apache/airflow/pull/5048#discussion_r288038294
 
 

 ##
 File path: tests/utils/log/test_json_formatter.py
 ##
 @@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from logging import makeLogRecord
+
+from airflow.utils.log.json_formatter import JSONFormatter
+
+
+class TestJSONFormatter(unittest.TestCase):
+def setUp(self):
+super().setUp()
+
+def test_json_formatter_is_not_none(self):
+json_fmt = JSONFormatter()
+self.assertIsNotNone(json_fmt)
+
+def test_format(self):
+log_record = makeLogRecord({"label": "value"})
+json_fmt = JSONFormatter(json_fields=["label"])
+self.assertEqual(json_fmt.format(log_record), '{"label": "value"}')
 
 Review comment:
   fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler

2019-05-28 Thread GitBox
andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] 
Add stdout output options to Elasticsearch task log handler
URL: https://github.com/apache/airflow/pull/5048#discussion_r288034244
 
 

 ##
 File path: airflow/config_templates/default_airflow.cfg
 ##
 @@ -561,13 +561,22 @@ api_rev = v3
 hide_sensitive_variable_fields = True
 
 [elasticsearch]
+# Elasticsearch host
 elasticsearch_host =
+# Format of the log_id, which is used to query for a given tasks logs
 elasticsearch_log_id_template = 
{{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
+# Used to mark the end of a log stream for a task
 elasticsearch_end_of_log_mark = end_of_log
 # Qualified URL for an elasticsearch frontend (like Kibana) with a template 
argument for log_id
 # Code will construct log_id using the log_id template from the argument above.
 # NOTE: The code will prefix the https:// automatically, don't include that 
here.
 elasticsearch_frontend =
+# Write the task logs to the stdout of the worker, rather than the default 
files
+elasticsearch_write_stdout = False
 
 Review comment:
   @ashb what about `elasticsearch_end_of_log_mark` and another options? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler

2019-04-28 Thread GitBox
andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] 
Add stdout output options to Elasticsearch task log handler
URL: https://github.com/apache/airflow/pull/5048#discussion_r279211175
 
 

 ##
 File path: airflow/utils/log/es_task_handler.py
 ##
 @@ -119,7 +146,10 @@ def _read(self, ti, try_number, metadata=None):
 if offset != next_offset or 'last_log_timestamp' not in metadata:
 metadata['last_log_timestamp'] = str(cur_ts)
 
-message = '\n'.join([log.message for log in logs])
+# If we hit the end of the log, remove the actual end_of_log message
 
 Review comment:
   @KevinYang21 can you please demonstrate example?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler

2019-04-26 Thread GitBox
andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] 
Add stdout output options to Elasticsearch task log handler
URL: https://github.com/apache/airflow/pull/5048#discussion_r278982447
 
 

 ##
 File path: airflow/utils/log/es_task_handler.py
 ##
 @@ -67,19 +72,37 @@ def __init__(self, base_log_folder, filename_template,
 
 self.mark_end_on_close = True
 self.end_of_log_mark = end_of_log_mark
+self.write_stdout = write_stdout
+self.json_format = json_format
+self.json_fields = [label.strip() for label in json_fields.split(",")]
+
+self.handler = None
 
 def _render_log_id(self, ti, try_number):
 if self.log_id_jinja_template:
 jinja_context = ti.get_template_context()
 jinja_context['try_number'] = try_number
 return self.log_id_jinja_template.render(**jinja_context)
 
+if self.json_format:
+execution_date = self._clean_execution_date(ti.execution_date)
+else:
+execution_date = ti.execution_date.isoformat()
 return self.log_id_template.format(dag_id=ti.dag_id,
task_id=ti.task_id,
-   execution_date=ti
-   .execution_date.isoformat(),
+   execution_date=execution_date,
try_number=try_number)
 
+@staticmethod
+def _clean_execution_date(execution_date):
+"""
+Clean up an execution date so that it is safe to query in elasticsearch
+by removing reserved characters.
+# 
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters
+:param execution_date: execution date of the dag run.
+"""
+return re.sub(r"[\+\-\:\.]", "", execution_date.isoformat())
 
 Review comment:
   @bolkedebruin  can you suggest something similar to 
`dt.today().strftime("%Y_%m_%dT%H_%I_%S_%f")` from 
   (pendulum docs)[https://pendulum.eustace.io/docs/#string-formatting]?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler

2019-04-18 Thread GitBox
andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] 
Add stdout output options to Elasticsearch task log handler
URL: https://github.com/apache/airflow/pull/5048#discussion_r276652641
 
 

 ##
 File path: airflow/utils/log/es_task_handler.py
 ##
 @@ -67,19 +72,37 @@ def __init__(self, base_log_folder, filename_template,
 
 self.mark_end_on_close = True
 self.end_of_log_mark = end_of_log_mark
+self.write_stdout = write_stdout
+self.json_format = json_format
+self.json_fields = [label.strip() for label in json_fields.split(",")]
+
+self.handler = None
 
 def _render_log_id(self, ti, try_number):
 if self.log_id_jinja_template:
 jinja_context = ti.get_template_context()
 jinja_context['try_number'] = try_number
 return self.log_id_jinja_template.render(**jinja_context)
 
+if self.json_format:
+execution_date = self._clean_execution_date(ti.execution_date)
+else:
+execution_date = ti.execution_date.isoformat()
 return self.log_id_template.format(dag_id=ti.dag_id,
task_id=ti.task_id,
-   execution_date=ti
-   .execution_date.isoformat(),
+   execution_date=execution_date,
try_number=try_number)
 
+@staticmethod
+def _clean_execution_date(execution_date):
+"""
+Clean up an execution date so that it is safe to query in elasticsearch
+by removing reserved characters.
+# 
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters
+:param execution_date: execution date of the dag run.
+"""
+return re.sub(r"[\+\-\:\.]", "", execution_date.isoformat())
 
 Review comment:
   or
   ```
   In [13]: re.sub(r"[\+\-\:\.]", "", dt_str)
   Out[13]: '20190418T155643400365'
   
   In [14]: re.sub(r"[\+\-\:\.]", "_", dt_str)
   Out[14]: '2019_04_18T15_56_43_400365'
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler

2019-04-18 Thread GitBox
andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] 
Add stdout output options to Elasticsearch task log handler
URL: https://github.com/apache/airflow/pull/5048#discussion_r276648762
 
 

 ##
 File path: airflow/utils/log/es_task_handler.py
 ##
 @@ -67,19 +72,37 @@ def __init__(self, base_log_folder, filename_template,
 
 self.mark_end_on_close = True
 self.end_of_log_mark = end_of_log_mark
+self.write_stdout = write_stdout
+self.json_format = json_format
+self.json_fields = [label.strip() for label in json_fields.split(",")]
+
+self.handler = None
 
 def _render_log_id(self, ti, try_number):
 if self.log_id_jinja_template:
 jinja_context = ti.get_template_context()
 jinja_context['try_number'] = try_number
 return self.log_id_jinja_template.render(**jinja_context)
 
+if self.json_format:
+execution_date = self._clean_execution_date(ti.execution_date)
+else:
+execution_date = ti.execution_date.isoformat()
 return self.log_id_template.format(dag_id=ti.dag_id,
task_id=ti.task_id,
-   execution_date=ti
-   .execution_date.isoformat(),
+   execution_date=execution_date,
try_number=try_number)
 
+@staticmethod
+def _clean_execution_date(execution_date):
+"""
+Clean up an execution date so that it is safe to query in elasticsearch
+by removing reserved characters.
+# 
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters
+:param execution_date: execution date of the dag run.
+"""
+return re.sub(r"[\+\-\:\.]", "", execution_date.isoformat())
 
 Review comment:
   @ashb  what do you think if it should be:
   ```
   In [5]: dt.today().strftime("%Y_%m_%dT%H_%I_%S_%f")
   Out[5]: '2019_04_18T15_03_06_899836'
   ```
   cc @schnie 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services