cloud-fan commented on a change in pull request #28645:
URL: https://github.com/apache/spark/pull/28645#discussion_r430521074



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
##########
@@ -103,17 +102,31 @@ case class ScalaUDF(
     }
   }
 
-  private def createToScalaConverter(i: Int, dataType: DataType): Any => Any = 
{
+  private def scalaConverter(i: Int, dataType: DataType): Any => Any = {
     if (inputEncoders.isEmpty) {
       // for untyped Scala UDF
-      CatalystTypeConverters.createToScalaConverter(dataType)
+      createToScalaConverter(dataType)
+    } else if (isPrimitive(dataType)) {
+      identity
     } else {
       val encoder = inputEncoders(i)
-      if (encoder.isDefined && encoder.get.isSerializedAsStructForTopLevel) {
-        val fromRow = encoder.get.resolveAndBind().createDeserializer()
-        row: Any => fromRow(row.asInstanceOf[InternalRow])
-      } else {
-        CatalystTypeConverters.createToScalaConverter(dataType)
+      encoder match {
+        case Some(enc) =>
+          if (enc.isSerializedAsStructForTopLevel) {
+            val fromRow = enc.resolveAndBind().createDeserializer()
+            row: Any => fromRow(row.asInstanceOf[InternalRow])
+          } else {
+            val child = children(i)
+            val attrs = new StructType().add(s"$child", 
child.dataType).toAttributes

Review comment:
       `child.toString` can be expensive. how about `"child"`? The name doesn't 
matter anyway.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
##########
@@ -103,17 +102,31 @@ case class ScalaUDF(
     }
   }
 
-  private def createToScalaConverter(i: Int, dataType: DataType): Any => Any = 
{
+  private def scalaConverter(i: Int, dataType: DataType): Any => Any = {
     if (inputEncoders.isEmpty) {
       // for untyped Scala UDF
-      CatalystTypeConverters.createToScalaConverter(dataType)
+      createToScalaConverter(dataType)
+    } else if (isPrimitive(dataType)) {
+      identity
     } else {
       val encoder = inputEncoders(i)
-      if (encoder.isDefined && encoder.get.isSerializedAsStructForTopLevel) {
-        val fromRow = encoder.get.resolveAndBind().createDeserializer()
-        row: Any => fromRow(row.asInstanceOf[InternalRow])
-      } else {
-        CatalystTypeConverters.createToScalaConverter(dataType)
+      encoder match {
+        case Some(enc) =>
+          if (enc.isSerializedAsStructForTopLevel) {
+            val fromRow = enc.resolveAndBind().createDeserializer()

Review comment:
       to be consistent, shall we bind with 
`child.dataType.asInstanceOf[StructType].toAttributes`?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
##########
@@ -581,4 +581,40 @@ class UDFSuite extends QueryTest with SharedSparkSession {
       .toDF("col1", "col2")
     checkAnswer(df.select(myUdf(Column("col1"), Column("col2"))), Row(2020) :: 
Nil)
   }
+
+  test("case class as element type of Seq/Array") {

Review comment:
       can we also test some special cases:
   1. the catalyst schema has more fields than the case class. e.g. 
`struct<key: int, value: string, col: int>` and `case class TestData(key: Int, 
value: String)`
   2. the fields order doesn't match, e.g. `struct<value: string, key: int>` 
and `case class TestData(key: Int, value: String)`
   3. the catalyst schema has missing fields, e.g. `struct<key: int>` and `case 
class TestData(key: Int, value: String)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to