richardc-db commented on code in PR #47253:
URL: https://github.com/apache/spark/pull/47253#discussion_r1673072226
##########
python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py:
##########
@@ -748,6 +750,31 @@ def check_vectorized_udf_return_scalar(self):
with self.assertRaisesRegex(Exception, "Return.*type.*Series"):
df.select(f(col("id"))).collect()
+ def test_udf_with_variant_input(self):
+ df = self.spark.range(0, 10).selectExpr("parse_json(cast(id as
string)) v")
+ from pyspark.sql.functions import col
+
+ scalar_f = pandas_udf(lambda u: str(u), StringType())
+ iter_f = pandas_udf(
+ lambda it: map(lambda u: str(u), it), StringType(),
PandasUDFType.SCALAR_ITER
+ )
+ expectedErrorStr = 'UDFs do not support "VARIANT" type input data'
+ for f in [scalar_f, iter_f]:
+ with self.assertRaisesRegex(AnalysisException, expectedErrorStr):
+ df.select(f(col("v"))).collect()
Review Comment:
done!
##########
python/pyspark/sql/types.py:
##########
@@ -194,16 +194,7 @@ def fromDDL(cls, ddl: str) -> "DataType":
>>> DataType.fromDDL("b: string, a: int")
StructType([StructField('b', StringType(), True), StructField('a',
IntegerType(), True)])
"""
- from pyspark.sql import SparkSession
- from pyspark.sql.functions import udf
-
- # Intentionally uses SparkSession so one implementation can be shared
with/without
- # Spark Connect.
- schema = (
- SparkSession.active().range(0).select(udf(lambda x: x,
returnType=ddl)("id")).schema
- )
- assert len(schema) == 1
- return schema[0].dataType
+ return _parse_datatype_string(ddl)
Review Comment:
If we want to do something like `fromDDL("v variant")`, `fromDDL` actually
calls a `udf` just to get the output schema. However, this PR disables variant
output UDFs during planning, so we have to get the data type another way
I've discussed a bit with @HyukjinKwon offline last night - he suggested
that we instead block variant ser/de, however I took a second look today and,
unless I'm missing something, I believe this route is preferable:
- This will fail earlier during planning rather than execution
- Blocking during ser/de may require more code changes, because pandas udfs
don't seem to have the same codepath as non-pandas udfs.
- the `_parse_datatype_string` also was made to work with spark connect
(which was @HyukjinKwon's initial concern) in
[this](https://github.com/apache/spark/commit/fa8aa571ad18441622bb7e3ac66032ab9e7cbc0a)
pr
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]