Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8874#discussion_r40253494
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
    @@ -586,47 +523,75 @@ private[hive] case class HiveGenericUDTF(
     
     private[hive] case class HiveUDAFFunction(
         funcWrapper: HiveFunctionWrapper,
    -    exprs: Seq[Expression],
    -    base: AggregateExpression1,
    +    children: Seq[Expression],
         isUDAFBridgeRequired: Boolean = false)
    -  extends AggregateFunction1
    -  with HiveInspectors {
    +  extends AggregateFunction2 with HiveInspectors {
     
    -  def this() = this(null, null, null)
    +  def this() = this(null, null)
     
    -  private val resolver =
    +  @transient
    +  private lazy val resolver =
         if (isUDAFBridgeRequired) {
           new GenericUDAFBridge(funcWrapper.createFunction[UDAF]())
         } else {
           funcWrapper.createFunction[AbstractGenericUDAFResolver]()
         }
     
    -  private val inspectors = exprs.map(toInspector).toArray
    +  @transient
    +  private lazy val inspectors = children.map(toInspector).toArray
     
    -  private val function = {
    +  @transient
    +  private lazy val functionAndInspector = {
         val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, 
false, false)
    -    resolver.getEvaluator(parameterInfo)
    +    val f = resolver.getEvaluator(parameterInfo)
    +    f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
       }
     
    -  private val returnInspector = 
function.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
    +  @transient
    +  private lazy val function = functionAndInspector._1
     
    -  private val buffer =
    -    function.getNewAggregationBuffer
    +  @transient
    +  private lazy val returnInspector = functionAndInspector._2
    +
    +  @transient
    +  private lazy val buffer = function.getNewAggregationBuffer
     
       override def eval(input: InternalRow): Any = 
unwrap(function.evaluate(buffer), returnInspector)
     
       @transient
    -  val inputProjection = new InterpretedProjection(exprs)
    +  private lazy val inputProjection = new InterpretedProjection(children)
     
       @transient
    -  protected lazy val cached = new Array[AnyRef](exprs.length)
    +  private lazy val cached = new Array[AnyRef](children.length)
     
       @transient
    -  private lazy val inputDataTypes: Array[DataType] = 
exprs.map(_.dataType).toArray
    +  private lazy val inputDataTypes: Array[DataType] = 
children.map(_.dataType).toArray
    +
    +  // Hive UDAF has its own buffer
    +  override def bufferSchema: StructType = StructType(Nil)
     
    -  def update(input: InternalRow): Unit = {
    -    val inputs = inputProjection(input)
    +  override def update(_buffer: MutableRow, _input: InternalRow): Unit = {
    +    val inputs = inputProjection(_input)
         function.iterate(buffer, wrap(inputs, inspectors, cached, 
inputDataTypes))
       }
    +
    +  override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
    +    throw new UnsupportedOperationException(
    +      "Hive UDAF doesn't support partial aggregate")
    +  }
    +
    +  override def cloneBufferAttributes: Seq[Attribute] = Nil
    +
    +  override def initialize(buffer: MutableRow): Unit = {}
    +
    +  override def bufferAttributes: Seq[AttributeReference] = Nil
    +
    +  override def inputTypes: Seq[AbstractDataType] = Nil
    --- End diff --
    
    Using `Nil` here means we don't need to check types for it, and it's the 
same with using `Seq.fill(children.length)(AnyDataType)`. See 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala#L705-L710


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to