mistercrunch closed pull request #4740: Add extraction function support for
Druid queries
URL: https://github.com/apache/incubator-superset/pull/4740
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/requirements.txt b/requirements.txt
index ea4f830cf9..da14026d6d 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -22,7 +22,7 @@ pandas==0.22.0
parsedatetime==2.0.0
pathlib2==2.3.0
polyline==1.3.2
-pydruid==0.4.1
+pydruid==0.4.2
pyhive==0.5.0
python-dateutil==2.6.1
python-geohash==0.8.5
diff --git a/superset/connectors/druid/models.py
b/superset/connectors/druid/models.py
index 26e3c721a0..f67b03b953 100644
--- a/superset/connectors/druid/models.py
+++ b/superset/connectors/druid/models.py
@@ -20,7 +20,8 @@
from flask_babel import lazy_gettext as _
from pydruid.client import PyDruid
from pydruid.utils.aggregators import count
-from pydruid.utils.filters import Bound, Dimension, Filter
+from pydruid.utils.dimensions import MapLookupExtraction, RegexExtraction
+from pydruid.utils.filters import Dimension, Filter
from pydruid.utils.having import Aggregation
from pydruid.utils.postaggregator import (
Const, Field, HyperUniqueCardinality, Postaggregator, Quantile, Quantiles,
@@ -958,8 +959,25 @@ def _add_filter_from_pre_query_data(self, df, dimensions,
dim_filter):
for unused, row in df.iterrows():
fields = []
for dim in dimensions:
- f = Dimension(dim) == row[dim]
- fields.append(f)
+ f = None
+ # Check if this dimension uses an extraction function
+ # If so, create the appropriate pydruid extraction object
+ if isinstance(dim, dict) and 'extractionFn' in dim:
+ (col, extraction_fn) =
DruidDatasource._create_extraction_fn(dim)
+ dim_val = dim['outputName']
+ f = Filter(
+ dimension=col,
+ value=row[dim_val],
+ extraction_function=extraction_fn,
+ )
+ elif isinstance(dim, dict):
+ dim_val = dim['outputName']
+ if dim_val:
+ f = Dimension(dim_val) == row[dim_val]
+ else:
+ f = Dimension(dim) == row[dim]
+ if f:
+ fields.append(f)
if len(fields) > 1:
term = Filter(type='and', fields=fields)
new_filters.append(term)
@@ -1063,7 +1081,9 @@ def _dimensions_to_values(dimensions):
values = []
for dimension in dimensions:
if isinstance(dimension, dict):
- if 'dimension' in dimension:
+ if 'extractionFn' in dimension:
+ values.append(dimension)
+ elif 'dimension' in dimension:
values.append(dimension['dimension'])
else:
values.append(dimension)
@@ -1130,7 +1150,7 @@ def run_query( # noqa / druid
intervals=self.intervals_from_dttms(from_dttm, to_dttm),
)
- filters = DruidDatasource.get_filters(filter, self.num_cols)
+ filters = DruidDatasource.get_filters(filter, self.num_cols,
columns_dict)
if filters:
qry['filter'] = filters
@@ -1215,7 +1235,14 @@ def run_query( # noqa / druid
pre_qry = deepcopy(qry)
pre_qry_dims = self._dimensions_to_values(qry['dimensions'])
- pre_qry['dimensions'] = list(set(pre_qry_dims))
+
+ # Can't use set on an array with dicts
+ # Use set with non-dict items only
+ non_dict_dims = list(
+ set([x for x in pre_qry_dims if not isinstance(x, dict)]),
+ )
+ dict_dims = [x for x in pre_qry_dims if isinstance(x, dict)]
+ pre_qry['dimensions'] = non_dict_dims + dict_dims
order_by = metrics[0] if metrics else pre_qry_dims[0]
@@ -1339,8 +1366,31 @@ def increment_timestamp(ts):
query=query_str,
duration=datetime.now() - qry_start_dttm)
+ @staticmethod
+ def _create_extraction_fn(dim_spec):
+ extraction_fn = None
+ if dim_spec and 'extractionFn' in dim_spec:
+ col = dim_spec['dimension']
+ fn = dim_spec['extractionFn']
+ ext_type = fn.get('type')
+ if ext_type == 'lookup' and fn['lookup'].get('type') == 'map':
+ replace_missing_values = fn.get('replaceMissingValueWith')
+ retain_missing_values = fn.get('retainMissingValue', False)
+ injective = fn.get('isOneToOne', False)
+ extraction_fn = MapLookupExtraction(
+ fn['lookup']['map'],
+ replace_missing_values=replace_missing_values,
+ retain_missing_values=retain_missing_values,
+ injective=injective,
+ )
+ elif ext_type == 'regex':
+ extraction_fn = RegexExtraction(fn['expr'])
+ else:
+ raise Exception(_('Unsupported extraction function: ' +
ext_type))
+ return (col, extraction_fn)
+
@classmethod
- def get_filters(cls, raw_filters, num_cols): # noqa
+ def get_filters(cls, raw_filters, num_cols, columns_dict): # noqa
"""Given Superset filter data structure, returns pydruid Filter(s)"""
filters = None
for flt in raw_filters:
@@ -1352,21 +1402,42 @@ def get_filters(cls, raw_filters, num_cols): # noqa
not op or
(eq is None and op not in ('IS NULL', 'IS NOT NULL'))):
continue
+
+ # Check if this dimension uses an extraction function
+ # If so, create the appropriate pydruid extraction object
+ column_def = columns_dict.get(col)
+ dim_spec = column_def.dimension_spec if column_def else None
+ extraction_fn = None
+ if dim_spec and 'extractionFn' in dim_spec:
+ (col, extraction_fn) =
DruidDatasource._create_extraction_fn(dim_spec)
+
cond = None
is_numeric_col = col in num_cols
is_list_target = op in ('in', 'not in')
eq = cls.filter_values_handler(
eq, is_list_target=is_list_target,
target_column_is_numeric=is_numeric_col)
+
+ # For these two ops, could have used Dimension,
+ # but it doesn't support extraction functions
if op == '==':
- cond = Dimension(col) == eq
+ cond = Filter(dimension=col, value=eq,
extraction_function=extraction_fn)
elif op == '!=':
- cond = Dimension(col) != eq
+ cond = ~Filter(dimension=col, value=eq,
extraction_function=extraction_fn)
elif op in ('in', 'not in'):
fields = []
# ignore the filter if it has no value
if not len(eq):
continue
+ # if it uses an extraction fn, use the "in" operator
+ # as Dimension isn't supported
+ elif extraction_fn is not None:
+ cond = Filter(
+ dimension=col,
+ values=eq,
+ type='in',
+ extraction_function=extraction_fn,
+ )
elif len(eq) == 1:
cond = Dimension(col) == eq[0]
else:
@@ -1376,20 +1447,58 @@ def get_filters(cls, raw_filters, num_cols): # noqa
if op == 'not in':
cond = ~cond
elif op == 'regex':
- cond = Filter(type='regex', pattern=eq, dimension=col)
+ cond = Filter(
+ extraction_function=extraction_fn,
+ type='regex',
+ pattern=eq,
+ dimension=col,
+ )
+
+ # For the ops below, could have used pydruid's Bound,
+ # but it doesn't support extraction functions
elif op == '>=':
- cond = Bound(col, eq, None, alphaNumeric=is_numeric_col)
+ cond = Filter(
+ type='bound',
+ extraction_function=extraction_fn,
+ dimension=col,
+ lowerStrict=False,
+ upperStrict=False,
+ lower=eq,
+ upper=None,
+ alphaNumeric=is_numeric_col,
+ )
elif op == '<=':
- cond = Bound(col, None, eq, alphaNumeric=is_numeric_col)
+ cond = Filter(
+ type='bound',
+ extraction_function=extraction_fn,
+ dimension=col,
+ lowerStrict=False,
+ upperStrict=False,
+ lower=None,
+ upper=eq,
+ alphaNumeric=is_numeric_col,
+ )
elif op == '>':
- cond = Bound(
- col, eq, None,
- lowerStrict=True, alphaNumeric=is_numeric_col,
+ cond = Filter(
+ type='bound',
+ extraction_function=extraction_fn,
+ lowerStrict=True,
+ upperStrict=False,
+ dimension=col,
+ lower=eq,
+ upper=None,
+ alphaNumeric=is_numeric_col,
)
elif op == '<':
- cond = Bound(
- col, None, eq,
- upperStrict=True, alphaNumeric=is_numeric_col,
+ cond = Filter(
+ type='bound',
+ extraction_function=extraction_fn,
+ upperStrict=True,
+ lowerStrict=False,
+ dimension=col,
+ lower=None,
+ upper=eq,
+ alphaNumeric=is_numeric_col,
)
elif op == 'IS NULL':
cond = Dimension(col) == None # NOQA
diff --git a/tests/druid_func_tests.py b/tests/druid_func_tests.py
index c367bd7ad7..64543a8edb 100644
--- a/tests/druid_func_tests.py
+++ b/tests/druid_func_tests.py
@@ -8,8 +8,10 @@
import unittest
from mock import Mock
+from pydruid.utils.dimensions import MapLookupExtraction, RegexExtraction
import pydruid.utils.postaggregator as postaggs
+
import superset.connectors.druid.models as models
from superset.connectors.druid.models import (
DruidColumn, DruidDatasource, DruidMetric,
@@ -31,14 +33,84 @@ def emplace(metrics_dict, metric_name, is_postagg=False):
# Unit tests that can be run without initializing base tests
class DruidFuncTestCase(unittest.TestCase):
+ def test_get_filters_extraction_fn_map(self):
+ filters = [{'col': 'deviceName', 'val': ['iPhone X'], 'op': 'in'}]
+ dimension_spec = {
+ 'type': 'extraction',
+ 'dimension': 'device',
+ 'outputName': 'deviceName',
+ 'outputType': 'STRING',
+ 'extractionFn': {
+ 'type': 'lookup',
+ 'dimension': 'dimensionName',
+ 'outputName': 'dimensionOutputName',
+ 'replaceMissingValueWith': 'missing_value',
+ 'retainMissingValue': False,
+ 'lookup': {
+ 'type': 'map',
+ 'map': {
+ 'iPhone10,1': 'iPhone 8',
+ 'iPhone10,4': 'iPhone 8',
+ 'iPhone10,2': 'iPhone 8 Plus',
+ 'iPhone10,5': 'iPhone 8 Plus',
+ 'iPhone10,3': 'iPhone X',
+ 'iPhone10,6': 'iPhone X',
+ },
+ 'isOneToOne': False,
+ },
+ },
+ }
+ spec_json = json.dumps(dimension_spec)
+ col = DruidColumn(column_name='deviceName',
dimension_spec_json=spec_json)
+ column_dict = {'deviceName': col}
+ f = DruidDatasource.get_filters(filters, [], column_dict)
+ assert isinstance(f.extraction_function, MapLookupExtraction)
+ dim_ext_fn = dimension_spec['extractionFn']
+ f_ext_fn = f.extraction_function
+ self.assertEqual(dim_ext_fn['lookup']['map'], f_ext_fn._mapping)
+ self.assertEqual(dim_ext_fn['lookup']['isOneToOne'],
f_ext_fn._injective)
+ self.assertEqual(
+ dim_ext_fn['replaceMissingValueWith'],
+ f_ext_fn._replace_missing_values,
+ )
+ self.assertEqual(
+ dim_ext_fn['retainMissingValue'],
+ f_ext_fn._retain_missing_values,
+ )
+
+ def test_get_filters_extraction_fn_regex(self):
+ filters = [{'col': 'buildPrefix', 'val': ['22B'], 'op': 'in'}]
+ dimension_spec = {
+ 'type': 'extraction',
+ 'dimension': 'build',
+ 'outputName': 'buildPrefix',
+ 'outputType': 'STRING',
+ 'extractionFn': {
+ 'type': 'regex',
+ 'expr': '(^[0-9A-Za-z]{3})',
+ },
+ }
+ spec_json = json.dumps(dimension_spec)
+ col = DruidColumn(column_name='buildPrefix',
dimension_spec_json=spec_json)
+ column_dict = {'buildPrefix': col}
+ f = DruidDatasource.get_filters(filters, [], column_dict)
+ assert isinstance(f.extraction_function, RegexExtraction)
+ dim_ext_fn = dimension_spec['extractionFn']
+ f_ext_fn = f.extraction_function
+ self.assertEqual(dim_ext_fn['expr'], f_ext_fn._expr)
+
def test_get_filters_ignores_invalid_filter_objects(self):
filtr = {'col': 'col1', 'op': '=='}
filters = [filtr]
- self.assertIsNone(DruidDatasource.get_filters(filters, []))
+ col = DruidColumn(column_name='col1')
+ column_dict = {'col1': col}
+ self.assertIsNone(DruidDatasource.get_filters(filters, [],
column_dict))
def test_get_filters_constructs_filter_in(self):
filtr = {'col': 'A', 'op': 'in', 'val': ['a', 'b', 'c']}
- res = DruidDatasource.get_filters([filtr], [])
+ col = DruidColumn(column_name='A')
+ column_dict = {'A': col}
+ res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertIn('filter', res.filter)
self.assertIn('fields', res.filter['filter'])
self.assertEqual('or', res.filter['filter']['type'])
@@ -46,7 +118,9 @@ def test_get_filters_constructs_filter_in(self):
def test_get_filters_constructs_filter_not_in(self):
filtr = {'col': 'A', 'op': 'not in', 'val': ['a', 'b', 'c']}
- res = DruidDatasource.get_filters([filtr], [])
+ col = DruidColumn(column_name='A')
+ column_dict = {'A': col}
+ res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertIn('filter', res.filter)
self.assertIn('type', res.filter['filter'])
self.assertEqual('not', res.filter['filter']['type'])
@@ -58,14 +132,18 @@ def test_get_filters_constructs_filter_not_in(self):
def test_get_filters_constructs_filter_equals(self):
filtr = {'col': 'A', 'op': '==', 'val': 'h'}
- res = DruidDatasource.get_filters([filtr], [])
+ col = DruidColumn(column_name='A')
+ column_dict = {'A': col}
+ res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertEqual('selector', res.filter['filter']['type'])
self.assertEqual('A', res.filter['filter']['dimension'])
self.assertEqual('h', res.filter['filter']['value'])
def test_get_filters_constructs_filter_not_equals(self):
filtr = {'col': 'A', 'op': '!=', 'val': 'h'}
- res = DruidDatasource.get_filters([filtr], [])
+ col = DruidColumn(column_name='A')
+ column_dict = {'A': col}
+ res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertEqual('not', res.filter['filter']['type'])
self.assertEqual(
'h',
@@ -74,25 +152,29 @@ def test_get_filters_constructs_filter_not_equals(self):
def test_get_filters_constructs_bounds_filter(self):
filtr = {'col': 'A', 'op': '>=', 'val': 'h'}
- res = DruidDatasource.get_filters([filtr], [])
+ col = DruidColumn(column_name='A')
+ column_dict = {'A': col}
+ res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertFalse(res.filter['filter']['lowerStrict'])
self.assertEqual('A', res.filter['filter']['dimension'])
self.assertEqual('h', res.filter['filter']['lower'])
self.assertFalse(res.filter['filter']['alphaNumeric'])
filtr['op'] = '>'
- res = DruidDatasource.get_filters([filtr], [])
+ res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertTrue(res.filter['filter']['lowerStrict'])
filtr['op'] = '<='
- res = DruidDatasource.get_filters([filtr], [])
+ res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertFalse(res.filter['filter']['upperStrict'])
self.assertEqual('h', res.filter['filter']['upper'])
filtr['op'] = '<'
- res = DruidDatasource.get_filters([filtr], [])
+ res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertTrue(res.filter['filter']['upperStrict'])
def test_get_filters_constructs_regex_filter(self):
filtr = {'col': 'A', 'op': 'regex', 'val': '[abc]'}
- res = DruidDatasource.get_filters([filtr], [])
+ col = DruidColumn(column_name='A')
+ column_dict = {'A': col}
+ res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertEqual('regex', res.filter['filter']['type'])
self.assertEqual('[abc]', res.filter['filter']['pattern'])
self.assertEqual('A', res.filter['filter']['dimension'])
@@ -100,46 +182,62 @@ def test_get_filters_constructs_regex_filter(self):
def test_get_filters_composes_multiple_filters(self):
filtr1 = {'col': 'A', 'op': '!=', 'val': 'y'}
filtr2 = {'col': 'B', 'op': 'in', 'val': ['a', 'b', 'c']}
- res = DruidDatasource.get_filters([filtr1, filtr2], [])
+ cola = DruidColumn(column_name='A')
+ colb = DruidColumn(column_name='B')
+ column_dict = {'A': cola, 'B': colb}
+ res = DruidDatasource.get_filters([filtr1, filtr2], [], column_dict)
self.assertEqual('and', res.filter['filter']['type'])
self.assertEqual(2, len(res.filter['filter']['fields']))
def test_get_filters_ignores_in_not_in_with_empty_value(self):
filtr1 = {'col': 'A', 'op': 'in', 'val': []}
filtr2 = {'col': 'A', 'op': 'not in', 'val': []}
- res = DruidDatasource.get_filters([filtr1, filtr2], [])
+ col = DruidColumn(column_name='A')
+ column_dict = {'A': col}
+ res = DruidDatasource.get_filters([filtr1, filtr2], [], column_dict)
self.assertIsNone(res)
def test_get_filters_constructs_equals_for_in_not_in_single_value(self):
filtr = {'col': 'A', 'op': 'in', 'val': ['a']}
- res = DruidDatasource.get_filters([filtr], [])
+ cola = DruidColumn(column_name='A')
+ colb = DruidColumn(column_name='B')
+ column_dict = {'A': cola, 'B': colb}
+ res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertEqual('selector', res.filter['filter']['type'])
def test_get_filters_handles_arrays_for_string_types(self):
filtr = {'col': 'A', 'op': '==', 'val': ['a', 'b']}
- res = DruidDatasource.get_filters([filtr], [])
+ col = DruidColumn(column_name='A')
+ column_dict = {'A': col}
+ res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertEqual('a', res.filter['filter']['value'])
filtr = {'col': 'A', 'op': '==', 'val': []}
- res = DruidDatasource.get_filters([filtr], [])
+ res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertIsNone(res.filter['filter']['value'])
def test_get_filters_handles_none_for_string_types(self):
filtr = {'col': 'A', 'op': '==', 'val': None}
- res = DruidDatasource.get_filters([filtr], [])
+ col = DruidColumn(column_name='A')
+ column_dict = {'A': col}
+ res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertIsNone(res)
def test_get_filters_extracts_values_in_quotes(self):
filtr = {'col': 'A', 'op': 'in', 'val': [' "a" ']}
- res = DruidDatasource.get_filters([filtr], [])
+ col = DruidColumn(column_name='A')
+ column_dict = {'A': col}
+ res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertEqual('a', res.filter['filter']['value'])
def test_get_filters_converts_strings_to_num(self):
filtr = {'col': 'A', 'op': 'in', 'val': ['6']}
- res = DruidDatasource.get_filters([filtr], ['A'])
+ col = DruidColumn(column_name='A')
+ column_dict = {'A': col}
+ res = DruidDatasource.get_filters([filtr], ['A'], column_dict)
self.assertEqual(6, res.filter['filter']['value'])
filtr = {'col': 'A', 'op': '==', 'val': '6'}
- res = DruidDatasource.get_filters([filtr], ['A'])
+ res = DruidDatasource.get_filters([filtr], ['A'], column_dict)
self.assertEqual(6, res.filter['filter']['value'])
def test_run_query_no_groupby(self):
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]