Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/20771#discussion_r177016698 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -599,8 +610,86 @@ case class MapObjects private( override def children: Seq[Expression] = lambdaFunction :: inputData :: Nil - override def eval(input: InternalRow): Any = - throw new UnsupportedOperationException("Only code-generated evaluation is supported") + // The data with UserDefinedType are actually stored with the data type of its sqlType. + // When we want to apply MapObjects on it, we have to use it. + lazy private val inputDataType = inputData.dataType match { + case u: UserDefinedType[_] => u.sqlType + case _ => inputData.dataType + } + + private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = { + inputCollection.map { element => + val row = InternalRow.fromSeq(Seq(element)) + lambdaFunction.eval(row) + } + } + + // Executes lambda function on input collection. + private lazy val executeFunc: Any => Seq[_] = inputDataType match { + case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) => + x => executeFuncOnCollection(x.asInstanceOf[Seq[_]]) + case ObjectType(cls) if cls.isArray => + x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq) + case ObjectType(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) => + x => executeFuncOnCollection(x.asInstanceOf[java.util.List[_]].asScala) + case ObjectType(cls) if cls == classOf[Object] => + if (cls.isArray) { + x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq) + } else { + x => executeFuncOnCollection(x.asInstanceOf[Seq[_]]) + } + case ArrayType(et, _) => + x => executeFuncOnCollection(x.asInstanceOf[ArrayData].array) + } + + // Converts the processed collection to custom collection class if any. + private lazy val getResults: Seq[_] => Any = customCollectionCls match { + case Some(cls) if classOf[Seq[_]].isAssignableFrom(cls) => + // Scala sequence + identity _ + case Some(cls) if classOf[scala.collection.Set[_]].isAssignableFrom(cls) => + // Scala set + _.toSet + case Some(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) => + // Java list + if (cls == classOf[java.util.List[_]] || cls == classOf[java.util.AbstractList[_]] || + cls == classOf[java.util.AbstractSequentialList[_]]) { + // Specifying non concrete implementations of `java.util.List` + _.asJava + } else { + // Specifying concrete implementations of `java.util.List` + (results) => { + val constructors = cls.getConstructors() --- End diff -- Is there a way we can move the constructor resolution out of the closure. I am fine with some code duplication here :)...
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org