HeartSaVioR commented on code in PR #44736:
URL: https://github.com/apache/spark/pull/44736#discussion_r1452117819


##########
python/pyspark/sql/tests/connect/streaming/test_parity_listener.py:
##########
@@ -26,77 +26,102 @@
 from pyspark.testing.connectutils import ReusedConnectTestCase
 
 
-class TestListener(StreamingQueryListener):
+# V1: Initial interface of StreamingQueryListener containing methods 
`onQueryStarted`,
+# `onQueryProgress`, `onQueryTerminated`. It is prior to Spark 3.5.
+class TestListenerV1(StreamingQueryListener):
     def onQueryStarted(self, event):
         e = pyspark.cloudpickle.dumps(event)
         df = self.spark.createDataFrame(data=[(e,)])
-        df.write.mode("append").saveAsTable("listener_start_events")
+        df.write.mode("append").saveAsTable("listener_start_events_v1")
 
     def onQueryProgress(self, event):
         e = pyspark.cloudpickle.dumps(event)
         df = self.spark.createDataFrame(data=[(e,)])
-        df.write.mode("append").saveAsTable("listener_progress_events")
+        df.write.mode("append").saveAsTable("listener_progress_events_v1")
+
+    def onQueryTerminated(self, event):
+        e = pyspark.cloudpickle.dumps(event)
+        df = self.spark.createDataFrame(data=[(e,)])
+        df.write.mode("append").saveAsTable("listener_terminated_events_v1")
+
+
+# V2: The interface after the method `onQueryIdle` is added. It is Spark 3.5+.
+class TestListenerV2(StreamingQueryListener):
+    def onQueryStarted(self, event):
+        e = pyspark.cloudpickle.dumps(event)
+        df = self.spark.createDataFrame(data=[(e,)])
+        df.write.mode("append").saveAsTable("listener_start_events_v2")
+
+    def onQueryProgress(self, event):
+        e = pyspark.cloudpickle.dumps(event)
+        df = self.spark.createDataFrame(data=[(e,)])
+        df.write.mode("append").saveAsTable("listener_progress_events_v2")
 
     def onQueryIdle(self, event):
         pass
 
     def onQueryTerminated(self, event):
         e = pyspark.cloudpickle.dumps(event)
         df = self.spark.createDataFrame(data=[(e,)])
-        df.write.mode("append").saveAsTable("listener_terminated_events")
+        df.write.mode("append").saveAsTable("listener_terminated_events_v2")
 
 
 class StreamingListenerParityTests(StreamingListenerTestsMixin, 
ReusedConnectTestCase):
     def test_listener_events(self):
-        test_listener = TestListener()
-
-        try:
-            self.spark.streams.addListener(test_listener)
-
-            # This ensures the read socket on the server won't crash (i.e. 
because of timeout)
-            # when there hasn't been a new event for a long time
-            time.sleep(30)
-
-            df = self.spark.readStream.format("rate").option("rowsPerSecond", 
10).load()
-            df_observe = df.observe("my_event", count(lit(1)).alias("rc"))
-            df_stateful = df_observe.groupBy().count()  # make query stateful
-            q = (
-                df_stateful.writeStream.format("noop")
-                .queryName("test")
-                .outputMode("complete")
-                .start()
-            )
-
-            self.assertTrue(q.isActive)
-            # ensure at least one batch is ran
-            while q.lastProgress is None or q.lastProgress["batchId"] == 0:
-                time.sleep(5)
-            q.stop()
-            self.assertFalse(q.isActive)
-
-            time.sleep(60)  # Sleep to make sure listener_terminated_events is 
written successfully
-
-            start_event = pyspark.cloudpickle.loads(
-                self.spark.read.table("listener_start_events").collect()[0][0]
-            )
-
-            progress_event = pyspark.cloudpickle.loads(
-                
self.spark.read.table("listener_progress_events").collect()[0][0]
-            )
-
-            terminated_event = pyspark.cloudpickle.loads(
-                
self.spark.read.table("listener_terminated_events").collect()[0][0]
-            )
-
-            self.check_start_event(start_event)
-            self.check_progress_event(progress_event)
-            self.check_terminated_event(terminated_event)
-
-        finally:
-            self.spark.streams.removeListener(test_listener)
-
-            # Remove again to verify this won't throw any error
-            self.spark.streams.removeListener(test_listener)
+        def verify(test_listener, table_postfix):

Review Comment:
   This is just an indentation change, with receiving test_listener from 
outside of the method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to