erikerlandson commented on a change in pull request #28983:
URL: https://github.com/apache/spark/pull/28983#discussion_r452476304
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
##########
@@ -517,3 +519,18 @@ case class ScalaAggregator[IN, BUF, OUT](
override def nodeName: String = agg.getClass.getSimpleName
}
+
+/**
+ * An extension rule to resolve encoder expressions from a [[ScalaAggregator]]
+ */
+object ResolveEncodersInScalaAgg extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp
{
+ case p if !p.resolved => p
+ case p => p.transformExpressionsUp {
+ case agg: ScalaAggregator[_, _, _] =>
+ agg.copy(
+ inputEncoder = agg.inputEncoder.resolveAndBind(),
Review comment:
@cloud-fan what I had done earlier was:
```scala
object ResolveEncodersInScalaAgg extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsUp {
case p if !p.resolved => p
case p => p.transformExpressionsUp {
case agg: ScalaAggregator[_, _, _] =>
val children = agg.children
require(children.length > 0, "Missing aggregator input")
val dataType: DataType = if (children.length == 1)
children.head.dataType else {
StructType(children.map(_.dataType).zipWithIndex.map { case (dt,
j) =>
StructField(s"_$j", dt, true)
})
}
val attrs = if (agg.inputEncoder.isSerializedAsStructForTopLevel) {
dataType.asInstanceOf[StructType].toAttributes
} else {
(new StructType().add("input", dataType)).toAttributes
}
agg.copy(
inputEncoder = agg.inputEncoder.resolveAndBind(attrs),
bufferEncoder = agg.bufferEncoder.resolveAndBind())
}
}
}
```
This also passes unit tests, but it would still fail if I tried to give it
`Float` data, so it's not automatically casting.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]