This is an automated email from the ASF dual-hosted git repository.

xinrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 501999a834ea [SPARK-47276][PYTHON][CONNECT] Introduce 
`spark.profile.clear` for SparkSession-based profiling
501999a834ea is described below

commit 501999a834ea7761a792b823c543e40fba84231d
Author: Xinrong Meng <xinr...@apache.org>
AuthorDate: Thu Mar 7 13:20:39 2024 -0800

    [SPARK-47276][PYTHON][CONNECT] Introduce `spark.profile.clear` for 
SparkSession-based profiling
    
    ### What changes were proposed in this pull request?
    Introduce `spark.profile.clear` for SparkSession-based profiling.
    
    ### Why are the changes needed?
    A straightforward and unified interface for managing and resetting 
profiling results for SparkSession-based profilers.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. `spark.profile.clear` is supported as shown below.
    
    Preparation:
    ```py
    >>> from pyspark.sql.functions import pandas_udf
    >>> df = spark.range(3)
    >>> pandas_udf("long")
    ... def add1(x):
    ...   return x + 1
    ...
    >>> added = df.select(add1("id"))
    >>> spark.conf.set("spark.sql.pyspark.udf.profiler", "perf")
    >>> added.show()
    +--------+
    |add1(id)|
    +--------+
    ...
    +--------+
    >>> spark.profile.show()
    ============================================================
    Profile of UDF<id=2>
    ============================================================
             1410 function calls (1374 primitive calls) in 0.004 seconds
    ...
    ```
    
    Example usage:
    ```py
    >>> spark.profile.profiler_collector._profile_results
    {2: (<pstats.Stats object at 0x7ff6484d22e0>, None)}
    
    >>> spark.profile.clear(1)  # id mismatch
    >>> spark.profile.profiler_collector._profile_results
    {2: (<pstats.Stats object at 0x7ff6484d22e0>, None)}
    
    >>> spark.profile.clear(type="memory")  # type mismatch
    >>> spark.profile.profiler_collector._profile_results
    {2: (<pstats.Stats object at 0x7ff6484d22e0>, None)}
    
    >>> spark.profile.clear()  # clear all
    >>> spark.profile.profiler_collector._profile_results
    {}
    >>> spark.profile.show()
    >>>
    ```
    
    ### How was this patch tested?
    Unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #45378 from xinrong-meng/profile_clear.
    
    Authored-by: Xinrong Meng <xinr...@apache.org>
    Signed-off-by: Xinrong Meng <xinr...@apache.org>
---
 python/pyspark/sql/profiler.py                | 79 +++++++++++++++++++++++++++
 python/pyspark/sql/tests/test_session.py      | 27 +++++++++
 python/pyspark/sql/tests/test_udf_profiler.py | 26 +++++++++
 python/pyspark/tests/test_memory_profiler.py  | 59 ++++++++++++++++++++
 4 files changed, 191 insertions(+)

diff --git a/python/pyspark/sql/profiler.py b/python/pyspark/sql/profiler.py
index 5ab27bce2582..711e39de4723 100644
--- a/python/pyspark/sql/profiler.py
+++ b/python/pyspark/sql/profiler.py
@@ -224,6 +224,56 @@ class ProfilerCollector(ABC):
             for id in sorted(code_map.keys()):
                 dump(id)
 
+    def clear_perf_profiles(self, id: Optional[int] = None) -> None:
+        """
+        Clear the perf profile results.
+
+        .. versionadded:: 4.0.0
+
+        Parameters
+        ----------
+        id : int, optional
+            The UDF ID whose profiling results should be cleared.
+            If not specified, all the results will be cleared.
+        """
+        with self._lock:
+            if id is not None:
+                if id in self._profile_results:
+                    perf, mem, *_ = self._profile_results[id]
+                    self._profile_results[id] = (None, mem, *_)
+                    if mem is None:
+                        self._profile_results.pop(id, None)
+            else:
+                for id, (perf, mem, *_) in list(self._profile_results.items()):
+                    self._profile_results[id] = (None, mem, *_)
+                    if mem is None:
+                        self._profile_results.pop(id, None)
+
+    def clear_memory_profiles(self, id: Optional[int] = None) -> None:
+        """
+        Clear the memory profile results.
+
+        .. versionadded:: 4.0.0
+
+        Parameters
+        ----------
+        id : int, optional
+            The UDF ID whose profiling results should be cleared.
+            If not specified, all the results will be cleared.
+        """
+        with self._lock:
+            if id is not None:
+                if id in self._profile_results:
+                    perf, mem, *_ = self._profile_results[id]
+                    self._profile_results[id] = (perf, None, *_)
+                    if perf is None:
+                        self._profile_results.pop(id, None)
+            else:
+                for id, (perf, mem, *_) in list(self._profile_results.items()):
+                    self._profile_results[id] = (perf, None, *_)
+                    if perf is None:
+                        self._profile_results.pop(id, None)
+
 
 class AccumulatorProfilerCollector(ProfilerCollector):
     def __init__(self) -> None:
@@ -309,3 +359,32 @@ class Profile:
                     "allowed_values": str(["perf", "memory"]),
                 },
             )
+
+    def clear(self, id: Optional[int] = None, *, type: Optional[str] = None) 
-> None:
+        """
+        Clear the profile results.
+
+        .. versionadded:: 4.0.0
+
+        Parameters
+        ----------
+        id : int, optional
+            The UDF ID whose profiling results should be cleared.
+            If not specified, all the results will be cleared.
+        type : str, optional
+            The profiler type to clear results for, which can be either "perf" 
or "memory".
+        """
+        if type == "memory":
+            self.profiler_collector.clear_memory_profiles(id)
+        elif type == "perf" or type is None:
+            self.profiler_collector.clear_perf_profiles(id)
+            if type is None:  # Clear both perf and memory profiles
+                self.profiler_collector.clear_memory_profiles(id)
+        else:
+            raise PySparkValueError(
+                error_class="VALUE_NOT_ALLOWED",
+                message_parameters={
+                    "arg_name": "type",
+                    "allowed_values": str(["perf", "memory"]),
+                },
+            )
diff --git a/python/pyspark/sql/tests/test_session.py 
b/python/pyspark/sql/tests/test_session.py
index b95e9de9e3f3..5f102d770c6a 100644
--- a/python/pyspark/sql/tests/test_session.py
+++ b/python/pyspark/sql/tests/test_session.py
@@ -531,6 +531,33 @@ class SparkSessionProfileTests(unittest.TestCase, 
PySparkErrorTestUtils):
             },
         )
 
+    def test_clear_memory_type(self):
+        self.profile.clear(type="memory")
+        self.profiler_collector_mock.clear_memory_profiles.assert_called_once()
+        self.profiler_collector_mock.clear_perf_profiles.assert_not_called()
+
+    def test_clear_perf_type(self):
+        self.profile.clear(type="perf")
+        self.profiler_collector_mock.clear_perf_profiles.assert_called_once()
+        self.profiler_collector_mock.clear_memory_profiles.assert_not_called()
+
+    def test_clear_no_type(self):
+        self.profile.clear()
+        self.profiler_collector_mock.clear_perf_profiles.assert_called_once()
+        self.profiler_collector_mock.clear_memory_profiles.assert_called_once()
+
+    def test_clear_invalid_type(self):
+        with self.assertRaises(PySparkValueError) as e:
+            self.profile.clear(type="invalid")
+        self.check_error(
+            exception=e.exception,
+            error_class="VALUE_NOT_ALLOWED",
+            message_parameters={
+                "arg_name": "type",
+                "allowed_values": str(["perf", "memory"]),
+            },
+        )
+
 
 class SparkExtensionsTest(unittest.TestCase):
     # These tests are separate because it uses 'spark.sql.extensions' which is
diff --git a/python/pyspark/sql/tests/test_udf_profiler.py 
b/python/pyspark/sql/tests/test_udf_profiler.py
index 557b4daa8550..a66503bc0213 100644
--- a/python/pyspark/sql/tests/test_udf_profiler.py
+++ b/python/pyspark/sql/tests/test_udf_profiler.py
@@ -521,6 +521,32 @@ class UDFProfiler2TestsMixin:
                 io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
             )
 
+    def test_perf_profiler_clear(self):
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            _do_computation(self.spark)
+        self.assertEqual(3, len(self.profile_results), 
str(list(self.profile_results)))
+
+        for id in self.profile_results:
+            self.spark.profile.clear(id)
+            self.assertNotIn(id, self.profile_results)
+        self.assertEqual(0, len(self.profile_results), 
str(list(self.profile_results)))
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            _do_computation(self.spark)
+        self.assertEqual(3, len(self.profile_results), 
str(list(self.profile_results)))
+
+        self.spark.profile.clear(type="memory")
+        self.assertEqual(3, len(self.profile_results), 
str(list(self.profile_results)))
+        self.spark.profile.clear(type="perf")
+        self.assertEqual(0, len(self.profile_results), 
str(list(self.profile_results)))
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            _do_computation(self.spark)
+        self.assertEqual(3, len(self.profile_results), 
str(list(self.profile_results)))
+
+        self.spark.profile.clear()
+        self.assertEqual(0, len(self.profile_results), 
str(list(self.profile_results)))
+
 
 class UDFProfiler2Tests(UDFProfiler2TestsMixin, ReusedSQLTestCase):
     def setUp(self) -> None:
diff --git a/python/pyspark/tests/test_memory_profiler.py 
b/python/pyspark/tests/test_memory_profiler.py
index f0abdd03e243..046dd3621c42 100644
--- a/python/pyspark/tests/test_memory_profiler.py
+++ b/python/pyspark/tests/test_memory_profiler.py
@@ -221,6 +221,10 @@ class MemoryProfiler2TestsMixin:
     def profile_results(self):
         return self.spark._profiler_collector._memory_profile_results
 
+    @property
+    def perf_profile_results(self):
+        return self.spark._profiler_collector._perf_profile_results
+
     def test_memory_profiler_udf(self):
         _do_computation(self.spark)
 
@@ -571,6 +575,61 @@ class MemoryProfiler2TestsMixin:
                 io.getvalue(), 
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
             )
 
+    def test_memory_profiler_clear(self):
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+            _do_computation(self.spark)
+        self.assertEqual(3, len(self.profile_results), 
str(list(self.profile_results)))
+
+        for id in list(self.profile_results.keys()):
+            self.spark.profile.clear(id)
+            self.assertNotIn(id, self.profile_results)
+        self.assertEqual(0, len(self.profile_results), 
str(list(self.profile_results)))
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+            _do_computation(self.spark)
+        self.assertEqual(3, len(self.profile_results), 
str(list(self.profile_results)))
+
+        self.spark.profile.clear(type="perf")
+        self.assertEqual(3, len(self.profile_results), 
str(list(self.profile_results)))
+        self.spark.profile.clear(type="memory")
+        self.assertEqual(0, len(self.profile_results), 
str(list(self.profile_results)))
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+            _do_computation(self.spark)
+        self.assertEqual(3, len(self.profile_results), 
str(list(self.profile_results)))
+
+        self.spark.profile.clear()
+        self.assertEqual(0, len(self.profile_results), 
str(list(self.profile_results)))
+
+    def test_profilers_clear(self):
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+            _do_computation(self.spark)
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            _do_computation(self.spark)
+
+        self.assertEqual(3, len(self.profile_results), 
str(list(self.profile_results)))
+
+        # clear a specific memory profile
+        some_id = next(iter(self.profile_results))
+        self.spark.profile.clear(some_id, type="memory")
+        self.assertEqual(2, len(self.profile_results), 
str(list(self.profile_results)))
+        self.assertEqual(3, len(self.perf_profile_results), 
str(list(self.perf_profile_results)))
+
+        # clear a specific perf profile
+        some_id = next(iter(self.perf_profile_results))
+        self.spark.profile.clear(some_id, type="perf")
+        self.assertEqual(2, len(self.perf_profile_results), 
str(list(self.perf_profile_results)))
+        self.assertEqual(2, len(self.profile_results), 
str(list(self.profile_results)))
+
+        # clear all memory profiles
+        self.spark.profile.clear(type="memory")
+        self.assertEqual(0, len(self.profile_results), 
str(list(self.profile_results)))
+        self.assertEqual(2, len(self.perf_profile_results), 
str(list(self.perf_profile_results)))
+
+        # clear all perf profiles
+        self.spark.profile.clear(type="perf")
+        self.assertEqual(0, len(self.perf_profile_results), 
str(list(self.perf_profile_results)))
+
 
 class MemoryProfiler2Tests(MemoryProfiler2TestsMixin, ReusedSQLTestCase):
     def setUp(self) -> None:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to