This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new 4b396ab KYLIN-4751 Fix NPE issue when run test case TestTopNUDAF 4b396ab is described below commit 4b396ab86152a7f72aecb65e69f87441394d7373 Author: Zhichao Zhang <441586...@qq.com> AuthorDate: Thu Sep 10 09:51:38 2020 +0800 KYLIN-4751 Fix NPE issue when run test case TestTopNUDAF --- .../org/apache/kylin/measure/topn/Counter.java | 34 ++++++-- .../org/apache/kylin/measure/topn/TopNCounter.java | 94 +++++++++++++++------- .../apache/kylin/measure/topn/TopNMeasureType.java | 22 ++--- .../kylin/measure/topn/TopNCounterBasicTest.java | 4 +- .../sql_topn/{query45.sql.disable => query45.sql} | 2 +- .../src/test/resources/query/sql_topn/query81.sql | 4 +- .../kylin/engine/spark/job/TestTopNUDAF.scala | 3 +- 7 files changed, 107 insertions(+), 56 deletions(-) diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java index 219c712..42422df 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java @@ -18,6 +18,10 @@ package org.apache.kylin.measure.topn; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; import java.io.Serializable; /** @@ -25,10 +29,10 @@ import java.io.Serializable; * * @param <T> */ -public class Counter<T> implements Serializable{ +public class Counter<T> implements Externalizable, Serializable { protected T item; - protected double count; + protected Double count; /** * For de-serialization @@ -37,30 +41,46 @@ public class Counter<T> implements Serializable{ } public Counter(T item) { - this.count = 0; + this.count = 0d; this.item = item; } - public Counter(T item, double count) { + public Counter(T item, Double count) { this.item = item; this.count = count; } - public T getItem() { return item; } - public double getCount() { + public Double getCount() { return count; } - public void setCount(double count) { + public void setItem(T item) { + this.item = item; + } + + public void setCount(Double count) { this.count = count; } + @Override public String toString() { return item + ":" + count; } + @SuppressWarnings("unchecked") + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + item = (T) in.readObject(); + count = in.readDouble(); + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(item); + out.writeDouble(count); + } } diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java index 979d591..2352bbd 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java @@ -19,7 +19,6 @@ package org.apache.kylin.measure.topn; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; @@ -46,7 +45,7 @@ public class TopNCounter<T> implements Iterable<Counter<T>>, java.io.Serializabl protected int capacity; private HashMap<T, Counter<T>> counterMap; - protected LinkedList<Counter<T>> counterList; //a linked list, first the is the toppest element + protected LinkedList<Counter<T>> counterList; //a linked list, first one is the toppest element private boolean ordered = true; private boolean descending = true; @@ -75,17 +74,30 @@ public class TopNCounter<T> implements Iterable<Counter<T>>, java.io.Serializabl * Algorithm: <i>Space-Saving</i> * * @param item stream element (<i>e</i>) - * @return false if item was already in the stream summary, true otherwise */ - public void offer(T item, double incrementCount) { + public void offer(T item, Double incrementCount) { Counter<T> counterNode = counterMap.get(item); + if (counterNode == null) { - counterNode = new Counter<T>(item, incrementCount); + if (size() < capacity) { + counterNode = new Counter<>(item, null); + if (this.descending) { + counterList.addLast(counterNode); + } else { + counterList.addFirst(counterNode); + } + } else { + // the min item should be dropped + if (!ordered) { + sort(); + } + counterNode = this.descending ? counterList.getLast() : counterList.getFirst(); + counterMap.remove(counterNode.getItem()); + counterNode.setItem(item); + } counterMap.put(item, counterNode); - counterList.add(counterNode); - } else { - counterNode.setCount(counterNode.getCount() + incrementCount); } + incrementCounter(counterNode, incrementCount); ordered = false; } @@ -93,9 +105,8 @@ public class TopNCounter<T> implements Iterable<Counter<T>>, java.io.Serializabl * Sort and keep the expected size; */ public void sortAndRetain() { - Collections.sort(counterList, this.descending ? DESC_COMPARATOR : ASC_COMPARATOR); + sort(); retain(capacity); - ordered = true; } public List<Counter<T>> topK(int k) { @@ -143,7 +154,7 @@ public class TopNCounter<T> implements Iterable<Counter<T>>, java.io.Serializabl * @param item * @param count */ - public void offerToHead(T item, double count) { + public void offerToHead(T item, Double count) { Counter<T> c = new Counter<T>(item, count); counterList.addFirst(c); counterMap.put(c.item, c); @@ -160,26 +171,20 @@ public class TopNCounter<T> implements Iterable<Counter<T>>, java.io.Serializabl double m1 = thisFull ? this.counterList.getLast().count : 0.0; double m2 = anotherFull ? another.counterList.getLast().count : 0.0; - if (anotherFull == true) { + if (anotherFull) { for (Counter<T> entry : this.counterMap.values()) { entry.count += m2; } } for (Map.Entry<T, Counter<T>> entry : another.counterMap.entrySet()) { - Counter<T> counter = this.counterMap.get(entry.getKey()); - if (counter != null) { - // this.offer(entry.getValue().getItem(), (entry.getValue().count - m2)); - counter.setCount(counter.getCount() + (entry.getValue().count - m2)); + if (this.counterMap.containsKey(entry.getKey())) { + this.offer(entry.getValue().getItem(), (entry.getValue().count - m2)); } else { - // this.offer(entry.getValue().getItem(), entry.getValue().count + m1); - counter = new Counter<T>(entry.getValue().getItem(), entry.getValue().count + m1); - this.counterMap.put(entry.getValue().getItem(), counter); - this.counterList.add(counter); + this.offer(entry.getValue().getItem(), entry.getValue().count + m1); } } this.ordered = false; - this.sortAndRetain(); return this; } @@ -226,33 +231,60 @@ public class TopNCounter<T> implements Iterable<Counter<T>>, java.io.Serializabl public TopNCounter<T> copy() { TopNCounter result = new TopNCounter(capacity); result.counterMap = Maps.newHashMap(counterMap); + result.counterList = Lists.newLinkedList(counterList); return result; } @Override public Iterator<Counter<T>> iterator() { - if (this.descending == true) { + if (this.descending) { return this.counterList.descendingIterator(); } else { throw new IllegalStateException(); // support in future } } - static final Comparator ASC_COMPARATOR = new Comparator<Counter>() { - @Override - public int compare(Counter o1, Counter o2) { - return Double.compare(o1.getCount(), o2.getCount()); + static final Comparator<Counter> ASC_COMPARATOR = (Counter o1, Counter o2) -> { + if (o1.getCount() == null) { + if (o2.getCount() == null) + return 0; + else + return -1; } + if (o2.getCount() == null) { + return 1; + } + return Double.compare(o1.getCount(), o2.getCount()); + }; + static final Comparator<Counter> DESC_COMPARATOR = (Counter o1, Counter o2) -> { + if (o1.getCount() == null) { + if (o2.getCount() == null) + return 0; + else + return 1; + } + if (o2.getCount() == null) { + return -1; + } + return Double.compare(o2.getCount(), o1.getCount()); }; - static final Comparator DESC_COMPARATOR = new Comparator<Counter>() { - @Override - public int compare(Counter o1, Counter o2) { - return Double.compare(o2.getCount(), o1.getCount()); + private void incrementCounter(Counter<T> counterNode, Double incrementCount) { + if (incrementCount == null) { + return; + } + if (counterNode.getCount() == null) { + counterNode.setCount(incrementCount); + } else { + counterNode.setCount(counterNode.getCount() + incrementCount); } + } - }; + private void sort() { + counterList.sort(this.descending ? DESC_COMPARATOR : ASC_COMPARATOR); + ordered = true; + } public void reset() { counterList.clear(); diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java index 518272e..31d7fb4 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java @@ -106,10 +106,10 @@ public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> { } private void validate(String funcName, DataType dataType, boolean checkDataType) { - if (FUNC_TOP_N.equals(funcName) == false) + if (!FUNC_TOP_N.equals(funcName)) throw new IllegalArgumentException(); - if (DATATYPE_TOPN.equals(dataType.getName()) == false) + if (!DATATYPE_TOPN.equals(dataType.getName())) throw new IllegalArgumentException(); if (dataType.getPrecision() < 1 || dataType.getPrecision() > 10000) @@ -186,7 +186,7 @@ public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> { } } - if (needReEncode == false) { + if (!needReEncode) { // no need re-encode return topNCounter; } @@ -312,7 +312,7 @@ public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> { }; } - if (digest.aggregations.size() == 0) { + if (digest.aggregations.isEmpty()) { // directly query the UHC column without sorting boolean b = unmatchedDimensions.removeAll(literalCol); if (b) { @@ -377,7 +377,7 @@ public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> { return false; } - if (sum.isSum() == false) + if (!sum.isSum()) return false; if (sum.getParameter() == null || sum.getParameter().getColRefs() == null @@ -411,18 +411,18 @@ public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> { FunctionDesc topnFunc = measureDesc.getFunction(); List<TblColRef> topnLiteralCol = getTopNLiteralColumn(topnFunc); - if (sqlDigest.groupbyColumns.containsAll(topnLiteralCol) == false) { + if (!sqlDigest.groupbyColumns.containsAll(topnLiteralCol)) { continue; } - if (sqlDigest.aggregations.size() > 0) { + if (!sqlDigest.aggregations.isEmpty()) { FunctionDesc origFunc = sqlDigest.aggregations.iterator().next(); - if (origFunc.isSum() == false && origFunc.isCount() == false) { + if (!origFunc.isSum() && !origFunc.isCount()) { logger.warn("When query with topN, only SUM/Count function is allowed."); return; } - if (isTopNCompatibleSum(measureDesc.getFunction(), origFunc) == false) { + if (!isTopNCompatibleSum(measureDesc.getFunction(), origFunc)) { continue; } @@ -549,7 +549,7 @@ public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> { } private TblColRef getTopNNumericColumn(FunctionDesc functionDesc) { - if (functionDesc.getParameter().isColumnType() == true) { + if (functionDesc.getParameter().isColumnType()) { return functionDesc.getParameter().getColRefs().get(0); } return null; @@ -557,7 +557,7 @@ public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> { private List<TblColRef> getTopNLiteralColumn(FunctionDesc functionDesc) { List<TblColRef> allColumns = functionDesc.getParameter().getColRefs(); - if (functionDesc.getParameter().isColumnType() == false) { + if (!functionDesc.getParameter().isColumnType()) { return allColumns; } return allColumns.subList(1, allColumns.size()); diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java index 506ecf3..48bf678 100644 --- a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java +++ b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java @@ -65,7 +65,7 @@ public class TopNCounterBasicTest { TopNCounter<String> vs = new TopNCounter<String>(3); String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", "C", "A", "A" }; for (String i : stream) { - vs.offer(i, 10); + vs.offer(i, 10d); } List<Counter<String>> topK = vs.topK(3); for (Counter<String> c : topK) { @@ -78,7 +78,7 @@ public class TopNCounterBasicTest { TopNCounter<String> vs_increment = new TopNCounter<String>(3); TopNCounter<String> vs_single = new TopNCounter<String>(3); String[] stream = { "A", "B", "C", "D", "A" }; - Integer[] increments = { 15, 20, 25, 30, 1 }; + Double[] increments = { 15d, 20d, 25d, 30d, 1d }; for (int i = 0; i < stream.length; i++) { vs_increment.offer(stream[i], increments[i]); diff --git a/kylin-it/src/test/resources/query/sql_topn/query45.sql.disable b/kylin-it/src/test/resources/query/sql_topn/query45.sql similarity index 95% rename from kylin-it/src/test/resources/query/sql_topn/query45.sql.disable rename to kylin-it/src/test/resources/query/sql_topn/query45.sql index 39f9571..8940c2e 100644 --- a/kylin-it/src/test/resources/query/sql_topn/query45.sql.disable +++ b/kylin-it/src/test/resources/query/sql_topn/query45.sql @@ -20,4 +20,4 @@ select seller_id, sum(price) as s from test_kylin_fact where lstg_format_name='FP-GTC' - group by seller_id + group by seller_id order by s desc limit 10 diff --git a/kylin-it/src/test/resources/query/sql_topn/query81.sql b/kylin-it/src/test/resources/query/sql_topn/query81.sql index 93868e7..1fedef8 100644 --- a/kylin-it/src/test/resources/query/sql_topn/query81.sql +++ b/kylin-it/src/test/resources/query/sql_topn/query81.sql @@ -17,7 +17,7 @@ -- select test_cal_dt.week_beg_dt, sum(price) as GMV - from test_kylin_fact + from test_kylin_fact inner JOIN edw.test_cal_dt as test_cal_dt ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt inner JOIN test_category_groupings @@ -25,4 +25,4 @@ inner JOIN edw.test_cal_dt as test_cal_dt inner JOIN edw.test_sites as test_sites ON test_kylin_fact.lstg_site_id = test_sites.site_id where test_cal_dt.week_beg_dt between DATE '2013-09-01' and DATE '2013-10-01' and (lstg_format_name='FP-GTC' or 'a' = 'b') - group by test_cal_dt.week_beg_dt \ No newline at end of file + group by test_cal_dt.week_beg_dt, test_kylin_fact.seller_id order by GMV desc limit 10 \ No newline at end of file diff --git a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestTopNUDAF.scala b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestTopNUDAF.scala index e799193..ff4f63c 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestTopNUDAF.scala +++ b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestTopNUDAF.scala @@ -27,8 +27,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ class TestTopNUDAF extends SparderBaseFunSuite with SharedSparkSession { - //ignore temporary - ignore("basic") { + test("basic") { val schema = StructType(Array( StructField("rowKey", IntegerType, nullable = true),