Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/22674#discussion_r223885742
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
---
@@ -75,95 +76,74 @@ trait QueryExecutionListener {
*/
@Experimental
@InterfaceStability.Evolving
-class ExecutionListenerManager private extends Logging {
-
- private[sql] def this(conf: SparkConf) = {
- this()
+// The `session` is used to indicate which session carries this listener
manager, and we only
+// catch SQL executions which are launched by the same session.
+// The `loadExtensions` flag is used to indicate whether we should load
the pre-defined,
+// user-specified listeners during construction. We should not do it when
cloning this listener
+// manager, as we will copy all listeners to the cloned listener manager.
+class ExecutionListenerManager private[sql](session: SparkSession,
loadExtensions: Boolean)
+ extends SparkListener with Logging {
+
+ private[this] val listeners = new
CopyOnWriteArrayList[QueryExecutionListener]
+
+ if (loadExtensions) {
+ val conf = session.sparkContext.conf
conf.get(QUERY_EXECUTION_LISTENERS).foreach { classNames =>
Utils.loadExtensions(classOf[QueryExecutionListener], classNames,
conf).foreach(register)
}
}
+ session.sparkContext.listenerBus.addToSharedQueue(this)
+
/**
* Registers the specified [[QueryExecutionListener]].
*/
@DeveloperApi
- def register(listener: QueryExecutionListener): Unit = writeLock {
- listeners += listener
+ def register(listener: QueryExecutionListener): Unit = {
+ listeners.add(listener)
}
/**
* Unregisters the specified [[QueryExecutionListener]].
*/
@DeveloperApi
- def unregister(listener: QueryExecutionListener): Unit = writeLock {
- listeners -= listener
+ def unregister(listener: QueryExecutionListener): Unit = {
+ listeners.remove(listener)
}
/**
* Removes all the registered [[QueryExecutionListener]].
*/
@DeveloperApi
- def clear(): Unit = writeLock {
+ def clear(): Unit = {
listeners.clear()
}
/**
* Get an identical copy of this listener manager.
*/
- @DeveloperApi
- override def clone(): ExecutionListenerManager = writeLock {
- val newListenerManager = new ExecutionListenerManager
- listeners.foreach(newListenerManager.register)
+ private[sql] def clone(session: SparkSession): ExecutionListenerManager
= {
+ val newListenerManager = new ExecutionListenerManager(session,
loadExtensions = false)
+ listeners.iterator().asScala.foreach(newListenerManager.register)
newListenerManager
}
- private[sql] def onSuccess(funcName: String, qe: QueryExecution,
duration: Long): Unit = {
- readLock {
- withErrorHandling { listener =>
- listener.onSuccess(funcName, qe, duration)
+ override def onOtherEvent(event: SparkListenerEvent): Unit = event match
{
+ case e: SparkListenerSQLExecutionEnd if shouldCatchEvent(e) =>
+ val funcName = e.executionName.get
+ e.executionFailure match {
+ case Some(ex) =>
+ listeners.iterator().asScala.foreach(_.onFailure(funcName, e.qe,
ex))
+ case _ =>
+ listeners.iterator().asScala.foreach(_.onSuccess(funcName, e.qe,
e.duration))
}
- }
- }
- private[sql] def onFailure(funcName: String, qe: QueryExecution,
exception: Exception): Unit = {
- readLock {
- withErrorHandling { listener =>
- listener.onFailure(funcName, qe, exception)
- }
- }
+ case _ => // Ignore
}
- private[this] val listeners = ListBuffer.empty[QueryExecutionListener]
-
- /** A lock to prevent updating the list of listeners while we are
traversing through them. */
- private[this] val lock = new ReentrantReadWriteLock()
-
- private def withErrorHandling(f: QueryExecutionListener => Unit): Unit =
{
- for (listener <- listeners) {
- try {
- f(listener)
- } catch {
- case NonFatal(e) => logWarning("Error executing query execution
listener", e)
- }
- }
- }
-
- /** Acquires a read lock on the cache for the duration of `f`. */
- private def readLock[A](f: => A): A = {
- val rl = lock.readLock()
- rl.lock()
- try f finally {
- rl.unlock()
- }
- }
-
- /** Acquires a write lock on the cache for the duration of `f`. */
- private def writeLock[A](f: => A): A = {
- val wl = lock.writeLock()
- wl.lock()
- try f finally {
- wl.unlock()
- }
+ private def shouldCatchEvent(e: SparkListenerSQLExecutionEnd): Boolean =
{
+ // Only catch SQL execution with a name, and triggered by the same
spark session that this
--- End diff --
So this is what bugs me. You are adding separation between the SparkSession
and its listeners, to undo that here. It seems like a bit of a hassle to go
through because you basically need async execution.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]