Repository: hive
Updated Branches:
  refs/heads/master e1e68b29a -> 15bdce43d


HIVE-13453: Support ORDER BY and windowing clause in partitioning clause with 
distinct function (Reviewed by Yongzhi Chen)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/15bdce43
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/15bdce43
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/15bdce43

Branch: refs/heads/master
Commit: 15bdce43db4624a63be1f648e46d1f2baa1c67de
Parents: e1e68b2
Author: Aihua Xu <aihu...@apache.org>
Authored: Fri May 6 11:00:20 2016 -0400
Committer: Aihua Xu <aihu...@apache.org>
Committed: Sat May 28 20:36:59 2016 -0400

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/FunctionRegistry.java   |   2 +-
 .../apache/hadoop/hive/ql/exec/Registry.java    |   8 +-
 .../hadoop/hive/ql/parse/WindowingSpec.java     |  14 --
 .../hive/ql/plan/ptf/WindowFunctionDef.java     |   2 +-
 .../hive/ql/udf/generic/GenericUDAFAverage.java |  68 ++++++++--
 .../hive/ql/udf/generic/GenericUDAFCount.java   |  57 +++++---
 .../udf/generic/GenericUDAFParameterInfo.java   |   7 +
 .../hive/ql/udf/generic/GenericUDAFSum.java     | 134 +++++++++++++------
 .../generic/SimpleGenericUDAFParameterInfo.java |   9 +-
 .../hive/ql/udf/ptf/WindowingTableFunction.java |   9 +-
 .../queries/clientpositive/windowing_distinct.q |  18 +++
 .../clientpositive/windowing_distinct.q.out     |  66 +++++++++
 .../objectinspector/ObjectInspectorUtils.java   |  38 ++++++
 13 files changed, 333 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
index fa90242..8217ad3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
@@ -902,7 +902,7 @@ public final class FunctionRegistry {
 
     GenericUDAFParameterInfo paramInfo =
         new SimpleGenericUDAFParameterInfo(
-            args, isDistinct, isAllColumns);
+            args, false, isDistinct, isAllColumns);
 
     GenericUDAFEvaluator udafEvaluator;
     if (udafResolver instanceof GenericUDAFResolver2) {

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
index 891514b..86df74d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
@@ -395,7 +395,7 @@ public class Registry {
    */
   @SuppressWarnings("deprecation")
   public GenericUDAFEvaluator getGenericUDAFEvaluator(String name,
-      List<ObjectInspector> argumentOIs, boolean isDistinct,
+      List<ObjectInspector> argumentOIs, boolean isWindowing, boolean 
isDistinct,
       boolean isAllColumns) throws SemanticException {
 
     GenericUDAFResolver udafResolver = getGenericUDAFResolver(name);
@@ -413,7 +413,7 @@ public class Registry {
 
     GenericUDAFParameterInfo paramInfo =
         new SimpleGenericUDAFParameterInfo(
-            args, isDistinct, isAllColumns);
+            args, isWindowing, isDistinct, isAllColumns);
     if (udafResolver instanceof GenericUDAFResolver2) {
       udafEvaluator =
           ((GenericUDAFResolver2) udafResolver).getEvaluator(paramInfo);
@@ -433,14 +433,14 @@ public class Registry {
     }
     if (!functionName.equals(FunctionRegistry.LEAD_FUNC_NAME) &&
         !functionName.equals(FunctionRegistry.LAG_FUNC_NAME)) {
-      return getGenericUDAFEvaluator(functionName, argumentOIs, isDistinct, 
isAllColumns);
+      return getGenericUDAFEvaluator(functionName, argumentOIs, true, 
isDistinct, isAllColumns);
     }
 
     // this must be lead/lag UDAF
     ObjectInspector args[] = new ObjectInspector[argumentOIs.size()];
     GenericUDAFResolver udafResolver = info.getGenericUDAFResolver();
     GenericUDAFParameterInfo paramInfo = new SimpleGenericUDAFParameterInfo(
-        argumentOIs.toArray(args), isDistinct, isAllColumns);
+        argumentOIs.toArray(args), true, isDistinct, isAllColumns);
     return ((GenericUDAFResolver2) udafResolver).getEvaluator(paramInfo);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
index 5ce7200..ef5186a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
@@ -124,9 +124,6 @@ public class WindowingSpec {
       WindowFunctionSpec wFn = (WindowFunctionSpec) expr;
       WindowSpec wdwSpec = wFn.getWindowSpec();
 
-      // 0. Precheck supported syntax
-      precheckSyntax(wFn, wdwSpec);
-
       // 1. For Wdw Specs that refer to Window Defns, inherit missing 
components
       if ( wdwSpec != null ) {
         ArrayList<String> sources = new ArrayList<String>();
@@ -153,14 +150,6 @@ public class WindowingSpec {
     }
   }
 
-  private void precheckSyntax(WindowFunctionSpec wFn, WindowSpec wdwSpec) 
throws SemanticException {
-    if (wdwSpec != null ) {
-      if (wFn.isDistinct && (wdwSpec.windowFrame != null || wdwSpec.getOrder() 
!= null) ) {
-        throw new SemanticException("Function with DISTINCT cannot work with 
partition ORDER BY or windowing clause.");
-      }
-    }
-  }
-
   private void fillInWindowSpec(String sourceId, WindowSpec dest, 
ArrayList<String> visited)
       throws SemanticException
   {
@@ -509,9 +498,6 @@ public class WindowingSpec {
       if ( getOrder() == null ) {
         OrderSpec order = new OrderSpec();
         order.prefixBy(getPartition());
-        if (wFn.isDistinct) {
-          order.addExpressions(wFn.getArgs());
-        }
         setOrder(order);
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java
index ed6c671..84ac614 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ptf/WindowFunctionDef.java
@@ -124,4 +124,4 @@ public class WindowFunctionDef extends WindowExpressionDef {
     this.pivotResult = pivotResult;
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
index 3c1ce26..6799978 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFAverage.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.udf.generic;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +39,7 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorObject;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
@@ -106,6 +108,7 @@ public class GenericUDAFAverage extends 
AbstractGenericUDAFResolver {
     AbstractGenericUDAFAverageEvaluator eval =
         (AbstractGenericUDAFAverageEvaluator) 
getEvaluator(paramInfo.getParameters());
     eval.avgDistinct = paramInfo.isDistinct();
+    eval.isWindowing = paramInfo.isWindowing();
     return eval;
   }
 
@@ -115,7 +118,7 @@ public class GenericUDAFAverage extends 
AbstractGenericUDAFResolver {
     public void doReset(AverageAggregationBuffer<Double> aggregation) throws 
HiveException {
       aggregation.count = 0;
       aggregation.sum = new Double(0);
-      aggregation.previousValue = null;
+      aggregation.uniqueObjects = new HashSet<ObjectInspectorObject>();
     }
 
     @Override
@@ -145,6 +148,12 @@ public class GenericUDAFAverage extends 
AbstractGenericUDAFResolver {
     }
 
     @Override
+    protected void doMergeAdd(Double sum,
+        ObjectInspectorObject obj) {
+      sum += PrimitiveObjectInspectorUtils.getDouble(obj.getValues()[0], 
copiedOI);
+    }
+
+    @Override
     protected void doTerminatePartial(AverageAggregationBuffer<Double> 
aggregation) {
       if(partialResult[1] == null) {
         partialResult[1] = new DoubleWritable(0);
@@ -172,6 +181,10 @@ public class GenericUDAFAverage extends 
AbstractGenericUDAFResolver {
 
     @Override
     public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef 
wFrameDef) {
+      // Don't use streaming for distinct cases
+      if (isWindowingDistinct()) {
+        return null;
+      }
 
       return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<DoubleWritable, 
Object[]>(this, wFrameDef) {
 
@@ -212,6 +225,7 @@ public class GenericUDAFAverage extends 
AbstractGenericUDAFResolver {
     public void doReset(AverageAggregationBuffer<HiveDecimal> aggregation) 
throws HiveException {
       aggregation.count = 0;
       aggregation.sum = HiveDecimal.ZERO;
+      aggregation.uniqueObjects = new HashSet<ObjectInspectorObject>();
     }
 
     @Override
@@ -263,6 +277,14 @@ public class GenericUDAFAverage extends 
AbstractGenericUDAFResolver {
       }
     }
 
+
+    @Override
+    protected void doMergeAdd(
+        HiveDecimal sum,
+        ObjectInspectorObject obj) {
+      sum.add(PrimitiveObjectInspectorUtils.getHiveDecimal(obj.getValues()[0], 
copiedOI));
+    }
+
     @Override
     protected void doTerminatePartial(AverageAggregationBuffer<HiveDecimal> 
aggregation) {
       if(partialResult[1] == null && aggregation.sum != null) {
@@ -296,6 +318,10 @@ public class GenericUDAFAverage extends 
AbstractGenericUDAFResolver {
 
     @Override
     public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef 
wFrameDef) {
+      // Don't use streaming for distinct cases
+      if (isWindowingDistinct()) {
+        return null;
+      }
 
       return new 
GenericUDAFStreamingEvaluator.SumAvgEnhancer<HiveDecimalWritable, Object[]>(
           this, wFrameDef) {
@@ -333,18 +359,18 @@ public class GenericUDAFAverage extends 
AbstractGenericUDAFResolver {
   }
 
   private static class AverageAggregationBuffer<TYPE> implements 
AggregationBuffer {
-    private Object previousValue;
+    private HashSet<ObjectInspectorObject> uniqueObjects; // Unique rows.
     private long count;
     private TYPE sum;
   };
 
   @SuppressWarnings("unchecked")
   public static abstract class AbstractGenericUDAFAverageEvaluator<TYPE> 
extends GenericUDAFEvaluator {
+    protected boolean isWindowing;
     protected boolean avgDistinct;
-
     // For PARTIAL1 and COMPLETE
     protected transient PrimitiveObjectInspector inputOI;
-    protected transient ObjectInspector copiedOI;
+    protected transient PrimitiveObjectInspector copiedOI;
     // For PARTIAL2 and FINAL
     private transient StructObjectInspector soi;
     private transient StructField countField;
@@ -363,6 +389,7 @@ public class GenericUDAFAverage extends 
AbstractGenericUDAFResolver {
         PrimitiveObjectInspector inputOI, Object parameter);
     protected abstract void doMerge(AverageAggregationBuffer<TYPE> 
aggregation, Long partialCount,
         ObjectInspector sumFieldOI, Object partialSum);
+    protected abstract void doMergeAdd(TYPE sum, ObjectInspectorObject obj);
     protected abstract void doTerminatePartial(AverageAggregationBuffer<TYPE> 
aggregation);
     protected abstract Object doTerminate(AverageAggregationBuffer<TYPE> 
aggregation);
     protected abstract void doReset(AverageAggregationBuffer<TYPE> 
aggregation) throws HiveException;
@@ -376,7 +403,7 @@ public class GenericUDAFAverage extends 
AbstractGenericUDAFResolver {
       // init input
       if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {
         inputOI = (PrimitiveObjectInspector) parameters[0];
-        copiedOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
+        copiedOI = 
(PrimitiveObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(inputOI,
             ObjectInspectorCopyOption.JAVA);
       } else {
         soi = (StructObjectInspector) parameters[0];
@@ -410,6 +437,10 @@ public class GenericUDAFAverage extends 
AbstractGenericUDAFResolver {
       }
     }
 
+    protected boolean isWindowingDistinct() {
+      return isWindowing && avgDistinct;
+    }
+
     @AggregationType(estimable = true)
     static class AverageAgg extends AbstractAggregationBuffer {
       long count;
@@ -432,12 +463,15 @@ public class GenericUDAFAverage extends 
AbstractGenericUDAFResolver {
         AverageAggregationBuffer<TYPE> averageAggregation = 
(AverageAggregationBuffer<TYPE>) aggregation;
         try {
           // Skip the same value if avgDistinct is true
-          if (this.avgDistinct &&
-              ObjectInspectorUtils.compare(parameter, inputOI, 
averageAggregation.previousValue, copiedOI) == 0) {
-            return;
+          if (isWindowingDistinct()) {
+            ObjectInspectorObject obj = new ObjectInspectorObject(
+                ObjectInspectorUtils.copyToStandardObject(parameter, inputOI, 
ObjectInspectorCopyOption.JAVA),
+                copiedOI);
+            if (averageAggregation.uniqueObjects.contains(obj)) {
+              return;
+            }
+            averageAggregation.uniqueObjects.add(obj);
           }
-          averageAggregation.previousValue = 
ObjectInspectorUtils.copyToStandardObject(
-              parameter, inputOI, ObjectInspectorCopyOption.JAVA);
 
           doIterate(averageAggregation, inputOI, parameter);
         } catch (NumberFormatException e) {
@@ -451,6 +485,10 @@ public class GenericUDAFAverage extends 
AbstractGenericUDAFResolver {
 
     @Override
     public Object terminatePartial(AggregationBuffer aggregation) throws 
HiveException {
+      if (isWindowingDistinct()) {
+        throw new HiveException("Distinct windowing UDAF doesn't support merge 
and terminatePartial");
+      }
+
       doTerminatePartial((AverageAggregationBuffer<TYPE>) aggregation);
       return partialResult;
     }
@@ -459,9 +497,13 @@ public class GenericUDAFAverage extends 
AbstractGenericUDAFResolver {
     public void merge(AggregationBuffer aggregation, Object partial)
         throws HiveException {
       if (partial != null) {
-        doMerge((AverageAggregationBuffer<TYPE>)aggregation,
-            countFieldOI.get(soi.getStructFieldData(partial, countField)),
-            sumFieldOI, soi.getStructFieldData(partial, sumField));
+        if (isWindowingDistinct()) {
+          throw new HiveException("Distinct windowing UDAF doesn't support 
merge and terminatePartial");
+        } else {
+          doMerge((AverageAggregationBuffer<TYPE>)aggregation,
+              countFieldOI.get(soi.getStructFieldData(partial, countField)),
+              sumFieldOI, soi.getStructFieldData(partial, sumField));
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
index 2825045..d1d0131 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hive.ql.udf.generic;
 
+import java.util.HashSet;
+
 import org.apache.hadoop.hive.ql.exec.Description;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -25,6 +27,7 @@ import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorObject;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -70,6 +73,7 @@ public class GenericUDAFCount implements GenericUDAFResolver2 
{
     }
 
     GenericUDAFCountEvaluator countEvaluator = new GenericUDAFCountEvaluator();
+    countEvaluator.setWindowing(paramInfo.isWindowing());
     countEvaluator.setCountAllColumns(paramInfo.isAllColumns());
     countEvaluator.setCountDistinct(paramInfo.isDistinct());
 
@@ -81,6 +85,7 @@ public class GenericUDAFCount implements GenericUDAFResolver2 
{
    *
    */
   public static class GenericUDAFCountEvaluator extends GenericUDAFEvaluator {
+    private boolean isWindowing = false;
     private boolean countAllColumns = false;
     private boolean countDistinct = false;
     private LongObjectInspector partialCountAggOI;
@@ -99,9 +104,14 @@ public class GenericUDAFCount implements 
GenericUDAFResolver2 {
             ObjectInspectorCopyOption.JAVA);
       }
       result = new LongWritable(0);
+
       return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
     }
 
+    public void setWindowing(boolean isWindowing) {
+      this.isWindowing = isWindowing;
+    }
+
     private void setCountAllColumns(boolean countAllCols) {
       countAllColumns = countAllCols;
     }
@@ -110,10 +120,14 @@ public class GenericUDAFCount implements 
GenericUDAFResolver2 {
       this.countDistinct = countDistinct;
     }
 
+    private boolean isWindowingDistinct() {
+      return isWindowing && countDistinct;
+    }
+
     /** class for storing count value. */
     @AggregationType(estimable = true)
     static class CountAgg extends AbstractAggregationBuffer {
-      Object[] prevColumns = null;    // Column values from previous row. Used 
to compare with current row for the case of COUNT(DISTINCT).
+      HashSet<ObjectInspectorObject> uniqueObjects; // Unique rows
       long value;
       @Override
       public int estimate() { return JavaDataModel.PRIMITIVES2; }
@@ -128,8 +142,8 @@ public class GenericUDAFCount implements 
GenericUDAFResolver2 {
 
     @Override
     public void reset(AggregationBuffer agg) throws HiveException {
-      ((CountAgg) agg).prevColumns = null;
       ((CountAgg) agg).value = 0;
+      ((CountAgg) agg).uniqueObjects = new HashSet<ObjectInspectorObject>();
     }
 
     @Override
@@ -151,19 +165,16 @@ public class GenericUDAFCount implements 
GenericUDAFResolver2 {
           }
         }
 
-        // Skip the counting if the values are the same for COUNT(DISTINCT) 
case
-        if (countThisRow && countDistinct) {
-          Object[] prevColumns = ((CountAgg) agg).prevColumns;
-          if (prevColumns == null) {
-            ((CountAgg) agg).prevColumns = new Object[parameters.length];
-          } else if (ObjectInspectorUtils.compare(parameters, inputOI, 
prevColumns, outputOI) == 0) {
-             countThisRow = false;
-          }
-
-          // We need to keep a copy of values from previous row.
-          if (countThisRow) {
-            ((CountAgg) agg).prevColumns = 
ObjectInspectorUtils.copyToStandardObject(
-                  parameters, inputOI, ObjectInspectorCopyOption.JAVA);
+        // Skip the counting if the values are the same for windowing 
COUNT(DISTINCT) case
+        if (countThisRow && isWindowingDistinct()) {
+          HashSet<ObjectInspectorObject> uniqueObjs = ((CountAgg) 
agg).uniqueObjects;
+          ObjectInspectorObject obj = new ObjectInspectorObject(
+              ObjectInspectorUtils.copyToStandardObject(parameters, inputOI, 
ObjectInspectorCopyOption.JAVA),
+              outputOI);
+          if (!uniqueObjs.contains(obj)) {
+            uniqueObjs.add(obj);
+          } else {
+            countThisRow = false;
           }
         }
 
@@ -177,8 +188,14 @@ public class GenericUDAFCount implements 
GenericUDAFResolver2 {
     public void merge(AggregationBuffer agg, Object partial)
       throws HiveException {
       if (partial != null) {
-        long p = partialCountAggOI.get(partial);
-        ((CountAgg) agg).value += p;
+        CountAgg countAgg = (CountAgg) agg;
+
+        if (isWindowingDistinct()) {
+          throw new HiveException("Distinct windowing UDAF doesn't support 
merge and terminatePartial");
+        } else {
+          long p = partialCountAggOI.get(partial);
+          countAgg.value += p;
+        }
       }
     }
 
@@ -190,7 +207,11 @@ public class GenericUDAFCount implements 
GenericUDAFResolver2 {
 
     @Override
     public Object terminatePartial(AggregationBuffer agg) throws HiveException 
{
-      return terminate(agg);
+      if (isWindowingDistinct()) {
+        throw new HiveException("Distinct windowing UDAF doesn't support merge 
and terminatePartial");
+      } else {
+        return terminate(agg);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java
index 6a62d7c..675d9f3 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFParameterInfo.java
@@ -67,6 +67,13 @@ public interface GenericUDAFParameterInfo {
   boolean isDistinct();
 
   /**
+   * The flag to indicate if the UDAF invocation was from the windowing 
function
+   * call or not.
+   * @return <tt>true</tt> if the UDAF invocation was from the windowing 
function
+   * call.
+   */
+  boolean isWindowing();
+  /**
    * Returns <tt>true</tt> if the UDAF invocation was done via the wildcard
    * syntax <tt>FUNCTION(*)</tt>. Note that this is provided for informational
    * purposes only and the function implementation is not expected to ensure

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
index 7b1d6e5..f53554c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFSum.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hive.ql.udf.generic;
 
+import java.util.HashSet;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
@@ -32,6 +34,7 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorObject;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
@@ -39,6 +42,7 @@ import 
org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -93,6 +97,7 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
     TypeInfo[] parameters = info.getParameters();
 
     GenericUDAFSumEvaluator eval = (GenericUDAFSumEvaluator) 
getEvaluator(parameters);
+    eval.setWindowing(info.isWindowing());
     eval.setSumDistinct(info.isDistinct());
 
     return eval;
@@ -125,44 +130,69 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
    * The base type for sum operator evaluator
    *
    */
-  public static abstract class GenericUDAFSumEvaluator<ResultType> extends 
GenericUDAFEvaluator {
+  public static abstract class GenericUDAFSumEvaluator<ResultType extends 
Writable> extends GenericUDAFEvaluator {
     static abstract class SumAgg<T> extends AbstractAggregationBuffer {
       boolean empty;
       T sum;
-      Object previousValue = null;
+      HashSet<ObjectInspectorObject> uniqueObjects; // Unique rows.
     }
 
     protected PrimitiveObjectInspector inputOI;
-    protected ObjectInspector outputOI;
+    protected PrimitiveObjectInspector outputOI;
     protected ResultType result;
+    protected boolean isWindowing;
     protected boolean sumDistinct;
 
-    public boolean sumDistinct() {
-      return sumDistinct;
+    public void setWindowing(boolean isWindowing) {
+      this.isWindowing = isWindowing;
     }
 
     public void setSumDistinct(boolean sumDistinct) {
       this.sumDistinct = sumDistinct;
     }
 
+    protected boolean isWindowingDistinct() {
+      return isWindowing && sumDistinct;
+    }
+
+    @Override
+    public Object terminatePartial(AggregationBuffer agg) throws HiveException 
{
+      if (isWindowingDistinct()) {
+        throw new HiveException("Distinct windowing UDAF doesn't support merge 
and terminatePartial");
+      } else {
+        return terminate(agg);
+      }
+    }
+
     /**
-     * Check if the input object is the same as the previous one for the case 
of
-     * SUM(DISTINCT).
+     * Check if the input object is eligible to contribute to the sum. If it's 
null
+     * or the same value as the previous one for the case of SUM(DISTINCT). 
Then
+     * skip it.
      * @param input the input object
-     * @return True if sumDistinct is false or the input is different from the 
previous object
+     * @return True if sumDistinct is false or the non-null input is different 
from the previous object
      */
-    protected boolean checkDistinct(SumAgg agg, Object input) {
-      if (this.sumDistinct &&
-          ObjectInspectorUtils.compare(input, inputOI, agg.previousValue, 
outputOI) == 0) {
+    protected boolean isEligibleValue(SumAgg agg, Object input) {
+      if (input == null) {
         return false;
       }
 
-      agg.previousValue = ObjectInspectorUtils.copyToStandardObject(
-          input, inputOI, ObjectInspectorCopyOption.JAVA);
-      return true;
-    }
+      if (isWindowingDistinct()) {
+        HashSet<ObjectInspectorObject> uniqueObjs = agg.uniqueObjects;
+        ObjectInspectorObject obj = input instanceof ObjectInspectorObject ?
+            (ObjectInspectorObject)input :
+            new ObjectInspectorObject(
+            ObjectInspectorUtils.copyToStandardObject(input, inputOI, 
ObjectInspectorCopyOption.JAVA),
+            outputOI);
+        if (!uniqueObjs.contains(obj)) {
+          uniqueObjs.add(obj);
+          return true;
+        }
 
+        return false;
+      }
 
+      return true;
+    }
   }
 
   /**
@@ -177,7 +207,7 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
       super.init(m, parameters);
       result = new HiveDecimalWritable(HiveDecimal.ZERO);
       inputOI = (PrimitiveObjectInspector) parameters[0];
-      outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
+      outputOI = (PrimitiveObjectInspector) 
ObjectInspectorUtils.getStandardObjectInspector(inputOI,
           ObjectInspectorCopyOption.JAVA);
       // The output precision is 10 greater than the input which should cover 
at least
       // 10b rows. The scale is the same as the input.
@@ -208,6 +238,7 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
       SumAgg<HiveDecimal> bdAgg = (SumAgg<HiveDecimal>) agg;
       bdAgg.empty = true;
       bdAgg.sum = HiveDecimal.ZERO;
+      bdAgg.uniqueObjects = new HashSet<ObjectInspectorObject>();
     }
 
     boolean warned = false;
@@ -216,8 +247,10 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
     public void iterate(AggregationBuffer agg, Object[] parameters) throws 
HiveException {
       assert (parameters.length == 1);
       try {
-        if (checkDistinct((SumAgg) agg, parameters[0])) {
-          merge(agg, parameters[0]);
+        if (isEligibleValue((SumHiveDecimalAgg) agg, parameters[0])) {
+          ((SumHiveDecimalAgg)agg).empty = false;
+          ((SumHiveDecimalAgg)agg).sum = ((SumHiveDecimalAgg)agg).sum.add(
+              PrimitiveObjectInspectorUtils.getHiveDecimal(parameters[0], 
inputOI));
         }
       } catch (NumberFormatException e) {
         if (!warned) {
@@ -232,11 +265,6 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
     }
 
     @Override
-    public Object terminatePartial(AggregationBuffer agg) throws HiveException 
{
-      return terminate(agg);
-    }
-
-    @Override
     public void merge(AggregationBuffer agg, Object partial) throws 
HiveException {
       if (partial != null) {
         SumHiveDecimalAgg myagg = (SumHiveDecimalAgg) agg;
@@ -245,7 +273,11 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
         }
 
         myagg.empty = false;
-        myagg.sum = 
myagg.sum.add(PrimitiveObjectInspectorUtils.getHiveDecimal(partial, inputOI));
+        if (isWindowingDistinct()) {
+          throw new HiveException("Distinct windowing UDAF doesn't support 
merge and terminatePartial");
+        } else {
+          myagg.sum = 
myagg.sum.add(PrimitiveObjectInspectorUtils.getHiveDecimal(partial, inputOI));
+        }
       }
     }
 
@@ -261,6 +293,11 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
 
     @Override
     public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef 
wFrameDef) {
+      // Don't use streaming for distinct cases
+      if (sumDistinct) {
+        return null;
+      }
+
       return new 
GenericUDAFStreamingEvaluator.SumAvgEnhancer<HiveDecimalWritable, HiveDecimal>(
           this, wFrameDef) {
 
@@ -301,7 +338,7 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
       super.init(m, parameters);
       result = new DoubleWritable(0);
       inputOI = (PrimitiveObjectInspector) parameters[0];
-      outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
+      outputOI = 
(PrimitiveObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(inputOI,
           ObjectInspectorCopyOption.JAVA);
       return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
     }
@@ -325,6 +362,7 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
       SumDoubleAgg myagg = (SumDoubleAgg) agg;
       myagg.empty = true;
       myagg.sum = 0.0;
+      myagg.uniqueObjects = new HashSet<ObjectInspectorObject>();
     }
 
     boolean warned = false;
@@ -333,8 +371,9 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
     public void iterate(AggregationBuffer agg, Object[] parameters) throws 
HiveException {
       assert (parameters.length == 1);
       try {
-        if (checkDistinct((SumAgg) agg, parameters[0])) {
-          merge(agg, parameters[0]);
+        if (isEligibleValue((SumDoubleAgg) agg, parameters[0])) {
+          ((SumDoubleAgg)agg).empty = false;
+          ((SumDoubleAgg)agg).sum += 
PrimitiveObjectInspectorUtils.getDouble(parameters[0], inputOI);
         }
       } catch (NumberFormatException e) {
         if (!warned) {
@@ -349,16 +388,15 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
     }
 
     @Override
-    public Object terminatePartial(AggregationBuffer agg) throws HiveException 
{
-      return terminate(agg);
-    }
-
-    @Override
     public void merge(AggregationBuffer agg, Object partial) throws 
HiveException {
       if (partial != null) {
         SumDoubleAgg myagg = (SumDoubleAgg) agg;
         myagg.empty = false;
-        myagg.sum += PrimitiveObjectInspectorUtils.getDouble(partial, inputOI);
+        if (isWindowingDistinct()) {
+          throw new HiveException("Distinct windowing UDAF doesn't support 
merge and terminatePartial");
+        } else {
+          myagg.sum += PrimitiveObjectInspectorUtils.getDouble(partial, 
inputOI);
+        }
       }
     }
 
@@ -374,6 +412,11 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
 
     @Override
     public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef 
wFrameDef) {
+      // Don't use streaming for distinct cases
+      if (sumDistinct) {
+        return null;
+      }
+
       return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<DoubleWritable, 
Double>(this,
           wFrameDef) {
 
@@ -415,7 +458,7 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
       super.init(m, parameters);
       result = new LongWritable(0);
       inputOI = (PrimitiveObjectInspector) parameters[0];
-      outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
+      outputOI = 
(PrimitiveObjectInspector)ObjectInspectorUtils.getStandardObjectInspector(inputOI,
           ObjectInspectorCopyOption.JAVA);
       return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
     }
@@ -439,6 +482,7 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
       SumLongAgg myagg = (SumLongAgg) agg;
       myagg.empty = true;
       myagg.sum = 0L;
+      myagg.uniqueObjects = new HashSet<ObjectInspectorObject>();
     }
 
     private boolean warned = false;
@@ -447,8 +491,9 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
     public void iterate(AggregationBuffer agg, Object[] parameters) throws 
HiveException {
       assert (parameters.length == 1);
       try {
-        if (checkDistinct((SumAgg) agg, parameters[0])) {
-          merge(agg, parameters[0]);
+        if (isEligibleValue((SumLongAgg) agg, parameters[0])) {
+          ((SumLongAgg)agg).empty = false;
+          ((SumLongAgg)agg).sum += 
PrimitiveObjectInspectorUtils.getLong(parameters[0], inputOI);
         }
       } catch (NumberFormatException e) {
         if (!warned) {
@@ -460,16 +505,15 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
     }
 
     @Override
-    public Object terminatePartial(AggregationBuffer agg) throws HiveException 
{
-      return terminate(agg);
-    }
-
-    @Override
     public void merge(AggregationBuffer agg, Object partial) throws 
HiveException {
       if (partial != null) {
         SumLongAgg myagg = (SumLongAgg) agg;
-        myagg.sum += PrimitiveObjectInspectorUtils.getLong(partial, inputOI);
         myagg.empty = false;
+        if (isWindowingDistinct()) {
+          throw new HiveException("Distinct windowing UDAF doesn't support 
merge and terminatePartial");
+        } else {
+            myagg.sum += PrimitiveObjectInspectorUtils.getLong(partial, 
inputOI);
+        }
       }
     }
 
@@ -485,6 +529,11 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
 
     @Override
     public GenericUDAFEvaluator getWindowingEvaluator(WindowFrameDef 
wFrameDef) {
+      // Don't use streaming for distinct cases
+      if (isWindowingDistinct()) {
+        return null;
+      }
+
       return new GenericUDAFStreamingEvaluator.SumAvgEnhancer<LongWritable, 
Long>(this,
           wFrameDef) {
 
@@ -509,7 +558,6 @@ public class GenericUDAFSum extends 
AbstractGenericUDAFResolver {
           SumLongAgg myagg = (SumLongAgg) ss.wrappedBuf;
           return myagg.empty ? null : new Long(myagg.sum);
         }
-
       };
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java
index 1a1b570..728964d 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/SimpleGenericUDAFParameterInfo.java
@@ -29,12 +29,14 @@ public class SimpleGenericUDAFParameterInfo implements 
GenericUDAFParameterInfo
 {
 
   private final ObjectInspector[] parameters;
+  private final boolean isWindowing;
   private final boolean distinct;
   private final boolean allColumns;
 
-  public SimpleGenericUDAFParameterInfo(ObjectInspector[] params, boolean 
distinct,
+  public SimpleGenericUDAFParameterInfo(ObjectInspector[] params, boolean 
isWindowing, boolean distinct,
       boolean allColumns) {
     this.parameters = params;
+    this.isWindowing = isWindowing;
     this.distinct = distinct;
     this.allColumns = allColumns;
   }
@@ -63,4 +65,9 @@ public class SimpleGenericUDAFParameterInfo implements 
GenericUDAFParameterInfo
   public boolean isAllColumns() {
     return allColumns;
   }
+
+  @Override
+  public boolean isWindowing() {
+    return isWindowing;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
index 858b47a..b89c14e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hive.ql.plan.ptf.WindowFunctionDef;
 import org.apache.hadoop.hive.ql.plan.ptf.WindowTableFunctionDef;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import 
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer;
+import 
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFStreamingEvaluator.SumAvgEnhancer;
 import org.apache.hadoop.hive.ql.udf.generic.ISupportStreamingModeForWindowing;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
@@ -392,8 +393,6 @@ public class WindowingTableFunction extends 
TableFunctionEvaluator {
     }
 
     streamingState.rollingPart.append(row);
-    row = streamingState.rollingPart
-        .getAt(streamingState.rollingPart.size() - 1);
 
     WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef();
 
@@ -408,7 +407,8 @@ public class WindowingTableFunction extends 
TableFunctionEvaluator {
         }
       }
 
-      if (fnEval instanceof ISupportStreamingModeForWindowing) {
+      if (fnEval != null &&
+          fnEval instanceof ISupportStreamingModeForWindowing) {
         fnEval.aggregate(streamingState.aggBuffers[i], 
streamingState.funcArgs[i]);
         Object out = ((ISupportStreamingModeForWindowing) fnEval)
             .getNextResult(streamingState.aggBuffers[i]);
@@ -472,7 +472,8 @@ public class WindowingTableFunction extends 
TableFunctionEvaluator {
       GenericUDAFEvaluator fnEval = wFn.getWFnEval();
 
       int numRowsRemaining = wFn.getWindowFrame().getEnd().getRelativeOffset();
-      if (fnEval instanceof ISupportStreamingModeForWindowing) {
+      if (fnEval != null &&
+          fnEval instanceof ISupportStreamingModeForWindowing) {
         fnEval.terminate(streamingState.aggBuffers[i]);
 
         WindowingFunctionInfoHelper wFnInfo = 
getWindowingFunctionInfoHelper(wFn.getName());

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/test/queries/clientpositive/windowing_distinct.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/windowing_distinct.q 
b/ql/src/test/queries/clientpositive/windowing_distinct.q
index bb192a7..6b49978 100644
--- a/ql/src/test/queries/clientpositive/windowing_distinct.q
+++ b/ql/src/test/queries/clientpositive/windowing_distinct.q
@@ -44,3 +44,21 @@ SELECT AVG(DISTINCT t) OVER (PARTITION BY index),
        AVG(DISTINCT ts) OVER (PARTITION BY index),
        AVG(DISTINCT dec) OVER (PARTITION BY index)
 FROM windowing_distinct;
+
+-- count
+select index, f, count(distinct f) over (partition by index order by f rows 
between 2 preceding and 1 preceding),
+                 count(distinct f) over (partition by index order by f rows 
between unbounded preceding and 1 preceding),
+                 count(distinct f) over (partition by index order by f rows 
between 1 following and 2 following),
+                 count(distinct f) over (partition by index order by f rows 
between unbounded preceding and 1 following) from windowing_distinct;
+
+-- sum
+select index, f, sum(distinct f) over (partition by index order by f rows 
between 2 preceding and 1 preceding),
+                 sum(distinct f) over (partition by index order by f rows 
between unbounded preceding and 1 preceding),
+                 sum(distinct f) over (partition by index order by f rows 
between 1 following and 2 following),
+                 sum(distinct f) over (partition by index order by f rows 
between unbounded preceding and 1 following) from windowing_distinct;
+
+-- avg
+select index, f, avg(distinct f) over (partition by index order by f rows 
between 2 preceding and 1 preceding),
+                 avg(distinct f) over (partition by index order by f rows 
between unbounded preceding and 1 preceding),
+                 avg(distinct f) over (partition by index order by f rows 
between 1 following and 2 following),
+                 avg(distinct f) over (partition by index order by f rows 
between unbounded preceding and 1 following) from windowing_distinct;

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/ql/src/test/results/clientpositive/windowing_distinct.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/windowing_distinct.q.out 
b/ql/src/test/results/clientpositive/windowing_distinct.q.out
index 074a594..86d1cdd 100644
--- a/ql/src/test/results/clientpositive/windowing_distinct.q.out
+++ b/ql/src/test/results/clientpositive/windowing_distinct.q.out
@@ -128,3 +128,69 @@ POSTHOOK: Input: default@windowing_distinct
 117.5  38.71   NULL    NULL    1.362157918703306E9     34.5000
 117.5  38.71   NULL    NULL    1.362157918703306E9     34.5000
 117.5  38.71   NULL    NULL    1.362157918703306E9     34.5000
+PREHOOK: query: -- count
+select index, f, count(distinct f) over (partition by index order by f rows 
between 2 preceding and 1 preceding),
+                 count(distinct f) over (partition by index order by f rows 
between unbounded preceding and 1 preceding),
+                 count(distinct f) over (partition by index order by f rows 
between 1 following and 2 following),
+                 count(distinct f) over (partition by index order by f rows 
between unbounded preceding and 1 following) from windowing_distinct
+PREHOOK: type: QUERY
+PREHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+POSTHOOK: query: -- count
+select index, f, count(distinct f) over (partition by index order by f rows 
between 2 preceding and 1 preceding),
+                 count(distinct f) over (partition by index order by f rows 
between unbounded preceding and 1 preceding),
+                 count(distinct f) over (partition by index order by f rows 
between 1 following and 2 following),
+                 count(distinct f) over (partition by index order by f rows 
between unbounded preceding and 1 following) from windowing_distinct
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+1      26.43   0       0       2       1
+1      26.43   1       1       1       2
+1      96.91   1       1       0       2
+2      13.01   0       0       1       2
+2      74.72   1       1       1       2
+2      74.72   2       2       0       2
+PREHOOK: query: -- sum
+select index, f, sum(distinct f) over (partition by index order by f rows 
between 2 preceding and 1 preceding),
+                 sum(distinct f) over (partition by index order by f rows 
between unbounded preceding and 1 preceding),
+                 sum(distinct f) over (partition by index order by f rows 
between 1 following and 2 following),
+                 sum(distinct f) over (partition by index order by f rows 
between unbounded preceding and 1 following) from windowing_distinct
+PREHOOK: type: QUERY
+PREHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+POSTHOOK: query: -- sum
+select index, f, sum(distinct f) over (partition by index order by f rows 
between 2 preceding and 1 preceding),
+                 sum(distinct f) over (partition by index order by f rows 
between unbounded preceding and 1 preceding),
+                 sum(distinct f) over (partition by index order by f rows 
between 1 following and 2 following),
+                 sum(distinct f) over (partition by index order by f rows 
between unbounded preceding and 1 following) from windowing_distinct
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+1      26.43   NULL    NULL    123.34000396728516      26.43000030517578
+1      26.43   26.43000030517578       26.43000030517578       
96.91000366210938       123.34000396728516
+1      96.91   26.43000030517578       26.43000030517578       NULL    
123.34000396728516
+2      13.01   NULL    NULL    74.72000122070312       87.73000144958496
+2      74.72   13.010000228881836      13.010000228881836      
74.72000122070312       87.73000144958496
+2      74.72   87.73000144958496       87.73000144958496       NULL    
87.73000144958496
+PREHOOK: query: -- avg
+select index, f, avg(distinct f) over (partition by index order by f rows 
between 2 preceding and 1 preceding),
+                 avg(distinct f) over (partition by index order by f rows 
between unbounded preceding and 1 preceding),
+                 avg(distinct f) over (partition by index order by f rows 
between 1 following and 2 following),
+                 avg(distinct f) over (partition by index order by f rows 
between unbounded preceding and 1 following) from windowing_distinct
+PREHOOK: type: QUERY
+PREHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+POSTHOOK: query: -- avg
+select index, f, avg(distinct f) over (partition by index order by f rows 
between 2 preceding and 1 preceding),
+                 avg(distinct f) over (partition by index order by f rows 
between unbounded preceding and 1 preceding),
+                 avg(distinct f) over (partition by index order by f rows 
between 1 following and 2 following),
+                 avg(distinct f) over (partition by index order by f rows 
between unbounded preceding and 1 following) from windowing_distinct
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+1      26.43   NULL    NULL    61.67000198364258       26.43000030517578
+1      26.43   26.43000030517578       26.43000030517578       
96.91000366210938       61.67000198364258
+1      96.91   26.43000030517578       26.43000030517578       NULL    
61.67000198364258
+2      13.01   NULL    NULL    74.72000122070312       43.86500072479248
+2      74.72   13.010000228881836      13.010000228881836      
74.72000122070312       43.86500072479248
+2      74.72   43.86500072479248       43.86500072479248       NULL    
43.86500072479248

http://git-wip-us.apache.org/repos/asf/hive/blob/15bdce43/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
----------------------------------------------------------------------
diff --git 
a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
 
b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
index c58e8ed..1ac72c6 100644
--- 
a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
+++ 
b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
@@ -117,6 +117,44 @@ public final class ObjectInspectorUtils {
   }
 
   /**
+   * This class can be used to wrap Hive objects and put in HashMap or HashSet.
+   * The objects will be compared using ObjectInspectors.
+   *
+   */
+  public static class ObjectInspectorObject {
+    private final Object[] objects;
+    private final ObjectInspector[] oi;
+
+    public ObjectInspectorObject(Object object, ObjectInspector oi) {
+      this.objects = new Object[] { object };
+      this.oi = new ObjectInspector[] { oi };
+    }
+
+    public ObjectInspectorObject(Object[] objects, ObjectInspector[] oi) {
+      this.objects = objects;
+      this.oi = oi;
+    }
+
+    public Object[] getValues() {
+      return objects;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (obj == null || obj.getClass() != this.getClass()) { return false; }
+
+      ObjectInspectorObject comparedObject = (ObjectInspectorObject)obj;
+      return ObjectInspectorUtils.compare(objects, oi, comparedObject.objects, 
comparedObject.oi) == 0;
+    }
+
+    @Override
+    public int hashCode() {
+      return ObjectInspectorUtils.getBucketHashCode(objects, oi);
+    }
+  }
+
+  /**
    * Calculates the hash code for array of Objects that contains writables. 
This is used
    * to work around the buggy Hadoop DoubleWritable hashCode implementation. 
This should
    * only be used for process-local hash codes; don't replace stored hash 
codes like bucketing.

Reply via email to