[ 
https://issues.apache.org/jira/browse/AIRFLOW-3506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724537#comment-16724537
 ] 

ASF GitHub Bot commented on AIRFLOW-3506:
-----------------------------------------

feng-tao closed pull request #4342: [AIRFLOW-3506] use match_phrase to query 
log_id in elasticsearch
URL: https://github.com/apache/incubator-airflow/pull/4342
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/utils/log/es_task_handler.py 
b/airflow/utils/log/es_task_handler.py
index 16372c0600..2dbee94171 100644
--- a/airflow/utils/log/es_task_handler.py
+++ b/airflow/utils/log/es_task_handler.py
@@ -135,7 +135,7 @@ def es_read(self, log_id, offset):
 
         # Offset is the unique key for sorting logs given log_id.
         s = Search(using=self.client) \
-            .query('match', log_id=log_id) \
+            .query('match_phrase', log_id=log_id) \
             .sort('offset')
 
         s = s.filter('range', offset={'gt': offset})
diff --git a/tests/utils/log/elasticmock/fake_elasticsearch.py 
b/tests/utils/log/elasticmock/fake_elasticsearch.py
index 0e29e91bb7..f068ede0e5 100644
--- a/tests/utils/log/elasticmock/fake_elasticsearch.py
+++ b/tests/utils/log/elasticmock/fake_elasticsearch.py
@@ -172,15 +172,8 @@ def count(self, index=None, doc_type=None, body=None, 
params=None):
                   'track_scores', 'version')
     def search(self, index=None, doc_type=None, body=None, params=None):
         searchable_indexes = self._normalize_index_to_list(index)
-        searchable_doc_types = self._normalize_doc_type_to_list(doc_type)
 
-        matches = []
-        for searchable_index in searchable_indexes:
-            for document in self.__documents_dict[searchable_index]:
-                if searchable_doc_types\
-                   and document.get('_type') not in searchable_doc_types:
-                    continue
-                matches.append(document)
+        matches = self._find_match(index, doc_type, body, params)
 
         result = {
             'hits': {
@@ -258,6 +251,31 @@ def suggest(self, body, index=None, params=None):
             ]
         return result_dict
 
+    def _find_match(self, index, doc_type, body, params=None):
+        searchable_indexes = self._normalize_index_to_list(index)
+        searchable_doc_types = self._normalize_doc_type_to_list(doc_type)
+
+        must = body['query']['bool']['must'][0]  # only support one must
+
+        matches = []
+        for searchable_index in searchable_indexes:
+            for document in self.__documents_dict[searchable_index]:
+                if searchable_doc_types\
+                   and document.get('_type') not in searchable_doc_types:
+                    continue
+
+                if 'match_phrase' in must:
+                    for query_id in must['match_phrase']:
+                        query_val = must['match_phrase'][query_id]
+                        if query_id in document['_source']:
+                            if query_val in document['_source'][query_id]:
+                                # use in as a proxy for match_phrase
+                                matches.append(document)
+                else:
+                    matches.append(document)
+
+        return matches
+
     def _normalize_index_to_list(self, index):
         # Ensure to have a list of index
         if index is None:
diff --git a/tests/utils/log/test_es_task_handler.py 
b/tests/utils/log/test_es_task_handler.py
index 94184fc826..c5164b1e19 100644
--- a/tests/utils/log/test_es_task_handler.py
+++ b/tests/utils/log/test_es_task_handler.py
@@ -39,8 +39,7 @@ class TestElasticsearchTaskHandler(unittest.TestCase):
     DAG_ID = 'dag_for_testing_file_task_handler'
     TASK_ID = 'task_for_testing_file_log_handler'
     EXECUTION_DATE = datetime(2016, 1, 1)
-    LOG_ID = 'dag_for_testing_file_task_handler-task_for_testing' \
-             '_file_log_handler-2016-01-01T00:00:00+00:00-1'
+    LOG_ID = 
'{dag_id}-{task_id}-2016-01-01T00:00:00+00:00-1'.format(dag_id=DAG_ID, 
task_id=TASK_ID)
 
     @elasticmock
     def setUp(self):
@@ -94,6 +93,31 @@ def test_read(self):
         self.assertEqual(1, metadatas[0]['offset'])
         self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) > 
ts)
 
+    def test_read_with_match_phrase_query(self):
+        simiar_log_id = 
'{task_id}-{dag_id}-2016-01-01T00:00:00+00:00-1'.format(
+            dag_id=TestElasticsearchTaskHandler.DAG_ID,
+            task_id=TestElasticsearchTaskHandler.TASK_ID)
+        another_test_message = 'another message'
+
+        another_body = {'message': another_test_message, 'log_id': 
simiar_log_id, 'offset': 1}
+        self.es.index(index=self.index_name, doc_type=self.doc_type,
+                      body=another_body, id=1)
+
+        ts = pendulum.now()
+        logs, metadatas = self.es_task_handler.read(self.ti,
+                                                    1,
+                                                    {'offset': 0,
+                                                     'last_log_timestamp': 
str(ts),
+                                                     'end_of_log': False})
+        self.assertEqual(1, len(logs))
+        self.assertEqual(len(logs), len(metadatas))
+        self.assertEqual(self.test_message, logs[0])
+        self.assertNotEqual(another_test_message, logs[0])
+
+        self.assertFalse(metadatas[0]['end_of_log'])
+        self.assertEqual(1, metadatas[0]['offset'])
+        self.assertTrue(timezone.parse(metadatas[0]['last_log_timestamp']) > 
ts)
+
     def test_read_with_none_meatadata(self):
         logs, metadatas = self.es_task_handler.read(self.ti, 1)
         self.assertEqual(1, len(logs))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fail to query log from elasticsearch
> ------------------------------------
>
>                 Key: AIRFLOW-3506
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3506
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: logging
>            Reporter: Ping Zhang
>            Assignee: Ping Zhang
>            Priority: Major
>              Labels: easyfix, newbie
>
> We have noticed that the elasticsearch_logging_backend use match to query the 
> log_id from elasticsearch, which ends up getting more results from 
> elasticsearch. It should use match_phrase to query the log_id as a phrase.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to