This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1dcd5c2  [FLINK-13128][hive] make HiveGenericUDAF expose accumulator 
type in order to create its corresponding AggregateFunctionDefinition
1dcd5c2 is described below

commit 1dcd5c2eb9a9eeab8146136a3ed40fc1c185a823
Author: bowen.li <bowenl...@gmail.com>
AuthorDate: Fri Jul 5 15:34:42 2019 -0700

    [FLINK-13128][hive] make HiveGenericUDAF expose accumulator type in order 
to create its corresponding AggregateFunctionDefinition
---
 .../table/functions/hive/HiveGenericUDAF.java      | 45 +++++++++++++---------
 1 file changed, 26 insertions(+), 19 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
index 5782455..6015ef6 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
@@ -58,7 +58,8 @@ public class HiveGenericUDAF
 
        private transient GenericUDAFEvaluator partialEvaluator;
        private transient GenericUDAFEvaluator finalEvaluator;
-       private transient ObjectInspector finalResult;
+       private transient ObjectInspector partialResultObjectInspector;
+       private transient ObjectInspector finalResultObjectInspector;
        private transient HiveObjectConversion[] conversions;
        private transient boolean allIdentityConverter;
        private transient boolean initialized;
@@ -89,13 +90,13 @@ public class HiveGenericUDAF
                // PARTIAL1: from original data to partial aggregation data:
                //              iterate() and terminatePartial() will be called.
                this.partialEvaluator = createEvaluator(inputInspectors);
-               ObjectInspector partialResult = 
partialEvaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputInspectors);
+               this.partialResultObjectInspector = 
partialEvaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputInspectors);
 
                // FINAL: from partial aggregation to full aggregation:
                //              merge() and terminate() will be called.
                this.finalEvaluator = createEvaluator(inputInspectors);
-               this.finalResult = finalEvaluator.init(
-                       GenericUDAFEvaluator.Mode.FINAL, new ObjectInspector[]{ 
partialResult });
+               this.finalResultObjectInspector = finalEvaluator.init(
+                       GenericUDAFEvaluator.Mode.FINAL, new ObjectInspector[]{ 
partialResultObjectInspector });
 
                conversions = new HiveObjectConversion[inputInspectors.length];
                for (int i = 0; i < inputInspectors.length; i++) {
@@ -175,7 +176,7 @@ public class HiveGenericUDAF
        @Override
        public Object getValue(GenericUDAFEvaluator.AggregationBuffer 
accumulator) {
                try {
-                       return HiveInspectors.toFlinkObject(finalResult, 
finalEvaluator.terminate(accumulator));
+                       return 
HiveInspectors.toFlinkObject(finalResultObjectInspector, 
finalEvaluator.terminate(accumulator));
                } catch (HiveException e) {
                        throw new FlinkHiveUDFException(
                                String.format("Failed to get final result on 
%s", hiveFunctionWrapper.getClassName()), e);
@@ -191,21 +192,12 @@ public class HiveGenericUDAF
        @Override
        public DataType getHiveResultType(Object[] constantArguments, 
DataType[] argTypes) {
                try {
-                       ObjectInspector[] inputs = 
HiveInspectors.toInspectors(constantArguments, argTypes);
-                       GenericUDAFEvaluator evaluator = 
createEvaluator(inputs);
-
-                       // The ObjectInspector for the parameters:
-                       // In PARTIAL1 mode, the parameters are original data;
-                       // In FINAL mode, the parameters are just partial 
aggregations
-                       // (in that case, the array will always have a single 
element).
-
-                       ObjectInspector partialObjectInspector = 
evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputs);
-
-                       ObjectInspector finalObjectInspector = evaluator.init(
-                               GenericUDAFEvaluator.Mode.FINAL,
-                               new ObjectInspector[]{ partialObjectInspector 
});
+                       if (!initialized) {
+                               setArgumentTypesAndConstants(constantArguments, 
argTypes);
+                               init();
+                       }
 
-                       return HiveTypeUtil.toFlinkType(finalObjectInspector);
+                       return 
HiveTypeUtil.toFlinkType(finalResultObjectInspector);
                } catch (Exception e) {
                        throw new FlinkHiveUDFException(
                                String.format("Failed to get Hive result type 
from %s", hiveFunctionWrapper.getClassName()), e);
@@ -217,4 +209,19 @@ public class HiveGenericUDAF
                return LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
                        getHiveResultType(this.constantArguments, 
this.argTypes));
        }
+
+       @Override
+       public TypeInformation getAccumulatorType() {
+               try {
+                       if (!initialized) {
+                               init();
+                       }
+
+                       return LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(
+                               
HiveTypeUtil.toFlinkType(partialResultObjectInspector));
+               } catch (Exception e) {
+                       throw new FlinkHiveUDFException(
+                               String.format("Failed to get Hive accumulator 
type from %s", hiveFunctionWrapper.getClassName()), e);
+               }
+       }
 }

Reply via email to