Github user pgandhi999 commented on a diff in the pull request:
https://github.com/apache/spark/pull/15703#discussion_r203828759
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
---
@@ -365,4 +394,74 @@ private[hive] case class HiveUDAFFunction(
val distinct = if (isDistinct) "DISTINCT " else " "
s"$name($distinct${children.map(_.sql).mkString(", ")})"
}
+
+ override def createAggregationBuffer(): AggregationBuffer =
+ partial1ModeEvaluator.getNewAggregationBuffer
+
+ @transient
+ private lazy val inputProjection = UnsafeProjection.create(children)
+
+ override def update(buffer: AggregationBuffer, input: InternalRow): Unit
= {
+ partial1ModeEvaluator.iterate(
+ buffer, wrap(inputProjection(input), inputWrappers, cached,
inputDataTypes))
+ }
+
+ override def merge(buffer: AggregationBuffer, input: AggregationBuffer):
Unit = {
+ // The 2nd argument of the Hive `GenericUDAFEvaluator.merge()` method
is an input aggregation
+ // buffer in the 3rd format mentioned in the ScalaDoc of this class.
Originally, Hive converts
+ // this `AggregationBuffer`s into this format before shuffling partial
aggregation results, and
+ // calls `GenericUDAFEvaluator.terminatePartial()` to do the
conversion.
+ partial2ModeEvaluator.merge(buffer,
partial1ModeEvaluator.terminatePartial(input))
--- End diff --
If we follow the code flow from interfaces.scala, we see that the results
of aggregation buffer mode in PARTIAL2 is merged with the aggregation buffer in
PARTIAL1. I am new to Spark and Hive, so just wanted to know the reason behind
the above behaviour. If there are any docs suggesting this, do let me know.
Thank you.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]