bogao007 commented on code in PR #41752:
URL: https://github.com/apache/spark/pull/41752#discussion_r1260145680
##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala:
##########
@@ -292,3 +331,31 @@ class TestForeachWriter[T] extends ForeachWriter[T] {
case class TestClass(value: Int) {
override def toString: String = value.toString
}
+
+class EventCollector extends StreamingQueryListener {
+ @volatile var startEvent: QueryStartedEvent = null
+ @volatile var terminationEvent: QueryTerminatedEvent = null
+ @volatile var idleEvent: QueryIdleEvent = null
+
+ private val _progressEvents = new mutable.Queue[StreamingQueryProgress]
+
+ def progressEvents: Seq[StreamingQueryProgress] =
_progressEvents.synchronized {
+ _progressEvents.clone().toSeq
+ }
+
+ override def onQueryStarted(event:
StreamingQueryListener.QueryStartedEvent): Unit = {
+ startEvent = event
+ }
+
+ override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
+ _progressEvents += event.progress
Review Comment:
I've done a manual test for `spark.write()`, you can check the testing
section in the description of this PR. I'll add an unit test for it.
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2909,6 +2918,36 @@ class SparkConnectPlanner(val sessionHolder:
SessionHolder) extends Logging {
session.streams.resetTerminated()
respBuilder.setResetTerminated(true)
+ case StreamingQueryManagerCommand.CommandCase.ADD_LISTENER =>
+ val listenerPacket = Utils
+ .deserialize[StreamingListenerPacket](
+ command.getAddListener.getListenerPayload.toByteArray,
+ Utils.getContextOrSparkClassLoader)
+ val listener: StreamingQueryListener = listenerPacket.listener
+ .asInstanceOf[StreamingQueryListener]
+ val id: String = listenerPacket.id
+ sessionHolder.cacheListenerById(id, listener)
+ session.streams.addListener(listener)
Review Comment:
Are you talking about the cache we introduced here? We'll have to do this
for `removeListener()` to work properly.
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -47,6 +48,9 @@ case class SessionHolder(userId: String, sessionId: String,
session: SparkSessio
// foreachBatch() in Streaming. Lazy since most sessions don't need it.
private lazy val dataFrameCache: ConcurrentMap[String, DataFrame] = new
ConcurrentHashMap()
+ private lazy val listenerCache: ConcurrentMap[String,
StreamingQueryListener] =
Review Comment:
Do you mean we cache the uuid and listener mapping in
`StreamingQueryManager` on server side? I forgot to add a remove method for
that, we should remove the cache entry when calling `removeListener()`, I'll
add that.
##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala:
##########
@@ -34,7 +34,7 @@ import org.apache.spark.scheduler.SparkListenerEvent
* @since 2.0.0
*/
@Evolving
-abstract class StreamingQueryListener {
+abstract class StreamingQueryListener extends Serializable {
Review Comment:
They are the same, in order to serialize the class, we'll need to let both
extend the `Serializable`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]