allisonwang-db commented on code in PR #47253:
URL: https://github.com/apache/spark/pull/47253#discussion_r1672808771


##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -954,6 +954,16 @@
           "The input of <functionName> can't be <dataType> type data."
         ]
       },
+      "UNSUPPORTED_UDF_INPUT_TYPE" : {
+        "message" : [
+          "UDFs do not support <dataType> type input data."
+        ]
+      },
+      "UNSUPPORTED_UDF_OUTPUT_TYPE" : {
+        "message" : [
+          "UDFs do not support <dataType> type output data."

Review Comment:
   ```suggestion
             "UDFs do not support '<dataType>' as an output data type."
   ```



##########
python/pyspark/sql/types.py:
##########
@@ -194,16 +194,7 @@ def fromDDL(cls, ddl: str) -> "DataType":
         >>> DataType.fromDDL("b: string, a: int")
         StructType([StructField('b', StringType(), True), StructField('a', 
IntegerType(), True)])
         """
-        from pyspark.sql import SparkSession
-        from pyspark.sql.functions import udf
-
-        # Intentionally uses SparkSession so one implementation can be shared 
with/without
-        # Spark Connect.
-        schema = (
-            SparkSession.active().range(0).select(udf(lambda x: x, 
returnType=ddl)("id")).schema
-        )
-        assert len(schema) == 1
-        return schema[0].dataType
+        return _parse_datatype_string(ddl)

Review Comment:
   Why is this change needed?



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -954,6 +954,16 @@
           "The input of <functionName> can't be <dataType> type data."
         ]
       },
+      "UNSUPPORTED_UDF_INPUT_TYPE" : {
+        "message" : [
+          "UDFs do not support <dataType> type input data."

Review Comment:
   ```suggestion
             "UDFs do not support '<dataType>' as an input data type."
   ```



##########
python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py:
##########
@@ -748,6 +750,31 @@ def check_vectorized_udf_return_scalar(self):
             with self.assertRaisesRegex(Exception, "Return.*type.*Series"):
                 df.select(f(col("id"))).collect()
 
+    def test_udf_with_variant_input(self):
+        df = self.spark.range(0, 10).selectExpr("parse_json(cast(id as 
string)) v")
+        from pyspark.sql.functions import col
+
+        scalar_f = pandas_udf(lambda u: str(u), StringType())
+        iter_f = pandas_udf(
+            lambda it: map(lambda u: str(u), it), StringType(), 
PandasUDFType.SCALAR_ITER
+        )
+        expectedErrorStr = 'UDFs do not support "VARIANT" type input data'
+        for f in [scalar_f, iter_f]:
+            with self.assertRaisesRegex(AnalysisException, expectedErrorStr):
+                df.select(f(col("v"))).collect()

Review Comment:
   Can we check the error class instead of the error string here?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -63,6 +63,32 @@ trait PythonFuncExpression extends NonSQLExpression with 
UserDefinedExpression {
   override def toString: String = s"$name(${children.mkString(", 
")})#${resultId.id}$typeSuffix"
 
   override def nullable: Boolean = true
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val check = super.checkInputDataTypes()
+    if (check.isFailure) {
+      check
+    } else {
+      val exprReturningVariant = children.collectFirst {
+        case e: Expression if typeContainsVariant(e.dataType) => e
+      }
+      exprReturningVariant match {
+        case Some(e) => TypeCheckResult.DataTypeMismatch(
+          errorSubClass = "UNSUPPORTED_UDF_INPUT_TYPE",
+          messageParameters = Map("dataType" -> s"\"${e.dataType.sql}\""))
+        case None => TypeCheckResult.TypeCheckSuccess
+      }
+    }
+  }
+
+  def typeContainsVariant(dt: DataType): Boolean = dt match {

Review Comment:
   Do we have a helper function for this? This might be needed to check for 
other expressions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to