KYLIN-2088 Refactor MeasureType to allow mutliple UDAF defined on a measure type
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0018a212 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0018a212 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0018a212 Branch: refs/heads/master-hbase1.x Commit: 0018a212470bfe1e8edd3603d383da80f2ffd322 Parents: b4c970a Author: Yang Li <liy...@apache.org> Authored: Sun Oct 16 23:44:49 2016 +0800 Committer: Yang Li <liy...@apache.org> Committed: Sun Oct 16 23:44:49 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 1 - .../org/apache/kylin/measure/MeasureType.java | 16 +++-- .../kylin/measure/MeasureTypeFactory.java | 46 +++++++++++--- .../kylin/measure/basic/BasicMeasureType.java | 6 -- .../kylin/measure/bitmap/BitmapMeasureType.java | 43 ++++++++----- .../dim/DimCountDistinctMeasureType.java | 10 ++- .../ExtendedColumnMeasureType.java | 6 +- .../kylin/measure/hllc/HLLCMeasureType.java | 10 ++- .../kylin/measure/raw/RawMeasureType.java | 6 +- .../kylin/measure/topn/TopNMeasureType.java | 5 -- .../kylin/metadata/realization/SQLDigest.java | 43 ++++++++----- .../kylin/engine/mr/steps/CubeReducerTest.java | 5 +- .../kylin/storage/hbase/ITStorageTest.java | 3 +- .../kylin/query/relnode/ColumnRowType.java | 7 +++ .../kylin/query/relnode/OLAPAggregateRel.java | 66 +++++++++++--------- .../apache/kylin/query/relnode/OLAPContext.java | 15 +++-- .../kylin/query/schema/OLAPSchemaFactory.java | 13 +++- .../cube/MeasureTypeOnlyAggrInBaseTest.java | 5 -- 18 files changed, 187 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 7dacd06..4942081 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -766,7 +766,6 @@ abstract public class KylinConfigBase implements Serializable { public Map<String, String> getUDFs() { Map<String, String> udfMap = getPropertiesByPrefix("kylin.query.udf."); - udfMap.put("intersect_count", "org.apache.kylin.measure.bitmap.BitmapIntersectDistinctCountAggFunc"); return udfMap; } http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java index e7312f2..de1b442 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java @@ -102,22 +102,20 @@ abstract public class MeasureType<T> { * Query Rewrite * ---------------------------------------------------------------------------- */ - // TODO support user defined Calcite aggr function - /** Whether or not Calcite rel-tree needs rewrite to do last around of aggregation */ abstract public boolean needRewrite(); - /** Does the rewrite involves an extra field for the pre-calculated */ + /** Does the rewrite involves an extra field to hold the pre-calculated */ public boolean needRewriteField() { return true; } - /** Returns a Calcite aggregation function implementation class */ - abstract public Class<?> getRewriteCalciteAggrFunctionClass(); - - /** Some measure may return different class depends on call name, eg. BitmapMeasureType */ - public Class<?> getRewriteCalciteAggrFunctionClass(String callName) { - return getRewriteCalciteAggrFunctionClass(); + /** + * Returns a map from UDAF to Calcite aggregation function implementation class. + * There can be zero or more UDAF defined on a measure type. + */ + public Map<String, Class<?>> getRewriteCalciteAggrFunctions() { + return null; } /* ============================================================================ http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java index 17d841a..c5bd482 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java @@ -32,6 +32,7 @@ import org.apache.kylin.measure.raw.RawMeasureType; import org.apache.kylin.measure.topn.TopNMeasureType; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.datatype.DataTypeSerializer; +import org.apache.kylin.metadata.model.FunctionDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,8 +87,10 @@ abstract public class MeasureTypeFactory<T> { // ============================================================================ - private static Map<String, List<MeasureTypeFactory<?>>> factories = Maps.newHashMap(); - private static List<MeasureTypeFactory<?>> defaultFactory = Lists.newArrayListWithCapacity(2); + final private static Map<String, List<MeasureTypeFactory<?>>> factories = Maps.newHashMap(); + final private static Map<String, Class<?>> udafMap = Maps.newHashMap(); // a map from UDAF to Calcite aggregation function implementation class + final private static Map<String, MeasureTypeFactory> udafFactories = Maps.newHashMap(); // a map from UDAF to its owner factory + final private static List<MeasureTypeFactory<?>> defaultFactory = Lists.newArrayListWithCapacity(2); static { init(); @@ -110,7 +113,8 @@ abstract public class MeasureTypeFactory<T> { logger.info("Checking custom measure types from kylin config"); try { - for (String customFactory : KylinConfig.getInstanceFromEnv().getCubeCustomMeasureTypes().values()) { + Map<String, String> customMeasureTypes = KylinConfig.getInstanceFromEnv().getCubeCustomMeasureTypes(); + for (String customFactory : customMeasureTypes.values()) { try { logger.info("Checking custom measure types from kylin config: " + customFactory); factoryInsts.add((MeasureTypeFactory<?>) Class.forName(customFactory).newInstance()); @@ -132,9 +136,10 @@ abstract public class MeasureTypeFactory<T> { throw new IllegalArgumentException("Aggregation data type name '" + dataTypeName + "' must be in lower case"); Class<? extends DataTypeSerializer<?>> serializer = factory.getAggrDataTypeSerializer(); - logger.info("registering " + dataTypeName); + logger.info("registering " + funcName + "(" + dataTypeName + "), " + factory.getClass()); DataType.register(dataTypeName); DataTypeSerializer.register(dataTypeName, serializer); + registerUDAF(factory); List<MeasureTypeFactory<?>> list = factories.get(funcName); if (list == null) factories.put(funcName, list = Lists.newArrayListWithCapacity(2)); @@ -144,13 +149,40 @@ abstract public class MeasureTypeFactory<T> { defaultFactory.add(new BasicMeasureType.Factory()); } + private static void registerUDAF(MeasureTypeFactory<?> factory) { + MeasureType<?> type = factory.createMeasureType(factory.getAggrFunctionName(), DataType.getType(factory.getAggrDataTypeName())); + Map<String, Class<?>> udafs = type.getRewriteCalciteAggrFunctions(); + if (type.needRewrite() == false || udafs == null) + return; + + for (String udaf : udafs.keySet()) { + udaf = udaf.toUpperCase(); + if (udaf.equals(FunctionDesc.FUNC_COUNT_DISTINCT)) + continue; // skip built-in function + + if (udafFactories.containsKey(udaf)) + throw new IllegalStateException("UDAF '" + udaf + "' was dup declared by " + udafFactories.get(udaf) + " and " + factory); + + udafFactories.put(udaf, factory); + udafMap.put(udaf, udafs.get(udaf)); + } + } + + public static Map<String, Class<?>> getUDAFs() { + return udafMap; + } + + public static Map<String, MeasureTypeFactory> getUDAFFactories() { + return udafFactories; + } + public static MeasureType<?> create(String funcName, String dataType) { return create(funcName, DataType.getType(dataType)); } public static MeasureType<?> createNoRewriteFieldsMeasureType(String funcName, DataType dataType) { // currently only has DimCountDistinctAgg - if (funcName.equalsIgnoreCase("COUNT_DISTINCT")) { + if (funcName.equalsIgnoreCase(FunctionDesc.FUNC_COUNT_DISTINCT)) { return new DimCountDistinctMeasureType.DimCountDistinctMeasureTypeFactory().createMeasureType(funcName, dataType); } @@ -213,8 +245,8 @@ abstract public class MeasureTypeFactory<T> { } @Override - public Class getRewriteCalciteAggrFunctionClass() { - throw new UnsupportedOperationException(); + public Map<String, Class<?>> getRewriteCalciteAggrFunctions() { + return null; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java index 4ab4584..ed493a1 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java @@ -143,10 +143,4 @@ public class BasicMeasureType extends MeasureType { public boolean needRewrite() { return !isSum(); } - - @Override - public Class<?> getRewriteCalciteAggrFunctionClass() { - return null; - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java index 2b88e21..8e2b2f7 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java @@ -33,12 +33,16 @@ import org.apache.kylin.metadata.datatype.DataTypeSerializer; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.realization.SQLDigest; +import org.apache.kylin.metadata.realization.SQLDigest.SQLCall; + +import com.google.common.collect.ImmutableMap; /** * Created by sunyerui on 15/12/10. */ public class BitmapMeasureType extends MeasureType<BitmapCounter> { - public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT"; + public static final String FUNC_COUNT_DISTINCT = FunctionDesc.FUNC_COUNT_DISTINCT; public static final String FUNC_INTERSECT_COUNT_DISTINCT = "INTERSECT_COUNT"; public static final String DATATYPE_BITMAP = "bitmap"; @@ -151,30 +155,37 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> { } } - @Override - public boolean needRewrite() { + // In order to keep compatibility with old version, tinyint/smallint/int column use value directly, without dictionary + private boolean needDictionaryColumn(FunctionDesc functionDesc) { + DataType dataType = functionDesc.getParameter().getColRefs().get(0).getType(); + if (dataType.isIntegerFamily() && !dataType.isBigInt()) { + return false; + } return true; } @Override - public Class<?> getRewriteCalciteAggrFunctionClass() { - return BitmapDistinctCountAggFunc.class; + public boolean needRewrite() { + return true; } + static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.<String, Class<?>> of(// + FUNC_COUNT_DISTINCT, BitmapDistinctCountAggFunc.class, // + FUNC_INTERSECT_COUNT_DISTINCT, BitmapIntersectDistinctCountAggFunc.class); + @Override - public Class<?> getRewriteCalciteAggrFunctionClass(String callName) { - if (callName != null && callName.equalsIgnoreCase(FUNC_INTERSECT_COUNT_DISTINCT)) { - return BitmapIntersectDistinctCountAggFunc.class; - } - return BitmapDistinctCountAggFunc.class; + public Map<String, Class<?>> getRewriteCalciteAggrFunctions() { + return UDAF_MAP; } - // In order to keep compatibility with old version, tinyint/smallint/int column use value directly, without dictionary - private boolean needDictionaryColumn(FunctionDesc functionDesc) { - DataType dataType = functionDesc.getParameter().getColRefs().get(0).getType(); - if (dataType.isIntegerFamily() && !dataType.isBigInt()) { - return false; + @Override + public void adjustSqlDigest(List<MeasureDesc> measureDescs, SQLDigest sqlDigest) { + for (SQLCall call : sqlDigest.aggrSqlCalls) { + if (FUNC_INTERSECT_COUNT_DISTINCT.equals(call.function)) { + TblColRef col = (TblColRef) call.args.get(1); + if (!sqlDigest.groupbyColumns.contains(col)) + sqlDigest.groupbyColumns.add(col); + } } - return true; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java index 9fe1075..0b3fd94 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java @@ -19,15 +19,19 @@ package org.apache.kylin.measure.dim; import java.util.List; +import java.util.Map; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.measure.MeasureIngester; import org.apache.kylin.measure.MeasureType; import org.apache.kylin.measure.MeasureTypeFactory; import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.realization.SQLDigest; +import com.google.common.collect.ImmutableMap; + /** * Created by dongli on 4/20/16. */ @@ -76,9 +80,11 @@ public class DimCountDistinctMeasureType extends MeasureType<Object> { return false; } + static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.<String, Class<?>> of(FunctionDesc.FUNC_COUNT_DISTINCT, DimCountDistinctAggFunc.class); + @Override - public Class<?> getRewriteCalciteAggrFunctionClass() { - return DimCountDistinctAggFunc.class; + public Map<String, Class<?>> getRewriteCalciteAggrFunctions() { + return UDAF_MAP; } public void adjustSqlDigest(List<MeasureDesc> measureDescs, SQLDigest sqlDigest) { http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java index 796f1f7..c8f01ad 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java @@ -227,6 +227,7 @@ public class ExtendedColumnMeasureType extends MeasureType<ByteArray> { }; } + @SuppressWarnings("serial") @Override public MeasureAggregator<ByteArray> newAggregator() { return new MeasureAggregator<ByteArray>() { @@ -268,9 +269,4 @@ public class ExtendedColumnMeasureType extends MeasureType<ByteArray> { public boolean needRewrite() { return false; } - - @Override - public Class<?> getRewriteCalciteAggrFunctionClass() { - return null; - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java index bd5013e..0e58dca 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java @@ -31,9 +31,11 @@ import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; +import com.google.common.collect.ImmutableMap; + public class HLLCMeasureType extends MeasureType<HyperLogLogPlusCounter> { - public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT"; + public static final String FUNC_COUNT_DISTINCT = FunctionDesc.FUNC_COUNT_DISTINCT; public static final String DATATYPE_HLLC = "hllc"; public static class Factory extends MeasureTypeFactory<HyperLogLogPlusCounter> { @@ -116,9 +118,11 @@ public class HLLCMeasureType extends MeasureType<HyperLogLogPlusCounter> { return true; } + static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.<String, Class<?>> of(FUNC_COUNT_DISTINCT, HLLDistinctCountAggFunc.class); + @Override - public Class<?> getRewriteCalciteAggrFunctionClass() { - return HLLDistinctCountAggFunc.class; + public Map<String, Class<?>> getRewriteCalciteAggrFunctions() { + return UDAF_MAP; } public static boolean isCountDistinct(FunctionDesc func) { http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawMeasureType.java index 50715ec..3a49d31 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawMeasureType.java @@ -75,6 +75,7 @@ public class RawMeasureType extends MeasureType<List<ByteArray>> { } } + @SuppressWarnings("unused") private final DataType dataType; public RawMeasureType(String funcName, DataType dataType) { @@ -191,11 +192,6 @@ public class RawMeasureType extends MeasureType<List<ByteArray>> { } @Override - public Class<?> getRewriteCalciteAggrFunctionClass() { - return null; - } - - @Override public void adjustSqlDigest(List<MeasureDesc> measureDescs, SQLDigest sqlDigest) { if (sqlDigest.isRawQuery) { http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java index 800ca88..33ab314 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java @@ -288,11 +288,6 @@ public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> { } @Override - public Class<?> getRewriteCalciteAggrFunctionClass() { - return null; - } - - @Override public void adjustSqlDigest(List<MeasureDesc> measureDescs, SQLDigest sqlDigest) { for (MeasureDesc measureDesc : measureDescs) { FunctionDesc topnFunc = measureDesc.getFunction(); http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java index d2bba66..4887abb 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java @@ -18,7 +18,8 @@ package org.apache.kylin.metadata.realization; -import java.util.Collection; +import java.util.List; +import java.util.Set; import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.model.FunctionDesc; @@ -26,6 +27,8 @@ import org.apache.kylin.metadata.model.JoinDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; +import com.google.common.collect.ImmutableList; + /** */ public class SQLDigest { @@ -34,29 +37,41 @@ public class SQLDigest { ASCENDING, DESCENDING } + public static class SQLCall { + public final String function; + public final List<Object> args; + + public SQLCall(String function, Iterable<Object> args) { + this.function = function; + this.args = ImmutableList.copyOf(args); + } + } + public String factTable; public TupleFilter filter; - public Collection<JoinDesc> joinDescs; - public Collection<TblColRef> allColumns; - public Collection<TblColRef> groupbyColumns; - public Collection<TblColRef> filterColumns; - public Collection<TblColRef> metricColumns; - public Collection<FunctionDesc> aggregations; - public Collection<MeasureDesc> sortMeasures; - public Collection<OrderEnum> sortOrders; + public List<JoinDesc> joinDescs; + public Set<TblColRef> allColumns; + public List<TblColRef> groupbyColumns; + public Set<TblColRef> filterColumns; + public Set<TblColRef> metricColumns; + public List<FunctionDesc> aggregations; // storage level measure type, on top of which various sql aggr function may apply + public List<SQLCall> aggrSqlCalls; // sql level aggregation function call + public List<MeasureDesc> sortMeasures; + public List<OrderEnum> sortOrders; public boolean isRawQuery; - //initialized when org.apache.kylin.query.routing.QueryRouter.selectRealization() - public SQLDigest(String factTable, TupleFilter filter, Collection<JoinDesc> joinDescs, Collection<TblColRef> allColumns, // - Collection<TblColRef> groupbyColumns, Collection<TblColRef> filterColumns, Collection<TblColRef> aggregatedColumns, Collection<FunctionDesc> aggregateFunnc, Collection<MeasureDesc> sortMeasures, Collection<OrderEnum> sortOrders) { + public SQLDigest(String factTable, TupleFilter filter, List<JoinDesc> joinDescs, Set<TblColRef> allColumns, // + List<TblColRef> groupbyColumns, Set<TblColRef> filterColumns, Set<TblColRef> metricColumns, // + List<FunctionDesc> aggregations, List<SQLCall> aggrSqlCalls, List<MeasureDesc> sortMeasures, List<OrderEnum> sortOrders) { this.factTable = factTable; this.filter = filter; this.joinDescs = joinDescs; this.allColumns = allColumns; this.groupbyColumns = groupbyColumns; this.filterColumns = filterColumns; - this.metricColumns = aggregatedColumns; - this.aggregations = aggregateFunnc; + this.metricColumns = metricColumns; + this.aggregations = aggregations; + this.aggrSqlCalls = aggrSqlCalls; this.sortMeasures = sortMeasures; this.sortOrders = sortOrders; this.isRawQuery = isRawQuery(); http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java index 5e687a5..97dd750 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java @@ -27,6 +27,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; @@ -197,8 +198,8 @@ public class CubeReducerTest extends LocalFileMetadataTestCase { } @Override - public Class<?> getRewriteCalciteAggrFunctionClass() { - return origMeasureType.getRewriteCalciteAggrFunctionClass(); + public Map<String, Class<?>> getRewriteCalciteAggrFunctions() { + return origMeasureType.getRewriteCalciteAggrFunctions(); } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java index 136342d..4121c02 100644 --- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java @@ -33,6 +33,7 @@ import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.realization.SQLDigest; +import org.apache.kylin.metadata.realization.SQLDigest.SQLCall; import org.apache.kylin.metadata.tuple.ITuple; import org.apache.kylin.metadata.tuple.ITupleIterator; import org.apache.kylin.storage.IStorageQuery; @@ -144,7 +145,7 @@ public class ITStorageTest extends HBaseMetadataTestCase { int count = 0; ITupleIterator iterator = null; try { - SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations, new ArrayList<MeasureDesc>(), new ArrayList<SQLDigest.OrderEnum>()); + SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations, Collections.<SQLCall> emptyList(), new ArrayList<MeasureDesc>(), new ArrayList<SQLDigest.OrderEnum>()); iterator = storageEngine.search(context, sqlDigest, StorageMockUtils.newTupleInfo(groups, aggregations)); while (iterator.hasNext()) { ITuple tuple = iterator.next(); http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java b/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java index 095b5e2..f2894e7 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java @@ -49,6 +49,13 @@ public class ColumnRowType { return columns.get(index); } + public TblColRef getColumnByIndexNullable(int index) { + if (index < 0 || index >= columns.size()) + return null; + else + return columns.get(index); + } + public int getIndexByName(String columnName) { for (int i = 0; i < columns.size(); i++) { if (columns.get(i).getName().equals(columnName)) { http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java index 8ecb808..9c4f287 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java @@ -55,14 +55,14 @@ import org.apache.calcite.sql.type.SqlTypeFamily; import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.Util; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.measure.bitmap.BitmapMeasureType; +import org.apache.kylin.measure.MeasureTypeFactory; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.ParameterDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.realization.SQLDigest.SQLCall; import org.apache.kylin.query.schema.OLAPTable; import com.google.common.base.Preconditions; @@ -80,23 +80,28 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { AGGR_FUNC_MAP.put("$SUM0", "SUM"); AGGR_FUNC_MAP.put("COUNT", "COUNT"); AGGR_FUNC_MAP.put("COUNT_DISTINCT", "COUNT_DISTINCT"); - AGGR_FUNC_MAP.put(BitmapMeasureType.FUNC_INTERSECT_COUNT_DISTINCT, "COUNT_DISTINCT"); AGGR_FUNC_MAP.put("MAX", "MAX"); AGGR_FUNC_MAP.put("MIN", "MIN"); - for (String customAggrFunc : KylinConfig.getInstanceFromEnv().getCubeCustomMeasureTypes().keySet()) { - AGGR_FUNC_MAP.put(customAggrFunc.trim().toUpperCase(), customAggrFunc.trim().toUpperCase()); + Map<String, MeasureTypeFactory> udafFactories = MeasureTypeFactory.getUDAFFactories(); + for (String udaf : udafFactories.keySet()) { + AGGR_FUNC_MAP.put(udaf, udafFactories.get(udaf).getAggrFunctionName()); } } - private static String getFuncName(AggregateCall aggCall) { - String aggName = aggCall.getAggregation().getName(); + private static String getSqlFuncName(AggregateCall aggCall) { + String sqlName = aggCall.getAggregation().getName(); if (aggCall.isDistinct()) { - aggName = aggName + "_DISTINCT"; + sqlName = sqlName + "_DISTINCT"; } - String funcName = AGGR_FUNC_MAP.get(aggName); + return sqlName; + } + + private static String getAggrFuncName(AggregateCall aggCall) { + String sqlName = getSqlFuncName(aggCall); + String funcName = AGGR_FUNC_MAP.get(sqlName); if (funcName == null) { - throw new IllegalStateException("Don't suppoprt aggregation " + aggName); + throw new IllegalStateException("Don't suppoprt aggregation " + sqlName); } return funcName; } @@ -151,7 +156,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { // only translate the innermost aggregation if (!this.afterAggregate) { - translateGroupBy(); + this.context.groupByColumns.addAll(this.groups); this.context.aggregations.addAll(this.aggregations); this.context.afterAggregate = true; } else { @@ -226,15 +231,6 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { Set<TblColRef> columns = inputColumnRowType.getSourceColumnsByIndex(i); this.groups.addAll(columns); } - // Some UDAF may group data by itself, add group key into groups, prevents aggregate at cube storage server side - for (AggregateCall aggCall : this.rewriteAggCalls) { - String aggregateName = aggCall.getAggregation().getName(); - if (aggregateName.equalsIgnoreCase(BitmapMeasureType.FUNC_INTERSECT_COUNT_DISTINCT)) { - int index = aggCall.getArgList().get(1); - TblColRef column = inputColumnRowType.getColumnByIndex(index); - groups.add(column); - } - } } private void buildAggregations() { @@ -254,17 +250,13 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { } } FunctionDesc aggFunc = new FunctionDesc(); - String funcName = getFuncName(aggCall); + String funcName = getAggrFuncName(aggCall); aggFunc.setExpression(funcName); aggFunc.setParameter(parameter); this.aggregations.add(aggFunc); } } - private void translateGroupBy() { - context.groupByColumns.addAll(this.groups); - } - @Override public void implementRewrite(RewriteImplementor implementor) { // only rewrite the innermost aggregation @@ -286,6 +278,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { aggCall = rewriteAggregateCall(aggCall, cubeFunc); } this.rewriteAggCalls.add(aggCall); + this.context.aggrSqlCalls.add(toSqlCall(aggCall)); } } @@ -295,6 +288,18 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { } + private SQLCall toSqlCall(AggregateCall aggCall) { + ColumnRowType inputColumnRowType = ((OLAPRel) getInput()).getColumnRowType(); + + String function = getSqlFuncName(aggCall); + List<Object> args = Lists.newArrayList(); + for (Integer index : aggCall.getArgList()) { + TblColRef col = inputColumnRowType.getColumnByIndexNullable(index); + args.add(col); + } + return new SQLCall(function, args); + } + private void translateAggregation() { // now the realization is known, replace aggregations with what's defined on MeasureDesc List<MeasureDesc> measures = this.context.realization.getMeasures(); @@ -371,8 +376,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { private AggregateCall rewriteAggregateCall(AggregateCall aggCall, FunctionDesc func) { - //if it's not a cube, then the "needRewriteField func" should not resort to any rewrite fields, - // which do not exist at all + // if it's not a cube, then the "needRewriteField func" should not resort to any rewrite fields, which do not exist at all if (noPrecaculatedFieldsAvailable() && func.needRewriteField()) { logger.info(func + "skip rewriteAggregateCall because no pre-aggregated field available"); return aggCall; @@ -391,16 +395,18 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { } // rebuild function - String callName = aggCall.getAggregation().getName(); + String callName = getSqlFuncName(aggCall); RelDataType fieldType = aggCall.getType(); SqlAggFunction newAgg = aggCall.getAggregation(); + Map<String, Class<?>> udafMap = func.getMeasureType().getRewriteCalciteAggrFunctions(); if (func.isCount()) { newAgg = SqlStdOperatorTable.SUM0; - } else if (func.getMeasureType().getRewriteCalciteAggrFunctionClass() != null) { - newAgg = createCustomAggFunction(callName, fieldType, func.getMeasureType().getRewriteCalciteAggrFunctionClass(callName)); + } else if (udafMap != null && udafMap.containsKey(callName)) { + newAgg = createCustomAggFunction(callName, fieldType, udafMap.get(callName)); } // rebuild aggregate call + @SuppressWarnings("deprecation") AggregateCall newAggCall = new AggregateCall(newAgg, false, newArgList, fieldType, callName); return newAggCall; } http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java index 41a3b4d..cdec665 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; @@ -36,6 +37,7 @@ import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.metadata.realization.SQLDigest; +import org.apache.kylin.metadata.realization.SQLDigest.SQLCall; import org.apache.kylin.metadata.tuple.TupleInfo; import org.apache.kylin.query.schema.OLAPSchema; import org.apache.kylin.storage.StorageContext; @@ -116,11 +118,12 @@ public class OLAPContext { // cube metadata public IRealization realization; - public Collection<TblColRef> allColumns = new HashSet<TblColRef>(); - public Collection<TblColRef> groupByColumns = new ArrayList<TblColRef>(); - public Collection<TblColRef> metricsColumns = new HashSet<TblColRef>(); - public List<FunctionDesc> aggregations = new ArrayList<FunctionDesc>(); - public Collection<TblColRef> filterColumns = new HashSet<TblColRef>(); + public Set<TblColRef> allColumns = new HashSet<TblColRef>(); + public List<TblColRef> groupByColumns = new ArrayList<TblColRef>(); + public Set<TblColRef> metricsColumns = new HashSet<TblColRef>(); + public List<FunctionDesc> aggregations = new ArrayList<FunctionDesc>(); // storage level measure type, on top of which various sql aggr function may apply + public List<SQLCall> aggrSqlCalls = new ArrayList<SQLCall>(); // sql level aggregation function call + public Set<TblColRef> filterColumns = new HashSet<TblColRef>(); public TupleFilter filter; public List<JoinDesc> joins = new LinkedList<JoinDesc>(); private List<MeasureDesc> sortMeasures; @@ -144,7 +147,7 @@ public class OLAPContext { public SQLDigest getSQLDigest() { if (sqlDigest == null) - sqlDigest = new SQLDigest(firstTableScan.getTableName(), filter, joins, allColumns, groupByColumns, filterColumns, metricsColumns, aggregations, sortMeasures, sortOrders); + sqlDigest = new SQLDigest(firstTableScan.getTableName(), filter, joins, allColumns, groupByColumns, filterColumns, metricsColumns, aggregations, aggrSqlCalls, sortMeasures, sortOrders); return sqlDigest; } http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java index e42d9be..93f06dd 100644 --- a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java +++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java @@ -25,6 +25,7 @@ import java.io.Writer; import java.nio.charset.Charset; import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.calcite.schema.Schema; @@ -34,6 +35,7 @@ import org.apache.calcite.util.ConversionUtil; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.measure.MeasureTypeFactory; import org.apache.kylin.metadata.model.DatabaseDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.project.ProjectInstance; @@ -41,6 +43,8 @@ import org.apache.kylin.metadata.project.ProjectManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Maps; + /** */ public class OLAPSchemaFactory implements SchemaFactory { @@ -138,9 +142,14 @@ public class OLAPSchemaFactory implements SchemaFactory { } private static void createOLAPSchemaFunctions(Writer out) throws IOException { - out.write(" \"functions\": [\n"); - Map<String, String> udfs = KylinConfig.getInstanceFromEnv().getUDFs(); + Map<String, String> udfs = Maps.newHashMap(); + udfs.putAll(KylinConfig.getInstanceFromEnv().getUDFs()); + for (Entry<String, Class<?>> entry : MeasureTypeFactory.getUDAFs().entrySet()) { + udfs.put(entry.getKey(), entry.getValue().getName()); + } + int index = 0; + out.write(" \"functions\": [\n"); for (Map.Entry<String, String> udf : udfs.entrySet()) { String udfName = udf.getKey().trim().toUpperCase(); String udfClassName = udf.getValue().trim(); http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/MeasureTypeOnlyAggrInBaseTest.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/MeasureTypeOnlyAggrInBaseTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/MeasureTypeOnlyAggrInBaseTest.java index 1353862..e07ea91 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/MeasureTypeOnlyAggrInBaseTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/MeasureTypeOnlyAggrInBaseTest.java @@ -137,11 +137,6 @@ public class MeasureTypeOnlyAggrInBaseTest extends LocalFileMetadataTestCase { } @Override - public Class<?> getRewriteCalciteAggrFunctionClass() { - return null; - } - - @Override public boolean onlyAggrInBaseCuboid() { return true; }