Github user icexelloss commented on a diff in the pull request:
https://github.com/apache/spark/pull/21082#discussion_r194129021
--- Diff: python/pyspark/sql/tests.py ---
@@ -5181,6 +5190,235 @@ def test_invalid_args(self):
'mixture.*aggregate function.*group aggregate pandas
UDF'):
df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect()
+
[email protected](
+ not _have_pandas or not _have_pyarrow,
+ _pandas_requirement_message or _pyarrow_requirement_message)
+class WindowPandasUDFTests(ReusedSQLTestCase):
+ @property
+ def data(self):
+ from pyspark.sql.functions import array, explode, col, lit
+ return self.spark.range(10).toDF('id') \
+ .withColumn("vs", array([lit(i * 1.0) + col('id') for i in
range(20, 30)])) \
+ .withColumn("v", explode(col('vs'))) \
+ .drop('vs') \
+ .withColumn('w', lit(1.0))
+
+ @property
+ def python_plus_one(self):
+ from pyspark.sql.functions import udf
+ return udf(lambda v: v + 1, 'double')
+
+ @property
+ def pandas_scalar_time_two(self):
+ from pyspark.sql.functions import pandas_udf, PandasUDFType
+ return pandas_udf(lambda v: v * 2, 'double')
+
+ @property
+ def pandas_agg_mean_udf(self):
+ from pyspark.sql.functions import pandas_udf, PandasUDFType
+
+ @pandas_udf('double', PandasUDFType.GROUPED_AGG)
+ def avg(v):
+ return v.mean()
+ return avg
+
+ @property
+ def pandas_agg_max_udf(self):
+ from pyspark.sql.functions import pandas_udf, PandasUDFType
+
+ @pandas_udf('double', PandasUDFType.GROUPED_AGG)
+ def max(v):
+ return v.max()
+ return max
+
+ @property
+ def pandas_agg_min_udf(self):
+ from pyspark.sql.functions import pandas_udf, PandasUDFType
+
+ @pandas_udf('double', PandasUDFType.GROUPED_AGG)
+ def min(v):
+ return v.min()
+ return min
+
+ @property
+ def unbounded_window(self):
+ return Window.partitionBy('id') \
+ .rowsBetween(Window.unboundedPreceding,
Window.unboundedFollowing)
+
+ @property
+ def ordered_window(self):
+ return Window.partitionBy('id').orderBy('v')
+
+ @property
+ def unpartitioned_window(self):
+ return Window.partitionBy()
--- End diff --
I think we can reply on that `Window.partitionBy()` returns unbounded
window here, otherwise there might be too many combinations to test. But I am
ok to add the tests for
`Window,.partitionBy().rowsBetween(Window.unboundedPreceding,
Window.unboundedFollowing)` in addition to the existing ones. WDYT?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]