Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/20771#discussion_r173616476
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
---
@@ -599,8 +610,79 @@ case class MapObjects private(
override def children: Seq[Expression] = lambdaFunction :: inputData ::
Nil
- override def eval(input: InternalRow): Any =
- throw new UnsupportedOperationException("Only code-generated
evaluation is supported")
+ // The data with UserDefinedType are actually stored with the data type
of its sqlType.
+ // When we want to apply MapObjects on it, we have to use it.
+ lazy private val inputDataType = inputData.dataType match {
+ case u: UserDefinedType[_] => u.sqlType
+ case _ => inputData.dataType
+ }
+
+ private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = {
+ inputCollection.map { element =>
+ val row = InternalRow.fromSeq(Seq(element))
+ lambdaFunction.eval(row)
+ }
+ }
+
+ // Executes lambda function on input collection.
+ private lazy val executeFunc: Any => Seq[_] = inputDataType match {
+ case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
+ x => executeFuncOnCollection(x.asInstanceOf[Seq[_]])
+ case ObjectType(cls) if cls.isArray =>
+ x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq)
+ case ObjectType(cls) if
classOf[java.util.List[_]].isAssignableFrom(cls) =>
+ x =>
executeFuncOnCollection(x.asInstanceOf[java.util.List[_]].asScala)
+ case ObjectType(cls) if cls == classOf[Object] =>
+ (inputCollection) => {
+ if (inputCollection.getClass.isArray) {
+
executeFuncOnCollection(inputCollection.asInstanceOf[Array[_]].toSeq)
+ } else {
+ executeFuncOnCollection(inputCollection.asInstanceOf[Seq[_]])
+ }
+ }
+ case ArrayType(et, _) =>
+ x => executeFuncOnCollection(x.asInstanceOf[ArrayData].array)
+ }
+
+ // Converts the processed collection to custom collection class if any.
+ private lazy val getResults: Seq[_] => Any = customCollectionCls match {
+ case Some(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
+ // Scala sequence
+ _.toSeq
+ case Some(cls) if
classOf[scala.collection.Set[_]].isAssignableFrom(cls) =>
+ // Scala set
+ _.toSet
+ case Some(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) =>
+ // Java list
+ if (cls == classOf[java.util.List[_]] || cls ==
classOf[java.util.AbstractList[_]] ||
+ cls == classOf[java.util.AbstractSequentialList[_]]) {
+ _.asJava
+ } else {
+ (results) => {
+ val builder = Try(cls.getConstructor(Integer.TYPE)).map {
constructor =>
--- End diff --
Not sure if I understand correctly. Please check update again.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]