This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this 
push:
     new 4b396ab  KYLIN-4751 Fix NPE issue when run test case TestTopNUDAF
4b396ab is described below

commit 4b396ab86152a7f72aecb65e69f87441394d7373
Author: Zhichao Zhang <441586...@qq.com>
AuthorDate: Thu Sep 10 09:51:38 2020 +0800

    KYLIN-4751 Fix NPE issue when run test case TestTopNUDAF
---
 .../org/apache/kylin/measure/topn/Counter.java     | 34 ++++++--
 .../org/apache/kylin/measure/topn/TopNCounter.java | 94 +++++++++++++++-------
 .../apache/kylin/measure/topn/TopNMeasureType.java | 22 ++---
 .../kylin/measure/topn/TopNCounterBasicTest.java   |  4 +-
 .../sql_topn/{query45.sql.disable => query45.sql}  |  2 +-
 .../src/test/resources/query/sql_topn/query81.sql  |  4 +-
 .../kylin/engine/spark/job/TestTopNUDAF.scala      |  3 +-
 7 files changed, 107 insertions(+), 56 deletions(-)

diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
index 219c712..42422df 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
@@ -18,6 +18,10 @@
 
 package org.apache.kylin.measure.topn;
 
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.io.Serializable;
 
 /**
@@ -25,10 +29,10 @@ import java.io.Serializable;
  * 
  * @param <T>
  */
-public class Counter<T> implements Serializable{
+public class Counter<T> implements Externalizable, Serializable {
 
     protected T item;
-    protected double count;
+    protected Double count;
 
     /**
      * For de-serialization
@@ -37,30 +41,46 @@ public class Counter<T> implements Serializable{
     }
 
     public Counter(T item) {
-        this.count = 0;
+        this.count = 0d;
         this.item = item;
     }
 
-    public Counter(T item, double count) {
+    public Counter(T item, Double count) {
         this.item = item;
         this.count = count;
     }
 
-
     public T getItem() {
         return item;
     }
 
-    public double getCount() {
+    public Double getCount() {
         return count;
     }
 
-    public void setCount(double count) {
+    public void setItem(T item) {
+        this.item = item;
+    }
+
+    public void setCount(Double count) {
         this.count = count;
     }
+
     @Override
     public String toString() {
         return item + ":" + count;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        item = (T) in.readObject();
+        count = in.readDouble();
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(item);
+        out.writeDouble(count);
+    }
 }
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
index 979d591..2352bbd 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.measure.topn;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -46,7 +45,7 @@ public class TopNCounter<T> implements Iterable<Counter<T>>, 
java.io.Serializabl
 
     protected int capacity;
     private HashMap<T, Counter<T>> counterMap;
-    protected LinkedList<Counter<T>> counterList; //a linked list, first the 
is the toppest element
+    protected LinkedList<Counter<T>> counterList; //a linked list, first one 
is the toppest element
     private boolean ordered = true;
     private boolean descending = true;
 
@@ -75,17 +74,30 @@ public class TopNCounter<T> implements 
Iterable<Counter<T>>, java.io.Serializabl
      * Algorithm: <i>Space-Saving</i>
      *
      * @param item stream element (<i>e</i>)
-     * @return false if item was already in the stream summary, true otherwise
      */
-    public void offer(T item, double incrementCount) {
+    public void offer(T item, Double incrementCount) {
         Counter<T> counterNode = counterMap.get(item);
+
         if (counterNode == null) {
-            counterNode = new Counter<T>(item, incrementCount);
+            if (size() < capacity) {
+                counterNode = new Counter<>(item, null);
+                if (this.descending) {
+                    counterList.addLast(counterNode);
+                } else {
+                    counterList.addFirst(counterNode);
+                }
+            } else {
+                // the min item should be dropped
+                if (!ordered) {
+                    sort();
+                }
+                counterNode = this.descending ? counterList.getLast() : 
counterList.getFirst();
+                counterMap.remove(counterNode.getItem());
+                counterNode.setItem(item);
+            }
             counterMap.put(item, counterNode);
-            counterList.add(counterNode);
-        } else {
-            counterNode.setCount(counterNode.getCount() + incrementCount);
         }
+        incrementCounter(counterNode, incrementCount);
         ordered = false;
     }
 
@@ -93,9 +105,8 @@ public class TopNCounter<T> implements Iterable<Counter<T>>, 
java.io.Serializabl
      * Sort and keep the expected size;
      */
     public void sortAndRetain() {
-        Collections.sort(counterList, this.descending ? DESC_COMPARATOR : 
ASC_COMPARATOR);
+        sort();
         retain(capacity);
-        ordered = true;
     }
 
     public List<Counter<T>> topK(int k) {
@@ -143,7 +154,7 @@ public class TopNCounter<T> implements 
Iterable<Counter<T>>, java.io.Serializabl
      * @param item
      * @param count
      */
-    public void offerToHead(T item, double count) {
+    public void offerToHead(T item, Double count) {
         Counter<T> c = new Counter<T>(item, count);
         counterList.addFirst(c);
         counterMap.put(c.item, c);
@@ -160,26 +171,20 @@ public class TopNCounter<T> implements 
Iterable<Counter<T>>, java.io.Serializabl
         double m1 = thisFull ? this.counterList.getLast().count : 0.0;
         double m2 = anotherFull ? another.counterList.getLast().count : 0.0;
 
-        if (anotherFull == true) {
+        if (anotherFull) {
             for (Counter<T> entry : this.counterMap.values()) {
                 entry.count += m2;
             }
         }
 
         for (Map.Entry<T, Counter<T>> entry : another.counterMap.entrySet()) {
-            Counter<T> counter = this.counterMap.get(entry.getKey());
-            if (counter != null) {
-                //                this.offer(entry.getValue().getItem(), 
(entry.getValue().count - m2));
-                counter.setCount(counter.getCount() + (entry.getValue().count 
- m2));
+            if (this.counterMap.containsKey(entry.getKey())) {
+                this.offer(entry.getValue().getItem(), (entry.getValue().count 
- m2));
             } else {
-                //                this.offer(entry.getValue().getItem(), 
entry.getValue().count + m1);
-                counter = new Counter<T>(entry.getValue().getItem(), 
entry.getValue().count + m1);
-                this.counterMap.put(entry.getValue().getItem(), counter);
-                this.counterList.add(counter);
+                this.offer(entry.getValue().getItem(), entry.getValue().count 
+ m1);
             }
         }
         this.ordered = false;
-
         this.sortAndRetain();
         return this;
     }
@@ -226,33 +231,60 @@ public class TopNCounter<T> implements 
Iterable<Counter<T>>, java.io.Serializabl
     public TopNCounter<T> copy() {
         TopNCounter result = new TopNCounter(capacity);
         result.counterMap = Maps.newHashMap(counterMap);
+        result.counterList = Lists.newLinkedList(counterList);
         return result;
     }
 
     @Override
     public Iterator<Counter<T>> iterator() {
-        if (this.descending == true) {
+        if (this.descending) {
             return this.counterList.descendingIterator();
         } else {
             throw new IllegalStateException(); // support in future
         }
     }
 
-    static final Comparator ASC_COMPARATOR = new Comparator<Counter>() {
-        @Override
-        public int compare(Counter o1, Counter o2) {
-            return Double.compare(o1.getCount(), o2.getCount());
+    static final Comparator<Counter> ASC_COMPARATOR = (Counter o1, Counter o2) 
-> {
+        if (o1.getCount() == null) {
+            if (o2.getCount() == null)
+                return 0;
+            else
+                return -1;
         }
+        if (o2.getCount() == null) {
+            return 1;
+        }
+        return Double.compare(o1.getCount(), o2.getCount());
+    };
 
+    static final Comparator<Counter> DESC_COMPARATOR = (Counter o1, Counter 
o2) -> {
+        if (o1.getCount() == null) {
+            if (o2.getCount() == null)
+                return 0;
+            else
+                return 1;
+        }
+        if (o2.getCount() == null) {
+            return -1;
+        }
+        return Double.compare(o2.getCount(), o1.getCount());
     };
 
-    static final Comparator DESC_COMPARATOR = new Comparator<Counter>() {
-        @Override
-        public int compare(Counter o1, Counter o2) {
-            return Double.compare(o2.getCount(), o1.getCount());
+    private void incrementCounter(Counter<T> counterNode, Double 
incrementCount) {
+        if (incrementCount == null) {
+            return;
+        }
+        if (counterNode.getCount() == null) {
+            counterNode.setCount(incrementCount);
+        } else {
+            counterNode.setCount(counterNode.getCount() + incrementCount);
         }
+    }
 
-    };
+    private void sort() {
+        counterList.sort(this.descending ? DESC_COMPARATOR : ASC_COMPARATOR);
+        ordered = true;
+    }
 
     public void reset() {
         counterList.clear();
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index 518272e..31d7fb4 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -106,10 +106,10 @@ public class TopNMeasureType extends 
MeasureType<TopNCounter<ByteArray>> {
     }
 
     private void validate(String funcName, DataType dataType, boolean 
checkDataType) {
-        if (FUNC_TOP_N.equals(funcName) == false)
+        if (!FUNC_TOP_N.equals(funcName))
             throw new IllegalArgumentException();
 
-        if (DATATYPE_TOPN.equals(dataType.getName()) == false)
+        if (!DATATYPE_TOPN.equals(dataType.getName()))
             throw new IllegalArgumentException();
 
         if (dataType.getPrecision() < 1 || dataType.getPrecision() > 10000)
@@ -186,7 +186,7 @@ public class TopNMeasureType extends 
MeasureType<TopNCounter<ByteArray>> {
                     }
                 }
 
-                if (needReEncode == false) {
+                if (!needReEncode) {
                     // no need re-encode
                     return topNCounter;
                 }
@@ -312,7 +312,7 @@ public class TopNMeasureType extends 
MeasureType<TopNCounter<ByteArray>> {
             };
         }
 
-        if (digest.aggregations.size() == 0) {
+        if (digest.aggregations.isEmpty()) {
             // directly query the UHC column without sorting
             boolean b = unmatchedDimensions.removeAll(literalCol);
             if (b) {
@@ -377,7 +377,7 @@ public class TopNMeasureType extends 
MeasureType<TopNCounter<ByteArray>> {
             return false;
         }
 
-        if (sum.isSum() == false)
+        if (!sum.isSum())
             return false;
 
         if (sum.getParameter() == null || sum.getParameter().getColRefs() == 
null
@@ -411,18 +411,18 @@ public class TopNMeasureType extends 
MeasureType<TopNCounter<ByteArray>> {
             FunctionDesc topnFunc = measureDesc.getFunction();
             List<TblColRef> topnLiteralCol = getTopNLiteralColumn(topnFunc);
 
-            if (sqlDigest.groupbyColumns.containsAll(topnLiteralCol) == false) 
{
+            if (!sqlDigest.groupbyColumns.containsAll(topnLiteralCol)) {
                 continue;
             }
 
-            if (sqlDigest.aggregations.size() > 0) {
+            if (!sqlDigest.aggregations.isEmpty()) {
                 FunctionDesc origFunc = 
sqlDigest.aggregations.iterator().next();
-                if (origFunc.isSum() == false && origFunc.isCount() == false) {
+                if (!origFunc.isSum() && !origFunc.isCount()) {
                     logger.warn("When query with topN, only SUM/Count function 
is allowed.");
                     return;
                 }
 
-                if (isTopNCompatibleSum(measureDesc.getFunction(), origFunc) 
== false) {
+                if (!isTopNCompatibleSum(measureDesc.getFunction(), origFunc)) 
{
                     continue;
                 }
 
@@ -549,7 +549,7 @@ public class TopNMeasureType extends 
MeasureType<TopNCounter<ByteArray>> {
     }
 
     private TblColRef getTopNNumericColumn(FunctionDesc functionDesc) {
-        if (functionDesc.getParameter().isColumnType() == true) {
+        if (functionDesc.getParameter().isColumnType()) {
             return functionDesc.getParameter().getColRefs().get(0);
         }
         return null;
@@ -557,7 +557,7 @@ public class TopNMeasureType extends 
MeasureType<TopNCounter<ByteArray>> {
 
     private List<TblColRef> getTopNLiteralColumn(FunctionDesc functionDesc) {
         List<TblColRef> allColumns = functionDesc.getParameter().getColRefs();
-        if (functionDesc.getParameter().isColumnType() == false) {
+        if (!functionDesc.getParameter().isColumnType()) {
             return allColumns;
         }
         return allColumns.subList(1, allColumns.size());
diff --git 
a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
 
b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
index 506ecf3..48bf678 100644
--- 
a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
+++ 
b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
@@ -65,7 +65,7 @@ public class TopNCounterBasicTest {
         TopNCounter<String> vs = new TopNCounter<String>(3);
         String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", 
"C", "A", "A" };
         for (String i : stream) {
-            vs.offer(i, 10);
+            vs.offer(i, 10d);
         }
         List<Counter<String>> topK = vs.topK(3);
         for (Counter<String> c : topK) {
@@ -78,7 +78,7 @@ public class TopNCounterBasicTest {
         TopNCounter<String> vs_increment = new TopNCounter<String>(3);
         TopNCounter<String> vs_single = new TopNCounter<String>(3);
         String[] stream = { "A", "B", "C", "D", "A" };
-        Integer[] increments = { 15, 20, 25, 30, 1 };
+        Double[] increments = { 15d, 20d, 25d, 30d, 1d };
 
         for (int i = 0; i < stream.length; i++) {
             vs_increment.offer(stream[i], increments[i]);
diff --git a/kylin-it/src/test/resources/query/sql_topn/query45.sql.disable 
b/kylin-it/src/test/resources/query/sql_topn/query45.sql
similarity index 95%
rename from kylin-it/src/test/resources/query/sql_topn/query45.sql.disable
rename to kylin-it/src/test/resources/query/sql_topn/query45.sql
index 39f9571..8940c2e 100644
--- a/kylin-it/src/test/resources/query/sql_topn/query45.sql.disable
+++ b/kylin-it/src/test/resources/query/sql_topn/query45.sql
@@ -20,4 +20,4 @@
 
 select seller_id, sum(price) as s from test_kylin_fact
   where lstg_format_name='FP-GTC' 
-  group by seller_id
+  group by seller_id order by s desc limit 10
diff --git a/kylin-it/src/test/resources/query/sql_topn/query81.sql 
b/kylin-it/src/test/resources/query/sql_topn/query81.sql
index 93868e7..1fedef8 100644
--- a/kylin-it/src/test/resources/query/sql_topn/query81.sql
+++ b/kylin-it/src/test/resources/query/sql_topn/query81.sql
@@ -17,7 +17,7 @@
 --
 
 select test_cal_dt.week_beg_dt, sum(price) as GMV
- from test_kylin_fact 
+ from test_kylin_fact
 inner JOIN edw.test_cal_dt as test_cal_dt
  ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
  inner JOIN test_category_groupings
@@ -25,4 +25,4 @@ inner JOIN edw.test_cal_dt as test_cal_dt
  inner JOIN edw.test_sites as test_sites
  ON test_kylin_fact.lstg_site_id = test_sites.site_id
  where test_cal_dt.week_beg_dt between DATE '2013-09-01' and DATE '2013-10-01' 
and (lstg_format_name='FP-GTC' or 'a' = 'b')
- group by test_cal_dt.week_beg_dt
\ No newline at end of file
+ group by test_cal_dt.week_beg_dt, test_kylin_fact.seller_id order by GMV desc 
limit 10
\ No newline at end of file
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestTopNUDAF.scala
 
b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestTopNUDAF.scala
index e799193..ff4f63c 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestTopNUDAF.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestTopNUDAF.scala
@@ -27,8 +27,7 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 
 class TestTopNUDAF extends SparderBaseFunSuite with SharedSparkSession {
-  //ignore temporary
-  ignore("basic") {
+  test("basic") {
 
     val schema = StructType(Array(
         StructField("rowKey", IntegerType, nullable = true),

Reply via email to