HyukjinKwon commented on code in PR #46037:
URL: https://github.com/apache/spark/pull/46037#discussion_r1607514868
##########
python/pyspark/sql/tests/connect/streaming/test_parity_listener.py:
##########
@@ -65,8 +66,140 @@ def onQueryTerminated(self, event):
df.write.mode("append").saveAsTable("listener_terminated_events_v2")
+class TestListenerLocal(StreamingQueryListener):
+ def __init__(self):
+ self.start = []
+ self.progress = []
+ self.terminated = []
+
+ def onQueryStarted(self, event):
+ self.start.append(event)
+
+ def onQueryProgress(self, event):
+ self.progress.append(event)
+
+ def onQueryIdle(self, event):
+ pass
+
+ def onQueryTerminated(self, event):
+ self.terminated.append(event)
+
+
class StreamingListenerParityTests(StreamingListenerTestsMixin,
ReusedConnectTestCase):
- def test_listener_events(self):
+ def test_listener_management(self):
+ listener1 = TestListenerLocal()
+ listener2 = TestListenerLocal()
+
+ try:
+ self.spark.streams.addListener(listener1)
+ self.spark.streams.addListener(listener2)
+ q =
self.spark.readStream.format("rate").load().writeStream.format("noop").start()
+
+ # Both listeners should have listener events already because
onQueryStarted
+ # is always called before DataStreamWriter.start() returns
+ self.assertEqual(len(listener1.start), 1)
+ self.assertEqual(len(listener2.start), 1)
+
+ # removeListener is a blocking call, resources are cleaned up by
the time it returns
+ self.spark.streams.removeListener(listener1)
+ self.spark.streams.removeListener(listener2)
+
+ # Add back the listener and stop the query, now should see a
terminated event
+ self.spark.streams.addListener(listener1)
+ q.stop()
+
+ # need to wait a while before QueryTerminatedEvent reaches client
+ time.sleep(15)
+ self.assertEqual(len(listener1.terminated), 1)
+
+ self.check_start_event(listener1.start[0])
+ for event in listener1.progress:
+ self.check_progress_event(event)
+ self.check_terminated_event(listener1.terminated[0])
+
+ finally:
+ for listener in self.spark.streams._sqlb._listener_bus:
+ self.spark.streams.removeListener(listener)
+ for q in self.spark.streams.active:
+ q.stop()
+
+ def test_slow_query(self):
Review Comment:
This seems flaky:
```
======================================================================
FAIL [20.288s]: test_slow_query
(pyspark.sql.tests.connect.streaming.test_parity_listener.StreamingListenerParityTests.test_slow_query)
----------------------------------------------------------------------
Traceback (most recent call last):
File
"/__w/spark/spark/python/pyspark/sql/tests/connect/streaming/test_parity_listener.py",
line 164, in test_slow_query
self.assertTrue(fast_query.id in [str(e.id) for e in
listener.terminated])
AssertionError: False is not true
----------------------------------------------------------------------
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]