mistercrunch closed pull request #4740: Add extraction function support for 
Druid queries
URL: https://github.com/apache/incubator-superset/pull/4740
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/requirements.txt b/requirements.txt
index ea4f830cf9..da14026d6d 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -22,7 +22,7 @@ pandas==0.22.0
 parsedatetime==2.0.0
 pathlib2==2.3.0
 polyline==1.3.2
-pydruid==0.4.1
+pydruid==0.4.2
 pyhive==0.5.0
 python-dateutil==2.6.1
 python-geohash==0.8.5
diff --git a/superset/connectors/druid/models.py 
b/superset/connectors/druid/models.py
index 26e3c721a0..f67b03b953 100644
--- a/superset/connectors/druid/models.py
+++ b/superset/connectors/druid/models.py
@@ -20,7 +20,8 @@
 from flask_babel import lazy_gettext as _
 from pydruid.client import PyDruid
 from pydruid.utils.aggregators import count
-from pydruid.utils.filters import Bound, Dimension, Filter
+from pydruid.utils.dimensions import MapLookupExtraction, RegexExtraction
+from pydruid.utils.filters import Dimension, Filter
 from pydruid.utils.having import Aggregation
 from pydruid.utils.postaggregator import (
     Const, Field, HyperUniqueCardinality, Postaggregator, Quantile, Quantiles,
@@ -958,8 +959,25 @@ def _add_filter_from_pre_query_data(self, df, dimensions, 
dim_filter):
             for unused, row in df.iterrows():
                 fields = []
                 for dim in dimensions:
-                    f = Dimension(dim) == row[dim]
-                    fields.append(f)
+                    f = None
+                    # Check if this dimension uses an extraction function
+                    # If so, create the appropriate pydruid extraction object
+                    if isinstance(dim, dict) and 'extractionFn' in dim:
+                        (col, extraction_fn) = 
DruidDatasource._create_extraction_fn(dim)
+                        dim_val = dim['outputName']
+                        f = Filter(
+                            dimension=col,
+                            value=row[dim_val],
+                            extraction_function=extraction_fn,
+                        )
+                    elif isinstance(dim, dict):
+                        dim_val = dim['outputName']
+                        if dim_val:
+                            f = Dimension(dim_val) == row[dim_val]
+                    else:
+                        f = Dimension(dim) == row[dim]
+                    if f:
+                        fields.append(f)
                 if len(fields) > 1:
                     term = Filter(type='and', fields=fields)
                     new_filters.append(term)
@@ -1063,7 +1081,9 @@ def _dimensions_to_values(dimensions):
         values = []
         for dimension in dimensions:
             if isinstance(dimension, dict):
-                if 'dimension' in dimension:
+                if 'extractionFn' in dimension:
+                    values.append(dimension)
+                elif 'dimension' in dimension:
                     values.append(dimension['dimension'])
             else:
                 values.append(dimension)
@@ -1130,7 +1150,7 @@ def run_query(  # noqa / druid
             intervals=self.intervals_from_dttms(from_dttm, to_dttm),
         )
 
-        filters = DruidDatasource.get_filters(filter, self.num_cols)
+        filters = DruidDatasource.get_filters(filter, self.num_cols, 
columns_dict)
         if filters:
             qry['filter'] = filters
 
@@ -1215,7 +1235,14 @@ def run_query(  # noqa / druid
 
                 pre_qry = deepcopy(qry)
                 pre_qry_dims = self._dimensions_to_values(qry['dimensions'])
-                pre_qry['dimensions'] = list(set(pre_qry_dims))
+
+                # Can't use set on an array with dicts
+                # Use set with non-dict items only
+                non_dict_dims = list(
+                    set([x for x in pre_qry_dims if not isinstance(x, dict)]),
+                )
+                dict_dims = [x for x in pre_qry_dims if isinstance(x, dict)]
+                pre_qry['dimensions'] = non_dict_dims + dict_dims
 
                 order_by = metrics[0] if metrics else pre_qry_dims[0]
 
@@ -1339,8 +1366,31 @@ def increment_timestamp(ts):
             query=query_str,
             duration=datetime.now() - qry_start_dttm)
 
+    @staticmethod
+    def _create_extraction_fn(dim_spec):
+        extraction_fn = None
+        if dim_spec and 'extractionFn' in dim_spec:
+            col = dim_spec['dimension']
+            fn = dim_spec['extractionFn']
+            ext_type = fn.get('type')
+            if ext_type == 'lookup' and fn['lookup'].get('type') == 'map':
+                replace_missing_values = fn.get('replaceMissingValueWith')
+                retain_missing_values = fn.get('retainMissingValue', False)
+                injective = fn.get('isOneToOne', False)
+                extraction_fn = MapLookupExtraction(
+                    fn['lookup']['map'],
+                    replace_missing_values=replace_missing_values,
+                    retain_missing_values=retain_missing_values,
+                    injective=injective,
+                )
+            elif ext_type == 'regex':
+                extraction_fn = RegexExtraction(fn['expr'])
+            else:
+                raise Exception(_('Unsupported extraction function: ' + 
ext_type))
+        return (col, extraction_fn)
+
     @classmethod
-    def get_filters(cls, raw_filters, num_cols):  # noqa
+    def get_filters(cls, raw_filters, num_cols, columns_dict):  # noqa
         """Given Superset filter data structure, returns pydruid Filter(s)"""
         filters = None
         for flt in raw_filters:
@@ -1352,21 +1402,42 @@ def get_filters(cls, raw_filters, num_cols):  # noqa
                     not op or
                     (eq is None and op not in ('IS NULL', 'IS NOT NULL'))):
                 continue
+
+            # Check if this dimension uses an extraction function
+            # If so, create the appropriate pydruid extraction object
+            column_def = columns_dict.get(col)
+            dim_spec = column_def.dimension_spec if column_def else None
+            extraction_fn = None
+            if dim_spec and 'extractionFn' in dim_spec:
+                (col, extraction_fn) = 
DruidDatasource._create_extraction_fn(dim_spec)
+
             cond = None
             is_numeric_col = col in num_cols
             is_list_target = op in ('in', 'not in')
             eq = cls.filter_values_handler(
                 eq, is_list_target=is_list_target,
                 target_column_is_numeric=is_numeric_col)
+
+            # For these two ops, could have used Dimension,
+            # but it doesn't support extraction functions
             if op == '==':
-                cond = Dimension(col) == eq
+                cond = Filter(dimension=col, value=eq, 
extraction_function=extraction_fn)
             elif op == '!=':
-                cond = Dimension(col) != eq
+                cond = ~Filter(dimension=col, value=eq, 
extraction_function=extraction_fn)
             elif op in ('in', 'not in'):
                 fields = []
                 # ignore the filter if it has no value
                 if not len(eq):
                     continue
+                # if it uses an extraction fn, use the "in" operator
+                # as Dimension isn't supported
+                elif extraction_fn is not None:
+                    cond = Filter(
+                        dimension=col,
+                        values=eq,
+                        type='in',
+                        extraction_function=extraction_fn,
+                    )
                 elif len(eq) == 1:
                     cond = Dimension(col) == eq[0]
                 else:
@@ -1376,20 +1447,58 @@ def get_filters(cls, raw_filters, num_cols):  # noqa
                 if op == 'not in':
                     cond = ~cond
             elif op == 'regex':
-                cond = Filter(type='regex', pattern=eq, dimension=col)
+                cond = Filter(
+                    extraction_function=extraction_fn,
+                    type='regex',
+                    pattern=eq,
+                    dimension=col,
+                )
+
+            # For the ops below, could have used pydruid's Bound,
+            # but it doesn't support extraction functions
             elif op == '>=':
-                cond = Bound(col, eq, None, alphaNumeric=is_numeric_col)
+                cond = Filter(
+                    type='bound',
+                    extraction_function=extraction_fn,
+                    dimension=col,
+                    lowerStrict=False,
+                    upperStrict=False,
+                    lower=eq,
+                    upper=None,
+                    alphaNumeric=is_numeric_col,
+                )
             elif op == '<=':
-                cond = Bound(col, None, eq, alphaNumeric=is_numeric_col)
+                cond = Filter(
+                    type='bound',
+                    extraction_function=extraction_fn,
+                    dimension=col,
+                    lowerStrict=False,
+                    upperStrict=False,
+                    lower=None,
+                    upper=eq,
+                    alphaNumeric=is_numeric_col,
+                )
             elif op == '>':
-                cond = Bound(
-                    col, eq, None,
-                    lowerStrict=True, alphaNumeric=is_numeric_col,
+                cond = Filter(
+                    type='bound',
+                    extraction_function=extraction_fn,
+                    lowerStrict=True,
+                    upperStrict=False,
+                    dimension=col,
+                    lower=eq,
+                    upper=None,
+                    alphaNumeric=is_numeric_col,
                 )
             elif op == '<':
-                cond = Bound(
-                    col, None, eq,
-                    upperStrict=True, alphaNumeric=is_numeric_col,
+                cond = Filter(
+                    type='bound',
+                    extraction_function=extraction_fn,
+                    upperStrict=True,
+                    lowerStrict=False,
+                    dimension=col,
+                    lower=None,
+                    upper=eq,
+                    alphaNumeric=is_numeric_col,
                 )
             elif op == 'IS NULL':
                 cond = Dimension(col) == None  # NOQA
diff --git a/tests/druid_func_tests.py b/tests/druid_func_tests.py
index c367bd7ad7..64543a8edb 100644
--- a/tests/druid_func_tests.py
+++ b/tests/druid_func_tests.py
@@ -8,8 +8,10 @@
 import unittest
 
 from mock import Mock
+from pydruid.utils.dimensions import MapLookupExtraction, RegexExtraction
 import pydruid.utils.postaggregator as postaggs
 
+
 import superset.connectors.druid.models as models
 from superset.connectors.druid.models import (
     DruidColumn, DruidDatasource, DruidMetric,
@@ -31,14 +33,84 @@ def emplace(metrics_dict, metric_name, is_postagg=False):
 # Unit tests that can be run without initializing base tests
 class DruidFuncTestCase(unittest.TestCase):
 
+    def test_get_filters_extraction_fn_map(self):
+        filters = [{'col': 'deviceName', 'val': ['iPhone X'], 'op': 'in'}]
+        dimension_spec = {
+            'type': 'extraction',
+            'dimension': 'device',
+            'outputName': 'deviceName',
+            'outputType': 'STRING',
+            'extractionFn': {
+                'type': 'lookup',
+                'dimension': 'dimensionName',
+                'outputName': 'dimensionOutputName',
+                'replaceMissingValueWith': 'missing_value',
+                'retainMissingValue': False,
+                'lookup': {
+                    'type': 'map',
+                    'map': {
+                        'iPhone10,1': 'iPhone 8',
+                        'iPhone10,4': 'iPhone 8',
+                        'iPhone10,2': 'iPhone 8 Plus',
+                        'iPhone10,5': 'iPhone 8 Plus',
+                        'iPhone10,3': 'iPhone X',
+                        'iPhone10,6': 'iPhone X',
+                    },
+                    'isOneToOne': False,
+                },
+            },
+        }
+        spec_json = json.dumps(dimension_spec)
+        col = DruidColumn(column_name='deviceName', 
dimension_spec_json=spec_json)
+        column_dict = {'deviceName': col}
+        f = DruidDatasource.get_filters(filters, [], column_dict)
+        assert isinstance(f.extraction_function, MapLookupExtraction)
+        dim_ext_fn = dimension_spec['extractionFn']
+        f_ext_fn = f.extraction_function
+        self.assertEqual(dim_ext_fn['lookup']['map'], f_ext_fn._mapping)
+        self.assertEqual(dim_ext_fn['lookup']['isOneToOne'], 
f_ext_fn._injective)
+        self.assertEqual(
+            dim_ext_fn['replaceMissingValueWith'],
+            f_ext_fn._replace_missing_values,
+        )
+        self.assertEqual(
+            dim_ext_fn['retainMissingValue'],
+            f_ext_fn._retain_missing_values,
+        )
+
+    def test_get_filters_extraction_fn_regex(self):
+        filters = [{'col': 'buildPrefix', 'val': ['22B'], 'op': 'in'}]
+        dimension_spec = {
+            'type': 'extraction',
+            'dimension': 'build',
+            'outputName': 'buildPrefix',
+            'outputType': 'STRING',
+            'extractionFn': {
+                'type': 'regex',
+                'expr': '(^[0-9A-Za-z]{3})',
+            },
+        }
+        spec_json = json.dumps(dimension_spec)
+        col = DruidColumn(column_name='buildPrefix', 
dimension_spec_json=spec_json)
+        column_dict = {'buildPrefix': col}
+        f = DruidDatasource.get_filters(filters, [], column_dict)
+        assert isinstance(f.extraction_function, RegexExtraction)
+        dim_ext_fn = dimension_spec['extractionFn']
+        f_ext_fn = f.extraction_function
+        self.assertEqual(dim_ext_fn['expr'], f_ext_fn._expr)
+
     def test_get_filters_ignores_invalid_filter_objects(self):
         filtr = {'col': 'col1', 'op': '=='}
         filters = [filtr]
-        self.assertIsNone(DruidDatasource.get_filters(filters, []))
+        col = DruidColumn(column_name='col1')
+        column_dict = {'col1': col}
+        self.assertIsNone(DruidDatasource.get_filters(filters, [], 
column_dict))
 
     def test_get_filters_constructs_filter_in(self):
         filtr = {'col': 'A', 'op': 'in', 'val': ['a', 'b', 'c']}
-        res = DruidDatasource.get_filters([filtr], [])
+        col = DruidColumn(column_name='A')
+        column_dict = {'A': col}
+        res = DruidDatasource.get_filters([filtr], [], column_dict)
         self.assertIn('filter', res.filter)
         self.assertIn('fields', res.filter['filter'])
         self.assertEqual('or', res.filter['filter']['type'])
@@ -46,7 +118,9 @@ def test_get_filters_constructs_filter_in(self):
 
     def test_get_filters_constructs_filter_not_in(self):
         filtr = {'col': 'A', 'op': 'not in', 'val': ['a', 'b', 'c']}
-        res = DruidDatasource.get_filters([filtr], [])
+        col = DruidColumn(column_name='A')
+        column_dict = {'A': col}
+        res = DruidDatasource.get_filters([filtr], [], column_dict)
         self.assertIn('filter', res.filter)
         self.assertIn('type', res.filter['filter'])
         self.assertEqual('not', res.filter['filter']['type'])
@@ -58,14 +132,18 @@ def test_get_filters_constructs_filter_not_in(self):
 
     def test_get_filters_constructs_filter_equals(self):
         filtr = {'col': 'A', 'op': '==', 'val': 'h'}
-        res = DruidDatasource.get_filters([filtr], [])
+        col = DruidColumn(column_name='A')
+        column_dict = {'A': col}
+        res = DruidDatasource.get_filters([filtr], [], column_dict)
         self.assertEqual('selector', res.filter['filter']['type'])
         self.assertEqual('A', res.filter['filter']['dimension'])
         self.assertEqual('h', res.filter['filter']['value'])
 
     def test_get_filters_constructs_filter_not_equals(self):
         filtr = {'col': 'A', 'op': '!=', 'val': 'h'}
-        res = DruidDatasource.get_filters([filtr], [])
+        col = DruidColumn(column_name='A')
+        column_dict = {'A': col}
+        res = DruidDatasource.get_filters([filtr], [], column_dict)
         self.assertEqual('not', res.filter['filter']['type'])
         self.assertEqual(
             'h',
@@ -74,25 +152,29 @@ def test_get_filters_constructs_filter_not_equals(self):
 
     def test_get_filters_constructs_bounds_filter(self):
         filtr = {'col': 'A', 'op': '>=', 'val': 'h'}
-        res = DruidDatasource.get_filters([filtr], [])
+        col = DruidColumn(column_name='A')
+        column_dict = {'A': col}
+        res = DruidDatasource.get_filters([filtr], [], column_dict)
         self.assertFalse(res.filter['filter']['lowerStrict'])
         self.assertEqual('A', res.filter['filter']['dimension'])
         self.assertEqual('h', res.filter['filter']['lower'])
         self.assertFalse(res.filter['filter']['alphaNumeric'])
         filtr['op'] = '>'
-        res = DruidDatasource.get_filters([filtr], [])
+        res = DruidDatasource.get_filters([filtr], [], column_dict)
         self.assertTrue(res.filter['filter']['lowerStrict'])
         filtr['op'] = '<='
-        res = DruidDatasource.get_filters([filtr], [])
+        res = DruidDatasource.get_filters([filtr], [], column_dict)
         self.assertFalse(res.filter['filter']['upperStrict'])
         self.assertEqual('h', res.filter['filter']['upper'])
         filtr['op'] = '<'
-        res = DruidDatasource.get_filters([filtr], [])
+        res = DruidDatasource.get_filters([filtr], [], column_dict)
         self.assertTrue(res.filter['filter']['upperStrict'])
 
     def test_get_filters_constructs_regex_filter(self):
         filtr = {'col': 'A', 'op': 'regex', 'val': '[abc]'}
-        res = DruidDatasource.get_filters([filtr], [])
+        col = DruidColumn(column_name='A')
+        column_dict = {'A': col}
+        res = DruidDatasource.get_filters([filtr], [], column_dict)
         self.assertEqual('regex', res.filter['filter']['type'])
         self.assertEqual('[abc]', res.filter['filter']['pattern'])
         self.assertEqual('A', res.filter['filter']['dimension'])
@@ -100,46 +182,62 @@ def test_get_filters_constructs_regex_filter(self):
     def test_get_filters_composes_multiple_filters(self):
         filtr1 = {'col': 'A', 'op': '!=', 'val': 'y'}
         filtr2 = {'col': 'B', 'op': 'in', 'val': ['a', 'b', 'c']}
-        res = DruidDatasource.get_filters([filtr1, filtr2], [])
+        cola = DruidColumn(column_name='A')
+        colb = DruidColumn(column_name='B')
+        column_dict = {'A': cola, 'B': colb}
+        res = DruidDatasource.get_filters([filtr1, filtr2], [], column_dict)
         self.assertEqual('and', res.filter['filter']['type'])
         self.assertEqual(2, len(res.filter['filter']['fields']))
 
     def test_get_filters_ignores_in_not_in_with_empty_value(self):
         filtr1 = {'col': 'A', 'op': 'in', 'val': []}
         filtr2 = {'col': 'A', 'op': 'not in', 'val': []}
-        res = DruidDatasource.get_filters([filtr1, filtr2], [])
+        col = DruidColumn(column_name='A')
+        column_dict = {'A': col}
+        res = DruidDatasource.get_filters([filtr1, filtr2], [], column_dict)
         self.assertIsNone(res)
 
     def test_get_filters_constructs_equals_for_in_not_in_single_value(self):
         filtr = {'col': 'A', 'op': 'in', 'val': ['a']}
-        res = DruidDatasource.get_filters([filtr], [])
+        cola = DruidColumn(column_name='A')
+        colb = DruidColumn(column_name='B')
+        column_dict = {'A': cola, 'B': colb}
+        res = DruidDatasource.get_filters([filtr], [], column_dict)
         self.assertEqual('selector', res.filter['filter']['type'])
 
     def test_get_filters_handles_arrays_for_string_types(self):
         filtr = {'col': 'A', 'op': '==', 'val': ['a', 'b']}
-        res = DruidDatasource.get_filters([filtr], [])
+        col = DruidColumn(column_name='A')
+        column_dict = {'A': col}
+        res = DruidDatasource.get_filters([filtr], [], column_dict)
         self.assertEqual('a', res.filter['filter']['value'])
 
         filtr = {'col': 'A', 'op': '==', 'val': []}
-        res = DruidDatasource.get_filters([filtr], [])
+        res = DruidDatasource.get_filters([filtr], [], column_dict)
         self.assertIsNone(res.filter['filter']['value'])
 
     def test_get_filters_handles_none_for_string_types(self):
         filtr = {'col': 'A', 'op': '==', 'val': None}
-        res = DruidDatasource.get_filters([filtr], [])
+        col = DruidColumn(column_name='A')
+        column_dict = {'A': col}
+        res = DruidDatasource.get_filters([filtr], [], column_dict)
         self.assertIsNone(res)
 
     def test_get_filters_extracts_values_in_quotes(self):
         filtr = {'col': 'A', 'op': 'in', 'val': ['  "a" ']}
-        res = DruidDatasource.get_filters([filtr], [])
+        col = DruidColumn(column_name='A')
+        column_dict = {'A': col}
+        res = DruidDatasource.get_filters([filtr], [], column_dict)
         self.assertEqual('a', res.filter['filter']['value'])
 
     def test_get_filters_converts_strings_to_num(self):
         filtr = {'col': 'A', 'op': 'in', 'val': ['6']}
-        res = DruidDatasource.get_filters([filtr], ['A'])
+        col = DruidColumn(column_name='A')
+        column_dict = {'A': col}
+        res = DruidDatasource.get_filters([filtr], ['A'], column_dict)
         self.assertEqual(6, res.filter['filter']['value'])
         filtr = {'col': 'A', 'op': '==', 'val': '6'}
-        res = DruidDatasource.get_filters([filtr], ['A'])
+        res = DruidDatasource.get_filters([filtr], ['A'], column_dict)
         self.assertEqual(6, res.filter['filter']['value'])
 
     def test_run_query_no_groupby(self):


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to