KYLIN-2088 Support intersect count for calculation of retention or conversion rates
Signed-off-by: Yang Li <liy...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b4c970ad Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b4c970ad Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b4c970ad Branch: refs/heads/master-cdh5.7 Commit: b4c970adf18362daade77e936693dac08c0639e1 Parents: 61a08d4 Author: sunyerui <sunye...@gmail.com> Authored: Wed Oct 12 20:59:54 2016 +0800 Committer: Yang Li <liy...@apache.org> Committed: Sun Oct 16 08:10:05 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 4 +- .../org/apache/kylin/measure/MeasureType.java | 5 ++ .../kylin/measure/bitmap/BitmapCounter.java | 32 +++++++ .../BitmapIntersectDistinctCountAggFunc.java | 94 ++++++++++++++++++++ .../kylin/measure/bitmap/BitmapMeasureType.java | 9 ++ .../apache/kylin/query/ITKylinQueryTest.java | 6 ++ .../query/sql_intersect_count/query00.sql | 32 +++++++ .../kylin/query/relnode/OLAPAggregateRel.java | 16 +++- 8 files changed, 195 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 99c3c5a..7dacd06 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -765,7 +765,9 @@ abstract public class KylinConfigBase implements Serializable { } public Map<String, String> getUDFs() { - return getPropertiesByPrefix("kylin.query.udf."); + Map<String, String> udfMap = getPropertiesByPrefix("kylin.query.udf."); + udfMap.put("intersect_count", "org.apache.kylin.measure.bitmap.BitmapIntersectDistinctCountAggFunc"); + return udfMap; } public int getHBaseMaxConnectionThreads() { http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java index 82618e9..e7312f2 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java @@ -115,6 +115,11 @@ abstract public class MeasureType<T> { /** Returns a Calcite aggregation function implementation class */ abstract public Class<?> getRewriteCalciteAggrFunctionClass(); + /** Some measure may return different class depends on call name, eg. BitmapMeasureType */ + public Class<?> getRewriteCalciteAggrFunctionClass(String callName) { + return getRewriteCalciteAggrFunctionClass(); + } + /* ============================================================================ * Storage * ---------------------------------------------------------------------------- */ http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java index d3b57a7..827390d 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java @@ -47,6 +47,12 @@ public class BitmapCounter implements Comparable<BitmapCounter> { bitmap.clear(); } + public BitmapCounter clone() { + BitmapCounter newCounter = new BitmapCounter(); + newCounter.bitmap = bitmap.clone(); + return newCounter; + } + public void add(int value) { bitmap.add(value); } @@ -74,6 +80,10 @@ public class BitmapCounter implements Comparable<BitmapCounter> { this.bitmap.or(another.bitmap); } + public void intersect(BitmapCounter another) { + this.bitmap.and(another.bitmap); + } + public long getCount() { return this.bitmap.getCardinality(); } @@ -107,6 +117,28 @@ public class BitmapCounter implements Comparable<BitmapCounter> { } @Override + public String toString() { + long count = getCount(); + if (count <= 10) { + return "(" + count + ")" + bitmap.toString(); + } else { + StringBuilder sb = new StringBuilder(); + sb.append("(").append(count).append("){"); + int values = 0; + for (Integer v : bitmap) { + if (values++ < 10) { + sb.append(v).append(","); + } else { + sb.append("..."); + break; + } + } + sb.append("}"); + return sb.toString(); + } + } + + @Override public int hashCode() { final int prime = 31; int result = 1; http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java new file mode 100644 index 0000000..cf42d1b --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.measure.bitmap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * BitmapIntersectDistinctCountAggFunc is an UDAF used for calculating the intersection of two or more bitmaps + * Usage: intersect_count(columnToCount, columnToFilter, filterList) + * Example: intersect_count(uuid, event, array['A', 'B', 'C']), meaning find the count of uuid in all A/B/C 3 bitmaps + * requires an bitmap count distinct measure of uuid, and an dimension of event + */ +public class BitmapIntersectDistinctCountAggFunc { + private static final Logger logger = LoggerFactory.getLogger(BitmapIntersectDistinctCountAggFunc.class); + + public static class RetentionPartialResult { + Map<Object, BitmapCounter> map; + List keyList; + + public RetentionPartialResult() { + map = new LinkedHashMap<>(); + } + + public void add(Object key, List keyList, Object value) { + if (this.keyList == null) { + this.keyList = keyList; + } + BitmapCounter counter = map.get(key); + if (counter == null) { + counter = new BitmapCounter(); + map.put(key, counter); + } + counter.merge((BitmapCounter)value); + } + + public long result() { + if (keyList == null || keyList.isEmpty()) { + return 0; + } + BitmapCounter counter = null; + for (Object key : keyList) { + BitmapCounter c = map.get(key); + if (c == null) { + // We have a key in filter list but not in map, meaning there's no intersect data + return 0; + } else { + if (counter == null) { + counter = c.clone(); + } + counter.intersect(c); + } + } + return counter.getCount(); + } + } + + public static RetentionPartialResult init() { + return new RetentionPartialResult(); + } + + public static RetentionPartialResult add(RetentionPartialResult result, Object value, Object key, List keyList) { + result.add(key, keyList, value); + return result; + } + + public static RetentionPartialResult merge(RetentionPartialResult result, Object value, Object key, List keyList) { + return add(result, value, key, keyList); + } + + public static long result(RetentionPartialResult result) { + return result.result(); + } +} + http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java index be96eb5..2b88e21 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java @@ -39,6 +39,7 @@ import org.apache.kylin.metadata.model.TblColRef; */ public class BitmapMeasureType extends MeasureType<BitmapCounter> { public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT"; + public static final String FUNC_INTERSECT_COUNT_DISTINCT = "INTERSECT_COUNT"; public static final String DATATYPE_BITMAP = "bitmap"; public static class Factory extends MeasureTypeFactory<BitmapCounter> { @@ -160,6 +161,14 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> { return BitmapDistinctCountAggFunc.class; } + @Override + public Class<?> getRewriteCalciteAggrFunctionClass(String callName) { + if (callName != null && callName.equalsIgnoreCase(FUNC_INTERSECT_COUNT_DISTINCT)) { + return BitmapIntersectDistinctCountAggFunc.class; + } + return BitmapDistinctCountAggFunc.class; + } + // In order to keep compatibility with old version, tinyint/smallint/int column use value directly, without dictionary private boolean needDictionaryColumn(FunctionDesc functionDesc) { DataType dataType = functionDesc.getParameter().getColRefs().get(0).getType(); http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java index 59a3a04..a0706ca 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java @@ -355,4 +355,10 @@ public class ITKylinQueryTest extends KylinTestBase { this.batchExecuteQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_window"); } + @Test + public void testIntersectCountQuery() throws Exception { + // cannot compare coz H2 does not support intersect count yet.. + this.batchExecuteQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_intersect_count"); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/kylin-it/src/test/resources/query/sql_intersect_count/query00.sql ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/resources/query/sql_intersect_count/query00.sql b/kylin-it/src/test/resources/query/sql_intersect_count/query00.sql new file mode 100644 index 0000000..15e274a --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_intersect_count/query00.sql @@ -0,0 +1,32 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +select +week_beg_dt as week, +intersect_count( seller_id, lstg_format_name, array['FP-GTC']) as a, +intersect_count( seller_id, lstg_format_name, array['Auction']) as b, +intersect_count( seller_id, lstg_format_name, array['Others']) as c, +intersect_count( seller_id, lstg_format_name, array['FP-GTC', 'Auction']) as ab, +intersect_count( seller_id, lstg_format_name, array['FP-GTC', 'Others']) as ac, +intersect_count( seller_id, lstg_format_name, array['FP-GTC', 'Auction', 'Others']) as abc, +count(distinct seller_id) as sellers, +count(*) as cnt +from test_kylin_fact left join edw.test_cal_dt on test_kylin_fact.cal_dt = edw.test_cal_dt.CAL_DT +where week_beg_dt in (DATE '2013-12-22', DATE '2012-06-23') +group by week_beg_dt + http://git-wip-us.apache.org/repos/asf/kylin/blob/b4c970ad/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java index 97efb27..8ecb808 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java @@ -56,6 +56,7 @@ import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.Util; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.measure.bitmap.BitmapMeasureType; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; @@ -79,6 +80,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { AGGR_FUNC_MAP.put("$SUM0", "SUM"); AGGR_FUNC_MAP.put("COUNT", "COUNT"); AGGR_FUNC_MAP.put("COUNT_DISTINCT", "COUNT_DISTINCT"); + AGGR_FUNC_MAP.put(BitmapMeasureType.FUNC_INTERSECT_COUNT_DISTINCT, "COUNT_DISTINCT"); AGGR_FUNC_MAP.put("MAX", "MAX"); AGGR_FUNC_MAP.put("MIN", "MIN"); @@ -224,6 +226,15 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { Set<TblColRef> columns = inputColumnRowType.getSourceColumnsByIndex(i); this.groups.addAll(columns); } + // Some UDAF may group data by itself, add group key into groups, prevents aggregate at cube storage server side + for (AggregateCall aggCall : this.rewriteAggCalls) { + String aggregateName = aggCall.getAggregation().getName(); + if (aggregateName.equalsIgnoreCase(BitmapMeasureType.FUNC_INTERSECT_COUNT_DISTINCT)) { + int index = aggCall.getArgList().get(1); + TblColRef column = inputColumnRowType.getColumnByIndex(index); + groups.add(column); + } + } } private void buildAggregations() { @@ -380,16 +391,17 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { } // rebuild function + String callName = aggCall.getAggregation().getName(); RelDataType fieldType = aggCall.getType(); SqlAggFunction newAgg = aggCall.getAggregation(); if (func.isCount()) { newAgg = SqlStdOperatorTable.SUM0; } else if (func.getMeasureType().getRewriteCalciteAggrFunctionClass() != null) { - newAgg = createCustomAggFunction(func.getExpression(), fieldType, func.getMeasureType().getRewriteCalciteAggrFunctionClass()); + newAgg = createCustomAggFunction(callName, fieldType, func.getMeasureType().getRewriteCalciteAggrFunctionClass(callName)); } // rebuild aggregate call - AggregateCall newAggCall = new AggregateCall(newAgg, false, newArgList, fieldType, newAgg.getName()); + AggregateCall newAggCall = new AggregateCall(newAgg, false, newArgList, fieldType, callName); return newAggCall; }