Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22674#discussion_r223910477
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
---
@@ -75,95 +76,74 @@ trait QueryExecutionListener {
*/
@Experimental
@InterfaceStability.Evolving
-class ExecutionListenerManager private extends Logging {
-
- private[sql] def this(conf: SparkConf) = {
- this()
+// The `session` is used to indicate which session carries this listener
manager, and we only
+// catch SQL executions which are launched by the same session.
+// The `loadExtensions` flag is used to indicate whether we should load
the pre-defined,
+// user-specified listeners during construction. We should not do it when
cloning this listener
+// manager, as we will copy all listeners to the cloned listener manager.
+class ExecutionListenerManager private[sql](session: SparkSession,
loadExtensions: Boolean)
+ extends SparkListener with Logging {
+
+ private[this] val listeners = new
CopyOnWriteArrayList[QueryExecutionListener]
+
+ if (loadExtensions) {
+ val conf = session.sparkContext.conf
conf.get(QUERY_EXECUTION_LISTENERS).foreach { classNames =>
Utils.loadExtensions(classOf[QueryExecutionListener], classNames,
conf).foreach(register)
}
}
+ session.sparkContext.listenerBus.addToSharedQueue(this)
+
/**
* Registers the specified [[QueryExecutionListener]].
*/
@DeveloperApi
- def register(listener: QueryExecutionListener): Unit = writeLock {
- listeners += listener
+ def register(listener: QueryExecutionListener): Unit = {
+ listeners.add(listener)
}
/**
* Unregisters the specified [[QueryExecutionListener]].
*/
@DeveloperApi
- def unregister(listener: QueryExecutionListener): Unit = writeLock {
- listeners -= listener
+ def unregister(listener: QueryExecutionListener): Unit = {
+ listeners.remove(listener)
}
/**
* Removes all the registered [[QueryExecutionListener]].
*/
@DeveloperApi
- def clear(): Unit = writeLock {
+ def clear(): Unit = {
listeners.clear()
}
/**
* Get an identical copy of this listener manager.
*/
- @DeveloperApi
- override def clone(): ExecutionListenerManager = writeLock {
- val newListenerManager = new ExecutionListenerManager
- listeners.foreach(newListenerManager.register)
+ private[sql] def clone(session: SparkSession): ExecutionListenerManager
= {
+ val newListenerManager = new ExecutionListenerManager(session,
loadExtensions = false)
+ listeners.iterator().asScala.foreach(newListenerManager.register)
newListenerManager
}
- private[sql] def onSuccess(funcName: String, qe: QueryExecution,
duration: Long): Unit = {
- readLock {
- withErrorHandling { listener =>
- listener.onSuccess(funcName, qe, duration)
+ override def onOtherEvent(event: SparkListenerEvent): Unit = event match
{
+ case e: SparkListenerSQLExecutionEnd if shouldCatchEvent(e) =>
+ val funcName = e.executionName.get
+ e.executionFailure match {
+ case Some(ex) =>
+ listeners.iterator().asScala.foreach(_.onFailure(funcName, e.qe,
ex))
--- End diff --
`ExecutionListenerManager` is already a listener, which is running in a
separated thread, receiving events from `LiveListenerBus`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]