Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13200#discussion_r63973275
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
---
    @@ -738,26 +734,127 @@ object SparkSession {
          * Gets an existing [[SparkSession]] or, if there is no existing one, 
creates a new one
          * based on the options set in this builder.
          *
    +     * This method first checks whether there is a valid thread-local 
SparkSession, and if yes,
    +     * return that one. It then checks whether there is a valid global 
default SparkSession,
    +     * and if yes, return that one. If no valid global default 
SparkSession exists, the method
    +     * creates a new SparkSession and assigns the newly created 
SparkSession as the global default.
    +     *
    +     * In case an existing SparkSession is returned, the config options 
specified in this builder
    +     * will be applied to the existing SparkSession.
    +     *
          * @since 2.0.0
          */
         def getOrCreate(): SparkSession = synchronized {
    -      // Step 1. Create a SparkConf
    -      // Step 2. Get a SparkContext
    -      // Step 3. Get a SparkSession
    -      val sparkConf = new SparkConf()
    -      options.foreach { case (k, v) => sparkConf.set(k, v) }
    -      val sparkContext = SparkContext.getOrCreate(sparkConf)
    -
    -      SQLContext.getOrCreate(sparkContext).sparkSession
    +      // Get the session from current thread's active session.
    +      var session = activeThreadSession.get()
    +      if ((session ne null) && !session.sparkContext.isStopped) {
    +        options.foreach { case (k, v) => session.conf.set(k, v) }
    +        return session
    +      }
    +
    +      // Global synchronization so we will only set the default session 
once.
    +      SparkSession.synchronized {
    +        // If the current thread does not have an active session, get it 
from the global session.
    +        session = defaultSession.get()
    +        if ((session ne null) && !session.sparkContext.isStopped) {
    +          options.foreach { case (k, v) => session.conf.set(k, v) }
    +          return session
    +        }
    +
    +        // No active nor global default session. Create a new one.
    +        val sparkContext = userSuppliedContext.getOrElse {
    +          // set app name if not given
    +          if (!options.contains("spark.app.name")) {
    +            options += "spark.app.name" -> 
java.util.UUID.randomUUID().toString
    +          }
    +
    +          val sparkConf = new SparkConf()
    +          options.foreach { case (k, v) => sparkConf.set(k, v) }
    +          SparkContext.getOrCreate(sparkConf)
    +        }
    +        session = new SparkSession(sparkContext)
    +        options.foreach { case (k, v) => session.conf.set(k, v) }
    +        defaultSession.set(session)
    +
    +        // Register a successfully instantiated context to the singleton. 
This should be at the
    +        // end of the class definition so that the singleton is updated 
only if there is no
    +        // exception in the construction of the instance.
    +        sparkContext.addSparkListener(new SparkListener {
    +          override def onApplicationEnd(applicationEnd: 
SparkListenerApplicationEnd): Unit = {
    +            defaultSession.set(null)
    +            // TODO(rxin): Do we need to also clear SQL listener?
    --- End diff --
    
    We need to clear it. Otherwise, after stopping the SparkContext, we leak it 
in object SQLContext.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to