bogao007 commented on code in PR #41752:
URL: https://github.com/apache/spark/pull/41752#discussion_r1261541326
##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala:
##########
@@ -36,6 +41,13 @@ import org.apache.spark.sql.SparkSession
@Evolving
class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends
Logging {
+ // Mapping from StreamingQueryListener to id. There's another mapping from
id to
+ // StreamingQueryListener on server side. This is used by removeListener()
to find the id
+ // of previously added StreamingQueryListener and pass it to server side to
find the
+ // corresponding listener on server side.
+ private lazy val listenerCache: ConcurrentMap[StreamingQueryListener,
String] =
Review Comment:
Thanks, I changed the client side cache to id -> listener mapping to make
sure there's no hash collision, also updated the test case to cover this
scenario. There were some issues when the same listener instance were added and
removed multiple times, this change should fix this as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]