This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a80579bbcf74 [SPARK-48002][PYTHON][SS] Add test for observed metrics in PySpark StreamingQueryListener a80579bbcf74 is described below commit a80579bbcf74c7bcfe60cb6d74a68d4c1574c14f Author: Wei Liu <wei....@databricks.com> AuthorDate: Sun Apr 28 17:46:42 2024 +0900 [SPARK-48002][PYTHON][SS] Add test for observed metrics in PySpark StreamingQueryListener ### What changes were proposed in this pull request? Following this doc test revisit PR https://github.com/apache/spark/pull/46189, for extra safety, add a unit test that verify observed metrics works for StreamingQueryListeners for both classic spark and spark connect. ### Why are the changes needed? Additional test coverage ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Test only addition ### Was this patch authored or co-authored using generative AI tooling? No Closes #46237 from WweiL/test-observed-metrics. Authored-by: Wei Liu <wei....@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../sql/tests/streaming/test_streaming_listener.py | 46 ++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/python/pyspark/sql/tests/streaming/test_streaming_listener.py b/python/pyspark/sql/tests/streaming/test_streaming_listener.py index 1920f8255744..9e4325e3c6ab 100644 --- a/python/pyspark/sql/tests/streaming/test_streaming_listener.py +++ b/python/pyspark/sql/tests/streaming/test_streaming_listener.py @@ -30,6 +30,7 @@ from pyspark.sql.streaming.listener import ( StateOperatorProgress, StreamingQueryProgress, ) +from pyspark.sql.functions import count, col, lit from pyspark.testing.sqlutils import ReusedSQLTestCase @@ -193,6 +194,51 @@ class StreamingListenerTestsMixin: self.assertTrue(isinstance(progress.numOutputRows, int)) self.assertTrue(isinstance(progress.metrics, dict)) + # This is a generic test work for both classic Spark and Spark Connect + def test_listener_observed_metrics(self): + class MyErrorListener(StreamingQueryListener): + def __init__(self): + self.num_rows = -1 + self.num_error_rows = -1 + + def onQueryStarted(self, event): + pass + + def onQueryProgress(self, event): + row = event.progress.observedMetrics.get("my_event") + # Save observed metrics for later verification + self.num_rows = row["rc"] + self.num_error_rows = row["erc"] + + def onQueryIdle(self, event): + pass + + def onQueryTerminated(self, event): + pass + + try: + error_listener = MyErrorListener() + self.spark.streams.addListener(error_listener) + + sdf = self.spark.readStream.format("rate").load().withColumn("error", col("value")) + + # Observe row count (rc) and error row count (erc) in the streaming Dataset + observed_ds = sdf.observe( + "my_event", count(lit(1)).alias("rc"), count(col("error")).alias("erc") + ) + + q = observed_ds.writeStream.format("console").start() + + while q.lastProgress is None or q.lastProgress["batchId"] == 0: + q.awaitTermination(0.5) + + self.assertTrue(error_listener.num_rows > 0) + self.assertTrue(error_listener.num_error_rows > 0) + + finally: + q.stop() + self.spark.streams.removeListener(error_listener) + class StreamingListenerTests(StreamingListenerTestsMixin, ReusedSQLTestCase): def test_number_of_public_methods(self): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org