Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22674#discussion_r223911185
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala 
---
    @@ -75,95 +76,74 @@ trait QueryExecutionListener {
      */
     @Experimental
     @InterfaceStability.Evolving
    -class ExecutionListenerManager private extends Logging {
    -
    -  private[sql] def this(conf: SparkConf) = {
    -    this()
    +// The `session` is used to indicate which session carries this listener 
manager, and we only
    +// catch SQL executions which are launched by the same session.
    +// The `loadExtensions` flag is used to indicate whether we should load 
the pre-defined,
    +// user-specified listeners during construction. We should not do it when 
cloning this listener
    +// manager, as we will copy all listeners to the cloned listener manager.
    +class ExecutionListenerManager private[sql](session: SparkSession, 
loadExtensions: Boolean)
    +  extends SparkListener with Logging {
    +
    +  private[this] val listeners = new 
CopyOnWriteArrayList[QueryExecutionListener]
    +
    +  if (loadExtensions) {
    +    val conf = session.sparkContext.conf
         conf.get(QUERY_EXECUTION_LISTENERS).foreach { classNames =>
           Utils.loadExtensions(classOf[QueryExecutionListener], classNames, 
conf).foreach(register)
         }
       }
     
    +  session.sparkContext.listenerBus.addToSharedQueue(this)
    +
       /**
        * Registers the specified [[QueryExecutionListener]].
        */
       @DeveloperApi
    -  def register(listener: QueryExecutionListener): Unit = writeLock {
    -    listeners += listener
    +  def register(listener: QueryExecutionListener): Unit = {
    +    listeners.add(listener)
       }
     
       /**
        * Unregisters the specified [[QueryExecutionListener]].
        */
       @DeveloperApi
    -  def unregister(listener: QueryExecutionListener): Unit = writeLock {
    -    listeners -= listener
    +  def unregister(listener: QueryExecutionListener): Unit = {
    +    listeners.remove(listener)
       }
     
       /**
        * Removes all the registered [[QueryExecutionListener]].
        */
       @DeveloperApi
    -  def clear(): Unit = writeLock {
    +  def clear(): Unit = {
         listeners.clear()
       }
     
       /**
        * Get an identical copy of this listener manager.
        */
    -  @DeveloperApi
    -  override def clone(): ExecutionListenerManager = writeLock {
    -    val newListenerManager = new ExecutionListenerManager
    -    listeners.foreach(newListenerManager.register)
    +  private[sql] def clone(session: SparkSession): ExecutionListenerManager 
= {
    +    val newListenerManager = new ExecutionListenerManager(session, 
loadExtensions = false)
    +    listeners.iterator().asScala.foreach(newListenerManager.register)
         newListenerManager
       }
     
    -  private[sql] def onSuccess(funcName: String, qe: QueryExecution, 
duration: Long): Unit = {
    -    readLock {
    -      withErrorHandling { listener =>
    -        listener.onSuccess(funcName, qe, duration)
    +  override def onOtherEvent(event: SparkListenerEvent): Unit = event match 
{
    +    case e: SparkListenerSQLExecutionEnd if shouldCatchEvent(e) =>
    +      val funcName = e.executionName.get
    +      e.executionFailure match {
    +        case Some(ex) =>
    +          listeners.iterator().asScala.foreach(_.onFailure(funcName, e.qe, 
ex))
    +        case _ =>
    +          listeners.iterator().asScala.foreach(_.onSuccess(funcName, e.qe, 
e.duration))
           }
    -    }
    -  }
     
    -  private[sql] def onFailure(funcName: String, qe: QueryExecution, 
exception: Exception): Unit = {
    -    readLock {
    -      withErrorHandling { listener =>
    -        listener.onFailure(funcName, qe, exception)
    -      }
    -    }
    +    case _ => // Ignore
       }
     
    -  private[this] val listeners = ListBuffer.empty[QueryExecutionListener]
    -
    -  /** A lock to prevent updating the list of listeners while we are 
traversing through them. */
    -  private[this] val lock = new ReentrantReadWriteLock()
    -
    -  private def withErrorHandling(f: QueryExecutionListener => Unit): Unit = 
{
    -    for (listener <- listeners) {
    -      try {
    -        f(listener)
    -      } catch {
    -        case NonFatal(e) => logWarning("Error executing query execution 
listener", e)
    -      }
    -    }
    -  }
    -
    -  /** Acquires a read lock on the cache for the duration of `f`. */
    -  private def readLock[A](f: => A): A = {
    -    val rl = lock.readLock()
    -    rl.lock()
    -    try f finally {
    -      rl.unlock()
    -    }
    -  }
    -
    -  /** Acquires a write lock on the cache for the duration of `f`. */
    -  private def writeLock[A](f: => A): A = {
    -    val wl = lock.writeLock()
    -    wl.lock()
    -    try f finally {
    -      wl.unlock()
    -    }
    +  private def shouldCatchEvent(e: SparkListenerSQLExecutionEnd): Boolean = 
{
    +    // Only catch SQL execution with a name, and triggered by the same 
spark session that this
    --- End diff --
    
    yea. Assuming we have many spark sessions, running queries at the same 
time. Each session sends query execution events to the central event bus, and 
sets up a listener to watch its own query execution events, asynchronously.
    
    To make it work, the most straightforward way is to carry the session 
identifier in the events, and the listener only watch events with the expected 
session identifier.
    
    Maybe a better way is to introduce session in the Spark core, so the 
listener framework can dispatch events w.r.t. session automatically. But that's 
a lot of work.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to