Github user liancheng commented on a diff in the pull request:
https://github.com/apache/spark/pull/15703#discussion_r87309805
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
---
@@ -289,73 +302,77 @@ private[hive] case class HiveUDAFFunction(
funcWrapper.createFunction[AbstractGenericUDAFResolver]()
}
+ // Hive `ObjectInspector`s for all child expressions (input parameters
of the function).
@transient
- private lazy val inspectors = children.map(toInspector).toArray
+ private lazy val inputInspectors = children.map(toInspector).toArray
+ // Spark SQL data types of input parameters.
@transient
- private lazy val functionAndInspector = {
- val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors,
false, false)
- val f = resolver.getEvaluator(parameterInfo)
- f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
- }
+ private lazy val inputDataTypes: Array[DataType] =
children.map(_.dataType).toArray
+ // The UDAF evaluator used to consume raw input rows and produce partial
aggregation results.
@transient
- private lazy val function = functionAndInspector._1
+ private lazy val partial1ModeEvaluator = {
+ val parameterInfo = new
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+ resolver.getEvaluator(parameterInfo)
+ }
+ // Hive `ObjectInspector` used to inspect partial aggregation results.
@transient
- private lazy val wrappers = children.map(x => wrapperFor(toInspector(x),
x.dataType)).toArray
+ private val partialResultInspector = partial1ModeEvaluator.init(
+ GenericUDAFEvaluator.Mode.PARTIAL1,
+ inputInspectors
+ )
+ // The UDAF evaluator used to merge partial aggregation results.
@transient
- private lazy val returnInspector = functionAndInspector._2
+ private lazy val partial2ModeEvaluator = {
+ val parameterInfo = new
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+ val evaluator = resolver.getEvaluator(parameterInfo)
+ evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2,
Array(partialResultInspector))
+ evaluator
+ }
+ // Spark SQL data type of partial aggregation results
@transient
- private lazy val unwrapper = unwrapperFor(returnInspector)
+ private lazy val partialResultDataType =
inspectorToDataType(partialResultInspector)
+ // The UDAF evaluator used to compute the final result from a partial
aggregation result objects.
@transient
- private[this] var buffer: GenericUDAFEvaluator.AggregationBuffer = _
-
- override def eval(input: InternalRow): Any =
unwrapper(function.evaluate(buffer))
+ private lazy val finalModeEvaluator = {
+ val parameterInfo = new
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+ resolver.getEvaluator(parameterInfo)
--- End diff --
Good point, thanks!
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]