rangadi commented on code in PR #41752:
URL: https://github.com/apache/spark/pull/41752#discussion_r1261765008
##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala:
##########
@@ -34,7 +34,7 @@ import org.apache.spark.scheduler.SparkListenerEvent
* @since 2.0.0
*/
@Evolving
-abstract class StreamingQueryListener {
+abstract class StreamingQueryListener extends Serializable {
Review Comment:
I see. Sure.
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2909,6 +2918,36 @@ class SparkConnectPlanner(val sessionHolder:
SessionHolder) extends Logging {
session.streams.resetTerminated()
respBuilder.setResetTerminated(true)
+ case StreamingQueryManagerCommand.CommandCase.ADD_LISTENER =>
+ val listenerPacket = Utils
+ .deserialize[StreamingListenerPacket](
+ command.getAddListener.getListenerPayload.toByteArray,
+ Utils.getContextOrSparkClassLoader)
+ val listener: StreamingQueryListener = listenerPacket.listener
+ .asInstanceOf[StreamingQueryListener]
+ val id: String = listenerPacket.id
+ sessionHolder.cacheListenerById(id, listener)
+ session.streams.addListener(listener)
Review Comment:
The cache is ok. What happens if the users want to access spark session?
How do they do that? If we create a session and make it available, would
that be a connect session or a legacy session?
We don't necessarily solve it in this PR. I think we should have detailed
TODO here (and a follow up task).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]