erikerlandson opened a new pull request #28983:
URL: https://github.com/apache/spark/pull/28983
Context: The fix for SPARK-27296 introduced by #25024 allows `Aggregator`
objects to appear in queries. This works fine for aggregators with atomic input
types, e.g. `Aggregator[Double, _, _]`.
However it can cause a null pointer exception if the input type is
`Array[_]`. This was historically considered an ignorable case for
serialization of `UnresolvedMapObjects`, as described here:
```scala
/**
* When constructing [[MapObjects]], the element type must be given, which
may not be available
* before analysis. This class acts like a placeholder for [[MapObjects]],
and will be replaced by
* [[MapObjects]] during analysis after the input data is resolved.
* Note that, ideally we should not serialize and send unresolved
expressions to executors, but
* users may accidentally do this(e.g. mistakenly reference an encoder
instance when implementing
* Aggregator). Here we mark `function` as transient because it may
reference scala Type, which is
* not serializable. Then even users mistakenly reference unresolved
expression and serialize it,
* it's just a performance issue(more network traffic), and will not fail.
*/
case class UnresolvedMapObjects(
@transient function: Expression => Expression,
child: Expression,
customCollectionCls: Option[Class[_]] = None) extends UnaryExpression
with Unevaluable {
override lazy val resolved = false
override def dataType: DataType =
customCollectionCls.map(ObjectType.apply).getOrElse {
throw new UnsupportedOperationException("not resolved")
}
}
```
Now, these aggregators are being serialized over to the executors, and the
`function` field above is unpacking as `null`, so attempting to use
`functions.udaf()` with an aggregator of form `Aggregator[Array[_],_,_]` fails
with a null pointer exception here:
```scala
object MapObjects {
def apply(
function: Expression => Expression,
inputData: Expression,
elementType: DataType,
elementNullable: Boolean = true,
customCollectionCls: Option[Class[_]] = None): MapObjects = {
val loopVar = LambdaVariable("MapObject", elementType, elementNullable)
MapObjects(loopVar, function(loopVar), inputData, customCollectionCls)
}
}
```
### What changes were proposed in this pull request?
Proposing a fix to intercept `null` values for the `function` field in
`UnresolvedMapObjects` when it is serialized over the wire to an Executor. This
fix should be evaluated for robustness with respect to all possible use cases.
### Why are the changes needed?
Applying an aggregator of the form `Aggregator[Array[_], _, _]` using
`functions.udaf()` currently causes a null pointer error in Catalyst.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Currently testing has been on the REPL. Once a fix is finalized, a unit test
combining `functions.udaf()` and `Aggregator[Array[_], _, _]` should be added.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]