Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22192#discussion_r215749556
  
    --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
    @@ -136,6 +136,32 @@ private[spark] class Executor(
       // for fetching remote cached RDD blocks, so need to make sure it uses 
the right classloader too.
       env.serializerManager.setDefaultClassLoader(replClassLoader)
     
    +  private val pluginList = conf.get(EXECUTOR_PLUGINS)
    +  if (pluginList.nonEmpty) {
    +    logDebug(s"Initializing the following plugins: 
${pluginList.mkString(", ")}")
    +  }
    +
    +  val executorPluginThread = new Thread {
    +    var plugins: Seq[ExecutorPlugin] = Nil
    +
    +    override def run(): Unit = {
    +      plugins = Utils.loadExtensions(classOf[ExecutorPlugin], pluginList, 
conf)
    +      plugins.foreach(_.init())
    +    }
    +
    +    override def interrupt(): Unit = {
    +      plugins.foreach(_.shutdown())
    +      super.interrupt()
    +    }
    +  }
    +
    +  executorPluginThread.setContextClassLoader(replClassLoader)
    +  executorPluginThread.start()
    +
    +  if (pluginList.nonEmpty) {
    +    logDebug("Finished initializing plugins")
    +  }
    +
    --- End diff --
    
    We should wait for plugin's to finish initialization (join() on thread) 
before moving on (particularly if we are emitting a message to indicate the 
same).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to