http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/MeasureType.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/metadata/src/main/java/org/apache/kylin/measure/MeasureType.java new file mode 100644 index 0000000..66ca209 --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/MeasureType.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence; +import org.apache.kylin.metadata.realization.SQLDigest; +import org.apache.kylin.metadata.tuple.Tuple; +import org.apache.kylin.metadata.tuple.TupleInfo; + +/** + * MeasureType captures how a kind of aggregation is defined, how it is calculated + * during cube build, and how it is involved in query and storage scan. + * + * @param <T> the Java type of aggregation data object, e.g. HyperLogLogPlusCounter + */ +abstract public class MeasureType<T> { + + /* ============================================================================ + * Define + * ---------------------------------------------------------------------------- */ + + /** Validates a user defined FunctionDesc has expected parameter etc. Throw IllegalArgumentException if anything wrong. */ + public void validate(FunctionDesc functionDesc) throws IllegalArgumentException { + return; + } + + /** Although most aggregated object takes only 8 bytes like long or double, + * some advanced aggregation like HyperLogLog or TopN can consume more than 10 KB for + * each object, which requires special care on memory allocation. */ + public boolean isMemoryHungry() { + return false; + } + + /* ============================================================================ + * Build + * ---------------------------------------------------------------------------- */ + + /** Return a MeasureIngester which knows how to init aggregation object from raw records. */ + abstract public MeasureIngester<T> newIngester(); + + /** Return a MeasureAggregator which does aggregation. */ + abstract public MeasureAggregator<T> newAggregator(); + + /** Some special measures need dictionary to encode column values for optimal storage. TopN is an example. */ + public List<TblColRef> getColumnsNeedDictionary(FunctionDesc functionDesc) { + return Collections.emptyList(); + } + + /* ============================================================================ + * Cube Selection + * ---------------------------------------------------------------------------- */ + + /** + * Some special measures hold columns which are usually treated as dimensions (or vice-versa). + * This is where they override to influence cube capability check. + * + * A SQLDigest contains dimensions and measures extracted from a query. After comparing to + * cube definition, the matched dimensions and measures are crossed out, and what's left is + * the <code>unmatchedDimensions</code> and <code>unmatchedAggregations</code>. + * + * Each measure type on the cube is then called on this method to check if any of the unmatched + * can be fulfilled. If a measure type cannot fulfill any of the unmatched, it simply return null. + * Or otherwise, <code>unmatchedDimensions</code> and <code>unmatchedAggregations</code> must + * be modified to drop the satisfied dimension or measure, and a CapabilityInfluence object + * must be returned to mark the contribution of this measure type. + */ + public CapabilityInfluence influenceCapabilityCheck(Collection<TblColRef> unmatchedDimensions, Collection<FunctionDesc> unmatchedAggregations, SQLDigest digest, MeasureDesc measureDesc) { + return null; + } + + /* ============================================================================ + * Query Rewrite + * ---------------------------------------------------------------------------- */ + + // TODO support user defined Calcite aggr function + + /** Whether or not Calcite rel-tree needs rewrite to do last around of aggregation */ + abstract public boolean needRewrite(); + + /** Returns a Calcite aggregation function implementation class */ + abstract public Class<?> getRewriteCalciteAggrFunctionClass(); + + /* ============================================================================ + * Storage + * ---------------------------------------------------------------------------- */ + + /** + * Some special measures hold columns which are usually treated as dimensions (or vice-versa). + * They need to adjust dimensions and measures in <code>sqlDigest</code> before scanning, + * such that correct cuboid and measures can be selected by storage. + */ + public void adjustSqlDigest(MeasureDesc measureDesc, SQLDigest sqlDigest) { + } + + /** Return true if one storage record maps to multiple tuples, or false otherwise. */ + public boolean needAdvancedTupleFilling() { + return false; + } + + /** The simply filling mode, one tuple per storage record. */ + public void fillTupleSimply(Tuple tuple, int indexInTuple, Object measureValue) { + tuple.setMeasureValue(indexInTuple, measureValue); + } + + /** The advanced filling mode, multiple tuples per storage record. */ + public IAdvMeasureFiller getAdvancedTupleFiller(FunctionDesc function, TupleInfo returnTupleInfo, Map<TblColRef, Dictionary<String>> dictionaryMap) { + throw new UnsupportedOperationException(); + } + + public static interface IAdvMeasureFiller { + + /** Reload a value from storage and get ready to fill multiple tuples with it. */ + void reload(Object measureValue); + + /** Returns how many rows contained in last loaded value. */ + int getNumOfRows(); + + /** Fill in specified row into tuple. */ + void fillTuplle(Tuple tuple, int row); + } +}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java b/metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java new file mode 100644 index 0000000..0784f91 --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure; + +import java.util.List; +import java.util.Map; + +import org.apache.kylin.measure.basic.BasicMeasureType; +import org.apache.kylin.measure.hllc.HLLCMeasureType; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.datatype.DataTypeSerializer; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +abstract public class MeasureTypeFactory<T> { + + abstract public MeasureType<T> createMeasureType(String funcName, DataType dataType); + + abstract public String getAggrFunctionName(); + + abstract public String getAggrDataTypeName(); + + abstract public Class<? extends DataTypeSerializer<T>> getAggrDataTypeSerializer(); + + // ============================================================================ + + + private static Map<String, MeasureTypeFactory<?>> factories = Maps.newHashMap(); + private static MeasureTypeFactory<?> defaultFactory = new BasicMeasureType.Factory(); + + static { + init(); + } + + public static synchronized void init() { + if (factories.isEmpty() == false) + return; + + List<MeasureTypeFactory<?>> factoryInsts = Lists.newArrayList(); + + // two built-in advanced measure types + factoryInsts.add(new HLLCMeasureType.Factory()); + + /* + * Maybe do classpath search for more custom measure types? + * More MeasureType cannot be configured via kylin.properties alone, + * because used in coprocessor, the new classes must be on classpath + * and be packaged into coprocessor jar. + */ + + // register factories & data type serializers + for (MeasureTypeFactory<?> factory : factoryInsts) { + String funcName = factory.getAggrFunctionName().toUpperCase(); + String dataTypeName = factory.getAggrDataTypeName().toLowerCase(); + Class<? extends DataTypeSerializer<?>> serializer = factory.getAggrDataTypeSerializer(); + + DataType.register(dataTypeName); + DataTypeSerializer.register(dataTypeName, serializer); + factories.put(funcName, factory); + } + } + + public static MeasureType<?> create(String funcName, String dataType) { + return create(funcName, DataType.getInstance(dataType)); + } + + public static MeasureType<?> create(String funcName, DataType dataType) { + funcName = funcName.toUpperCase(); + + MeasureTypeFactory<?> factory = factories.get(funcName); + if (factory == null) + factory = defaultFactory; + + return factory.createMeasureType(funcName, dataType); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java b/metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java new file mode 100644 index 0000000..4ab4584 --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure.basic; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.measure.MeasureAggregator; +import org.apache.kylin.measure.MeasureIngester; +import org.apache.kylin.measure.MeasureType; +import org.apache.kylin.measure.MeasureTypeFactory; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.model.FunctionDesc; + +@SuppressWarnings("rawtypes") +public class BasicMeasureType extends MeasureType { + + public static class Factory extends MeasureTypeFactory { + + @Override + public MeasureType createMeasureType(String funcName, DataType dataType) { + return new BasicMeasureType(funcName, dataType); + } + + @Override + public String getAggrFunctionName() { + return null; + } + + @Override + public String getAggrDataTypeName() { + return null; + } + + @Override + public Class getAggrDataTypeSerializer() { + return null; + } + } + + private final String funcName; + private final DataType dataType; + + public BasicMeasureType(String funcName, DataType dataType) { + // note at query parsing phase, the data type may be null, because only function and parameters are known + this.funcName = funcName; + this.dataType = dataType; + } + + @Override + public void validate(FunctionDesc functionDesc) throws IllegalArgumentException { + DataType rtype = dataType; + + if (funcName.equals(FunctionDesc.FUNC_SUM)) { + if (rtype.isNumberFamily() == false) { + throw new IllegalArgumentException("Return type for function " + funcName + " must be one of " + DataType.NUMBER_FAMILY); + } + } else if (funcName.equals(FunctionDesc.FUNC_COUNT)) { + if (rtype.isIntegerFamily() == false) { + throw new IllegalArgumentException("Return type for function " + funcName + " must be one of " + DataType.INTEGER_FAMILY); + } + } else if (funcName.equals(FunctionDesc.FUNC_MAX) || funcName.equals(FunctionDesc.FUNC_MIN)) { + if (rtype.isNumberFamily() == false) { + throw new IllegalArgumentException("Return type for function " + funcName + " must be one of " + DataType.NUMBER_FAMILY); + } + } else { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + if (config.isQueryIgnoreUnknownFunction() == false) + throw new IllegalArgumentException("Unrecognized function: [" + funcName + "]"); + } + } + + @Override + public MeasureIngester<?> newIngester() { + if (dataType.isIntegerFamily()) + return new LongIngester(); + else if (dataType.isDecimal()) + return new BigDecimalIngester(); + else if (dataType.isNumberFamily()) + return new DoubleIngester(); + else + throw new IllegalArgumentException("No ingester for aggregation type " + dataType); + } + + @Override + public MeasureAggregator<?> newAggregator() { + if (isSum() || isCount()) { + if (dataType.isDecimal()) + return new BigDecimalSumAggregator(); + else if (dataType.isIntegerFamily()) + return new LongSumAggregator(); + else if (dataType.isNumberFamily()) + return new DoubleSumAggregator(); + } else if (isMax()) { + if (dataType.isDecimal()) + return new BigDecimalMaxAggregator(); + else if (dataType.isIntegerFamily()) + return new LongMaxAggregator(); + else if (dataType.isNumberFamily()) + return new DoubleMaxAggregator(); + } else if (isMin()) { + if (dataType.isDecimal()) + return new BigDecimalMinAggregator(); + else if (dataType.isIntegerFamily()) + return new LongMinAggregator(); + else if (dataType.isNumberFamily()) + return new DoubleMinAggregator(); + } + throw new IllegalArgumentException("No aggregator for func '" + funcName + "' and return type '" + dataType + "'"); + } + + private boolean isSum() { + return FunctionDesc.FUNC_SUM.equals(funcName); + } + + private boolean isCount() { + return FunctionDesc.FUNC_COUNT.equals(funcName); + } + + private boolean isMax() { + return FunctionDesc.FUNC_MAX.equals(funcName); + } + + private boolean isMin() { + return FunctionDesc.FUNC_MIN.equals(funcName); + } + + @Override + public boolean needRewrite() { + return !isSum(); + } + + @Override + public Class<?> getRewriteCalciteAggrFunctionClass() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java b/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java new file mode 100644 index 0000000..b51917c --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure.basic; + +import java.math.BigDecimal; +import java.util.Map; + +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.measure.MeasureIngester; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.TblColRef; + +public class BigDecimalIngester extends MeasureIngester<BigDecimal> { + + @Override + public BigDecimal valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { + if (values.length > 1) + throw new IllegalArgumentException(); + + if (values[0] == null) + return new BigDecimal(0); + else + return new BigDecimal(values[0]); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java b/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java new file mode 100644 index 0000000..44c8765 --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.measure.basic; + + +import org.apache.kylin.measure.MeasureAggregator; + +import java.math.BigDecimal; + +/** + * @author yangli9 + * + */ +public class BigDecimalMaxAggregator extends MeasureAggregator<BigDecimal> { + + BigDecimal max = null; + + @Override + public void reset() { + max = null; + } + + @Override + public void aggregate(BigDecimal value) { + if (max == null) + max = value; + else if (max.compareTo(value) < 0) + max = value; + } + + @Override + public BigDecimal getState() { + return max; + } + + @Override + public int getMemBytesEstimate() { + return guessBigDecimalMemBytes(); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java b/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java new file mode 100644 index 0000000..87f9acf --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.measure.basic; + + +import org.apache.kylin.measure.MeasureAggregator; + +import java.math.BigDecimal; + +/** + * @author yangli9 + * + */ +public class BigDecimalMinAggregator extends MeasureAggregator<BigDecimal> { + + BigDecimal max = null; + + @Override + public void reset() { + max = null; + } + + @Override + public void aggregate(BigDecimal value) { + if (max == null) + max = value; + else if (max.compareTo(value) > 0) + max = value; + } + + @Override + public BigDecimal getState() { + return max; + } + + @Override + public int getMemBytesEstimate() { + return guessBigDecimalMemBytes(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java b/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java new file mode 100644 index 0000000..de19828 --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.measure.basic; + + +import org.apache.kylin.measure.MeasureAggregator; + +import java.math.BigDecimal; + +/** + * @author yangli9 + * + */ +public class BigDecimalSumAggregator extends MeasureAggregator<BigDecimal> { + + BigDecimal sum = new BigDecimal(0); + + @Override + public void reset() { + sum = new BigDecimal(0); + } + + @Override + public void aggregate(BigDecimal value) { + sum = sum.add(value); + } + + @Override + public BigDecimal getState() { + return sum; + } + + @Override + public int getMemBytesEstimate() { + return guessBigDecimalMemBytes(); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java b/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java new file mode 100644 index 0000000..9118c28 --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure.basic; + +import java.util.Map; + +import org.apache.hadoop.io.DoubleWritable; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.measure.MeasureIngester; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.TblColRef; + +public class DoubleIngester extends MeasureIngester<DoubleWritable> { + + // avoid repeated object creation + private DoubleWritable current = new DoubleWritable(); + + @Override + public DoubleWritable valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { + if (values.length > 1) + throw new IllegalArgumentException(); + + DoubleWritable l = current; + if (values[0] == null) + l.set(0L); + else + l.set(Double.parseDouble(values[0])); + return l; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java b/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java new file mode 100644 index 0000000..dd1c727 --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.measure.basic; + +import org.apache.hadoop.io.DoubleWritable; +import org.apache.kylin.measure.MeasureAggregator; + +/** + * @author yangli9 + * + */ +public class DoubleMaxAggregator extends MeasureAggregator<DoubleWritable> { + + DoubleWritable max = null; + + @Override + public void reset() { + max = null; + } + + @Override + public void aggregate(DoubleWritable value) { + if (max == null) + max = new DoubleWritable(value.get()); + else if (max.get() < value.get()) + max.set(value.get()); + } + + @Override + public DoubleWritable getState() { + return max; + } + + @Override + public int getMemBytesEstimate() { + return guessDoubleMemBytes(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java b/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java new file mode 100644 index 0000000..5c19d7a --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.measure.basic; + +import org.apache.hadoop.io.DoubleWritable; +import org.apache.kylin.measure.MeasureAggregator; + +/** + * @author yangli9 + * + */ +public class DoubleMinAggregator extends MeasureAggregator<DoubleWritable> { + + DoubleWritable min = null; + + @Override + public void reset() { + min = null; + } + + @Override + public void aggregate(DoubleWritable value) { + if (min == null) + min = new DoubleWritable(value.get()); + else if (min.get() > value.get()) + min.set(value.get()); + } + + @Override + public DoubleWritable getState() { + return min; + } + + @Override + public int getMemBytesEstimate() { + return guessDoubleMemBytes(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java b/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java new file mode 100644 index 0000000..a5ec9ff --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.measure.basic; + +import org.apache.hadoop.io.DoubleWritable; +import org.apache.kylin.measure.MeasureAggregator; + +/** + * @author yangli9 + * + */ +public class DoubleSumAggregator extends MeasureAggregator<DoubleWritable> { + + DoubleWritable sum = new DoubleWritable(); + + @Override + public void reset() { + sum.set(0.0); + } + + @Override + public void aggregate(DoubleWritable value) { + sum.set(sum.get() + value.get()); + } + + @Override + public DoubleWritable getState() { + return sum; + } + + @Override + public int getMemBytesEstimate() { + return guessDoubleMemBytes(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java b/metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java new file mode 100644 index 0000000..6c951d1 --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure.basic; + +import java.util.Map; + +import org.apache.hadoop.io.LongWritable; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.measure.MeasureIngester; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.TblColRef; + +public class LongIngester extends MeasureIngester<LongWritable> { + + // avoid repeated object creation + private LongWritable current = new LongWritable(); + + @Override + public LongWritable valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { + if (values.length > 1) + throw new IllegalArgumentException(); + + LongWritable l = current; + if (values[0] == null) + l.set(0L); + else + l.set(Long.parseLong(values[0])); + return l; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java b/metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java new file mode 100644 index 0000000..38db6f1 --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.measure.basic; + +import org.apache.hadoop.io.LongWritable; +import org.apache.kylin.measure.MeasureAggregator; + +/** + * @author yangli9 + * + */ +public class LongMaxAggregator extends MeasureAggregator<LongWritable> { + + LongWritable max = null; + + @Override + public void reset() { + max = null; + } + + @Override + public void aggregate(LongWritable value) { + if (max == null) + max = new LongWritable(value.get()); + else if (max.get() < value.get()) + max.set(value.get()); + } + + @Override + public LongWritable getState() { + return max; + } + + @Override + public int getMemBytesEstimate() { + return guessLongMemBytes(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java b/metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java new file mode 100644 index 0000000..2774169 --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.measure.basic; + +import org.apache.hadoop.io.LongWritable; +import org.apache.kylin.measure.MeasureAggregator; + +/** + * @author yangli9 + * + */ +public class LongMinAggregator extends MeasureAggregator<LongWritable> { + + LongWritable min = null; + + @Override + public void reset() { + min = null; + } + + @Override + public void aggregate(LongWritable value) { + if (min == null) + min = new LongWritable(value.get()); + else if (min.get() > value.get()) + min.set(value.get()); + } + + @Override + public LongWritable getState() { + return min; + } + + @Override + public int getMemBytesEstimate() { + return guessLongMemBytes(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java b/metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java new file mode 100644 index 0000000..7b4e0b3 --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.measure.basic; + +import org.apache.hadoop.io.LongWritable; +import org.apache.kylin.measure.MeasureAggregator; + +/** + * @author yangli9 + * + */ +public class LongSumAggregator extends MeasureAggregator<LongWritable> { + + LongWritable sum = new LongWritable(); + + @Override + public void reset() { + sum.set(0); + } + + @Override + public void aggregate(LongWritable value) { + sum.set(sum.get() + value.get()); + } + + @Override + public LongWritable getState() { + return sum; + } + + @Override + public int getMemBytesEstimate() { + return guessLongMemBytes(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java b/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java new file mode 100644 index 0000000..90be370 --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure.hllc; + +import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.measure.MeasureAggregator; + +/** + */ +@SuppressWarnings("serial") +public class HLLCAggregator extends MeasureAggregator<HyperLogLogPlusCounter> { + + final int precision; + HyperLogLogPlusCounter sum = null; + + public HLLCAggregator(int precision) { + this.precision = precision; + } + + @Override + public void reset() { + sum = null; + } + + @Override + public void aggregate(HyperLogLogPlusCounter value) { + if (sum == null) + sum = new HyperLogLogPlusCounter(value); + else + sum.merge(value); + } + + @Override + public HyperLogLogPlusCounter getState() { + return sum; + } + + @Override + public int getMemBytesEstimate() { + // 1024 + 60 returned by AggregationCacheMemSizeTest + return 8 // aggregator obj shell + + 4 // precision + + 8 // ref to HLLC + + 8 // HLLC obj shell + + 32 + (1 << precision); // HLLC internal + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java b/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java new file mode 100644 index 0000000..fffb0d1 --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure.hllc; + +import java.util.Map; + +import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.measure.MeasureAggregator; +import org.apache.kylin.measure.MeasureIngester; +import org.apache.kylin.measure.MeasureType; +import org.apache.kylin.measure.MeasureTypeFactory; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.datatype.DataTypeSerializer; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.TblColRef; + +public class HLLCMeasureType extends MeasureType<HyperLogLogPlusCounter> { + + public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT"; + public static final String DATATYPE_HLLC = "hllc"; + + public static class Factory extends MeasureTypeFactory<HyperLogLogPlusCounter> { + + @Override + public MeasureType<HyperLogLogPlusCounter> createMeasureType(String funcName, DataType dataType) { + return new HLLCMeasureType(funcName, dataType); + } + + @Override + public String getAggrFunctionName() { + return FUNC_COUNT_DISTINCT; + } + + @Override + public String getAggrDataTypeName() { + return DATATYPE_HLLC; + } + + @Override + public Class<? extends DataTypeSerializer<HyperLogLogPlusCounter>> getAggrDataTypeSerializer() { + return HLLCSerializer.class; + } + } + + // ============================================================================ + + private final DataType dataType; + + public HLLCMeasureType(String funcName, DataType dataType) { + // note at query parsing phase, the data type may be null, because only function and parameters are known + this.dataType = dataType; + } + + public void validate(FunctionDesc functionDesc) throws IllegalArgumentException { + validate(functionDesc.getExpression(), functionDesc.getReturnDataType(), true); + } + + private void validate(String funcName, DataType dataType, boolean checkDataType) { + if (FUNC_COUNT_DISTINCT.equals(funcName) == false) + throw new IllegalArgumentException(); + + if (DATATYPE_HLLC.equals(dataType.getName()) == false) + throw new IllegalArgumentException(); + + if (dataType.getPrecision() < 1 || dataType.getPrecision() > 5000) + throw new IllegalArgumentException(); + } + + @Override + public boolean isMemoryHungry() { + return true; + } + + @Override + public MeasureIngester<HyperLogLogPlusCounter> newIngester() { + return new MeasureIngester<HyperLogLogPlusCounter>() { + HyperLogLogPlusCounter current = new HyperLogLogPlusCounter(dataType.getPrecision()); + + @Override + public HyperLogLogPlusCounter valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { + HyperLogLogPlusCounter hllc = current; + hllc.clear(); + for (String v : values) + hllc.add(v == null ? "__nUlL__" : v); + return hllc; + } + }; + } + + @Override + public MeasureAggregator<HyperLogLogPlusCounter> newAggregator() { + return new HLLCAggregator(dataType.getPrecision()); + } + + @Override + public boolean needRewrite() { + return true; + } + + @Override + public Class<?> getRewriteCalciteAggrFunctionClass() { + return HLLDistinctCountAggFunc.class; + } + + public static boolean isCountDistinct(FunctionDesc func) { + return FUNC_COUNT_DISTINCT.equalsIgnoreCase(func.getExpression()); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java b/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java new file mode 100644 index 0000000..e5a379a --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure.hllc; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.datatype.DataTypeSerializer; + +/** + * @author yangli9 + * + */ +public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> { + + // be thread-safe and avoid repeated obj creation + private ThreadLocal<HyperLogLogPlusCounter> current = new ThreadLocal<HyperLogLogPlusCounter>(); + + private int precision; + + public HLLCSerializer(DataType type) { + this.precision = type.getPrecision(); + } + + @Override + public void serialize(HyperLogLogPlusCounter value, ByteBuffer out) { + try { + value.writeRegisters(out); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private HyperLogLogPlusCounter current() { + HyperLogLogPlusCounter hllc = current.get(); + if (hllc == null) { + hllc = new HyperLogLogPlusCounter(precision); + current.set(hllc); + } + return hllc; + } + + @Override + public HyperLogLogPlusCounter deserialize(ByteBuffer in) { + HyperLogLogPlusCounter hllc = current(); + try { + hllc.readRegisters(in); + } catch (IOException e) { + throw new RuntimeException(e); + } + return hllc; + } + + @Override + public int peekLength(ByteBuffer in) { + return current().peekLength(in); + } + + @Override + public int maxLength() { + return current().maxLength(); + } + + @Override + public int getStorageBytesEstimate() { + // for HLL, it will be compressed when export to bytes + return (int) (current().maxLength() * 0.75); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java b/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java new file mode 100644 index 0000000..471bc8a --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.measure.hllc; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.kylin.common.hll.HyperLogLogPlusCounter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author xjiang + */ +public class HLLDistinctCountAggFunc { + + private static final Logger logger = LoggerFactory.getLogger(HLLDistinctCountAggFunc.class); + + public static HyperLogLogPlusCounter init() { + return null; + } + + public static HyperLogLogPlusCounter initAdd(Object v) { + if (v instanceof Long) { // holistic case + long l = (Long) v; + return new FixedValueHLLCMockup(l); + } else { + HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v; + return new HyperLogLogPlusCounter(c); + } + } + + public static HyperLogLogPlusCounter add(HyperLogLogPlusCounter counter, Object v) { + if (v instanceof Long) { // holistic case + long l = (Long) v; + if (counter == null) { + return new FixedValueHLLCMockup(l); + } else { + if (!(counter instanceof FixedValueHLLCMockup)) + throw new IllegalStateException("counter is not FixedValueHLLCMockup"); + + ((FixedValueHLLCMockup) counter).set(l); + return counter; + } + } else { + HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v; + if (counter == null) { + return new HyperLogLogPlusCounter(c); + } else { + counter.merge(c); + return counter; + } + } + } + + public static HyperLogLogPlusCounter merge(HyperLogLogPlusCounter counter0, Object counter1) { + return add(counter0, counter1); + } + + public static long result(HyperLogLogPlusCounter counter) { + return counter == null ? 0L : counter.getCountEstimate(); + } + + private static class FixedValueHLLCMockup extends HyperLogLogPlusCounter { + + private Long value = null; + + FixedValueHLLCMockup(long value) { + this.value = value; + } + + public void set(long value) { + if (this.value == null) { + this.value = value; + } else { + long oldValue = Math.abs(this.value.longValue()); + long take = Math.max(oldValue, value); + logger.warn("Error to aggregate holistic count distinct, old value " + oldValue + ", new value " + value + ", taking " + take); + this.value = -take; // make it obvious that this value is wrong + } + } + + @Override + public void clear() { + this.value = null; + } + + @Override + protected void add(long hash) { + throw new UnsupportedOperationException(); + } + + @Override + public void merge(HyperLogLogPlusCounter another) { + throw new UnsupportedOperationException(); + } + + @Override + public long getCountEstimate() { + return value; + } + + @Override + public void writeRegisters(ByteBuffer out) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void readRegisters(ByteBuffer in) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (int) (value ^ (value >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (getClass() != obj.getClass()) + return false; + FixedValueHLLCMockup other = (FixedValueHLLCMockup) obj; + if (!value.equals(other.value)) + return false; + return true; + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java new file mode 100644 index 0000000..468e4b0 --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metadata.datatype; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; + +import org.apache.kylin.common.util.BytesUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author yangli9 + * + */ +public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> { + + private static final Logger logger = LoggerFactory.getLogger(BigDecimalSerializer.class); + + final DataType type; + final int maxLength; + + int avoidVerbose = 0; + + public BigDecimalSerializer(DataType type) { + this.type = type; + // see serialize(): 1 byte scale, 1 byte length, assume every 2 digits takes 1 byte + this.maxLength = 1 + 1 + (type.getPrecision() + 1) / 2; + } + + @Override + public void serialize(BigDecimal value, ByteBuffer out) { + if (value.scale() > type.getScale()) { + if (avoidVerbose % 10000 == 0) { + logger.warn("value's scale has exceeded the " + type.getScale() + ", cut it off, to ensure encoded value do not exceed maxLength " + maxLength + " times:" + (avoidVerbose++)); + } + value = value.setScale(type.getScale(), BigDecimal.ROUND_HALF_EVEN); + } + byte[] bytes = value.unscaledValue().toByteArray(); + if (bytes.length + 2 > maxLength) { + throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type); + } + + BytesUtil.writeVInt(value.scale(), out); + BytesUtil.writeVInt(bytes.length, out); + out.put(bytes); + } + + @Override + public BigDecimal deserialize(ByteBuffer in) { + int scale = BytesUtil.readVInt(in); + int n = BytesUtil.readVInt(in); + + byte[] bytes = new byte[n]; + in.get(bytes); + + return new BigDecimal(new BigInteger(bytes), scale); + } + + @Override + public int peekLength(ByteBuffer in) { + int mark = in.position(); + + @SuppressWarnings("unused") + int scale = BytesUtil.readVInt(in); + int n = BytesUtil.readVInt(in); + int len = in.position() - mark + n; + + in.position(mark); + return len; + } + + @Override + public int maxLength() { + return maxLength; + } + + @Override + public int getStorageBytesEstimate() { + return 8; + } + + @Override + public BigDecimal valueOf(String str) { + return new BigDecimal(str); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java b/metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java new file mode 100644 index 0000000..0ac07e1 --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.metadata.datatype; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.lang.StringUtils; +import org.apache.kylin.measure.MeasureTypeFactory; +import org.apache.kylin.metadata.model.TblColRef; + +/** + * @author yangli9 + * + */ +public class DataType { + private static final LinkedHashSet<String> VALID_TYPES = new LinkedHashSet<String>(); + + private static Pattern TYPE_PATTERN = null; + private static final String TYPE_PATTEN_TAIL = "\\s*" // + + "(?:" + "[(]" + "([\\d\\s,]+)" + "[)]" + ")?"; + + public static synchronized void register(String... typeNames) { + for (String typeName : typeNames) { + VALID_TYPES.add(typeName); + } + + TYPE_PATTERN = Pattern.compile( // + "(" + StringUtils.join(VALID_TYPES, "|") + ")" // + + TYPE_PATTEN_TAIL, Pattern.CASE_INSENSITIVE); + } + + // standard sql types, ref: http://www.w3schools.com/sql/sql_datatypes_general.asp + static { + register("any", "char", "varchar", "string", // + "boolean", "byte", "binary", // + "int", "short", "long", "integer", "tinyint", "smallint", "bigint", // + "float", "real", "double", "decimal", "numeric", // + "date", "time", "datetime", "timestamp", // + TblColRef.InnerDataTypeEnum.LITERAL.getDataType(), TblColRef.InnerDataTypeEnum.DERIVED.getDataType()); + } + + public static final Set<String> INTEGER_FAMILY = new HashSet<String>(); + public static final Set<String> NUMBER_FAMILY = new HashSet<String>(); + public static final Set<String> DATETIME_FAMILY = new HashSet<String>(); + public static final Set<String> STRING_FAMILY = new HashSet<String>(); + private static final Map<String, String> LEGACY_TYPE_MAP = new HashMap<String, String>(); + static { + INTEGER_FAMILY.add("tinyint"); + INTEGER_FAMILY.add("smallint"); + INTEGER_FAMILY.add("integer"); + INTEGER_FAMILY.add("bigint"); + + NUMBER_FAMILY.addAll(INTEGER_FAMILY); + NUMBER_FAMILY.add("float"); + NUMBER_FAMILY.add("double"); + NUMBER_FAMILY.add("decimal"); + NUMBER_FAMILY.add("real"); + NUMBER_FAMILY.add("numeric"); + + DATETIME_FAMILY.add("date"); + DATETIME_FAMILY.add("time"); + DATETIME_FAMILY.add("datetime"); + DATETIME_FAMILY.add("timestamp"); + + STRING_FAMILY.add("varchar"); + STRING_FAMILY.add("char"); + + LEGACY_TYPE_MAP.put("byte", "tinyint"); + LEGACY_TYPE_MAP.put("int", "integer"); + LEGACY_TYPE_MAP.put("short", "smallint"); + LEGACY_TYPE_MAP.put("long", "bigint"); + LEGACY_TYPE_MAP.put("string", "varchar"); + LEGACY_TYPE_MAP.put("hllc10", "hllc(10)"); + LEGACY_TYPE_MAP.put("hllc12", "hllc(12)"); + LEGACY_TYPE_MAP.put("hllc14", "hllc(14)"); + LEGACY_TYPE_MAP.put("hllc15", "hllc(15)"); + LEGACY_TYPE_MAP.put("hllc16", "hllc(16)"); + + } + + private static final ConcurrentMap<DataType, DataType> CACHE = new ConcurrentHashMap<DataType, DataType>(); + + public static final DataType ANY = DataType.getInstance("any"); + + + static { + MeasureTypeFactory.init(); + } + + public static DataType getInstance(String type) { + if (type == null) + return null; + + DataType dataType = new DataType(type); + DataType cached = CACHE.get(dataType); + if (cached == null) { + CACHE.put(dataType, dataType); + cached = dataType; + } + return cached; + } + + // ============================================================================ + + private String name; + private int precision; + private int scale; + + + private DataType(String datatype) { + datatype = datatype.trim().toLowerCase(); + datatype = replaceLegacy(datatype); + + Pattern pattern = TYPE_PATTERN; + Matcher m = pattern.matcher(datatype); + if (m.matches() == false) + throw new IllegalArgumentException("bad data type -- " + datatype + ", does not match " + pattern); + + name = replaceLegacy(m.group(1)); + precision = -1; + scale = -1; + + String leftover = m.group(2); + if (leftover != null) { + String[] parts = leftover.split("\\s*,\\s*"); + for (int i = 0; i < parts.length; i++) { + int n; + try { + n = Integer.parseInt(parts[i]); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("bad data type -- " + datatype + ", precision/scale not numeric"); + } + if (i == 0) + precision = n; + else if (i == 1) + scale = n; + else + throw new IllegalArgumentException("bad data type -- " + datatype + ", too many precision/scale parts"); + } + } + + // FIXME 256 for unknown string precision + if ((name.equals("char") || name.equals("varchar")) && precision == -1) { + precision = 256; // to save memory at frontend, e.g. tableau will + // allocate memory according to this + } + + // FIXME (19,4) for unknown decimal precision + if ((name.equals("decimal") || name.equals("numeric")) && precision == -1) { + precision = 19; + scale = 4; + } + } + + private String replaceLegacy(String str) { + String replace = LEGACY_TYPE_MAP.get(str); + return replace == null ? str : replace; + } + + public int getStorageBytesEstimate() { + return DataTypeSerializer.create(this).getStorageBytesEstimate(); + } + + + public boolean isStringFamily() { + return STRING_FAMILY.contains(name); + } + + public boolean isIntegerFamily() { + return INTEGER_FAMILY.contains(name); + } + + public boolean isNumberFamily() { + return NUMBER_FAMILY.contains(name); + } + + public boolean isDateTimeFamily() { + return DATETIME_FAMILY.contains(name); + } + + public boolean isTinyInt() { + return name.equals("tinyint"); + } + + public boolean isSmallInt() { + return name.equals("smallint"); + } + + public boolean isInt() { + return name.equals("integer"); + } + + public boolean isBigInt() { + return name.equals("bigint"); + } + + public boolean isFloat() { + return name.equals("float"); + } + + public boolean isDouble() { + return name.equals("double"); + } + + public boolean isDecimal() { + return name.equals("decimal"); + } + + public String getName() { + return name; + } + + public int getPrecision() { + return precision; + } + + public int getScale() { + return scale; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((name == null) ? 0 : name.hashCode()); + result = prime * result + precision; + result = prime * result + scale; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + DataType other = (DataType) obj; + if (name == null) { + if (other.name != null) + return false; + } else if (!name.equals(other.name)) + return false; + if (precision != other.precision) + return false; + if (scale != other.scale) + return false; + return true; + } + + @Override + public String toString() { + if (precision < 0 && scale < 0) + return name; + else if (scale < 0) + return name + "(" + precision + ")"; + else + return name + "(" + precision + "," + scale + ")"; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java new file mode 100644 index 0000000..601925b --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metadata.datatype; + +import java.nio.ByteBuffer; +import java.util.Map; + +import org.apache.kylin.common.util.BytesSerializer; + +import com.google.common.collect.Maps; + +/** + * Note: the implementations MUST be thread-safe. + */ +abstract public class DataTypeSerializer<T> implements BytesSerializer<T> { + + final static Map<String, Class<?>> implementations = Maps.newHashMap(); + static { + implementations.put("varchar", StringSerializer.class); + implementations.put("decimal", BigDecimalSerializer.class); + implementations.put("double", DoubleSerializer.class); + implementations.put("float", DoubleSerializer.class); + implementations.put("bigint", LongSerializer.class); + implementations.put("long", LongSerializer.class); + implementations.put("integer", LongSerializer.class); + implementations.put("int", LongSerializer.class); + implementations.put("smallint", LongSerializer.class); + implementations.put("date", DateTimeSerializer.class); + implementations.put("datetime", DateTimeSerializer.class); + implementations.put("timestamp", DateTimeSerializer.class); + } + + public static void register(String dataTypeName, Class<? extends DataTypeSerializer<?>> impl) { + implementations.put(dataTypeName, impl); + } + + public static DataTypeSerializer<?> create(String dataType) { + return create(DataType.getInstance(dataType)); + } + + public static DataTypeSerializer<?> create(DataType type) { + Class<?> clz = implementations.get(type.getName()); + if (clz == null) + throw new RuntimeException("No DataTypeSerializer for type " + type); + + try { + return (DataTypeSerializer<?>) clz.getConstructor(DataType.class).newInstance(type); + } catch (Exception e) { + throw new RuntimeException(e); // never happen + } + } + + /** peek into buffer and return the length of serialization */ + abstract public int peekLength(ByteBuffer in); + + /** return the max number of bytes to the longest serialization */ + abstract public int maxLength(); + + /** get an estimate of size in bytes of the serialized data */ + abstract public int getStorageBytesEstimate(); + + /** an optional convenient method that converts a string to this data type (for dimensions) */ + public T valueOf(String str) { + throw new UnsupportedOperationException(); + } + + /** convert from obj to string */ + public String toString(T value) { + if (value == null) + return "NULL"; + else + return value.toString(); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java new file mode 100644 index 0000000..f14481a --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java @@ -0,0 +1,57 @@ +package org.apache.kylin.metadata.datatype; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.io.LongWritable; +import org.apache.kylin.common.util.DateFormat; + +public class DateTimeSerializer extends DataTypeSerializer<LongWritable> { + + // be thread-safe and avoid repeated obj creation + private ThreadLocal<LongWritable> current = new ThreadLocal<LongWritable>(); + + public DateTimeSerializer(DataType type) { + } + + @Override + public void serialize(LongWritable value, ByteBuffer out) { + out.putLong(value.get()); + } + + private LongWritable current() { + LongWritable l = current.get(); + if (l == null) { + l = new LongWritable(); + current.set(l); + } + return l; + } + + @Override + public LongWritable deserialize(ByteBuffer in) { + LongWritable l = current(); + l.set(in.getLong()); + return l; + } + + @Override + public int peekLength(ByteBuffer in) { + return 8; + } + + @Override + public int maxLength() { + return 8; + } + + @Override + public int getStorageBytesEstimate() { + return 8; + } + + @Override + public LongWritable valueOf(String str) { + return new LongWritable(DateFormat.stringToMillis(str)); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java new file mode 100644 index 0000000..04b43cf --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metadata.datatype; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.io.DoubleWritable; + +/** + */ +public class DoubleSerializer extends DataTypeSerializer<DoubleWritable> { + + // be thread-safe and avoid repeated obj creation + private ThreadLocal<DoubleWritable> current = new ThreadLocal<DoubleWritable>(); + + public DoubleSerializer(DataType type) { + } + + @Override + public void serialize(DoubleWritable value, ByteBuffer out) { + out.putDouble(value.get()); + } + + private DoubleWritable current() { + DoubleWritable d = current.get(); + if (d == null) { + d = new DoubleWritable(); + current.set(d); + } + return d; + } + + @Override + public DoubleWritable deserialize(ByteBuffer in) { + DoubleWritable d = current(); + d.set(in.getDouble()); + return d; + } + + @Override + public int peekLength(ByteBuffer in) { + return 8; + } + + @Override + public int maxLength() { + return 8; + } + + @Override + public int getStorageBytesEstimate() { + return 8; + } + + @Override + public DoubleWritable valueOf(String str) { + return new DoubleWritable(Double.parseDouble(str)); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java new file mode 100644 index 0000000..d893b29 --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.metadata.datatype; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.io.LongWritable; +import org.apache.kylin.common.util.BytesUtil; + +/** + */ +public class LongSerializer extends DataTypeSerializer<LongWritable> { + + // be thread-safe and avoid repeated obj creation + private ThreadLocal<LongWritable> current = new ThreadLocal<LongWritable>(); + + public LongSerializer(DataType type) { + } + + @Override + public void serialize(LongWritable value, ByteBuffer out) { + BytesUtil.writeVLong(value.get(), out); + } + + private LongWritable current() { + LongWritable l = current.get(); + if (l == null) { + l = new LongWritable(); + current.set(l); + } + return l; + } + + @Override + public LongWritable deserialize(ByteBuffer in) { + LongWritable l = current(); + l.set(BytesUtil.readVLong(in)); + return l; + } + + @Override + public int peekLength(ByteBuffer in) { + int mark = in.position(); + + BytesUtil.readVLong(in); + int len = in.position() - mark; + + in.position(mark); + return len; + } + + @Override + public int maxLength() { + return 9; // vlong: 1 + 8 + } + + @Override + public int getStorageBytesEstimate() { + return 5; + } + + @Override + public LongWritable valueOf(String str) { + return new LongWritable(Long.parseLong(str)); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/datatype/StringSerializer.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/metadata/datatype/StringSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/datatype/StringSerializer.java new file mode 100644 index 0000000..eef2868 --- /dev/null +++ b/metadata/src/main/java/org/apache/kylin/metadata/datatype/StringSerializer.java @@ -0,0 +1,52 @@ +package org.apache.kylin.metadata.datatype; + +import java.nio.ByteBuffer; + +import org.apache.kylin.common.util.BytesUtil; + +public class StringSerializer extends DataTypeSerializer<String> { + + final DataType type; + final int maxLength; + + public StringSerializer(DataType type) { + this.type = type; + // see serialize(): 2 byte length, rest is String.toBytes() + this.maxLength = 2 + type.getPrecision(); + } + + @Override + public void serialize(String value, ByteBuffer out) { + int start = out.position(); + + BytesUtil.writeUTFString(value, out); + + if (out.position() - start > maxLength) + throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type); + } + + @Override + public String deserialize(ByteBuffer in) { + return BytesUtil.readUTFString(in); + } + + @Override + public int peekLength(ByteBuffer in) { + return BytesUtil.peekByteArrayLength(in); + } + + @Override + public int maxLength() { + return maxLength; + } + + @Override + public int getStorageBytesEstimate() { + return maxLength; + } + + @Override + public String valueOf(String str) { + return str; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMaxAggregator.java ---------------------------------------------------------------------- diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMaxAggregator.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMaxAggregator.java deleted file mode 100644 index 56222f1..0000000 --- a/metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMaxAggregator.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.metadata.measure; - -import java.math.BigDecimal; - -/** - * @author yangli9 - * - */ -public class BigDecimalMaxAggregator extends MeasureAggregator<BigDecimal> { - - BigDecimal max = null; - - @Override - public void reset() { - max = null; - } - - @Override - public void aggregate(BigDecimal value) { - if (max == null) - max = value; - else if (max.compareTo(value) < 0) - max = value; - } - - @Override - public BigDecimal getState() { - return max; - } - - @Override - public int getMemBytes() { - return guessBigDecimalMemBytes(); - } -}