Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22674#discussion_r223910477
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala 
---
    @@ -75,95 +76,74 @@ trait QueryExecutionListener {
      */
     @Experimental
     @InterfaceStability.Evolving
    -class ExecutionListenerManager private extends Logging {
    -
    -  private[sql] def this(conf: SparkConf) = {
    -    this()
    +// The `session` is used to indicate which session carries this listener 
manager, and we only
    +// catch SQL executions which are launched by the same session.
    +// The `loadExtensions` flag is used to indicate whether we should load 
the pre-defined,
    +// user-specified listeners during construction. We should not do it when 
cloning this listener
    +// manager, as we will copy all listeners to the cloned listener manager.
    +class ExecutionListenerManager private[sql](session: SparkSession, 
loadExtensions: Boolean)
    +  extends SparkListener with Logging {
    +
    +  private[this] val listeners = new 
CopyOnWriteArrayList[QueryExecutionListener]
    +
    +  if (loadExtensions) {
    +    val conf = session.sparkContext.conf
         conf.get(QUERY_EXECUTION_LISTENERS).foreach { classNames =>
           Utils.loadExtensions(classOf[QueryExecutionListener], classNames, 
conf).foreach(register)
         }
       }
     
    +  session.sparkContext.listenerBus.addToSharedQueue(this)
    +
       /**
        * Registers the specified [[QueryExecutionListener]].
        */
       @DeveloperApi
    -  def register(listener: QueryExecutionListener): Unit = writeLock {
    -    listeners += listener
    +  def register(listener: QueryExecutionListener): Unit = {
    +    listeners.add(listener)
       }
     
       /**
        * Unregisters the specified [[QueryExecutionListener]].
        */
       @DeveloperApi
    -  def unregister(listener: QueryExecutionListener): Unit = writeLock {
    -    listeners -= listener
    +  def unregister(listener: QueryExecutionListener): Unit = {
    +    listeners.remove(listener)
       }
     
       /**
        * Removes all the registered [[QueryExecutionListener]].
        */
       @DeveloperApi
    -  def clear(): Unit = writeLock {
    +  def clear(): Unit = {
         listeners.clear()
       }
     
       /**
        * Get an identical copy of this listener manager.
        */
    -  @DeveloperApi
    -  override def clone(): ExecutionListenerManager = writeLock {
    -    val newListenerManager = new ExecutionListenerManager
    -    listeners.foreach(newListenerManager.register)
    +  private[sql] def clone(session: SparkSession): ExecutionListenerManager 
= {
    +    val newListenerManager = new ExecutionListenerManager(session, 
loadExtensions = false)
    +    listeners.iterator().asScala.foreach(newListenerManager.register)
         newListenerManager
       }
     
    -  private[sql] def onSuccess(funcName: String, qe: QueryExecution, 
duration: Long): Unit = {
    -    readLock {
    -      withErrorHandling { listener =>
    -        listener.onSuccess(funcName, qe, duration)
    +  override def onOtherEvent(event: SparkListenerEvent): Unit = event match 
{
    +    case e: SparkListenerSQLExecutionEnd if shouldCatchEvent(e) =>
    +      val funcName = e.executionName.get
    +      e.executionFailure match {
    +        case Some(ex) =>
    +          listeners.iterator().asScala.foreach(_.onFailure(funcName, e.qe, 
ex))
    --- End diff --
    
    `ExecutionListenerManager` is already a listener, which is running in a 
separated thread, receiving events from `LiveListenerBus`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to