This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 1dcd5c2 [FLINK-13128][hive] make HiveGenericUDAF expose accumulator type in order to create its corresponding AggregateFunctionDefinition 1dcd5c2 is described below commit 1dcd5c2eb9a9eeab8146136a3ed40fc1c185a823 Author: bowen.li <bowenl...@gmail.com> AuthorDate: Fri Jul 5 15:34:42 2019 -0700 [FLINK-13128][hive] make HiveGenericUDAF expose accumulator type in order to create its corresponding AggregateFunctionDefinition --- .../table/functions/hive/HiveGenericUDAF.java | 45 +++++++++++++--------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java index 5782455..6015ef6 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java @@ -58,7 +58,8 @@ public class HiveGenericUDAF private transient GenericUDAFEvaluator partialEvaluator; private transient GenericUDAFEvaluator finalEvaluator; - private transient ObjectInspector finalResult; + private transient ObjectInspector partialResultObjectInspector; + private transient ObjectInspector finalResultObjectInspector; private transient HiveObjectConversion[] conversions; private transient boolean allIdentityConverter; private transient boolean initialized; @@ -89,13 +90,13 @@ public class HiveGenericUDAF // PARTIAL1: from original data to partial aggregation data: // iterate() and terminatePartial() will be called. this.partialEvaluator = createEvaluator(inputInspectors); - ObjectInspector partialResult = partialEvaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputInspectors); + this.partialResultObjectInspector = partialEvaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputInspectors); // FINAL: from partial aggregation to full aggregation: // merge() and terminate() will be called. this.finalEvaluator = createEvaluator(inputInspectors); - this.finalResult = finalEvaluator.init( - GenericUDAFEvaluator.Mode.FINAL, new ObjectInspector[]{ partialResult }); + this.finalResultObjectInspector = finalEvaluator.init( + GenericUDAFEvaluator.Mode.FINAL, new ObjectInspector[]{ partialResultObjectInspector }); conversions = new HiveObjectConversion[inputInspectors.length]; for (int i = 0; i < inputInspectors.length; i++) { @@ -175,7 +176,7 @@ public class HiveGenericUDAF @Override public Object getValue(GenericUDAFEvaluator.AggregationBuffer accumulator) { try { - return HiveInspectors.toFlinkObject(finalResult, finalEvaluator.terminate(accumulator)); + return HiveInspectors.toFlinkObject(finalResultObjectInspector, finalEvaluator.terminate(accumulator)); } catch (HiveException e) { throw new FlinkHiveUDFException( String.format("Failed to get final result on %s", hiveFunctionWrapper.getClassName()), e); @@ -191,21 +192,12 @@ public class HiveGenericUDAF @Override public DataType getHiveResultType(Object[] constantArguments, DataType[] argTypes) { try { - ObjectInspector[] inputs = HiveInspectors.toInspectors(constantArguments, argTypes); - GenericUDAFEvaluator evaluator = createEvaluator(inputs); - - // The ObjectInspector for the parameters: - // In PARTIAL1 mode, the parameters are original data; - // In FINAL mode, the parameters are just partial aggregations - // (in that case, the array will always have a single element). - - ObjectInspector partialObjectInspector = evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, inputs); - - ObjectInspector finalObjectInspector = evaluator.init( - GenericUDAFEvaluator.Mode.FINAL, - new ObjectInspector[]{ partialObjectInspector }); + if (!initialized) { + setArgumentTypesAndConstants(constantArguments, argTypes); + init(); + } - return HiveTypeUtil.toFlinkType(finalObjectInspector); + return HiveTypeUtil.toFlinkType(finalResultObjectInspector); } catch (Exception e) { throw new FlinkHiveUDFException( String.format("Failed to get Hive result type from %s", hiveFunctionWrapper.getClassName()), e); @@ -217,4 +209,19 @@ public class HiveGenericUDAF return LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo( getHiveResultType(this.constantArguments, this.argTypes)); } + + @Override + public TypeInformation getAccumulatorType() { + try { + if (!initialized) { + init(); + } + + return LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo( + HiveTypeUtil.toFlinkType(partialResultObjectInspector)); + } catch (Exception e) { + throw new FlinkHiveUDFException( + String.format("Failed to get Hive accumulator type from %s", hiveFunctionWrapper.getClassName()), e); + } + } }