xinrong-meng commented on code in PR #45050:
URL: https://github.com/apache/spark/pull/45050#discussion_r1482315739
##########
python/pyspark/sql/tests/test_udf_profiler.py:
##########
@@ -394,6 +394,129 @@ def min_udf(v: pd.Series) -> float:
io.getvalue(),
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
)
+ @unittest.skipIf(
+ not have_pandas or not have_pyarrow,
+ cast(str, pandas_requirement_message or pyarrow_requirement_message),
+ )
+ def test_perf_profiler_group_apply_in_pandas(self):
+ # FlatMapGroupsInBatchExec
+ df = self.spark.createDataFrame(
+ [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+ )
+
+ def normalize(pdf):
+ v = pdf.v
+ return pdf.assign(v=(v - v.mean()) / v.std())
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+ df.groupby("id").applyInPandas(normalize, schema="id long, v
double").show()
+
+ self.assertEqual(1, len(self.profile_results),
str(self.profile_results.keys()))
+
+ for id in self.profile_results:
+ with self.trap_stdout() as io:
+ self.spark.showPerfProfiles(id)
+
+ self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+ self.assertRegex(
+ io.getvalue(),
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+ )
+
+ @unittest.skipIf(
+ not have_pandas or not have_pyarrow,
+ cast(str, pandas_requirement_message or pyarrow_requirement_message),
+ )
+ def test_perf_profiler_cogroup_apply_in_pandas(self):
+ # FlatMapCoGroupsInBatchExec
+ import pandas as pd
+
+ df1 = self.spark.createDataFrame(
+ [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0),
(20000102, 2, 4.0)],
+ ("time", "id", "v1"),
+ )
+ df2 = self.spark.createDataFrame(
+ [(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2")
+ )
+
+ def asof_join(left, right):
+ return pd.merge_asof(left, right, on="time", by="id")
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+ df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
+ asof_join, schema="time int, id int, v1 double, v2 string"
+ ).show()
+
+ self.assertEqual(1, len(self.profile_results),
str(self.profile_results.keys()))
+
+ for id in self.profile_results:
+ with self.trap_stdout() as io:
+ self.spark.showPerfProfiles(id)
+
+ self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+ self.assertRegex(
+ io.getvalue(),
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+ )
+
+ @unittest.skipIf(
+ not have_pandas or not have_pyarrow,
+ cast(str, pandas_requirement_message or pyarrow_requirement_message),
+ )
+ def test_perf_profiler_group_apply_in_arrow(self):
+ # FlatMapGroupsInBatchExec
+ import pyarrow.compute as pc
+
+ df = self.spark.createDataFrame(
+ [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+ )
+
+ def normalize(table):
+ v = table.column("v")
+ norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1))
+ return table.set_column(1, "v", norm)
+
+ with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+ df.groupby("id").applyInArrow(normalize, schema="id long, v
double").show()
+
+ self.assertEqual(1, len(self.profile_results),
str(self.profile_results.keys()))
+
+ for id in self.profile_results:
+ with self.trap_stdout() as io:
+ self.spark.showPerfProfiles(id)
+
+ self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+ self.assertRegex(
+ io.getvalue(),
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+ )
+
+ @unittest.skipIf(
+ not have_pandas or not have_pyarrow,
+ cast(str, pandas_requirement_message or pyarrow_requirement_message),
+ )
+ def test_perf_profiler_cogroup_apply_in_arrow(self):
+ import pyarrow as pa
+
+ df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2,
4.0)], ("id", "v1"))
+ df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2"))
+
+ def summarize(l, r):
Review Comment:
I was wondering the reason and saw "In some fonts, these characters are
indistinguishable from the numerals one and zero. When tempted to use 'l', use
'L' instead." That's good to learn :p
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]