cloud-fan commented on a change in pull request #25024: [SPARK-27296][SQL]
Allows Aggregator to be registered as a UDF
URL: https://github.com/apache/spark/pull/25024#discussion_r362875671
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
##########
@@ -136,3 +141,42 @@ private[sql] case class SparkUserDefinedFunction(
}
}
}
+
+private[sql] case class UserDefinedAggregator[IN, BUF, OUT](
+ aggregator: Aggregator[IN, BUF, OUT],
+ inputEncoder: Encoder[IN],
+ name: Option[String] = None,
+ nullable: Boolean = true,
+ deterministic: Boolean = true) extends UserDefinedFunction {
+
+ @scala.annotation.varargs
+ def apply(exprs: Column*): Column = {
+ Column(AggregateExpression(scalaAggregator(exprs.map(_.expr)), Complete,
isDistinct = false))
+ }
+
+ // This is also used by udf.register(...) when it detects a
UserDefinedAggregator
+ def scalaAggregator(exprs: Seq[Expression]): ScalaAggregator[IN, BUF, OUT] =
{
+ val iEncoder =
inputEncoder.asInstanceOf[ExpressionEncoder[IN]].resolveAndBind()
Review comment:
it's too early to resolve and bind here. We should do it in
`ScalaAggregator`, where we have the actual children schema.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]