Github user icexelloss commented on a diff in the pull request:
https://github.com/apache/spark/pull/21650#discussion_r199167215
--- Diff: python/pyspark/sql/tests.py ---
@@ -5060,6 +5049,138 @@ def test_type_annotation(self):
df = self.spark.range(1).select(pandas_udf(f=_locals['noop'],
returnType='bigint')('id'))
self.assertEqual(df.first()[0], 0)
+ def test_mixed_udf(self):
+ import pandas as pd
+ from pyspark.sql.functions import udf, pandas_udf
+
+ df = self.spark.range(0, 1).toDF('v')
+
+ @udf('int')
+ def f1(x):
+ assert type(x) == int
+ return x + 1
+
+ @pandas_udf('int')
+ def f2(x):
+ assert type(x) == pd.Series
+ return x + 10
+
+ @udf('int')
+ def f3(x):
+ assert type(x) == int
+ return x + 100
+
+ @pandas_udf('int')
+ def f4(x):
+ assert type(x) == pd.Series
+ return x + 1000
+
+ # Test mixed udfs in a single projection
+ df1 = df.withColumn('f1', f1(df['v']))
+ df1 = df1.withColumn('f2', f2(df1['v']))
+ df1 = df1.withColumn('f3', f3(df1['v']))
+ df1 = df1.withColumn('f4', f4(df1['v']))
+ df1 = df1.withColumn('f2_f1', f2(df1['f1']))
+ df1 = df1.withColumn('f3_f1', f3(df1['f1']))
+ df1 = df1.withColumn('f4_f1', f4(df1['f1']))
+ df1 = df1.withColumn('f3_f2', f3(df1['f2']))
+ df1 = df1.withColumn('f4_f2', f4(df1['f2']))
+ df1 = df1.withColumn('f4_f3', f4(df1['f3']))
+ df1 = df1.withColumn('f3_f2_f1', f3(df1['f2_f1']))
+ df1 = df1.withColumn('f4_f2_f1', f4(df1['f2_f1']))
+ df1 = df1.withColumn('f4_f3_f1', f4(df1['f3_f1']))
+ df1 = df1.withColumn('f4_f3_f2', f4(df1['f3_f2']))
+ df1 = df1.withColumn('f4_f3_f2_f1', f4(df1['f3_f2_f1']))
+
+ # Test mixed udfs in a single expression
+ df2 = df.withColumn('f1', f1(df['v']))
+ df2 = df2.withColumn('f2', f2(df['v']))
+ df2 = df2.withColumn('f3', f3(df['v']))
+ df2 = df2.withColumn('f4', f4(df['v']))
+ df2 = df2.withColumn('f2_f1', f2(f1(df['v'])))
+ df2 = df2.withColumn('f3_f1', f3(f1(df['v'])))
+ df2 = df2.withColumn('f4_f1', f4(f1(df['v'])))
+ df2 = df2.withColumn('f3_f2', f3(f2(df['v'])))
+ df2 = df2.withColumn('f4_f2', f4(f2(df['v'])))
+ df2 = df2.withColumn('f4_f3', f4(f3(df['v'])))
+ df2 = df2.withColumn('f3_f2_f1', f3(f2(f1(df['v']))))
+ df2 = df2.withColumn('f4_f2_f1', f4(f2(f1(df['v']))))
+ df2 = df2.withColumn('f4_f3_f1', f4(f3(f1(df['v']))))
+ df2 = df2.withColumn('f4_f3_f2', f4(f3(f2(df['v']))))
+ df2 = df2.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(df['v'])))))
+
+ df3 = df.withColumn('f1', df['v'] + 1)
+ df3 = df3.withColumn('f2', df['v'] + 10)
+ df3 = df3.withColumn('f3', df['v'] + 100)
+ df3 = df3.withColumn('f4', df['v'] + 1000)
+ df3 = df3.withColumn('f2_f1', df['v'] + 11)
+ df3 = df3.withColumn('f3_f1', df['v'] + 101)
+ df3 = df3.withColumn('f4_f1', df['v'] + 1001)
+ df3 = df3.withColumn('f3_f2', df['v'] + 110)
+ df3 = df3.withColumn('f4_f2', df['v'] + 1010)
+ df3 = df3.withColumn('f4_f3', df['v'] + 1100)
+ df3 = df3.withColumn('f3_f2_f1', df['v'] + 111)
+ df3 = df3.withColumn('f4_f2_f1', df['v'] + 1011)
+ df3 = df3.withColumn('f4_f3_f1', df['v'] + 1101)
+ df3 = df3.withColumn('f4_f3_f2', df['v'] + 1110)
+ df3 = df3.withColumn('f4_f3_f2_f1', df['v'] + 1111)
+
+ self.assertEquals(df3.collect(), df1.collect())
+ self.assertEquals(df3.collect(), df2.collect())
+
+ def test_mixed_udf_and_sql(self):
+ import pandas as pd
+ from pyspark.sql.functions import udf, pandas_udf
+
+ df = self.spark.range(0, 1).toDF('v')
+
+ @udf('int')
+ def f1(x):
+ assert type(x) == int
+ return x + 1
+
+ def f2(x):
--- End diff --
Yes, the purpose is to test mixing udf, pandas_udf and sql expression. I
will add comments to make it clearer.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]