Github user pgandhi999 commented on a diff in the pull request: https://github.com/apache/spark/pull/15703#discussion_r203828759 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala --- @@ -365,4 +394,74 @@ private[hive] case class HiveUDAFFunction( val distinct = if (isDistinct) "DISTINCT " else " " s"$name($distinct${children.map(_.sql).mkString(", ")})" } + + override def createAggregationBuffer(): AggregationBuffer = + partial1ModeEvaluator.getNewAggregationBuffer + + @transient + private lazy val inputProjection = UnsafeProjection.create(children) + + override def update(buffer: AggregationBuffer, input: InternalRow): Unit = { + partial1ModeEvaluator.iterate( + buffer, wrap(inputProjection(input), inputWrappers, cached, inputDataTypes)) + } + + override def merge(buffer: AggregationBuffer, input: AggregationBuffer): Unit = { + // The 2nd argument of the Hive `GenericUDAFEvaluator.merge()` method is an input aggregation + // buffer in the 3rd format mentioned in the ScalaDoc of this class. Originally, Hive converts + // this `AggregationBuffer`s into this format before shuffling partial aggregation results, and + // calls `GenericUDAFEvaluator.terminatePartial()` to do the conversion. + partial2ModeEvaluator.merge(buffer, partial1ModeEvaluator.terminatePartial(input)) --- End diff -- If we follow the code flow from interfaces.scala, we see that the results of aggregation buffer mode in PARTIAL2 is merged with the aggregation buffer in PARTIAL1. I am new to Spark and Hive, so just wanted to know the reason behind the above behaviour. If there are any docs suggesting this, do let me know. Thank you.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org