Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/20771#discussion_r173436038
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
---
@@ -599,8 +610,79 @@ case class MapObjects private(
override def children: Seq[Expression] = lambdaFunction :: inputData ::
Nil
- override def eval(input: InternalRow): Any =
- throw new UnsupportedOperationException("Only code-generated
evaluation is supported")
+ // The data with UserDefinedType are actually stored with the data type
of its sqlType.
+ // When we want to apply MapObjects on it, we have to use it.
+ lazy private val inputDataType = inputData.dataType match {
+ case u: UserDefinedType[_] => u.sqlType
+ case _ => inputData.dataType
+ }
+
+ private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = {
+ inputCollection.map { element =>
+ val row = InternalRow.fromSeq(Seq(element))
+ lambdaFunction.eval(row)
+ }
+ }
+
+ // Executes lambda function on input collection.
+ private lazy val executeFunc: Any => Seq[_] = inputDataType match {
+ case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
+ x => executeFuncOnCollection(x.asInstanceOf[Seq[_]])
+ case ObjectType(cls) if cls.isArray =>
+ x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq)
+ case ObjectType(cls) if
classOf[java.util.List[_]].isAssignableFrom(cls) =>
+ x =>
executeFuncOnCollection(x.asInstanceOf[java.util.List[_]].asScala)
+ case ObjectType(cls) if cls == classOf[Object] =>
+ (inputCollection) => {
+ if (inputCollection.getClass.isArray) {
--- End diff --
(I am sorry for sounding like a broken record) But can we move this check
out of the the function closure?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]