Github user rxin commented on a diff in the pull request:
https://github.com/apache/spark/pull/13200#discussion_r63970867
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
---
@@ -738,26 +734,127 @@ object SparkSession {
* Gets an existing [[SparkSession]] or, if there is no existing one,
creates a new one
* based on the options set in this builder.
*
+ * This method first checks whether there is a valid thread-local
SparkSession, and if yes,
+ * return that one. It then checks whether there is a valid global
default SparkSession,
+ * and if yes, return that one. If no valid global default
SparkSession exists, the method
+ * creates a new SparkSession and assigns the newly created
SparkSession as the global default.
+ *
+ * In case an existing SparkSession is returned, the config options
specified in this builder
+ * will be applied to the existing SparkSession.
+ *
* @since 2.0.0
*/
def getOrCreate(): SparkSession = synchronized {
- // Step 1. Create a SparkConf
- // Step 2. Get a SparkContext
- // Step 3. Get a SparkSession
- val sparkConf = new SparkConf()
- options.foreach { case (k, v) => sparkConf.set(k, v) }
- val sparkContext = SparkContext.getOrCreate(sparkConf)
-
- SQLContext.getOrCreate(sparkContext).sparkSession
+ // Get the session from current thread's active session.
+ var session = activeThreadSession.get()
+ if ((session ne null) && !session.sparkContext.isStopped) {
+ options.foreach { case (k, v) => session.conf.set(k, v) }
+ return session
+ }
+
+ // Global synchronization so we will only set the default session
once.
+ SparkSession.synchronized {
+ // If the current thread does not have an active session, get it
from the global session.
+ session = defaultSession.get()
+ if ((session ne null) && !session.sparkContext.isStopped) {
+ options.foreach { case (k, v) => session.conf.set(k, v) }
+ return session
+ }
+
+ // No active nor global default session. Create a new one.
+ val sparkContext = userSuppliedContext.getOrElse {
+ // set app name if not given
+ if (!options.contains("spark.app.name")) {
+ options += "spark.app.name" ->
java.util.UUID.randomUUID().toString
+ }
+
+ val sparkConf = new SparkConf()
+ options.foreach { case (k, v) => sparkConf.set(k, v) }
+ SparkContext.getOrCreate(sparkConf)
+ }
+ session = new SparkSession(sparkContext)
+ options.foreach { case (k, v) => session.conf.set(k, v) }
+ defaultSession.set(session)
+
+ // Register a successfully instantiated context to the singleton.
This should be at the
+ // end of the class definition so that the singleton is updated
only if there is no
+ // exception in the construction of the instance.
+ sparkContext.addSparkListener(new SparkListener {
+ override def onApplicationEnd(applicationEnd:
SparkListenerApplicationEnd): Unit = {
+ defaultSession.set(null)
+ // TODO(rxin): Do we need to also clear SQL listener?
--- End diff --
actually @zsxwing might be good for you to review the entire pr.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]