This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a80579bbcf74 [SPARK-48002][PYTHON][SS] Add test for observed metrics 
in PySpark StreamingQueryListener
a80579bbcf74 is described below

commit a80579bbcf74c7bcfe60cb6d74a68d4c1574c14f
Author: Wei Liu <wei....@databricks.com>
AuthorDate: Sun Apr 28 17:46:42 2024 +0900

    [SPARK-48002][PYTHON][SS] Add test for observed metrics in PySpark 
StreamingQueryListener
    
    ### What changes were proposed in this pull request?
    
    Following this doc test revisit PR 
https://github.com/apache/spark/pull/46189, for extra safety, add a unit test 
that verify observed metrics works for StreamingQueryListeners for both classic 
spark and spark connect.
    
    ### Why are the changes needed?
    
    Additional test coverage
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Test only addition
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #46237 from WweiL/test-observed-metrics.
    
    Authored-by: Wei Liu <wei....@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../sql/tests/streaming/test_streaming_listener.py | 46 ++++++++++++++++++++++
 1 file changed, 46 insertions(+)

diff --git a/python/pyspark/sql/tests/streaming/test_streaming_listener.py 
b/python/pyspark/sql/tests/streaming/test_streaming_listener.py
index 1920f8255744..9e4325e3c6ab 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming_listener.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming_listener.py
@@ -30,6 +30,7 @@ from pyspark.sql.streaming.listener import (
     StateOperatorProgress,
     StreamingQueryProgress,
 )
+from pyspark.sql.functions import count, col, lit
 from pyspark.testing.sqlutils import ReusedSQLTestCase
 
 
@@ -193,6 +194,51 @@ class StreamingListenerTestsMixin:
         self.assertTrue(isinstance(progress.numOutputRows, int))
         self.assertTrue(isinstance(progress.metrics, dict))
 
+    # This is a generic test work for both classic Spark and Spark Connect
+    def test_listener_observed_metrics(self):
+        class MyErrorListener(StreamingQueryListener):
+            def __init__(self):
+                self.num_rows = -1
+                self.num_error_rows = -1
+
+            def onQueryStarted(self, event):
+                pass
+
+            def onQueryProgress(self, event):
+                row = event.progress.observedMetrics.get("my_event")
+                # Save observed metrics for later verification
+                self.num_rows = row["rc"]
+                self.num_error_rows = row["erc"]
+
+            def onQueryIdle(self, event):
+                pass
+
+            def onQueryTerminated(self, event):
+                pass
+
+        try:
+            error_listener = MyErrorListener()
+            self.spark.streams.addListener(error_listener)
+
+            sdf = 
self.spark.readStream.format("rate").load().withColumn("error", col("value"))
+
+            # Observe row count (rc) and error row count (erc) in the 
streaming Dataset
+            observed_ds = sdf.observe(
+                "my_event", count(lit(1)).alias("rc"), 
count(col("error")).alias("erc")
+            )
+
+            q = observed_ds.writeStream.format("console").start()
+
+            while q.lastProgress is None or q.lastProgress["batchId"] == 0:
+                q.awaitTermination(0.5)
+
+            self.assertTrue(error_listener.num_rows > 0)
+            self.assertTrue(error_listener.num_error_rows > 0)
+
+        finally:
+            q.stop()
+            self.spark.streams.removeListener(error_listener)
+
 
 class StreamingListenerTests(StreamingListenerTestsMixin, ReusedSQLTestCase):
     def test_number_of_public_methods(self):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to