[TOREE-470] Config option do control SparkContext initialization --spark-context-initialization-mode eager (disable lazy initialization) --spark-context-initialization-mode lazy (default, enable lazy initialization)
Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/9c929f51 Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/9c929f51 Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/9c929f51 Branch: refs/heads/master Commit: 9c929f517ba0617ada3a782aa9c28826f82a8e9f Parents: 5f4d77a Author: Luciano Resende <lrese...@apache.org> Authored: Thu Apr 19 20:12:59 2018 -0700 Committer: Luciano Resende <lrese...@apache.org> Committed: Fri Apr 20 12:19:31 2018 -0700 ---------------------------------------------------------------------- .../apache/toree/boot/CommandLineOptions.scala | 10 ++++ .../boot/layer/ComponentInitialization.scala | 8 +++ .../org/apache/toree/kernel/api/Kernel.scala | 57 +++++++++++-------- .../toree/boot/CommandLineOptionsSpec.scala | 60 ++++++++++++++++++++ resources/compile/application.conf | 1 + resources/compile/reference.conf | 5 +- resources/test/application.conf | 1 + resources/test/reference.conf | 3 + 8 files changed, 119 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/9c929f51/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala b/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala index 87e9578..ae7f220 100644 --- a/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala +++ b/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala @@ -21,6 +21,7 @@ import java.io.{File, OutputStream} import com.typesafe.config.{Config, ConfigFactory} import joptsimple.{OptionParser, OptionSpec} +import joptsimple.util.RegexMatcher._ import scala.collection.JavaConverters._ @@ -94,6 +95,13 @@ class CommandLineOptions(args: Seq[String]) { private val _nosparkcontext = parser.accepts("nosparkcontext", "kernel should not create a spark context") + private val _spark_context_initialization_mode = parser.accepts( + "spark-context-initialization-mode", + "Identify how the Spark context initialization occurs. " + + "EAGER initialization will happen during runtime initialization, " + + "LAZY initialization will happen when the context is used for the first time ." + ).withRequiredArg().ofType(classOf[String]).withValuesConvertedBy( regex("(lazy)|(eager)")).defaultsTo("lazy") + private val _spark_context_initialization_timeout = parser.accepts( "spark-context-initialization-timeout", "The time (in milliseconds) allowed for creation of the spark context. " + @@ -164,6 +172,8 @@ class CommandLineOptions(args: Seq[String]) { "jar_dir" -> get(_jar_dir), "default_interpreter" -> get(_default_interpreter), "nosparkcontext" -> (if (has(_nosparkcontext)) Some(true) else Some(false)), + "spark_context_initialization_mode" -> (if( has(_spark_context_initialization_mode)) + get(_spark_context_initialization_mode) else Some("lazy")), "spark_context_initialization_timeout" -> get(_spark_context_initialization_timeout), "interpreter_plugins" -> interpreterPlugins, "default_repositories" -> getAll(_default_repositories).map(_.asJava) http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/9c929f51/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala b/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala index 42999d7..46f0796 100644 --- a/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala +++ b/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala @@ -82,6 +82,8 @@ trait StandardComponentInitialization extends ComponentInitialization { initializePlugins(config, pluginManager) + initializeSparkContext(config, kernel) + interpreterManager.initializeInterpreters(kernel) pluginManager.fireEvent(AllInterpretersReady) @@ -108,6 +110,12 @@ trait StandardComponentInitialization extends ComponentInitialization { (commStorage, commRegistrar, commManager) } + def initializeSparkContext(config:Config, kernel:Kernel) = { + if(config.getString("spark_context_initialization_mode") == "eager") { + kernel.sparkSession + } + } + private def initializeDependencyDownloader(config: Config) = { val depsDir = { if(config.hasPath("deps_dir") && Files.exists(Paths.get(config.getString("deps_dir")))) { http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/9c929f51/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala b/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala index df6fed5..cb90b81 100644 --- a/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala +++ b/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala @@ -410,32 +410,39 @@ class Kernel ( private lazy val defaultSparkConf: SparkConf = createSparkConf(new SparkConf()) override def sparkSession: SparkSession = { - defaultSparkConf.getOption("spark.master") match { - case Some(master) if !master.contains("local") => - // When connecting to a remote cluster, the first call to getOrCreate - // may create a session and take a long time, so this starts a future - // to get the session. If it take longer than specified timeout, then - // print a message to the user that Spark is starting. Note, the - // default timeout is 100ms and it is specified in reference.conf. - import scala.concurrent.ExecutionContext.Implicits.global - val sessionFuture = Future { + + if(config.getString("spark_context_initialization_mode") == "eager") { + // explicitly enable eager initialization of spark context + SparkSession.builder.config(defaultSparkConf).getOrCreate + } else { + // default lazy initialization of spark context + defaultSparkConf.getOption("spark.master") match { + case Some(master) if !master.contains("local") => + // When connecting to a remote cluster, the first call to getOrCreate + // may create a session and take a long time, so this starts a future + // to get the session. If it take longer than specified timeout, then + // print a message to the user that Spark is starting. Note, the + // default timeout is 100ms and it is specified in reference.conf. + import scala.concurrent.ExecutionContext.Implicits.global + val sessionFuture = Future { + SparkSession.builder.config(defaultSparkConf).getOrCreate + } + + try { + val timeout = getSparkContextInitializationTimeout + Await.result(sessionFuture, Duration(timeout, TimeUnit.MILLISECONDS)) + } catch { + case timeout: TimeoutException => + // getting the session is taking a long time, so assume that Spark + // is starting and print a message + display.content( + MIMEType.PlainText, "Waiting for a Spark session to start...") + Await.result(sessionFuture, Duration.Inf) + } + + case _ => SparkSession.builder.config(defaultSparkConf).getOrCreate - } - - try { - val timeout = getSparkContextInitializationTimeout - Await.result(sessionFuture, Duration(timeout, TimeUnit.MILLISECONDS)) - } catch { - case timeout: TimeoutException => - // getting the session is taking a long time, so assume that Spark - // is starting and print a message - display.content( - MIMEType.PlainText, "Waiting for a Spark session to start...") - Await.result(sessionFuture, Duration.Inf) - } - - case _ => - SparkSession.builder.config(defaultSparkConf).getOrCreate + } } } http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/9c929f51/kernel/src/test/scala/org/apache/toree/boot/CommandLineOptionsSpec.scala ---------------------------------------------------------------------- diff --git a/kernel/src/test/scala/org/apache/toree/boot/CommandLineOptionsSpec.scala b/kernel/src/test/scala/org/apache/toree/boot/CommandLineOptionsSpec.scala index 1363a92..029f46b 100644 --- a/kernel/src/test/scala/org/apache/toree/boot/CommandLineOptionsSpec.scala +++ b/kernel/src/test/scala/org/apache/toree/boot/CommandLineOptionsSpec.scala @@ -356,6 +356,66 @@ class CommandLineOptionsSpec extends FunSpec with Matchers { } } + describe("when dealing with --spark-context-initialization-mode") { + val key = "spark_context_initialization_mode" + + it("when none of the options are specified, it should default to lazy") { + val options = new CommandLineOptions(Nil) + val config: Config = options.toConfig + config.getString(key) should be("lazy") + } + + it("when other options are specified, it should default to lazy") { + val options = new CommandLineOptions(Seq( + "--interpreter-plugin", + "dummy:test.utils.DummyInterpreter" + )) + val config: Config = options.toConfig + config.getString(key) should be("lazy") + } + + it("when the options is specified, it should return the specified value") { + val options = new CommandLineOptions(List( + "--stdin-port", "99999", + "--shell-port", "88888", + "--iopub-port", "77777", + "--control-port", "55555", + "--heartbeat-port", "44444", + "--spark-context-initialization-mode", "eager" + )) + val config: Config = options.toConfig + config.getString(key) should be("eager") + } + + it("when an invalid value is specified, an exception must be thrown") { + intercept [OptionException] { + val options = new CommandLineOptions(List( + "--stdin-port", "99999", + "--shell-port", "88888", + "--iopub-port", "77777", + "--control-port", "55555", + "--heartbeat-port", "44444", + "--spark-context-initialization-mode", "foo" + )) + val config: Config = options.toConfig + } + } + + it("when a value is not specified, an exception must be thrown") { + intercept [OptionException] { + val options = new CommandLineOptions(List( + "--stdin-port", "99999", + "--shell-port", "88888", + "--iopub-port", "77777", + "--control-port", "55555", + "--heartbeat-port", "44444", + "--spark-context-initialization-mode", "" + )) + val config: Config = options.toConfig + } + } + } + describe("when dealing with --alternate-sigint") { val key = "alternate_sigint" http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/9c929f51/resources/compile/application.conf ---------------------------------------------------------------------- diff --git a/resources/compile/application.conf b/resources/compile/application.conf index 1e0f5cc..3f74bd9 100644 --- a/resources/compile/application.conf +++ b/resources/compile/application.conf @@ -26,4 +26,5 @@ control_port = ${?PORT1} hb_port = ${?PORT2} shell_port = ${?PORT3} iopub_port = ${?PORT4} +spark_context_initialization_mode = ${?SPARK_CONTEXT_INITIALIZATION_MODE} spark_context_initialization_timeout = ${?SPARK_CONTEXT_INITIALIZATION_TIMEOUT} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/9c929f51/resources/compile/reference.conf ---------------------------------------------------------------------- diff --git a/resources/compile/reference.conf b/resources/compile/reference.conf index b99f703..1c98865 100644 --- a/resources/compile/reference.conf +++ b/resources/compile/reference.conf @@ -55,8 +55,11 @@ deps_dir = ${?DEPS_DIR} default_interpreter = "Scala" default_interpreter = ${?DEFAULT_INTERPRETER} +spark_context_initialization_mode = "lazy" +spark_context_initialization_mode = ${?SPARK_CONTEXT_INITIALIZATION_MODE} + spark_context_initialization_timeout = 100 -spark_context_initialization_timeout = ${?SPARK_CONTEXT_INITIALIZATION_TIMEOUT} +spark_context_initialization_timeout = ${?SPARK_CONTEXT_INITIALIZATION_MODE} default_interpreter_plugin = [ "Scala:org.apache.toree.kernel.interpreter.scala.ScalaInterpreter", http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/9c929f51/resources/test/application.conf ---------------------------------------------------------------------- diff --git a/resources/test/application.conf b/resources/test/application.conf index 1e0f5cc..3f74bd9 100644 --- a/resources/test/application.conf +++ b/resources/test/application.conf @@ -26,4 +26,5 @@ control_port = ${?PORT1} hb_port = ${?PORT2} shell_port = ${?PORT3} iopub_port = ${?PORT4} +spark_context_initialization_mode = ${?SPARK_CONTEXT_INITIALIZATION_MODE} spark_context_initialization_timeout = ${?SPARK_CONTEXT_INITIALIZATION_TIMEOUT} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/9c929f51/resources/test/reference.conf ---------------------------------------------------------------------- diff --git a/resources/test/reference.conf b/resources/test/reference.conf index 3324442..66741d4 100644 --- a/resources/test/reference.conf +++ b/resources/test/reference.conf @@ -54,6 +54,9 @@ send_empty_output = ${?SEND_EMPTY_OUTPUT} default_interpreter = "Scala" default_interpreter = ${?DEFAULT_INTERPRETER} +spark_context_initialization_mode = "lazy" +spark_context_initialization_mode = ${?SPARK_CONTEXT_INITIALIZATION_MODE} + spark_context_initialization_timeout = 100 spark_context_initialization_timeout = ${?SPARK_CONTEXT_INITIALIZATION_TIMEOUT}