cloud-fan commented on a change in pull request #24144: [SPARK-24935][SQL] fix
Hive UDAF with two aggregation buffers
URL: https://github.com/apache/spark/pull/24144#discussion_r267165795
##########
File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
##########
@@ -401,29 +391,47 @@ private[hive] case class HiveUDAFFunction(
s"$name($distinct${children.map(_.sql).mkString(", ")})"
}
- override def createAggregationBuffer(): AggregationBuffer =
- partial1HiveEvaluator.evaluator.getNewAggregationBuffer
+ // The hive UDAF may create different buffers to handle different inputs:
original data or
+ // aggregate buffer. However, the Spark UDAF framework does not expose this
information when
+ // creating the buffer. Here we return null, and create the buffer in
`update` and `merge`
+ // on demand, so that we can know what input we are dealing with.
+ override def createAggregationBuffer(): AggregationBuffer = null
@transient
private lazy val inputProjection = UnsafeProjection.create(children)
override def update(buffer: AggregationBuffer, input: InternalRow):
AggregationBuffer = {
+ // The input is original data, we create buffer with the partial1
evaluator.
+ val actualBuffer = if (buffer == null) {
+ partial1HiveEvaluator.evaluator.getNewAggregationBuffer
+ } else {
+ buffer
+ }
+
partial1HiveEvaluator.evaluator.iterate(
- buffer, wrap(inputProjection(input), inputWrappers, cached,
inputDataTypes))
- buffer
+ actualBuffer, wrap(inputProjection(input), inputWrappers, cached,
inputDataTypes))
+ actualBuffer
}
override def merge(buffer: AggregationBuffer, input: AggregationBuffer):
AggregationBuffer = {
+ // The input is aggregate buffer, we create buffer with the partial2
evaluator.
+ val actualBuffer = if (buffer == null) {
Review comment:
It can happen if Spark initializes a UDAF, run `update` and then run
`merge`. I don't think that will happen in Spark.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]