erikerlandson opened a new pull request #28983:
URL: https://github.com/apache/spark/pull/28983


   
   Context: The fix for SPARK-27296 introduced by #25024 allows `Aggregator` 
objects to appear in queries. This works fine for aggregators with atomic input 
types, e.g. `Aggregator[Double, _, _]`. 
   
    However it can cause a null pointer exception if the input type is 
`Array[_]`.  This was historically considered an ignorable case for 
serialization of `UnresolvedMapObjects`, as described here:
   
   ```scala
   /**
    * When constructing [[MapObjects]], the element type must be given, which 
may not be available
    * before analysis. This class acts like a placeholder for [[MapObjects]], 
and will be replaced by
    * [[MapObjects]] during analysis after the input data is resolved.
    * Note that, ideally we should not serialize and send unresolved 
expressions to executors, but
    * users may accidentally do this(e.g. mistakenly reference an encoder 
instance when implementing
    * Aggregator). Here we mark `function` as transient because it may 
reference scala Type, which is
    * not serializable. Then even users mistakenly reference unresolved 
expression and serialize it,
    * it's just a performance issue(more network traffic), and will not fail.
    */
   case class UnresolvedMapObjects(
       @transient function: Expression => Expression,
       child: Expression,
       customCollectionCls: Option[Class[_]] = None) extends UnaryExpression 
with Unevaluable {
     override lazy val resolved = false
   
     override def dataType: DataType = 
customCollectionCls.map(ObjectType.apply).getOrElse {
       throw new UnsupportedOperationException("not resolved")
     }
   }
   ```
   
   Now, these aggregators are being serialized over to the executors, and the 
`function` field above is unpacking as `null`, so attempting to use 
`functions.udaf()` with an aggregator of form `Aggregator[Array[_],_,_]` fails 
with a null pointer exception here:
   
   ```scala
   object MapObjects {
     def apply(
         function: Expression => Expression,
         inputData: Expression,
         elementType: DataType,
         elementNullable: Boolean = true,
         customCollectionCls: Option[Class[_]] = None): MapObjects = {
       val loopVar = LambdaVariable("MapObject", elementType, elementNullable)
       MapObjects(loopVar, function(loopVar), inputData, customCollectionCls)
     }
   }
   ```
   
   ### What changes were proposed in this pull request?
   Proposing a fix to intercept `null` values for the `function` field in 
`UnresolvedMapObjects` when it is serialized over the wire to an Executor. This 
fix should be evaluated for robustness with respect to all possible use cases.
   
   ### Why are the changes needed?
   Applying an aggregator of the form `Aggregator[Array[_], _, _]` using 
`functions.udaf()` currently causes a null pointer error in Catalyst.
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   Currently testing has been on the REPL. Once a fix is finalized, a unit test 
combining `functions.udaf()` and `Aggregator[Array[_], _, _]` should be added.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to