[GitHub] [airflow] andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler
andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler URL: https://github.com/apache/airflow/pull/5048#discussion_r289957508 ## File path: UPDATING.md ## @@ -24,6 +24,16 @@ assists users migrating to a new version. ## Airflow Master +### Changes in writing Logs to Elasticsearch + +* renamed `elasticsearch_host` parameter to `host` under `[elasticsearch]` section in `airflow.cfg` Review comment: @schnie pls review ^ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler
andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler URL: https://github.com/apache/airflow/pull/5048#discussion_r289957144 ## File path: UPDATING.md ## @@ -594,8 +594,8 @@ log_filename_template = ti.dag_id / ti.task_id / ts /{{ log_processor_filename_template = filename .log [elasticsearch] -elasticsearch_log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}} Review comment: @ashb done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler
andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler URL: https://github.com/apache/airflow/pull/5048#discussion_r289309846 ## File path: airflow/config_templates/default_airflow.cfg ## @@ -561,13 +561,22 @@ api_rev = v3 hide_sensitive_variable_fields = True [elasticsearch] +# Elasticsearch host elasticsearch_host = +# Format of the log_id, which is used to query for a given tasks logs elasticsearch_log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}} +# Used to mark the end of a log stream for a task elasticsearch_end_of_log_mark = end_of_log # Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id # Code will construct log_id using the log_id template from the argument above. # NOTE: The code will prefix the https:// automatically, don't include that here. elasticsearch_frontend = +# Write the task logs to the stdout of the worker, rather than the default files +elasticsearch_write_stdout = False Review comment: @ashb depreciation is good point, let me dig into it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler
andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler URL: https://github.com/apache/airflow/pull/5048#discussion_r288039427 ## File path: airflow/config_templates/default_airflow.cfg ## @@ -561,13 +561,22 @@ api_rev = v3 hide_sensitive_variable_fields = True [elasticsearch] +# Elasticsearch host elasticsearch_host = +# Format of the log_id, which is used to query for a given tasks logs elasticsearch_log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}} +# Used to mark the end of a log stream for a task elasticsearch_end_of_log_mark = end_of_log # Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id # Code will construct log_id using the log_id template from the argument above. # NOTE: The code will prefix the https:// automatically, don't include that here. elasticsearch_frontend = +# Write the task logs to the stdout of the worker, rather than the default files +elasticsearch_write_stdout = False Review comment: > Is this option specific to Elasticsearch remote logging or could it be useful for other loggers too? I think it's only for ElasticSearch for now, and probably it can be useful for all loggers in future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler
andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler URL: https://github.com/apache/airflow/pull/5048#discussion_r288038294 ## File path: tests/utils/log/test_json_formatter.py ## @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import unittest +from logging import makeLogRecord + +from airflow.utils.log.json_formatter import JSONFormatter + + +class TestJSONFormatter(unittest.TestCase): +def setUp(self): +super().setUp() + +def test_json_formatter_is_not_none(self): +json_fmt = JSONFormatter() +self.assertIsNotNone(json_fmt) + +def test_format(self): +log_record = makeLogRecord({"label": "value"}) +json_fmt = JSONFormatter(json_fields=["label"]) +self.assertEqual(json_fmt.format(log_record), '{"label": "value"}') Review comment: fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler
andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler URL: https://github.com/apache/airflow/pull/5048#discussion_r288034244 ## File path: airflow/config_templates/default_airflow.cfg ## @@ -561,13 +561,22 @@ api_rev = v3 hide_sensitive_variable_fields = True [elasticsearch] +# Elasticsearch host elasticsearch_host = +# Format of the log_id, which is used to query for a given tasks logs elasticsearch_log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}} +# Used to mark the end of a log stream for a task elasticsearch_end_of_log_mark = end_of_log # Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id # Code will construct log_id using the log_id template from the argument above. # NOTE: The code will prefix the https:// automatically, don't include that here. elasticsearch_frontend = +# Write the task logs to the stdout of the worker, rather than the default files +elasticsearch_write_stdout = False Review comment: @ashb what about `elasticsearch_end_of_log_mark` and another options? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler
andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler URL: https://github.com/apache/airflow/pull/5048#discussion_r279211175 ## File path: airflow/utils/log/es_task_handler.py ## @@ -119,7 +146,10 @@ def _read(self, ti, try_number, metadata=None): if offset != next_offset or 'last_log_timestamp' not in metadata: metadata['last_log_timestamp'] = str(cur_ts) -message = '\n'.join([log.message for log in logs]) +# If we hit the end of the log, remove the actual end_of_log message Review comment: @KevinYang21 can you please demonstrate example? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler
andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler URL: https://github.com/apache/airflow/pull/5048#discussion_r278982447 ## File path: airflow/utils/log/es_task_handler.py ## @@ -67,19 +72,37 @@ def __init__(self, base_log_folder, filename_template, self.mark_end_on_close = True self.end_of_log_mark = end_of_log_mark +self.write_stdout = write_stdout +self.json_format = json_format +self.json_fields = [label.strip() for label in json_fields.split(",")] + +self.handler = None def _render_log_id(self, ti, try_number): if self.log_id_jinja_template: jinja_context = ti.get_template_context() jinja_context['try_number'] = try_number return self.log_id_jinja_template.render(**jinja_context) +if self.json_format: +execution_date = self._clean_execution_date(ti.execution_date) +else: +execution_date = ti.execution_date.isoformat() return self.log_id_template.format(dag_id=ti.dag_id, task_id=ti.task_id, - execution_date=ti - .execution_date.isoformat(), + execution_date=execution_date, try_number=try_number) +@staticmethod +def _clean_execution_date(execution_date): +""" +Clean up an execution date so that it is safe to query in elasticsearch +by removing reserved characters. +# https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters +:param execution_date: execution date of the dag run. +""" +return re.sub(r"[\+\-\:\.]", "", execution_date.isoformat()) Review comment: @bolkedebruin can you suggest something similar to `dt.today().strftime("%Y_%m_%dT%H_%I_%S_%f")` from (pendulum docs)[https://pendulum.eustace.io/docs/#string-formatting]? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler
andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler URL: https://github.com/apache/airflow/pull/5048#discussion_r276652641 ## File path: airflow/utils/log/es_task_handler.py ## @@ -67,19 +72,37 @@ def __init__(self, base_log_folder, filename_template, self.mark_end_on_close = True self.end_of_log_mark = end_of_log_mark +self.write_stdout = write_stdout +self.json_format = json_format +self.json_fields = [label.strip() for label in json_fields.split(",")] + +self.handler = None def _render_log_id(self, ti, try_number): if self.log_id_jinja_template: jinja_context = ti.get_template_context() jinja_context['try_number'] = try_number return self.log_id_jinja_template.render(**jinja_context) +if self.json_format: +execution_date = self._clean_execution_date(ti.execution_date) +else: +execution_date = ti.execution_date.isoformat() return self.log_id_template.format(dag_id=ti.dag_id, task_id=ti.task_id, - execution_date=ti - .execution_date.isoformat(), + execution_date=execution_date, try_number=try_number) +@staticmethod +def _clean_execution_date(execution_date): +""" +Clean up an execution date so that it is safe to query in elasticsearch +by removing reserved characters. +# https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters +:param execution_date: execution date of the dag run. +""" +return re.sub(r"[\+\-\:\.]", "", execution_date.isoformat()) Review comment: or ``` In [13]: re.sub(r"[\+\-\:\.]", "", dt_str) Out[13]: '20190418T155643400365' In [14]: re.sub(r"[\+\-\:\.]", "_", dt_str) Out[14]: '2019_04_18T15_56_43_400365' ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler
andriisoldatenko commented on a change in pull request #5048: [AIRFLOW-3370] Add stdout output options to Elasticsearch task log handler URL: https://github.com/apache/airflow/pull/5048#discussion_r276648762 ## File path: airflow/utils/log/es_task_handler.py ## @@ -67,19 +72,37 @@ def __init__(self, base_log_folder, filename_template, self.mark_end_on_close = True self.end_of_log_mark = end_of_log_mark +self.write_stdout = write_stdout +self.json_format = json_format +self.json_fields = [label.strip() for label in json_fields.split(",")] + +self.handler = None def _render_log_id(self, ti, try_number): if self.log_id_jinja_template: jinja_context = ti.get_template_context() jinja_context['try_number'] = try_number return self.log_id_jinja_template.render(**jinja_context) +if self.json_format: +execution_date = self._clean_execution_date(ti.execution_date) +else: +execution_date = ti.execution_date.isoformat() return self.log_id_template.format(dag_id=ti.dag_id, task_id=ti.task_id, - execution_date=ti - .execution_date.isoformat(), + execution_date=execution_date, try_number=try_number) +@staticmethod +def _clean_execution_date(execution_date): +""" +Clean up an execution date so that it is safe to query in elasticsearch +by removing reserved characters. +# https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters +:param execution_date: execution date of the dag run. +""" +return re.sub(r"[\+\-\:\.]", "", execution_date.isoformat()) Review comment: @ashb what do you think if it should be: ``` In [5]: dt.today().strftime("%Y_%m_%dT%H_%I_%S_%f") Out[5]: '2019_04_18T15_03_06_899836' ``` cc @schnie This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services