[TOREE-470] Config option do control SparkContext initialization

--spark-context-initialization-mode eager (disable lazy initialization)
--spark-context-initialization-mode lazy (default, enable lazy initialization)


Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/9c929f51
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/9c929f51
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/9c929f51

Branch: refs/heads/master
Commit: 9c929f517ba0617ada3a782aa9c28826f82a8e9f
Parents: 5f4d77a
Author: Luciano Resende <lrese...@apache.org>
Authored: Thu Apr 19 20:12:59 2018 -0700
Committer: Luciano Resende <lrese...@apache.org>
Committed: Fri Apr 20 12:19:31 2018 -0700

----------------------------------------------------------------------
 .../apache/toree/boot/CommandLineOptions.scala  | 10 ++++
 .../boot/layer/ComponentInitialization.scala    |  8 +++
 .../org/apache/toree/kernel/api/Kernel.scala    | 57 +++++++++++--------
 .../toree/boot/CommandLineOptionsSpec.scala     | 60 ++++++++++++++++++++
 resources/compile/application.conf              |  1 +
 resources/compile/reference.conf                |  5 +-
 resources/test/application.conf                 |  1 +
 resources/test/reference.conf                   |  3 +
 8 files changed, 119 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/9c929f51/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala
----------------------------------------------------------------------
diff --git 
a/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala 
b/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala
index 87e9578..ae7f220 100644
--- a/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala
+++ b/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala
@@ -21,6 +21,7 @@ import java.io.{File, OutputStream}
 
 import com.typesafe.config.{Config, ConfigFactory}
 import joptsimple.{OptionParser, OptionSpec}
+import joptsimple.util.RegexMatcher._
 
 import scala.collection.JavaConverters._
 
@@ -94,6 +95,13 @@ class CommandLineOptions(args: Seq[String]) {
   private val _nosparkcontext =
     parser.accepts("nosparkcontext", "kernel should not create a spark 
context")
 
+  private val _spark_context_initialization_mode = parser.accepts(
+    "spark-context-initialization-mode",
+    "Identify how the Spark context initialization occurs. " +
+      "EAGER initialization will happen during runtime initialization,  " +
+      "LAZY initialization will happen when the context is used for the first 
time ."
+  ).withRequiredArg().ofType(classOf[String]).withValuesConvertedBy( 
regex("(lazy)|(eager)")).defaultsTo("lazy")
+
   private val _spark_context_initialization_timeout = parser.accepts(
     "spark-context-initialization-timeout",
     "The time (in milliseconds) allowed for creation of the spark context. " +
@@ -164,6 +172,8 @@ class CommandLineOptions(args: Seq[String]) {
       "jar_dir" -> get(_jar_dir),
       "default_interpreter" -> get(_default_interpreter),
       "nosparkcontext" -> (if (has(_nosparkcontext)) Some(true) else 
Some(false)),
+      "spark_context_initialization_mode" -> (if( 
has(_spark_context_initialization_mode))
+        get(_spark_context_initialization_mode) else Some("lazy")),
       "spark_context_initialization_timeout" -> 
get(_spark_context_initialization_timeout),
       "interpreter_plugins" -> interpreterPlugins,
       "default_repositories" -> getAll(_default_repositories).map(_.asJava)

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/9c929f51/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala
----------------------------------------------------------------------
diff --git 
a/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala
 
b/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala
index 42999d7..46f0796 100644
--- 
a/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala
+++ 
b/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala
@@ -82,6 +82,8 @@ trait StandardComponentInitialization extends 
ComponentInitialization {
 
     initializePlugins(config, pluginManager)
 
+    initializeSparkContext(config, kernel)
+
     interpreterManager.initializeInterpreters(kernel)
     
     pluginManager.fireEvent(AllInterpretersReady)
@@ -108,6 +110,12 @@ trait StandardComponentInitialization extends 
ComponentInitialization {
     (commStorage, commRegistrar, commManager)
   }
 
+  def initializeSparkContext(config:Config, kernel:Kernel) = {
+    if(config.getString("spark_context_initialization_mode") == "eager") {
+      kernel.sparkSession
+    }
+  }
+
   private def initializeDependencyDownloader(config: Config) = {
     val depsDir = {
       if(config.hasPath("deps_dir") && 
Files.exists(Paths.get(config.getString("deps_dir")))) {

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/9c929f51/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala 
b/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
index df6fed5..cb90b81 100644
--- a/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
+++ b/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
@@ -410,32 +410,39 @@ class Kernel (
   private lazy val defaultSparkConf: SparkConf = createSparkConf(new 
SparkConf())
 
   override def sparkSession: SparkSession = {
-    defaultSparkConf.getOption("spark.master") match {
-      case Some(master) if !master.contains("local") =>
-        // When connecting to a remote cluster, the first call to getOrCreate
-        // may create a session and take a long time, so this starts a future
-        // to get the session. If it take longer than specified timeout, then
-        // print a message to the user that Spark is starting. Note, the
-        // default timeout is 100ms and it is specified in reference.conf.
-        import scala.concurrent.ExecutionContext.Implicits.global
-        val sessionFuture = Future {
+
+    if(config.getString("spark_context_initialization_mode") == "eager") {
+      // explicitly enable eager initialization of spark context
+      SparkSession.builder.config(defaultSparkConf).getOrCreate
+    } else {
+      // default lazy initialization of spark context
+      defaultSparkConf.getOption("spark.master") match {
+        case Some(master) if !master.contains("local") =>
+          // When connecting to a remote cluster, the first call to getOrCreate
+          // may create a session and take a long time, so this starts a future
+          // to get the session. If it take longer than specified timeout, then
+          // print a message to the user that Spark is starting. Note, the
+          // default timeout is 100ms and it is specified in reference.conf.
+          import scala.concurrent.ExecutionContext.Implicits.global
+          val sessionFuture = Future {
+            SparkSession.builder.config(defaultSparkConf).getOrCreate
+          }
+
+          try {
+            val timeout = getSparkContextInitializationTimeout
+            Await.result(sessionFuture, Duration(timeout, 
TimeUnit.MILLISECONDS))
+          } catch {
+            case timeout: TimeoutException =>
+              // getting the session is taking a long time, so assume that 
Spark
+              // is starting and print a message
+              display.content(
+                MIMEType.PlainText, "Waiting for a Spark session to start...")
+              Await.result(sessionFuture, Duration.Inf)
+          }
+
+        case _ =>
           SparkSession.builder.config(defaultSparkConf).getOrCreate
-        }
-
-        try {
-          val timeout = getSparkContextInitializationTimeout
-          Await.result(sessionFuture, Duration(timeout, TimeUnit.MILLISECONDS))
-        } catch {
-          case timeout: TimeoutException =>
-            // getting the session is taking a long time, so assume that Spark
-            // is starting and print a message
-            display.content(
-              MIMEType.PlainText, "Waiting for a Spark session to start...")
-            Await.result(sessionFuture, Duration.Inf)
-        }
-
-      case _ =>
-        SparkSession.builder.config(defaultSparkConf).getOrCreate
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/9c929f51/kernel/src/test/scala/org/apache/toree/boot/CommandLineOptionsSpec.scala
----------------------------------------------------------------------
diff --git 
a/kernel/src/test/scala/org/apache/toree/boot/CommandLineOptionsSpec.scala 
b/kernel/src/test/scala/org/apache/toree/boot/CommandLineOptionsSpec.scala
index 1363a92..029f46b 100644
--- a/kernel/src/test/scala/org/apache/toree/boot/CommandLineOptionsSpec.scala
+++ b/kernel/src/test/scala/org/apache/toree/boot/CommandLineOptionsSpec.scala
@@ -356,6 +356,66 @@ class CommandLineOptionsSpec extends FunSpec with Matchers 
{
       }
     }
 
+    describe("when dealing with --spark-context-initialization-mode") {
+      val key = "spark_context_initialization_mode"
+
+      it("when none of the options are specified, it should default to lazy") {
+        val options = new CommandLineOptions(Nil)
+        val config: Config = options.toConfig
+        config.getString(key) should be("lazy")
+      }
+
+      it("when other options are specified, it should default to lazy") {
+        val options = new CommandLineOptions(Seq(
+          "--interpreter-plugin",
+          "dummy:test.utils.DummyInterpreter"
+        ))
+        val config: Config = options.toConfig
+        config.getString(key) should be("lazy")
+      }
+
+      it("when the options is specified, it should return the specified 
value") {
+        val options = new CommandLineOptions(List(
+          "--stdin-port", "99999",
+          "--shell-port", "88888",
+          "--iopub-port", "77777",
+          "--control-port", "55555",
+          "--heartbeat-port", "44444",
+          "--spark-context-initialization-mode", "eager"
+        ))
+        val config: Config = options.toConfig
+        config.getString(key) should be("eager")
+      }
+
+      it("when an invalid value is specified, an exception must be thrown") {
+        intercept [OptionException] {
+          val options = new CommandLineOptions(List(
+            "--stdin-port", "99999",
+            "--shell-port", "88888",
+            "--iopub-port", "77777",
+            "--control-port", "55555",
+            "--heartbeat-port", "44444",
+            "--spark-context-initialization-mode", "foo"
+          ))
+          val config: Config = options.toConfig
+        }
+      }
+
+      it("when a value is not specified, an exception must be thrown") {
+        intercept [OptionException] {
+          val options = new CommandLineOptions(List(
+            "--stdin-port", "99999",
+            "--shell-port", "88888",
+            "--iopub-port", "77777",
+            "--control-port", "55555",
+            "--heartbeat-port", "44444",
+            "--spark-context-initialization-mode", ""
+          ))
+          val config: Config = options.toConfig
+        }
+      }
+    }
+
     describe("when dealing with --alternate-sigint") {
       val key = "alternate_sigint"
 

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/9c929f51/resources/compile/application.conf
----------------------------------------------------------------------
diff --git a/resources/compile/application.conf 
b/resources/compile/application.conf
index 1e0f5cc..3f74bd9 100644
--- a/resources/compile/application.conf
+++ b/resources/compile/application.conf
@@ -26,4 +26,5 @@ control_port  = ${?PORT1}
 hb_port       = ${?PORT2}
 shell_port    = ${?PORT3}
 iopub_port    = ${?PORT4}
+spark_context_initialization_mode = ${?SPARK_CONTEXT_INITIALIZATION_MODE}
 spark_context_initialization_timeout = ${?SPARK_CONTEXT_INITIALIZATION_TIMEOUT}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/9c929f51/resources/compile/reference.conf
----------------------------------------------------------------------
diff --git a/resources/compile/reference.conf b/resources/compile/reference.conf
index b99f703..1c98865 100644
--- a/resources/compile/reference.conf
+++ b/resources/compile/reference.conf
@@ -55,8 +55,11 @@ deps_dir = ${?DEPS_DIR}
 default_interpreter = "Scala"
 default_interpreter = ${?DEFAULT_INTERPRETER}
 
+spark_context_initialization_mode = "lazy"
+spark_context_initialization_mode = ${?SPARK_CONTEXT_INITIALIZATION_MODE}
+
 spark_context_initialization_timeout = 100
-spark_context_initialization_timeout = ${?SPARK_CONTEXT_INITIALIZATION_TIMEOUT}
+spark_context_initialization_timeout = ${?SPARK_CONTEXT_INITIALIZATION_MODE}
 
 default_interpreter_plugin = [
   "Scala:org.apache.toree.kernel.interpreter.scala.ScalaInterpreter",

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/9c929f51/resources/test/application.conf
----------------------------------------------------------------------
diff --git a/resources/test/application.conf b/resources/test/application.conf
index 1e0f5cc..3f74bd9 100644
--- a/resources/test/application.conf
+++ b/resources/test/application.conf
@@ -26,4 +26,5 @@ control_port  = ${?PORT1}
 hb_port       = ${?PORT2}
 shell_port    = ${?PORT3}
 iopub_port    = ${?PORT4}
+spark_context_initialization_mode = ${?SPARK_CONTEXT_INITIALIZATION_MODE}
 spark_context_initialization_timeout = ${?SPARK_CONTEXT_INITIALIZATION_TIMEOUT}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/9c929f51/resources/test/reference.conf
----------------------------------------------------------------------
diff --git a/resources/test/reference.conf b/resources/test/reference.conf
index 3324442..66741d4 100644
--- a/resources/test/reference.conf
+++ b/resources/test/reference.conf
@@ -54,6 +54,9 @@ send_empty_output = ${?SEND_EMPTY_OUTPUT}
 default_interpreter = "Scala"
 default_interpreter = ${?DEFAULT_INTERPRETER}
 
+spark_context_initialization_mode = "lazy"
+spark_context_initialization_mode = ${?SPARK_CONTEXT_INITIALIZATION_MODE}
+
 spark_context_initialization_timeout = 100
 spark_context_initialization_timeout = ${?SPARK_CONTEXT_INITIALIZATION_TIMEOUT}
 

Reply via email to