Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/22305#discussion_r232485348
--- Diff: python/pyspark/sql/tests.py ---
@@ -7064,12 +7098,104 @@ def test_invalid_args(self):
foo_udf = pandas_udf(lambda x: x, 'v double',
PandasUDFType.GROUPED_MAP)
df.withColumn('v2', foo_udf(df['v']).over(w))
- with QuietTest(self.sc):
- with self.assertRaisesRegexp(
- AnalysisException,
- '.*Only unbounded window frame is supported.*'):
- df.withColumn('mean_v', mean_udf(df['v']).over(ow))
+ def test_bounded_simple(self):
+ from pyspark.sql.functions import mean, max, min, count
+
+ df = self.data
+ w1 = self.sliding_row_window
+ w2 = self.shrinking_range_window
+
+ plus_one = self.python_plus_one
+ count_udf = self.pandas_agg_count_udf
+ mean_udf = self.pandas_agg_mean_udf
+ max_udf = self.pandas_agg_max_udf
+ min_udf = self.pandas_agg_min_udf
+
+ result1 = df.withColumn('mean_v',
mean_udf(plus_one(df['v'])).over(w1))\
+ .withColumn('count_v', count_udf(df['v']).over(w2)) \
+ .withColumn('max_v', max_udf(df['v']).over(w2)) \
+ .withColumn('min_v', min_udf(df['v']).over(w1)) \
+
+ expected1 = df.withColumn('mean_v',
mean(plus_one(df['v'])).over(w1))\
+ .withColumn('count_v', count(df['v']).over(w2)) \
+ .withColumn('max_v', max(df['v']).over(w2)) \
+ .withColumn('min_v', min(df['v']).over(w1)) \
+
+ self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
+
+ def test_growing_window(self):
+ from pyspark.sql.functions import mean
+
+ df = self.data
+ w1 = self.growing_row_window
+ w2 = self.growing_range_window
+ mean_udf = self.pandas_agg_mean_udf
+
+ result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \
+ .withColumn('m2', mean_udf(df['v']).over(w2))
+
+ expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \
+ .withColumn('m2', mean(df['v']).over(w2))
+
+ self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
+
+ def test_sliding_window(self):
+ from pyspark.sql.functions import mean
+
+ df = self.data
+ w1 = self.sliding_row_window
+ w2 = self.sliding_range_window
+
+ mean_udf = self.pandas_agg_mean_udf
+
+ result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \
+ .withColumn('m2', mean_udf(df['v']).over(w2))
+
+ expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \
+ .withColumn('m2', mean(df['v']).over(w2))
+
+ result1.show()
+ expected1.show()
--- End diff --
nit: looks a mistake
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]