Hi Greg,

I managed to get this work. There were a couple of issues I managed to fix:
The HELM charts for k8s airflow helm chart uses container image 
puckel/docker-airflow and that image did not install elasticsearch python 
modules. 
The airflow documentation use double curly braces in LOG_ID_TEMPLATE and that 
does not work: the dag_id and task_id are not picked up by the following line, 
and the execution_date was in a different format than the one written to the 
logs thus airflow-web cannot find the logs. I changed to the single curly brace 
json form of the log_id template and it works.
if self.log_id_jinja_template:

            jinja_context = ti.get_template_context()

            jinja_context['try_number'] = try_number

            return self.log_id_jinja_template.render(**jinja_context)
The “end of log” marker does not include the aforementioned log_id. The issue 
is then airflow-web does not know when to stop tailing the logs. I made a fix 
as shown below and validated it works for me. 
diff --git a/airflow/utils/log/es_task_handler.py 
b/airflow/utils/log/es_task_handler.py

index 2dacfb49b..10fd7e240 100644

--- a/airflow/utils/log/es_task_handler.py

+++ b/airflow/utils/log/es_task_handler.py

@@ -70,7 +70,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):

 

         self.log_id_template, self.log_id_jinja_template = \

             parse_template_string(log_id_template)

-

+        self.index = conf.get('elasticsearch', 'INDEX')

         self.client = elasticsearch.Elasticsearch([host], **es_kwargs)

 

         self.mark_end_on_close = True

@@ -171,9 +171,8 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
LoggingMixin):

         :param metadata: log metadata, used for steaming log download.

         :type metadata: dict

         """

-

         # Offset is the unique key for sorting logs given log_id.

-        search = Search(using=self.client) \

+        search = Search(using=self.client, index=self.index) \

             .query('match_phrase', log_id=log_id) \

             .sort('offset')

 

@@ -254,8 +253,10 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
LoggingMixin):

             self.handler.stream = self.handler._open()  # pylint: 
disable=protected-access

 

         # Mark the end of file using end of log mark,

-        # so we know where to stop while auto-tailing.

-        self.handler.stream.write(self.end_of_log_mark)

+        # so we know where to stop while auto-tailing.\

+        if self.write_stdout:

+            print()

+        self.handler.emit(logging.LogRecord(None, logging.INFO, None, 0, 
self.end_of_log_mark, None, None))

 

         if self.write_stdout:

             self.handler.close()

 

I could not submit a PR for these fixes to github with the following error.

 

remote: Permission to apache/airflow.git denied to larryzhu2018.

fatal: unable to access 'https://github.com/apache/airflow.git/': The requested 
URL returned error: 403

 

Can you please point me how to get the permission so that I can submit PRs to 
fix the issues in airflow?

Thanks,

--Larry

 

 

From: Greg Neiheisel <g...@astronomer.io>
Reply-To: <users@airflow.apache.org>
Date: Friday, January 10, 2020 at 3:30 PM
To: <users@airflow.apache.org>
Subject: Re: Help to write airflow task instance logs to elastic search

 

Hey Larry, are you using the `KubernetesExecutor`? We support ES logging for 
our clients and work with Local, Celery and Kubernetes executors. I took a look 
through our helm charts to see if anything jumped out. Wondering if you may 
need to pass this extra configuration to the executor pods 
https://github.com/astronomer/helm.astronomer.io/blob/master/charts/airflow/templates/configmap.yaml#L78

 

Possible that without that configuration set, it may skip logger configuration 
here: 
https://github.com/apache/airflow/blob/d5fa17f7b969eab6fd2af731bc63e5e6e90d56cb/airflow/config_templates/airflow_local_settings.py#L200

 

On Thu, Jan 9, 2020 at 2:25 AM Larry Zhu <larry....@oracle.com> wrote:

I am using 1.10.6 and here are my log configurations for running airflow on 
kubernetes. I set up the kubenets to send all the console output logs to 
elasticsearch and I am trying to configure airflow worker to write logs to 
console. And it does not seem to work. I can see the local logs in the pod, but 
the task instance logs are not getting written to console therefore my filebeat 
daemon set cannot pick up the logs. Can you please help to shed lights to this?

 
airflow:
  config:
    AIRFLOW__CORE__REMOTE_LOGGING: "True"
    # HTTP_PROXY: "http://proxy.mycompany.com:123";
    AIRFLOW__ELASTICSEARCH__LOG_ID_TEMPLATE: 
"{{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}"
    AIRFLOW__ELASTICSEARCH__END_OF_LOG_MARK: "end_of_log"
    AIRFLOW__ELASTICSEARCH__WRITE_STDOUT: "True"
    AIRFLOW__ELASTICSEARCH__JSON_FORMAT: "True"
    AIRFLOW__ELASTICSEARCH__JSON_FIELDS: "asctime, filename, lineno, levelname, 
message"
 


 

-- 

Greg Neiheisel / CTO Astronomer.io

 

Reply via email to