This is an automated email from the ASF dual-hosted git repository. xinrong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 501999a834ea [SPARK-47276][PYTHON][CONNECT] Introduce `spark.profile.clear` for SparkSession-based profiling 501999a834ea is described below commit 501999a834ea7761a792b823c543e40fba84231d Author: Xinrong Meng <xinr...@apache.org> AuthorDate: Thu Mar 7 13:20:39 2024 -0800 [SPARK-47276][PYTHON][CONNECT] Introduce `spark.profile.clear` for SparkSession-based profiling ### What changes were proposed in this pull request? Introduce `spark.profile.clear` for SparkSession-based profiling. ### Why are the changes needed? A straightforward and unified interface for managing and resetting profiling results for SparkSession-based profilers. ### Does this PR introduce _any_ user-facing change? Yes. `spark.profile.clear` is supported as shown below. Preparation: ```py >>> from pyspark.sql.functions import pandas_udf >>> df = spark.range(3) >>> pandas_udf("long") ... def add1(x): ... return x + 1 ... >>> added = df.select(add1("id")) >>> spark.conf.set("spark.sql.pyspark.udf.profiler", "perf") >>> added.show() +--------+ |add1(id)| +--------+ ... +--------+ >>> spark.profile.show() ============================================================ Profile of UDF<id=2> ============================================================ 1410 function calls (1374 primitive calls) in 0.004 seconds ... ``` Example usage: ```py >>> spark.profile.profiler_collector._profile_results {2: (<pstats.Stats object at 0x7ff6484d22e0>, None)} >>> spark.profile.clear(1) # id mismatch >>> spark.profile.profiler_collector._profile_results {2: (<pstats.Stats object at 0x7ff6484d22e0>, None)} >>> spark.profile.clear(type="memory") # type mismatch >>> spark.profile.profiler_collector._profile_results {2: (<pstats.Stats object at 0x7ff6484d22e0>, None)} >>> spark.profile.clear() # clear all >>> spark.profile.profiler_collector._profile_results {} >>> spark.profile.show() >>> ``` ### How was this patch tested? Unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45378 from xinrong-meng/profile_clear. Authored-by: Xinrong Meng <xinr...@apache.org> Signed-off-by: Xinrong Meng <xinr...@apache.org> --- python/pyspark/sql/profiler.py | 79 +++++++++++++++++++++++++++ python/pyspark/sql/tests/test_session.py | 27 +++++++++ python/pyspark/sql/tests/test_udf_profiler.py | 26 +++++++++ python/pyspark/tests/test_memory_profiler.py | 59 ++++++++++++++++++++ 4 files changed, 191 insertions(+) diff --git a/python/pyspark/sql/profiler.py b/python/pyspark/sql/profiler.py index 5ab27bce2582..711e39de4723 100644 --- a/python/pyspark/sql/profiler.py +++ b/python/pyspark/sql/profiler.py @@ -224,6 +224,56 @@ class ProfilerCollector(ABC): for id in sorted(code_map.keys()): dump(id) + def clear_perf_profiles(self, id: Optional[int] = None) -> None: + """ + Clear the perf profile results. + + .. versionadded:: 4.0.0 + + Parameters + ---------- + id : int, optional + The UDF ID whose profiling results should be cleared. + If not specified, all the results will be cleared. + """ + with self._lock: + if id is not None: + if id in self._profile_results: + perf, mem, *_ = self._profile_results[id] + self._profile_results[id] = (None, mem, *_) + if mem is None: + self._profile_results.pop(id, None) + else: + for id, (perf, mem, *_) in list(self._profile_results.items()): + self._profile_results[id] = (None, mem, *_) + if mem is None: + self._profile_results.pop(id, None) + + def clear_memory_profiles(self, id: Optional[int] = None) -> None: + """ + Clear the memory profile results. + + .. versionadded:: 4.0.0 + + Parameters + ---------- + id : int, optional + The UDF ID whose profiling results should be cleared. + If not specified, all the results will be cleared. + """ + with self._lock: + if id is not None: + if id in self._profile_results: + perf, mem, *_ = self._profile_results[id] + self._profile_results[id] = (perf, None, *_) + if perf is None: + self._profile_results.pop(id, None) + else: + for id, (perf, mem, *_) in list(self._profile_results.items()): + self._profile_results[id] = (perf, None, *_) + if perf is None: + self._profile_results.pop(id, None) + class AccumulatorProfilerCollector(ProfilerCollector): def __init__(self) -> None: @@ -309,3 +359,32 @@ class Profile: "allowed_values": str(["perf", "memory"]), }, ) + + def clear(self, id: Optional[int] = None, *, type: Optional[str] = None) -> None: + """ + Clear the profile results. + + .. versionadded:: 4.0.0 + + Parameters + ---------- + id : int, optional + The UDF ID whose profiling results should be cleared. + If not specified, all the results will be cleared. + type : str, optional + The profiler type to clear results for, which can be either "perf" or "memory". + """ + if type == "memory": + self.profiler_collector.clear_memory_profiles(id) + elif type == "perf" or type is None: + self.profiler_collector.clear_perf_profiles(id) + if type is None: # Clear both perf and memory profiles + self.profiler_collector.clear_memory_profiles(id) + else: + raise PySparkValueError( + error_class="VALUE_NOT_ALLOWED", + message_parameters={ + "arg_name": "type", + "allowed_values": str(["perf", "memory"]), + }, + ) diff --git a/python/pyspark/sql/tests/test_session.py b/python/pyspark/sql/tests/test_session.py index b95e9de9e3f3..5f102d770c6a 100644 --- a/python/pyspark/sql/tests/test_session.py +++ b/python/pyspark/sql/tests/test_session.py @@ -531,6 +531,33 @@ class SparkSessionProfileTests(unittest.TestCase, PySparkErrorTestUtils): }, ) + def test_clear_memory_type(self): + self.profile.clear(type="memory") + self.profiler_collector_mock.clear_memory_profiles.assert_called_once() + self.profiler_collector_mock.clear_perf_profiles.assert_not_called() + + def test_clear_perf_type(self): + self.profile.clear(type="perf") + self.profiler_collector_mock.clear_perf_profiles.assert_called_once() + self.profiler_collector_mock.clear_memory_profiles.assert_not_called() + + def test_clear_no_type(self): + self.profile.clear() + self.profiler_collector_mock.clear_perf_profiles.assert_called_once() + self.profiler_collector_mock.clear_memory_profiles.assert_called_once() + + def test_clear_invalid_type(self): + with self.assertRaises(PySparkValueError) as e: + self.profile.clear(type="invalid") + self.check_error( + exception=e.exception, + error_class="VALUE_NOT_ALLOWED", + message_parameters={ + "arg_name": "type", + "allowed_values": str(["perf", "memory"]), + }, + ) + class SparkExtensionsTest(unittest.TestCase): # These tests are separate because it uses 'spark.sql.extensions' which is diff --git a/python/pyspark/sql/tests/test_udf_profiler.py b/python/pyspark/sql/tests/test_udf_profiler.py index 557b4daa8550..a66503bc0213 100644 --- a/python/pyspark/sql/tests/test_udf_profiler.py +++ b/python/pyspark/sql/tests/test_udf_profiler.py @@ -521,6 +521,32 @@ class UDFProfiler2TestsMixin: io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}" ) + def test_perf_profiler_clear(self): + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): + _do_computation(self.spark) + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + + for id in self.profile_results: + self.spark.profile.clear(id) + self.assertNotIn(id, self.profile_results) + self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): + _do_computation(self.spark) + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + + self.spark.profile.clear(type="memory") + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + self.spark.profile.clear(type="perf") + self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): + _do_computation(self.spark) + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + + self.spark.profile.clear() + self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + class UDFProfiler2Tests(UDFProfiler2TestsMixin, ReusedSQLTestCase): def setUp(self) -> None: diff --git a/python/pyspark/tests/test_memory_profiler.py b/python/pyspark/tests/test_memory_profiler.py index f0abdd03e243..046dd3621c42 100644 --- a/python/pyspark/tests/test_memory_profiler.py +++ b/python/pyspark/tests/test_memory_profiler.py @@ -221,6 +221,10 @@ class MemoryProfiler2TestsMixin: def profile_results(self): return self.spark._profiler_collector._memory_profile_results + @property + def perf_profile_results(self): + return self.spark._profiler_collector._perf_profile_results + def test_memory_profiler_udf(self): _do_computation(self.spark) @@ -571,6 +575,61 @@ class MemoryProfiler2TestsMixin: io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}" ) + def test_memory_profiler_clear(self): + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): + _do_computation(self.spark) + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + + for id in list(self.profile_results.keys()): + self.spark.profile.clear(id) + self.assertNotIn(id, self.profile_results) + self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): + _do_computation(self.spark) + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + + self.spark.profile.clear(type="perf") + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + self.spark.profile.clear(type="memory") + self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): + _do_computation(self.spark) + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + + self.spark.profile.clear() + self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + + def test_profilers_clear(self): + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): + _do_computation(self.spark) + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): + _do_computation(self.spark) + + self.assertEqual(3, len(self.profile_results), str(list(self.profile_results))) + + # clear a specific memory profile + some_id = next(iter(self.profile_results)) + self.spark.profile.clear(some_id, type="memory") + self.assertEqual(2, len(self.profile_results), str(list(self.profile_results))) + self.assertEqual(3, len(self.perf_profile_results), str(list(self.perf_profile_results))) + + # clear a specific perf profile + some_id = next(iter(self.perf_profile_results)) + self.spark.profile.clear(some_id, type="perf") + self.assertEqual(2, len(self.perf_profile_results), str(list(self.perf_profile_results))) + self.assertEqual(2, len(self.profile_results), str(list(self.profile_results))) + + # clear all memory profiles + self.spark.profile.clear(type="memory") + self.assertEqual(0, len(self.profile_results), str(list(self.profile_results))) + self.assertEqual(2, len(self.perf_profile_results), str(list(self.perf_profile_results))) + + # clear all perf profiles + self.spark.profile.clear(type="perf") + self.assertEqual(0, len(self.perf_profile_results), str(list(self.perf_profile_results))) + class MemoryProfiler2Tests(MemoryProfiler2TestsMixin, ReusedSQLTestCase): def setUp(self) -> None: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org