Github user pgandhi999 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15703#discussion_r203828759
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
    @@ -365,4 +394,74 @@ private[hive] case class HiveUDAFFunction(
         val distinct = if (isDistinct) "DISTINCT " else " "
         s"$name($distinct${children.map(_.sql).mkString(", ")})"
       }
    +
    +  override def createAggregationBuffer(): AggregationBuffer =
    +    partial1ModeEvaluator.getNewAggregationBuffer
    +
    +  @transient
    +  private lazy val inputProjection = UnsafeProjection.create(children)
    +
    +  override def update(buffer: AggregationBuffer, input: InternalRow): Unit 
= {
    +    partial1ModeEvaluator.iterate(
    +      buffer, wrap(inputProjection(input), inputWrappers, cached, 
inputDataTypes))
    +  }
    +
    +  override def merge(buffer: AggregationBuffer, input: AggregationBuffer): 
Unit = {
    +    // The 2nd argument of the Hive `GenericUDAFEvaluator.merge()` method 
is an input aggregation
    +    // buffer in the 3rd format mentioned in the ScalaDoc of this class. 
Originally, Hive converts
    +    // this `AggregationBuffer`s into this format before shuffling partial 
aggregation results, and
    +    // calls `GenericUDAFEvaluator.terminatePartial()` to do the 
conversion.
    +    partial2ModeEvaluator.merge(buffer, 
partial1ModeEvaluator.terminatePartial(input))
    --- End diff --
    
    If we follow the code flow from interfaces.scala, we see that the results 
of aggregation buffer mode in PARTIAL2 is merged with the aggregation buffer in 
PARTIAL1. I am new to Spark and Hive, so just wanted to know the reason behind 
the above behaviour. If there are any docs suggesting this, do let me know. 
Thank you.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to