This is an automated email from the ASF dual-hosted git repository.

johnbodley pushed a commit to branch feature--embeddable-charts-pilot
in repository https://gitbox.apache.org/repos/asf/incubator-superset.git

commit c0034e916bbd9a684c095db8d5d9225ee44ba33a
Author: Conglei Shi <conglei....@airbnb.com>
AuthorDate: Tue Nov 13 01:31:33 2018 -0800

    added cache logic
---
 superset/common/query_context.py | 71 +++++++++++++++++++++++++---------------
 1 file changed, 44 insertions(+), 27 deletions(-)

diff --git a/superset/common/query_context.py b/superset/common/query_context.py
index f17f39c..7fe67db 100644
--- a/superset/common/query_context.py
+++ b/superset/common/query_context.py
@@ -30,6 +30,7 @@ class QueryContext:
             datasource: Dict,
             queries: List[Dict],
             force: bool = False,
+            custom_cache_timeout: int = None,
     ):
         self.datasource = 
ConnectorRegistry.get_datasource(datasource.get('type'),
                                                            
int(datasource.get('id')),
@@ -38,9 +39,9 @@ class QueryContext:
 
         self.force = force
         
-        self.query_details = []
+        self.custom_cache_timeout = custom_cache_timeout
 
-    def get_df(self, query_object):
+    def get_query_result(self, query_object):
         """Returns a pandas dataframe based on the query object"""
 
         # Here, we assume that all the queries will use the same datasource, 
which is
@@ -55,12 +56,6 @@ class QueryContext:
 
         # The datasource here can be different backend but the interface is 
common
         result = self.datasource.query(query_object.to_dict())
-        query_detail = {
-            'raw_query': result.query,
-            'status': result.status,
-            'error_message': result.error_message,
-        }
-        self.query_details.append(query_detail)
 
         df = result.df
         # Transform the timestamp we received from database to pandas supported
@@ -85,7 +80,12 @@ class QueryContext:
 
             df.replace([np.inf, -np.inf], np.nan)
             df = self.handle_nulls(df)
-        return df
+        return {
+            'query': result.query,
+            'status': result.status,
+            'error_message': result.error_message,
+            'df': df
+        }
 
     def df_metrics_to_num(self, df, query_object):
         """Converting metrics to numeric when pandas.read_sql cannot"""
@@ -127,7 +127,7 @@ class QueryContext:
             if df is not None and df.empty:
                 payload['error'] = 'No data'
             else:
-                payload['data'] = self.get_data(d)
+                payload['data'] = self.get_data(df)
         if 'df' in payload:
             del payload['df']
         return payload
@@ -136,6 +136,18 @@ class QueryContext:
         """Get all the paylaods from the arrays"""
         return [self.get_payload(query_ojbect) for query_ojbect in 
self.queries]
 
+    @property
+    def cache_timeout(self):
+        if self.custom_cache_timeout not None:
+            return self.custom_cache_timeout
+        if self.datasource.cache_timeout is not None:
+            return self.datasource.cache_timeout
+        if (
+                hasattr(self.datasource, 'database') and
+                self.datasource.database.cache_timeout) is not None:
+            return self.datasource.database.cache_timeout
+        return config.get('CACHE_DEFAULT_TIMEOUT')
+
     def get_df_payload(self, query_obj):
         """Handles caching around the df paylod retrieval"""
         cache_key = query_obj.cache_key() if query_obj else None
@@ -144,6 +156,10 @@ class QueryContext:
         stacktrace = None
         df = None
         cached_dttm = datetime.utcnow().isoformat().split('.')[0]
+        cache_value = None
+        status = None
+        query = ''
+        error_message = None
         if cache_key and cache and not self.force:
             cache_value = cache.get(cache_key)
             if cache_value:
@@ -151,10 +167,8 @@ class QueryContext:
                 try:
                     cache_value = pkl.loads(cache_value)
                     df = cache_value['df']
-                    self.query = cache_value['query']
-                    self._any_cached_dttm = cache_value['dttm']
-                    self._any_cache_key = cache_key
-                    self.status = utils.QueryStatus.SUCCESS
+                    query = cache_value['query']
+                    status = utils.QueryStatus.SUCCESS
                     is_loaded = True
                 except Exception as e:
                     logging.exception(e)
@@ -164,27 +178,31 @@ class QueryContext:
 
         if query_obj and not is_loaded:
             try:
-                df = self.get_df(query_obj)
-                if self.status != utils.QueryStatus.FAILED:
+                query_result = self.get_query_result(query_obj)
+                status = query_result['status']
+                query = query_result['query']
+                error_message = query_result['error_message']
+                df = query_result['df']
+                if status != utils.QueryStatus.FAILED:
                     stats_logger.incr('loaded_from_source')
                     is_loaded = True
             except Exception as e:
                 logging.exception(e)
-                if not self.error_message:
-                    self.error_message = '{}'.format(e)
-                self.status = utils.QueryStatus.FAILED
+                if not error_message:
+                    error_message = '{}'.format(e)
+                status = utils.QueryStatus.FAILED
                 stacktrace = traceback.format_exc()
 
             if (
                     is_loaded and
                     cache_key and
                     cache and
-                    self.status != utils.QueryStatus.FAILED):
+                    status != utils.QueryStatus.FAILED):
                 try:
                     cache_value = dict(
                         dttm=cached_dttm,
                         df=df if df is not None else None,
-                        query=self.query,
+                        query=query,
                     )
                     cache_value = pkl.dumps(
                         cache_value, protocol=pkl.HIGHEST_PROTOCOL)
@@ -204,15 +222,14 @@ class QueryContext:
                     logging.exception(e)
                     cache.delete(cache_key)
         return {
-            'cache_key': self._any_cache_key,
+            'cache_key': cache_key,
             'cached_dttm': self._any_cached_dttm,
             'cache_timeout': self.cache_timeout,
             'df': df,
-            'error': self.error_message,
-            'form_data': self.form_data,
-            'is_cached': self._any_cache_key is not None,
-            'query': self.query,
-            'status': self.status,
+            'error': error_message,
+            'is_cached': cache_key is not None,
+            'query': query,
+            'status': status,
             'stacktrace': stacktrace,
             'rowcount': len(df.index) if df is not None else 0,
         }

Reply via email to