Hi Greg, I managed to get this work. There were a couple of issues I managed to fix: The HELM charts for k8s airflow helm chart uses container image puckel/docker-airflow and that image did not install elasticsearch python modules. The airflow documentation use double curly braces in LOG_ID_TEMPLATE and that does not work: the dag_id and task_id are not picked up by the following line, and the execution_date was in a different format than the one written to the logs thus airflow-web cannot find the logs. I changed to the single curly brace json form of the log_id template and it works. if self.log_id_jinja_template:
jinja_context = ti.get_template_context() jinja_context['try_number'] = try_number return self.log_id_jinja_template.render(**jinja_context) The “end of log” marker does not include the aforementioned log_id. The issue is then airflow-web does not know when to stop tailing the logs. I made a fix as shown below and validated it works for me. diff --git a/airflow/utils/log/es_task_handler.py b/airflow/utils/log/es_task_handler.py index 2dacfb49b..10fd7e240 100644 --- a/airflow/utils/log/es_task_handler.py +++ b/airflow/utils/log/es_task_handler.py @@ -70,7 +70,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin): self.log_id_template, self.log_id_jinja_template = \ parse_template_string(log_id_template) - + self.index = conf.get('elasticsearch', 'INDEX') self.client = elasticsearch.Elasticsearch([host], **es_kwargs) self.mark_end_on_close = True @@ -171,9 +171,8 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin): :param metadata: log metadata, used for steaming log download. :type metadata: dict """ - # Offset is the unique key for sorting logs given log_id. - search = Search(using=self.client) \ + search = Search(using=self.client, index=self.index) \ .query('match_phrase', log_id=log_id) \ .sort('offset') @@ -254,8 +253,10 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin): self.handler.stream = self.handler._open() # pylint: disable=protected-access # Mark the end of file using end of log mark, - # so we know where to stop while auto-tailing. - self.handler.stream.write(self.end_of_log_mark) + # so we know where to stop while auto-tailing.\ + if self.write_stdout: + print() + self.handler.emit(logging.LogRecord(None, logging.INFO, None, 0, self.end_of_log_mark, None, None)) if self.write_stdout: self.handler.close() I could not submit a PR for these fixes to github with the following error. remote: Permission to apache/airflow.git denied to larryzhu2018. fatal: unable to access 'https://github.com/apache/airflow.git/': The requested URL returned error: 403 Can you please point me how to get the permission so that I can submit PRs to fix the issues in airflow? Thanks, --Larry From: Greg Neiheisel <g...@astronomer.io> Reply-To: <users@airflow.apache.org> Date: Friday, January 10, 2020 at 3:30 PM To: <users@airflow.apache.org> Subject: Re: Help to write airflow task instance logs to elastic search Hey Larry, are you using the `KubernetesExecutor`? We support ES logging for our clients and work with Local, Celery and Kubernetes executors. I took a look through our helm charts to see if anything jumped out. Wondering if you may need to pass this extra configuration to the executor pods https://github.com/astronomer/helm.astronomer.io/blob/master/charts/airflow/templates/configmap.yaml#L78 Possible that without that configuration set, it may skip logger configuration here: https://github.com/apache/airflow/blob/d5fa17f7b969eab6fd2af731bc63e5e6e90d56cb/airflow/config_templates/airflow_local_settings.py#L200 On Thu, Jan 9, 2020 at 2:25 AM Larry Zhu <larry....@oracle.com> wrote: I am using 1.10.6 and here are my log configurations for running airflow on kubernetes. I set up the kubenets to send all the console output logs to elasticsearch and I am trying to configure airflow worker to write logs to console. And it does not seem to work. I can see the local logs in the pod, but the task instance logs are not getting written to console therefore my filebeat daemon set cannot pick up the logs. Can you please help to shed lights to this? airflow: config: AIRFLOW__CORE__REMOTE_LOGGING: "True" # HTTP_PROXY: "http://proxy.mycompany.com:123" AIRFLOW__ELASTICSEARCH__LOG_ID_TEMPLATE: "{{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}" AIRFLOW__ELASTICSEARCH__END_OF_LOG_MARK: "end_of_log" AIRFLOW__ELASTICSEARCH__WRITE_STDOUT: "True" AIRFLOW__ELASTICSEARCH__JSON_FORMAT: "True" AIRFLOW__ELASTICSEARCH__JSON_FIELDS: "asctime, filename, lineno, levelname, message" -- Greg Neiheisel / CTO Astronomer.io