http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/test/scala/org/apache/toree/utils/ScheduledTaskManagerSpec.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/test/scala/org/apache/toree/utils/ScheduledTaskManagerSpec.scala b/kernel-api/src/test/scala/org/apache/toree/utils/ScheduledTaskManagerSpec.scala new file mode 100644 index 0000000..8db4535 --- /dev/null +++ b/kernel-api/src/test/scala/org/apache/toree/utils/ScheduledTaskManagerSpec.scala @@ -0,0 +1,129 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.utils + +import java.util.Calendar +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.{TimeUnit, CountDownLatch} + +import org.scalatest.concurrent.Eventually +import org.scalatest.time.{Milliseconds, Span} +import org.scalatest.{BeforeAndAfter, Matchers, FunSpec} + +class ScheduledTaskManagerSpec extends FunSpec with Matchers with BeforeAndAfter + with Eventually +{ + private val TestTimeInterval = 30 + private val MaximumChecks = 3 + private val TimeoutScale = 3 + private var scheduledTaskManager: ScheduledTaskManager = _ + private var scheduleVerifier: ScheduleVerifier = _ + + implicit override val patienceConfig = PatienceConfig( + timeout = scaled(Span( + TestTimeInterval * MaximumChecks * TimeoutScale, Milliseconds)), + interval = scaled(Span(TestTimeInterval / 2, Milliseconds)) + ) + + private class ScheduleVerifier { + @volatile private var checkinTimes: List[Long] = Nil + private val taskRun = new AtomicBoolean(false) + + def task() = { + if (checkinTimes.length < MaximumChecks) + checkinTimes = checkinTimes :+ Calendar.getInstance().getTimeInMillis + taskRun.set(true) + } + + def shouldNotRunAnymore(milliseconds: Long) = eventually { + val offset: Int = (milliseconds * 0.5).toInt + + // Clean the state and wait to see if the task is executed again + taskRun.set(false) + Thread.sleep(milliseconds + offset) + + taskRun.get() should be (false) + } + + def shouldRunEvery(milliseconds: Long) = { + // 50% +/- + val offset: Int = (milliseconds * 0.5).toInt + + eventually { + // Assert we have the desired number of checks + checkinTimes.length should be (MaximumChecks) + + checkinTimes.take(checkinTimes.length - 1).zip( + checkinTimes.takeRight(checkinTimes.length - 1) + ).foreach({ times => + val firstTime = times._1 + val secondTime = times._2 + (secondTime - firstTime) should ( + be >= milliseconds - offset and + be <= milliseconds + offset) + }) + } + } + } + + before { + scheduledTaskManager = new ScheduledTaskManager + scheduleVerifier = new ScheduleVerifier + } + + after { + scheduledTaskManager.stop() + } + + describe("ScheduledTaskManager") { + describe("#addTask") { + // TODO: This is failing frequently due to some sort of timing problem + ignore("should add a new task to be executed periodically") { + scheduledTaskManager.addTask(timeInterval = TestTimeInterval, + task = scheduleVerifier.task()) + + scheduleVerifier.shouldRunEvery(TestTimeInterval) + } + } + + describe("#removeTask") { + it("should stop and remove the task if it exists") { + val taskId = scheduledTaskManager.addTask( + timeInterval = TestTimeInterval, + task = scheduleVerifier.task() + ) + + scheduledTaskManager.removeTask(taskId) + + scheduleVerifier.shouldNotRunAnymore(TestTimeInterval) + } + + it("should return true if the task was removed") { + val taskId = scheduledTaskManager.addTask( + timeInterval = TestTimeInterval, + task = scheduleVerifier.task() + ) + + scheduledTaskManager.removeTask(taskId) should be (true) + } + + it("should return false if the task does not exist") { + scheduledTaskManager.removeTask("") should be (false) + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/test/scala/org/apache/toree/utils/TaskManagerSpec.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/test/scala/org/apache/toree/utils/TaskManagerSpec.scala b/kernel-api/src/test/scala/org/apache/toree/utils/TaskManagerSpec.scala new file mode 100644 index 0000000..3456d98 --- /dev/null +++ b/kernel-api/src/test/scala/org/apache/toree/utils/TaskManagerSpec.scala @@ -0,0 +1,301 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.utils + +import java.util.concurrent.{RejectedExecutionException, ExecutionException} + +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.scalatest.concurrent.{Timeouts, Eventually, ScalaFutures} +import org.scalatest.mock.MockitoSugar +import org.scalatest.time.{Milliseconds, Seconds, Span} +import org.scalatest.{BeforeAndAfter, FunSpec, Matchers} +import test.utils.UncaughtExceptionSuppression + +import scala.concurrent.Future +import scala.runtime.BoxedUnit + +class TaskManagerSpec extends FunSpec with Matchers with MockitoSugar + with BeforeAndAfter with ScalaFutures with UncaughtExceptionSuppression + with Eventually with Timeouts +{ + implicit override val patienceConfig = PatienceConfig( + timeout = scaled(Span(200, Milliseconds)), + interval = scaled(Span(5, Milliseconds)) + ) + private val MaxTestTasks = 50000 + private var taskManager: TaskManager = _ + + before { + taskManager = new TaskManager + } + + after { + taskManager = null + } + + describe("TaskManager") { + describe("#add") { + it("should throw an exception if not started") { + intercept[AssertionError] { + taskManager.add {} + } + } + + it("should throw an exception if more tasks are added than max task size") { + val taskManager = new TaskManager(maximumWorkers = 1, maxTasks = 1) + + taskManager.start() + + // Should fail from having too many tasks added + intercept[RejectedExecutionException] { + for (i <- 1 to MaxTestTasks) taskManager.add {} + } + } + + it("should return a Future[_] based on task provided") { + taskManager.start() + + // Cannot check inner Future type due to type erasure + taskManager.add { } shouldBe an [Future[_]] + + taskManager.stop() + } + + it("should work for a task that returns nothing") { + taskManager.start() + + val f = taskManager.add { } + + whenReady(f) { result => + result shouldBe a [BoxedUnit] + taskManager.stop() + } + } + + it("should construct a Runnable that invokes a Promise on success") { + taskManager.start() + + val returnValue = 3 + val f = taskManager.add { returnValue } + + whenReady(f) { result => + result should be (returnValue) + taskManager.stop() + } + } + + it("should construct a Runnable that invokes a Promise on failure") { + taskManager.start() + + val error = new Throwable("ERROR") + val f = taskManager.add { throw error } + + whenReady(f.failed) { result => + result should be (error) + taskManager.stop() + } + } + + it("should not block when adding more tasks than available threads") { + val taskManager = new TaskManager(maximumWorkers = 1) + + taskManager.start() + + failAfter(Span(100, Milliseconds)) { + taskManager.add { while (true) { Thread.sleep(1) } } + taskManager.add { while (true) { Thread.sleep(1) } } + } + } + } + + describe("#size") { + it("should be zero when no tasks have been added") { + taskManager.size should be (0) + } + + it("should reflect queued tasks and executing tasks") { + val taskManager = new TaskManager(maximumWorkers = 1) + taskManager.start() + + // Fill up the task manager and then add another task to the queue + taskManager.add { while (true) { Thread.sleep(1000) } } + taskManager.add { while (true) { Thread.sleep(1000) } } + + taskManager.size should be (2) + } + + it("should be one if there is only one executing task and no queued ones") { + taskManager.start() + + taskManager.add { while (true) { Thread.sleep(1000) } } + + // Wait until task is being executed to check if the task is still in + // the queue + while (!taskManager.isExecutingTask) Thread.sleep(1) + + taskManager.size should be (1) + + taskManager.stop() + } + } + + describe("#hasTaskInQueue") { + it("should be false when no task has been added") { + taskManager.hasTaskInQueue should be (false) + } + + it("should be true where there are tasks remaining in the queue") { + val taskManager = new TaskManager(maximumWorkers = 1) + taskManager.start() + + // Fill up the task manager and then add another task to the queue + taskManager.add { while (true) { Thread.sleep(1000) } } + taskManager.add { while (true) { Thread.sleep(1000) } } + + taskManager.hasTaskInQueue should be (true) + } + + it("should be false when the only task is currently being executed") { + taskManager.start() + + taskManager.add { while (true) { Thread.sleep(1000) } } + + // Wait until task is being executed to check if the task is still in + // the queue + while (!taskManager.isExecutingTask) Thread.sleep(1) + + taskManager.hasTaskInQueue should be (false) + + taskManager.stop() + } + } + + describe("#isExecutingTask") { + it("should be true when a task is being executed") { + taskManager.start() + taskManager.add { while (true) { Thread.sleep(1000) } } + + eventually { + taskManager.isExecutingTask should be (true) + } + + taskManager.stop() + } + + it("should be false when no tasks have been added") { + taskManager.isExecutingTask should be (false) + } + + // TODO: Timing issue on Travis CI needs to be resolved + ignore("should be false when all tasks have finished") { + taskManager.start() + val f = taskManager.add { } // Really fast execution + + // Wait for up to 1 second for the task to finish + whenReady(f, Timeout(Span(1, Seconds))) { result => + taskManager.isExecutingTask should be (false) + taskManager.stop() + } + } + } + + describe("#await") { + it("should block until all tasks are completed") { + val taskManager = new TaskManager( + maximumWorkers = 1, + maxTasks = MaxTestTasks + ) + + taskManager.start() + + // TODO: Need better way to ensure tasks are still running while + // awaiting their return + for (x <- 1 to MaxTestTasks) taskManager.add { Thread.sleep(1) } + + assume(taskManager.hasTaskInQueue) + taskManager.await() + + taskManager.hasTaskInQueue should be (false) + taskManager.isExecutingTask should be (false) + + taskManager.stop() + } + } + + describe("#start") { + it("should create an internal thread pool executor") { + taskManager.start() + + taskManager.executor should not be (None) + + taskManager.stop() + } + } + + describe("#restart") { + it("should stop & erase the old internal thread and create a new one") { + taskManager.start() + + val oldExecutor = taskManager.executor + + taskManager.restart() + + taskManager.executor should not be (oldExecutor) + + taskManager.stop() + } + } + + describe("#stop") { + it("should attempt to interrupt the currently-running task") { + taskManager.start() + val f = taskManager.add { while (true) { Thread.sleep(1000) } } + + // Wait for the task to start + while (!taskManager.isExecutingTask) Thread.sleep(1) + + // Cancel the task + taskManager.stop() + + // Future should return an InterruptedException + whenReady(f.failed) { result => + result shouldBe an [ExecutionException] + result.getCause shouldBe an [InterruptedException] + } + } + + // TODO: Refactoring task manager to be parallelizable broke this ability + // so this will need to be reimplemented or abandoned + ignore("should kill the thread if interrupts failed and kill enabled") { + taskManager.start() + val f = taskManager.add { var x = 0; while (true) { x += 1 } } + + // Wait for the task to start + while (!taskManager.isExecutingTask) Thread.sleep(1) + + // Kill the task + taskManager.stop() + + // Future should return ThreadDeath when killed + whenReady(f.failed) { result => + result shouldBe an [ExecutionException] + result.getCause shouldBe a [ThreadDeath] + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/SparkKernel.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/com/ibm/spark/SparkKernel.scala b/kernel/src/main/scala/com/ibm/spark/SparkKernel.scala deleted file mode 100644 index f532de9..0000000 --- a/kernel/src/main/scala/com/ibm/spark/SparkKernel.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark - -import com.ibm.spark.boot.layer._ -import com.ibm.spark.boot.{CommandLineOptions, KernelBootstrap} -import com.ibm.spark.kernel.BuildInfo - -object SparkKernel extends App { - private val options = new CommandLineOptions(args) - - if (options.help) { - options.printHelpOn(System.out) - } else if (options.version) { - println(s"Kernel Version: ${BuildInfo.version}") - println(s"Build Date: ${BuildInfo.buildDate}") - println(s"Scala Version: ${BuildInfo.scalaVersion}") - println(s"Apache Spark Version: ${BuildInfo.sparkVersion}") - } else { - (new KernelBootstrap(options.toConfig) - with StandardBareInitialization - with StandardComponentInitialization - with StandardHandlerInitialization - with StandardHookInitialization) - .initialize() - .waitForTermination() - .shutdown() - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/boot/CommandLineOptions.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/com/ibm/spark/boot/CommandLineOptions.scala b/kernel/src/main/scala/com/ibm/spark/boot/CommandLineOptions.scala deleted file mode 100644 index a5acbc2..0000000 --- a/kernel/src/main/scala/com/ibm/spark/boot/CommandLineOptions.scala +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.boot - -import java.io.{File, OutputStream} - -import com.ibm.spark.utils.KeyValuePairUtils -import com.typesafe.config.{Config, ConfigFactory} -import joptsimple.util.KeyValuePair -import joptsimple.{OptionParser, OptionSpec} - -import scala.collection.JavaConverters._ - -class CommandLineOptions(args: Seq[String]) { - private val parser = new OptionParser() - parser.allowsUnrecognizedOptions() - - /* - * Options supported by our kernel. - */ - private val _help = - parser.acceptsAll(Seq("help", "h").asJava, "display help information").forHelp() - - private val _version = - parser.acceptsAll(Seq("version", "v").asJava, "display version information") - - private val _profile = - parser.accepts("profile", "path to IPython JSON connection file") - .withRequiredArg().ofType(classOf[File]) - - private val _ip = - parser.accepts("ip", "ip used to bind sockets") - .withRequiredArg().ofType(classOf[String]) - - private val _stdin_port = parser.accepts( - "stdin-port", "port of the stdin socket" - ).withRequiredArg().ofType(classOf[Int]) - - private val _shell_port = parser.accepts( - "shell-port", "port of the shell socket" - ).withRequiredArg().ofType(classOf[Int]) - - private val _iopub_port = parser.accepts( - "iopub-port", "port of the iopub socket" - ).withRequiredArg().ofType(classOf[Int]) - - private val _control_port = parser.accepts( - "control-port", "port of the control socket" - ).withRequiredArg().ofType(classOf[Int]) - - private val _heartbeat_port = parser.accepts( - "heartbeat-port", "port of the heartbeat socket" - ).withRequiredArg().ofType(classOf[Int]) - - private val _spark_configuration = parser.acceptsAll( - Seq("spark-configuration", "S").asJava, - "configuration setting for Apache Spark" - ).withRequiredArg().ofType(classOf[KeyValuePair]) - - private val _magic_url = - parser.accepts("magic-url", "path to a magic jar") - .withRequiredArg().ofType(classOf[String]) - - private val _max_interpreter_threads = parser.accepts( - "max-interpreter-threads", - "total number of worker threads to use to execute code" - ).withRequiredArg().ofType(classOf[Int]) - - private val _jar_dir = parser.accepts( - "jar-dir", - "directory where user added jars are stored (MUST EXIST)" - ).withRequiredArg().ofType(classOf[String]) - - private val _default_interpreter = - parser.accepts("default-interpreter", "default interpreter for the kernel") - .withRequiredArg().ofType(classOf[String]) - - private val _nosparkcontext = - parser.accepts("nosparkcontext", "kernel should not create a spark context") - - private val _interpreter_plugin = parser.accepts( - "interpreter-plugin" - ).withRequiredArg().ofType(classOf[String]) - - private val options = parser.parse(args.map(_.trim): _*) - - /* - * Helpers to determine if an option is provided and the value with which it - * was provided. - */ - - private def has[T](spec: OptionSpec[T]): Boolean = - options.has(spec) - - private def get[T](spec: OptionSpec[T]): Option[T] = - Some(options.valueOf(spec)).filter(_ != null) - - private def getAll[T](spec: OptionSpec[T]): Option[List[T]] = - Some(options.valuesOf(spec).asScala.toList).filter(_ != null) - - /* - * Expose options in terms of their existence/value. - */ - - val help: Boolean = has(_help) - - val version: Boolean = has(_version) - - /* - * Config object has 3 levels and fallback in this order - * 1. Comandline Args - * 2. --profile file - * 3. Defaults - */ - def toConfig: Config = { - val profileConfig: Config = get(_profile) match { - case Some(x) => - ConfigFactory.parseFile(x) - case None => - ConfigFactory.empty() - } - - val commandLineConfig: Config = ConfigFactory.parseMap(Map( - "stdin_port" -> get(_stdin_port), - "shell_port" -> get(_shell_port), - "iopub_port" -> get(_iopub_port), - "control_port" -> get(_control_port), - "hb_port" -> get(_heartbeat_port), - "ip" -> get(_ip), - "interpreter_args" -> interpreterArgs, - "magic_urls" -> getAll(_magic_url).map(_.asJava) - .flatMap(list => if (list.isEmpty) None else Some(list)), - "spark_configuration" -> getAll(_spark_configuration) - .map(list => KeyValuePairUtils.keyValuePairSeqToString(list)) - .flatMap(str => if (str.nonEmpty) Some(str) else None), - "max_interpreter_threads" -> get(_max_interpreter_threads), - "jar_dir" -> get(_jar_dir), - "default_interpreter" -> get(_default_interpreter), - "nosparkcontext" -> (if (has(_nosparkcontext)) Some(true) else Some(false)), - "interpreter_plugins" -> interpreterPlugins - ).flatMap(removeEmptyOptions).asInstanceOf[Map[String, AnyRef]].asJava) - - commandLineConfig.withFallback(profileConfig).withFallback(ConfigFactory.load) - } - - private val removeEmptyOptions: ((String, Option[Any])) => Iterable[(String, Any)] = { - pair => if (pair._2.isDefined) Some((pair._1, pair._2.get)) else None - } - - /** - * - * @return - */ - private def interpreterArgs: Option[java.util.List[String]] = { - args.dropWhile(_ != "--").drop(1).toList match { - case Nil => None - case list: List[String] => Some(list.asJava) - } - } - - private def interpreterPlugins: Option[java.util.List[String]] = { - //val defaults = getAll(_default_interpreter_plugin).getOrElse(List()) - //val defaults = List[String]( - // "PySpark:com.ibm.spark.kernel.interpreter.pyspark.PySparkInterpreter", - // "SparkR:com.ibm.spark.kernel.interpreter.sparkr.SparkRInterpreter", - // "SQL:com.ibm.spark.kernel.interpreter.sql.SqlInterpreter" - //) - - val userDefined = getAll(_interpreter_plugin) match { - case Some(l) => l - case _ => List[String]() - } - - //val p = defaults ++ userDefined - Some(userDefined.asJava) - } - - /** - * Prints the help message to the output stream provided. - * @param out The output stream to direct the help message - */ - def printHelpOn(out: OutputStream) = - parser.printHelpOn(out) -} - http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/boot/KernelBootstrap.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/com/ibm/spark/boot/KernelBootstrap.scala b/kernel/src/main/scala/com/ibm/spark/boot/KernelBootstrap.scala deleted file mode 100644 index 1e7927c..0000000 --- a/kernel/src/main/scala/com/ibm/spark/boot/KernelBootstrap.scala +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.boot - -import akka.actor.{ActorRef, ActorSystem} -import com.ibm.spark.boot.layer._ -import com.ibm.spark.interpreter.Interpreter -import com.ibm.spark.kernel.api.Kernel -import com.ibm.spark.kernel.protocol.v5.KernelStatusType._ -import com.ibm.spark.kernel.protocol.v5._ -import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader -import com.ibm.spark.security.KernelSecurityManager -import com.ibm.spark.utils.LogLike -import com.typesafe.config.Config -import org.apache.spark.SparkContext -import org.zeromq.ZMQ - -import scala.util.Try - -class KernelBootstrap(config: Config) extends LogLike { - this: BareInitialization with ComponentInitialization - with HandlerInitialization with HookInitialization => - - private val DefaultAppName = SparkKernelInfo.banner - private val DefaultActorSystemName = "spark-kernel-actor-system" - - private var actorSystem: ActorSystem = _ - private var actorLoader: ActorLoader = _ - private var kernelMessageRelayActor: ActorRef = _ - private var statusDispatch: ActorRef = _ - private var kernel: Kernel = _ - - private var sparkContext: SparkContext = _ - private var interpreters: Seq[Interpreter] = Nil - - /** - * Initializes all kernel systems. - */ - def initialize() = { - // TODO: Investigate potential to initialize System out/err/in to capture - // Console DynamicVariable initialization (since takes System fields) - // and redirect it to a workable location (like an actor) with the - // thread's current information attached - // - // E.G. System.setOut(customPrintStream) ... all new threads will have - // customPrintStream as their initial Console.out value - // - - displayVersionInfo() - - // Do this first to support shutting down quickly before entire system - // is ready - initializeShutdownHook() - - // Initialize the bare minimum to report a starting message - val (actorSystem, actorLoader, kernelMessageRelayActor, statusDispatch) = - initializeBare( - config = config, - actorSystemName = DefaultActorSystemName - ) - - this.actorSystem = actorSystem - this.actorLoader = actorLoader - this.kernelMessageRelayActor = kernelMessageRelayActor - this.statusDispatch = statusDispatch - - // Indicate that the kernel is now starting - publishStatus(KernelStatusType.Starting) - - // Initialize components needed elsewhere - val (commStorage, commRegistrar, commManager, interpreter, - kernel, dependencyDownloader, - magicLoader, responseMap) = - initializeComponents( - config = config, - appName = DefaultAppName, - actorLoader = actorLoader - ) - //this.sparkContext = sparkContext - this.interpreters ++= Seq(interpreter) - - this.kernel = kernel - - // Initialize our handlers that take care of processing messages - initializeHandlers( - actorSystem = actorSystem, - actorLoader = actorLoader, - kernel = kernel, - interpreter = interpreter, - commRegistrar = commRegistrar, - commStorage = commStorage, - magicLoader = magicLoader, - responseMap = responseMap - ) - - // Initialize our non-shutdown hooks that handle various JVM events - initializeHooks( - interpreter = interpreter - ) - - logger.debug("Initializing security manager") - System.setSecurityManager(new KernelSecurityManager) - - logger.info("Marking relay as ready for receiving messages") - kernelMessageRelayActor ! true - - this - } - - /** - * Shuts down all kernel systems. - */ - def shutdown() = { - logger.info("Shutting down Spark Context") - Try(kernel.sparkContext.stop()).failed.foreach( - logger.error("Failed to shutdown Spark Context", _: Throwable) - ) - - logger.info("Shutting down interpreters") - Try(interpreters.foreach(_.stop())).failed.foreach( - logger.error("Failed to shutdown interpreters", _: Throwable) - ) - - logger.info("Shutting down actor system") - Try(actorSystem.shutdown()).failed.foreach( - logger.error("Failed to shutdown actor system", _: Throwable) - ) - - this - } - - /** - * Waits for the main actor system to terminate. - */ - def waitForTermination() = { - logger.debug("Waiting for actor system to terminate") - actorSystem.awaitTermination() - - this - } - - private def publishStatus( - status: KernelStatusType, - parentHeader: Option[ParentHeader] = None - ): Unit = { - parentHeader match { - case Some(header) => statusDispatch ! ((status, header)) - case None => statusDispatch ! status - } - } - - @inline private def displayVersionInfo() = { - logger.info("Kernel version: " + SparkKernelInfo.implementationVersion) - logger.info("Scala version: " + SparkKernelInfo.languageVersion) - logger.info("ZeroMQ (JeroMQ) version: " + ZMQ.getVersionString) - } -} - http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/boot/layer/BareInitialization.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/com/ibm/spark/boot/layer/BareInitialization.scala b/kernel/src/main/scala/com/ibm/spark/boot/layer/BareInitialization.scala deleted file mode 100644 index d2d6ab9..0000000 --- a/kernel/src/main/scala/com/ibm/spark/boot/layer/BareInitialization.scala +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.boot.layer - -import akka.actor.{ActorRef, Props, ActorSystem} -import com.ibm.spark.kernel.protocol.v5.dispatch.StatusDispatch -import com.ibm.spark.kernel.protocol.v5.handler.{GenericSocketMessageHandler, ShutdownHandler} -import com.ibm.spark.kernel.protocol.v5.kernel.{SimpleActorLoader, ActorLoader} -import com.ibm.spark.communication.security.{SecurityActorType, SignatureManagerActor} -import com.ibm.spark.kernel.protocol.v5.kernel.socket._ -import com.ibm.spark.kernel.protocol.v5._ -import com.ibm.spark.kernel.protocol.v5.content.{CommClose, CommMsg, CommOpen} -import com.ibm.spark.kernel.protocol.v5.relay.KernelMessageRelay -import com.ibm.spark.utils.LogLike -import com.typesafe.config.Config -import play.api.libs.json.Json - -/** - * Represents the raw initialization needed to send a "starting" message. - */ -trait BareInitialization { - /** - * Initializes and registers all objects needed to get the kernel to send a - * "starting" message. - * - * @param config The config used for initialization - * @param actorSystemName The name to use for the actor system - */ - def initializeBare(config: Config, actorSystemName: String): - (ActorSystem, ActorLoader, ActorRef, ActorRef) -} - -/** - * Represents the standard implementation of BareInitialization. - */ -trait StandardBareInitialization extends BareInitialization { this: LogLike => - /** - * Initializes and registers all objects needed to get the kernel to send a - * "starting" message. - * - * @param config The config used for initialization - * @param actorSystemName The name to use for the actor system - */ - def initializeBare(config: Config, actorSystemName: String) = { - val actorSystem = createActorSystem(actorSystemName) - val actorLoader = createActorLoader(actorSystem) - val (kernelMessageRelayActor, _, statusDispatch, _, _) = - initializeCoreActors(config, actorSystem, actorLoader) - createSockets(config, actorSystem, actorLoader) - - (actorSystem, actorLoader, kernelMessageRelayActor, statusDispatch) - } - - protected def createActorSystem(actorSystemName: String): ActorSystem = { - logger.info("Initializing internal actor system") - ActorSystem(actorSystemName) - } - - protected def createActorLoader(actorSystem: ActorSystem): ActorLoader = { - logger.debug("Creating Simple Actor Loader") - SimpleActorLoader(actorSystem) - } - - /** - * Does minimal setup in order to send the "starting" status message over - * the IOPub socket - */ - protected def initializeCoreActors( - config: Config, actorSystem: ActorSystem, actorLoader: ActorLoader - ) = { - logger.debug("Creating kernel message relay actor") - val kernelMessageRelayActor = actorSystem.actorOf( - Props( - classOf[KernelMessageRelay], actorLoader, true, - Map( - CommOpen.toTypeString -> MessageType.Incoming.CommOpen.toString, - CommMsg.toTypeString -> MessageType.Incoming.CommMsg.toString, - CommClose.toTypeString -> MessageType.Incoming.CommClose.toString - ), - Map( - CommOpen.toTypeString -> MessageType.Outgoing.CommOpen.toString, - CommMsg.toTypeString -> MessageType.Outgoing.CommMsg.toString, - CommClose.toTypeString -> MessageType.Outgoing.CommClose.toString - ) - ), - name = SystemActorType.KernelMessageRelay.toString - ) - - logger.debug("Creating signature manager actor") - val sigKey = config.getString("key") - val sigScheme = config.getString("signature_scheme") - logger.debug("Key = " + sigKey) - logger.debug("Scheme = " + sigScheme) - val signatureManagerActor = actorSystem.actorOf( - Props( - classOf[SignatureManagerActor], sigKey, sigScheme.replace("-", "") - ), - name = SecurityActorType.SignatureManager.toString - ) - - logger.debug("Creating status dispatch actor") - val statusDispatch = actorSystem.actorOf( - Props(classOf[StatusDispatch], actorLoader), - name = SystemActorType.StatusDispatch.toString - ) - - logger.debug("Creating shutdown handler and sender actors") - val shutdownHandler = actorSystem.actorOf( - Props(classOf[ShutdownHandler], actorLoader), - name = MessageType.Incoming.ShutdownRequest.toString - ) - val shutdownSender = actorSystem.actorOf( - Props(classOf[GenericSocketMessageHandler], actorLoader, SocketType.Control), - name = MessageType.Outgoing.ShutdownReply.toString - ) - - (kernelMessageRelayActor, signatureManagerActor, statusDispatch, shutdownHandler, shutdownSender) - } - - protected def createSockets( - config: Config, actorSystem: ActorSystem, actorLoader: ActorLoader - ): Unit = { - logger.debug("Creating sockets") - - val socketConfig: SocketConfig = SocketConfig.fromConfig(config) - logger.info("Connection Profile: " - + Json.prettyPrint(Json.toJson(socketConfig))) - - logger.debug("Constructing ServerSocketFactory") - val socketFactory = new SocketFactory(socketConfig) - - logger.debug("Initializing Heartbeat on port " + - socketConfig.hb_port) - val heartbeatActor = actorSystem.actorOf( - Props(classOf[Heartbeat], socketFactory), - name = SocketType.Heartbeat.toString - ) - - logger.debug("Initializing Stdin on port " + - socketConfig.stdin_port) - val stdinActor = actorSystem.actorOf( - Props(classOf[Stdin], socketFactory, actorLoader), - name = SocketType.StdIn.toString - ) - - logger.debug("Initializing Shell on port " + - socketConfig.shell_port) - val shellActor = actorSystem.actorOf( - Props(classOf[Shell], socketFactory, actorLoader), - name = SocketType.Shell.toString - ) - - logger.debug("Initializing Control on port " + - socketConfig.control_port) - val controlActor = actorSystem.actorOf( - Props(classOf[Control], socketFactory, actorLoader), - name = SocketType.Control.toString - ) - - logger.debug("Initializing IOPub on port " + - socketConfig.iopub_port) - val ioPubActor = actorSystem.actorOf( - Props(classOf[IOPub], socketFactory), - name = SocketType.IOPub.toString - ) - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala b/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala deleted file mode 100644 index 939b896..0000000 --- a/kernel/src/main/scala/com/ibm/spark/boot/layer/ComponentInitialization.scala +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.boot.layer - -import java.util -import java.util.concurrent.ConcurrentHashMap - -import akka.actor.ActorRef -import com.ibm.spark.comm.{CommManager, KernelCommManager, CommRegistrar, CommStorage} -import com.ibm.spark.dependencies.{DependencyDownloader, IvyDependencyDownloader} -import com.ibm.spark.global -import com.ibm.spark.interpreter._ -import com.ibm.spark.kernel.api.{KernelLike, Kernel} -import com.ibm.spark.kernel.protocol.v5.KMBuilder -import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader -import com.ibm.spark.kernel.protocol.v5.stream.KernelOutputStream -import com.ibm.spark.magic.MagicLoader -import com.ibm.spark.magic.builtin.BuiltinLoader -import com.ibm.spark.magic.dependencies.DependencyMap -import com.ibm.spark.utils.{MultiClassLoader, TaskManager, KeyValuePairUtils, LogLike} -import com.typesafe.config.Config -import org.apache.spark.sql.SQLContext -import org.apache.spark.{SparkContext, SparkConf} - -import scala.collection.JavaConverters._ - -import scala.util.Try - -/** - * Represents the component initialization. All component-related pieces of the - * kernel (non-actors) should be created here. Limited items should be exposed. - */ -trait ComponentInitialization { - /** - * Initializes and registers all components (not needed by bare init). - * - * @param config The config used for initialization - * @param appName The name of the "application" for Spark - * @param actorLoader The actor loader to use for some initialization - */ - def initializeComponents( - config: Config, appName: String, actorLoader: ActorLoader - ): (CommStorage, CommRegistrar, CommManager, Interpreter, - Kernel, DependencyDownloader, MagicLoader, - collection.mutable.Map[String, ActorRef]) -} - -/** - * Represents the standard implementation of ComponentInitialization. - */ -trait StandardComponentInitialization extends ComponentInitialization { - this: LogLike => - - /** - * Initializes and registers all components (not needed by bare init). - * - * @param config The config used for initialization - * @param appName The name of the "application" for Spark - * @param actorLoader The actor loader to use for some initialization - */ - def initializeComponents( - config: Config, appName: String, actorLoader: ActorLoader - ) = { - val (commStorage, commRegistrar, commManager) = - initializeCommObjects(actorLoader) - - val manager = InterpreterManager(config) - val scalaInterpreter = manager.interpreters.get("Scala").orNull - - val dependencyDownloader = initializeDependencyDownloader(config) - val magicLoader = initializeMagicLoader( - config, scalaInterpreter, dependencyDownloader) - - val kernel = initializeKernel( - config, actorLoader, manager, commManager, magicLoader - ) - - val responseMap = initializeResponseMap() - - initializeSparkContext(config, kernel, appName) - - (commStorage, commRegistrar, commManager, - manager.defaultInterpreter.orNull, kernel, - dependencyDownloader, magicLoader, responseMap) - - } - - - def initializeSparkContext(config:Config, kernel:Kernel, appName:String) = { - if(!config.getBoolean("nosparkcontext")) { - kernel.createSparkContext(config.getString("spark.master"), appName) - } - } - - private def initializeCommObjects(actorLoader: ActorLoader) = { - logger.debug("Constructing Comm storage") - val commStorage = new CommStorage() - - logger.debug("Constructing Comm registrar") - val commRegistrar = new CommRegistrar(commStorage) - - logger.debug("Constructing Comm manager") - val commManager = new KernelCommManager( - actorLoader, KMBuilder(), commRegistrar) - - (commStorage, commRegistrar, commManager) - } - - private def initializeDependencyDownloader(config: Config) = { - val dependencyDownloader = new IvyDependencyDownloader( - "http://repo1.maven.org/maven2/", config.getString("ivy_local") - ) - - dependencyDownloader - } - - protected def initializeResponseMap(): collection.mutable.Map[String, ActorRef] = - new ConcurrentHashMap[String, ActorRef]().asScala - - private def initializeKernel( - config: Config, - actorLoader: ActorLoader, - interpreterManager: InterpreterManager, - commManager: CommManager, - magicLoader: MagicLoader - ) = { - val kernel = new Kernel( - config, - actorLoader, - interpreterManager, - commManager, - magicLoader - ) - /* - interpreter.doQuietly { - interpreter.bind( - "kernel", "com.ibm.spark.kernel.api.Kernel", - kernel, List( """@transient implicit""") - ) - } - */ - magicLoader.dependencyMap.setKernel(kernel) - - kernel - } - - private def initializeMagicLoader( - config: Config, interpreter: Interpreter, - dependencyDownloader: DependencyDownloader - ) = { - logger.debug("Constructing magic loader") - - logger.debug("Building dependency map") - val dependencyMap = new DependencyMap() - .setInterpreter(interpreter) - .setKernelInterpreter(interpreter) // This is deprecated - .setDependencyDownloader(dependencyDownloader) - .setConfig(config) - - logger.debug("Creating BuiltinLoader") - val builtinLoader = new BuiltinLoader() - - val magicUrlArray = config.getStringList("magic_urls").asScala - .map(s => new java.net.URL(s)).toArray - - if (magicUrlArray.isEmpty) - logger.warn("No external magics provided to MagicLoader!") - else - logger.info("Using magics from the following locations: " + - magicUrlArray.map(_.getPath).mkString(",")) - - val multiClassLoader = new MultiClassLoader( - builtinLoader, - interpreter.classLoader - ) - - logger.debug("Creating MagicLoader") - val magicLoader = new MagicLoader( - dependencyMap = dependencyMap, - urls = magicUrlArray, - parentLoader = multiClassLoader - ) - magicLoader.dependencyMap.setMagicLoader(magicLoader) - magicLoader - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/boot/layer/HandlerInitialization.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/com/ibm/spark/boot/layer/HandlerInitialization.scala b/kernel/src/main/scala/com/ibm/spark/boot/layer/HandlerInitialization.scala deleted file mode 100644 index 1d16ac4..0000000 --- a/kernel/src/main/scala/com/ibm/spark/boot/layer/HandlerInitialization.scala +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.boot.layer - -import akka.actor.{ActorRef, ActorSystem, Props} -import com.ibm.spark.comm.{CommRegistrar, CommStorage} -import com.ibm.spark.interpreter.Interpreter -import com.ibm.spark.kernel.api.Kernel -import com.ibm.spark.kernel.protocol.v5.MessageType.MessageType -import com.ibm.spark.kernel.protocol.v5.SocketType.SocketType -import com.ibm.spark.kernel.protocol.v5.handler._ -import com.ibm.spark.kernel.protocol.v5.interpreter.InterpreterActor -import com.ibm.spark.kernel.protocol.v5.interpreter.tasks.InterpreterTaskFactory -import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader -import com.ibm.spark.kernel.protocol.v5.magic.{MagicParser, PostProcessor} -import com.ibm.spark.kernel.protocol.v5.relay.ExecuteRequestRelay -import com.ibm.spark.kernel.protocol.v5.{MessageType, SocketType, SystemActorType} -import com.ibm.spark.magic.MagicLoader -import com.ibm.spark.utils.LogLike - -/** - * Represents the Akka handler initialization. All actors (not needed in bare - * initialization) should be constructed here. - */ -trait HandlerInitialization { - /** - * Initializes and registers all handlers. - * - * @param actorSystem The actor system needed for registration - * @param actorLoader The actor loader needed for registration - * @param kernel The kernel api needed for registration - * @param interpreter The main interpreter needed for registration - * @param magicLoader The magic loader needed for registration - * @param commRegistrar The comm registrar needed for registration - * @param commStorage The comm storage needed for registration - */ - def initializeHandlers( - actorSystem: ActorSystem, actorLoader: ActorLoader, - kernel: Kernel, - interpreter: Interpreter, magicLoader: MagicLoader, - commRegistrar: CommRegistrar, commStorage: CommStorage, - responseMap: collection.mutable.Map[String, ActorRef] - ): Unit -} - -/** - * Represents the standard implementation of HandlerInitialization. - */ -trait StandardHandlerInitialization extends HandlerInitialization { - this: LogLike => - - /** - * Initializes and registers all handlers. - * - * @param actorSystem The actor system needed for registration - * @param actorLoader The actor loader needed for registration - * @param kernel The kernel api needed for registration - * @param interpreter The main interpreter needed for registration - * @param magicLoader The magic loader needed for registration - * @param commRegistrar The comm registrar needed for registration - * @param commStorage The comm storage needed for registration - */ - def initializeHandlers( - actorSystem: ActorSystem, actorLoader: ActorLoader, - kernel: Kernel, - interpreter: Interpreter, magicLoader: MagicLoader, - commRegistrar: CommRegistrar, commStorage: CommStorage, - responseMap: collection.mutable.Map[String, ActorRef] - ): Unit = { - initializeKernelHandlers( - actorSystem, actorLoader, kernel, commRegistrar, commStorage, responseMap - ) - initializeSystemActors(actorSystem, actorLoader, interpreter, magicLoader) - } - - private def initializeSystemActors( - actorSystem: ActorSystem, actorLoader: ActorLoader, - interpreter: Interpreter, magicLoader: MagicLoader - ): Unit = { - logger.debug("Creating interpreter actor") - val interpreterActor = actorSystem.actorOf( - Props(classOf[InterpreterActor], new InterpreterTaskFactory(interpreter)), - name = SystemActorType.Interpreter.toString - ) - - logger.debug("Creating execute request relay actor") - val postProcessor = new PostProcessor(interpreter) - val magicParser = new MagicParser(magicLoader) - val executeRequestRelayActor = actorSystem.actorOf( - Props(classOf[ExecuteRequestRelay], - actorLoader, magicLoader, magicParser, postProcessor - ), - name = SystemActorType.ExecuteRequestRelay.toString - ) - } - - private def initializeKernelHandlers( - actorSystem: ActorSystem, actorLoader: ActorLoader, - kernel: Kernel, - commRegistrar: CommRegistrar, commStorage: CommStorage, - responseMap: collection.mutable.Map[String, ActorRef] - ): Unit = { - def initializeRequestHandler[T](clazz: Class[T], messageType: MessageType, extraArguments: AnyRef*) = { - logger.debug("Creating %s handler".format(messageType.toString)) - actorSystem.actorOf( - Props(clazz, actorLoader +: extraArguments: _*), - name = messageType.toString - ) - } - - def initializeInputHandler[T]( - clazz: Class[T], - messageType: MessageType - ): Unit = { - logger.debug("Creating %s handler".format(messageType.toString)) - actorSystem.actorOf( - Props(clazz, actorLoader, responseMap), - name = messageType.toString - ) - } - - // TODO: Figure out how to pass variable number of arguments to actor - def initializeCommHandler[T](clazz: Class[T], messageType: MessageType) = { - logger.debug("Creating %s handler".format(messageType.toString)) - actorSystem.actorOf( - Props(clazz, actorLoader, commRegistrar, commStorage), - name = messageType.toString - ) - } - - def initializeSocketHandler(socketType: SocketType, messageType: MessageType): Unit = { - logger.debug("Creating %s to %s socket handler ".format(messageType.toString ,socketType.toString)) - actorSystem.actorOf( - Props(classOf[GenericSocketMessageHandler], actorLoader, socketType), - name = messageType.toString - ) - } - - // These are the handlers for messages coming into the - initializeRequestHandler(classOf[ExecuteRequestHandler], - MessageType.Incoming.ExecuteRequest, kernel) - initializeRequestHandler(classOf[KernelInfoRequestHandler], - MessageType.Incoming.KernelInfoRequest) - initializeRequestHandler(classOf[CodeCompleteHandler], - MessageType.Incoming.CompleteRequest) - initializeInputHandler(classOf[InputRequestReplyHandler], - MessageType.Incoming.InputReply) - initializeCommHandler(classOf[CommOpenHandler], - MessageType.Incoming.CommOpen) - initializeCommHandler(classOf[CommMsgHandler], - MessageType.Incoming.CommMsg) - initializeCommHandler(classOf[CommCloseHandler], - MessageType.Incoming.CommClose) - - // These are handlers for messages leaving the kernel through the sockets - initializeSocketHandler(SocketType.Shell, MessageType.Outgoing.KernelInfoReply) - initializeSocketHandler(SocketType.Shell, MessageType.Outgoing.ExecuteReply) - initializeSocketHandler(SocketType.Shell, MessageType.Outgoing.CompleteReply) - - initializeSocketHandler(SocketType.StdIn, MessageType.Outgoing.InputRequest) - - initializeSocketHandler(SocketType.IOPub, MessageType.Outgoing.ExecuteResult) - initializeSocketHandler(SocketType.IOPub, MessageType.Outgoing.Stream) - initializeSocketHandler(SocketType.IOPub, MessageType.Outgoing.ExecuteInput) - initializeSocketHandler(SocketType.IOPub, MessageType.Outgoing.Status) - initializeSocketHandler(SocketType.IOPub, MessageType.Outgoing.Error) - initializeSocketHandler(SocketType.IOPub, MessageType.Outgoing.CommOpen) - initializeSocketHandler(SocketType.IOPub, MessageType.Outgoing.CommMsg) - initializeSocketHandler(SocketType.IOPub, MessageType.Outgoing.CommClose) - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/boot/layer/HookInitialization.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/com/ibm/spark/boot/layer/HookInitialization.scala b/kernel/src/main/scala/com/ibm/spark/boot/layer/HookInitialization.scala deleted file mode 100644 index c590394..0000000 --- a/kernel/src/main/scala/com/ibm/spark/boot/layer/HookInitialization.scala +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.boot.layer - -import com.ibm.spark.boot.KernelBootstrap -import com.ibm.spark.interpreter.Interpreter -import com.ibm.spark.utils.LogLike - -/** - * Represents the hook (interrupt/shutdown) initialization. All JVM-related - * hooks should be constructed here. - */ -trait HookInitialization { - /** - * Initializes and registers all hooks except shutdown. - * - * @param interpreter The main interpreter - */ - def initializeHooks(interpreter: Interpreter): Unit - - /** - * Initializes the shutdown hook. - */ - def initializeShutdownHook(): Unit -} - -/** - * Represents the standard implementation of HookInitialization. - */ -trait StandardHookInitialization extends HookInitialization { - this: KernelBootstrap with LogLike => - - /** - * Initializes and registers all hooks. - * - * @param interpreter The main interpreter - */ - def initializeHooks(interpreter: Interpreter): Unit = { - registerInterruptHook(interpreter) - } - - /** - * Initializes the shutdown hook. - */ - def initializeShutdownHook(): Unit = { - registerShutdownHook() - } - - private def registerInterruptHook(interpreter: Interpreter): Unit = { - val self = this - - import sun.misc.{Signal, SignalHandler} - - // TODO: Signals are not a good way to handle this since JVM only has the - // proprietary sun API that is not necessarily available on all platforms - Signal.handle(new Signal("INT"), new SignalHandler() { - private val MaxSignalTime: Long = 3000 // 3 seconds - var lastSignalReceived: Long = 0 - - def handle(sig: Signal) = { - val currentTime = System.currentTimeMillis() - if (currentTime - lastSignalReceived > MaxSignalTime) { - logger.info("Resetting code execution!") - interpreter.interrupt() - - // TODO: Cancel group representing current code execution - //sparkContext.cancelJobGroup() - - logger.info("Enter Ctrl-C twice to shutdown!") - lastSignalReceived = currentTime - } else { - logger.info("Shutting down kernel") - self.shutdown() - } - } - }) - } - - private def registerShutdownHook(): Unit = { - logger.debug("Registering shutdown hook") - val self = this - val mainThread = Thread.currentThread() - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run() = { - logger.info("Shutting down kernel") - self.shutdown() - // TODO: Check if you can magically access the spark context to stop it - // TODO: inside a different thread - if (mainThread.isAlive) mainThread.join() - } - }) - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/boot/layer/InterpreterManager.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/com/ibm/spark/boot/layer/InterpreterManager.scala b/kernel/src/main/scala/com/ibm/spark/boot/layer/InterpreterManager.scala deleted file mode 100644 index 520d68c..0000000 --- a/kernel/src/main/scala/com/ibm/spark/boot/layer/InterpreterManager.scala +++ /dev/null @@ -1,66 +0,0 @@ -package com.ibm.spark.boot.layer - -import com.ibm.spark.kernel.api.KernelLike -import com.typesafe.config.Config -import com.ibm.spark.interpreter._ -import scala.collection.JavaConverters._ - -import org.slf4j.LoggerFactory - -case class InterpreterManager( - default: String = "Scala", - interpreters: Map[String, Interpreter] = Map[String, Interpreter]() -) { - - - def initializeInterpreters(kernel: KernelLike): Unit = { - interpreters.values.foreach(interpreter => - interpreter.init(kernel) - ) - } - - def addInterpreter( - name:String, - interpreter: Interpreter - ): InterpreterManager = { - copy(interpreters = interpreters + (name -> interpreter)) - } - - def defaultInterpreter: Option[Interpreter] = { - interpreters.get(default) - } -} - -object InterpreterManager { - - protected val logger = LoggerFactory.getLogger(this.getClass.getName) - - def apply(config: Config): InterpreterManager = { - val ip = config.getStringList("interpreter_plugins").asScala ++ - config.getStringList("default_interpreter_plugin").asScala - - val m = ip.foldLeft(Map[String, Interpreter]())( (acc, v) => { - v.split(":") match { - case Array(name, className) => - try { - val i = Class - .forName(className) - .newInstance() - .asInstanceOf[Interpreter] - acc + (name -> i) - } - catch { - case e:Throwable => - logger.error("Error loading interpreter class " + className) - logger.error(e.getMessage()) - acc - } - case _ => acc - } - }) - - val default = config.getString("default_interpreter") - - InterpreterManager(interpreters = m, default = default) - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/comm/KernelCommManager.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/com/ibm/spark/comm/KernelCommManager.scala b/kernel/src/main/scala/com/ibm/spark/comm/KernelCommManager.scala deleted file mode 100644 index 9706ce7..0000000 --- a/kernel/src/main/scala/com/ibm/spark/comm/KernelCommManager.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.comm - -import com.ibm.spark.annotations.Experimental -import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader -import com.ibm.spark.kernel.protocol.v5.{KMBuilder, UUID} - -/** - * Represents a CommManager that uses a KernelCommWriter for its underlying - * open implementation. - * - * @param actorLoader The actor loader to use with the ClientCommWriter - * @param kmBuilder The KMBuilder to use with the ClientCommWriter - * @param commRegistrar The registrar to use for callback registration - */ -@Experimental -class KernelCommManager( - private val actorLoader: ActorLoader, - private val kmBuilder: KMBuilder, - private val commRegistrar: CommRegistrar -) extends CommManager(commRegistrar) -{ - /** - * Creates a new CommWriter instance given the Comm id. - * - * @param commId The Comm id to use with the Comm writer - * - * @return The new CommWriter instance - */ - override protected def newCommWriter(commId: UUID): CommWriter = - new KernelCommWriter(actorLoader, kmBuilder, commId) -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/comm/KernelCommWriter.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/com/ibm/spark/comm/KernelCommWriter.scala b/kernel/src/main/scala/com/ibm/spark/comm/KernelCommWriter.scala deleted file mode 100644 index e494663..0000000 --- a/kernel/src/main/scala/com/ibm/spark/comm/KernelCommWriter.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.comm - -import com.ibm.spark.annotations.Experimental -import com.ibm.spark.kernel.protocol.v5 -import com.ibm.spark.kernel.protocol.v5.content.{CommMsg, CommOpen, CommClose, CommContent} -import com.ibm.spark.kernel.protocol.v5._ -import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader - -/** - * Represents a CommWriter to send messages from the Kernel to the Client. - * - * @param actorLoader The actor loader to use for loading actors responsible for - * communication - * @param kmBuilder The kernel message builder used to construct kernel messages - * @param commId The comm id associated with this writer (defaults to a - * random UUID) - */ -@Experimental -class KernelCommWriter( - private val actorLoader: ActorLoader, - private val kmBuilder: KMBuilder, - override private[comm] val commId: v5.UUID -) extends CommWriter(commId) { - /** - * Sends the comm message (open/msg/close) to the actor responsible for - * relaying messages. - * - * @param commContent The message to relay (will be packaged) - * - * @tparam T Either CommOpen, CommMsg, or CommClose - */ - override protected[comm] def sendCommKernelMessage[ - T <: KernelMessageContent with CommContent - ](commContent: T): Unit = { - val messageType = commContent match { - case _: CommOpen => CommOpen.toTypeString - case _: CommMsg => CommMsg.toTypeString - case _: CommClose => CommClose.toTypeString - case _ => throw new Throwable("Invalid kernel message type!") - } - actorLoader.load(SystemActorType.KernelMessageRelay) ! - kmBuilder.withHeader(messageType).withContentString(commContent).build - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/global/ExecuteRequestState.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/com/ibm/spark/global/ExecuteRequestState.scala b/kernel/src/main/scala/com/ibm/spark/global/ExecuteRequestState.scala deleted file mode 100644 index 389cf82..0000000 --- a/kernel/src/main/scala/com/ibm/spark/global/ExecuteRequestState.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.global - -import com.ibm.spark.kernel.api.Kernel -import com.ibm.spark.kernel.protocol.v5.KernelMessage - -/** - * Represents the state of the kernel messages being received containing - * execute requests. - */ -object ExecuteRequestState { - private var _lastKernelMessage: Option[KernelMessage] = None - - /** - * Processes the incoming kernel message and updates any associated state. - * - * @param kernelMessage The kernel message to process - */ - def processIncomingKernelMessage(kernelMessage: KernelMessage) = - _lastKernelMessage = Some(kernelMessage) - - /** - * Returns the last kernel message funneled through the KernelMessageRelay - * if any. - * - * @return Some KernelMessage instance if the relay has processed one, - * otherwise None - */ - def lastKernelMessage: Option[KernelMessage] = _lastKernelMessage - - /** - * Resets the state of the ExecuteRequestState to the default. - */ - def reset() = _lastKernelMessage = None -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/global/ExecutionCounter.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/com/ibm/spark/global/ExecutionCounter.scala b/kernel/src/main/scala/com/ibm/spark/global/ExecutionCounter.scala deleted file mode 100644 index c00f937..0000000 --- a/kernel/src/main/scala/com/ibm/spark/global/ExecutionCounter.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.global - -import com.ibm.spark.kernel.protocol.v5.UUID - -import scala.collection.concurrent._ - -/** - * A class to keep track of execution counts for sessions. - */ -object ExecutionCounter { - private val executionCounts: Map[UUID, Int] = TrieMap[UUID, Int]() - - /** - * This function will increase, by 1, an integer value associated with the given key. The incremented value - * will be returned. If the key has no value associated value, 1 will be returned. - * @param key The key for incrementing the value. - * @return The incremented value - */ - def incr(key: UUID): Int = { - (executionCounts += (key -> (executionCounts.getOrElse(key, 0) + 1))) (key) - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/global/ScheduledTaskManager.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/com/ibm/spark/global/ScheduledTaskManager.scala b/kernel/src/main/scala/com/ibm/spark/global/ScheduledTaskManager.scala deleted file mode 100644 index 83d2cea..0000000 --- a/kernel/src/main/scala/com/ibm/spark/global/ScheduledTaskManager.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.global - -import com.ibm.spark.utils - -object ScheduledTaskManager { - lazy val instance = new utils.ScheduledTaskManager -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/kernel/api/FactoryMethods.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/com/ibm/spark/kernel/api/FactoryMethods.scala b/kernel/src/main/scala/com/ibm/spark/kernel/api/FactoryMethods.scala deleted file mode 100644 index 36c5c5c..0000000 --- a/kernel/src/main/scala/com/ibm/spark/kernel/api/FactoryMethods.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.kernel.api - -import java.io.{InputStream, OutputStream} - -import com.ibm.spark.kernel.protocol.v5 -import com.ibm.spark.kernel.protocol.v5.{KMBuilder, KernelMessage} -import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader -import com.ibm.spark.kernel.protocol.v5.stream.{KernelOutputStream, KernelInputStream} -import com.typesafe.config.Config - -/** - * Represents the methods available to stream data from the kernel to the - * client. - * - * @param config The kernel configuration to use during object creation - * @param actorLoader The actor loader to use when retrieve actors needed for - * object creation - * @param parentMessage The parent message to use when needed for object - * creation - * @param kernelMessageBuilder The builder to use when constructing kernel - * messages inside objects created - */ -class FactoryMethods( - private val config: Config, - private val actorLoader: ActorLoader, - private val parentMessage: KernelMessage, - private val kernelMessageBuilder: KMBuilder -) extends FactoryMethodsLike { - require(parentMessage != null, "Parent message cannot be null!") - - private[api] val kmBuilder = kernelMessageBuilder.withParent(parentMessage) - - /** - * Creates a new kernel input stream. - * - * @param prompt The text to use as a prompt - * @param password If true, should treat input as a password field - * - * @return The new KernelInputStream instance - */ - override def newKernelInputStream( - prompt: String = KernelInputStream.DefaultPrompt, - password: Boolean = KernelInputStream.DefaultPassword - ): InputStream = { - new KernelInputStream( - actorLoader, - kmBuilder.withIds(parentMessage.ids), - prompt = prompt, - password = password - ) - } - - /** - * Creates a new kernel output stream. - * - * @param streamType The type of output stream (stdout/stderr) - * @param sendEmptyOutput If true, will send message even if output is empty - * - * @return The new KernelOutputStream instance - */ - override def newKernelOutputStream( - streamType: String = KernelOutputStream.DefaultStreamType, - sendEmptyOutput: Boolean = config.getBoolean("send_empty_output") - ): OutputStream = { - new v5.stream.KernelOutputStream( - actorLoader, - kmBuilder, - com.ibm.spark.global.ScheduledTaskManager.instance, - streamType = streamType, - sendEmptyOutput = sendEmptyOutput - ) - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala b/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala deleted file mode 100644 index 219804a..0000000 --- a/kernel/src/main/scala/com/ibm/spark/kernel/api/Kernel.scala +++ /dev/null @@ -1,448 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.kernel.api - -import java.io.{OutputStream, InputStream, PrintStream} -import java.util.concurrent.ConcurrentHashMap - -import com.ibm.spark.annotations.Experimental -import com.ibm.spark.boot.layer.InterpreterManager -import com.ibm.spark.comm.CommManager -import com.ibm.spark.global -import com.ibm.spark.interpreter.Results.Result -import com.ibm.spark.interpreter._ -import com.ibm.spark.kernel.protocol.v5 -import com.ibm.spark.kernel.protocol.v5.{KMBuilder, KernelMessage} -import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader -import com.ibm.spark.kernel.protocol.v5.magic.MagicParser -import com.ibm.spark.kernel.protocol.v5.stream.{KernelOutputStream, KernelInputStream} -import com.ibm.spark.magic.{MagicLoader, MagicExecutor} -import com.ibm.spark.utils.{KeyValuePairUtils, LogLike} -import com.typesafe.config.Config -import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.sql.SQLContext -import org.apache.spark.{SparkContext, SparkConf} -import scala.util.{Try, DynamicVariable} - -import scala.reflect.runtime.universe._ - -import scala.language.dynamics -import com.ibm.spark.global.ExecuteRequestState - -/** - * Represents the main kernel API to be used for interaction. - * - * @param _config The configuration used when starting the kernel - * @param interpreterManager The interpreter manager to expose in this instance - * @param comm The Comm manager to expose in this instance - * @param actorLoader The actor loader to use for message relaying - */ -@Experimental -class Kernel ( - private val _config: Config, - private val actorLoader: ActorLoader, - val interpreterManager: InterpreterManager, - val comm: CommManager, - val magicLoader: MagicLoader -) extends KernelLike with LogLike { - - /** - * Represents the current input stream used by the kernel for the specific - * thread. - */ - private val currentInputStream = - new DynamicVariable[InputStream](null) - private val currentInputKernelMessage = - new DynamicVariable[KernelMessage](null) - - /** - * Represents the current output stream used by the kernel for the specific - * thread. - */ - private val currentOutputStream = - new DynamicVariable[PrintStream](null) - private val currentOutputKernelMessage = - new DynamicVariable[KernelMessage](null) - - /** - * Represents the current error stream used by the kernel for the specific - * thread. - */ - private val currentErrorStream = - new DynamicVariable[PrintStream](null) - private val currentErrorKernelMessage = - new DynamicVariable[KernelMessage](null) - - private var _sparkContext:SparkContext = null; - private var _sparkConf:SparkConf = null; - private var _javaSparkContext:JavaSparkContext = null; - private var _sqlContext:SQLContext = null; - - /** - * Represents magics available through the kernel. - */ - val magics = new MagicExecutor(magicLoader) - - /** - * Represents magic parsing functionality. - */ - val magicParser = new MagicParser(magicLoader) - - /** - * Represents the data that can be shared using the kernel as the middleman. - * - * @note Using Java structure to enable other languages to have easy access! - */ - val data: java.util.Map[String, Any] = new ConcurrentHashMap[String, Any]() - - - interpreterManager.initializeInterpreters(this) - - val interpreter = interpreterManager.defaultInterpreter.get - - /** - * Handles the output of interpreting code. - * @param output the output of the interpreter - * @return (success, message) or (failure, message) - */ - private def handleInterpreterOutput( - output: (Result, Either[ExecuteOutput, ExecuteFailure]) - ): (Boolean, String) = { - val (success, result) = output - success match { - case Results.Success => - (true, result.left.getOrElse("").asInstanceOf[String]) - case Results.Error => - (false, result.right.getOrElse("").toString) - case Results.Aborted => - (false, "Aborted!") - case Results.Incomplete => - // If we get an incomplete it's most likely a syntax error, so - // let the user know. - (false, "Syntax Error!") - } - } - - override def config:Config = { - _config - } - - /** - * Executes a block of code represented as a string and returns the result. - * - * @param code The code as an option to execute - * @return A tuple containing the result (true/false) and the output as a - * string - */ - def eval(code: Option[String]): (Boolean, String) = { - code.map(c => { - magicParser.parse(c) match { - case Left(parsedCode) => - val output = interpreter.interpret(parsedCode) - handleInterpreterOutput(output) - case Right(errMsg) => - (false, errMsg) - } - }).getOrElse((false, "Error!")) - } - - /** - * Constructs a new instance of the stream methods using the latest - * kernel message instance. - * - * @return The collection of stream methods - */ - override def stream: StreamMethods = stream() - - /** - * Constructs a new instance of the stream methods using the specified - * kernel message instance. - * - * @param parentMessage The message to serve as the parent of outgoing - * messages sent as a result of using streaming methods - * - * @return The collection of streaming methods - */ - private[spark] def stream( - parentMessage: v5.KernelMessage = lastKernelMessage() - ): StreamMethods = { - new StreamMethods(actorLoader, parentMessage) - } - - /** - * Constructs a new instance of the factory methods using the latest - * kernel message instance. - * - * @return The collection of factory methods - */ - override def factory: FactoryMethods = factory() - - /** - * Constructs a new instance of the factory methods using the specified - * kernel message and kernel message builder. - * - * @param parentMessage The message to serve as the parent of outgoing - * messages sent as a result of using an object created - * by the factory methods - * @param kmBuilder The builder to be used by objects created by factory - * methods - * - * @return The collection of factory methods - */ - private[spark] def factory( - parentMessage: v5.KernelMessage = lastKernelMessage(), - kmBuilder: v5.KMBuilder = v5.KMBuilder() - ): FactoryMethods = { - new FactoryMethods(_config, actorLoader, parentMessage, kmBuilder) - } - - /** - * Returns a print stream to be used for communication back to clients - * via standard out. - * - * @return The print stream instance or an error if the stream info is - * not found - */ - override def out: PrintStream = { - val kernelMessage = lastKernelMessage() - - constructStream(currentOutputStream, currentOutputKernelMessage, kernelMessage, { kernelMessage => - val outputStream = this.factory(parentMessage = kernelMessage) - .newKernelOutputStream("stdout") - - new PrintStream(outputStream) - }) - } - - /** - * Returns a print stream to be used for communication back to clients - * via standard error. - * - * @return The print stream instance or an error if the stream info is - * not found - */ - override def err: PrintStream = { - val kernelMessage = lastKernelMessage() - - constructStream(currentErrorStream, currentErrorKernelMessage, kernelMessage, { kernelMessage => - val outputStream = this.factory(parentMessage = kernelMessage) - .newKernelOutputStream("stderr") - - new PrintStream(outputStream) - }) - } - - /** - * Returns an input stream to be used to receive information from the client. - * - * @return The input stream instance or an error if the stream info is - * not found - */ - override def in: InputStream = { - val kernelMessage = lastKernelMessage() - - constructStream(currentInputStream, currentInputKernelMessage, kernelMessage, { kernelMessage => - this.factory(parentMessage = kernelMessage).newKernelInputStream() - }) - } - - /** - * Constructs or uses an existing stream. - * - * @param dynamicStream The DynamicVariable containing the stream to modify - * or use - * @param dynamicKernelMessage The DynamicVariable containing the KernelMessage to - * check against the new KernelMessage - * @param newKernelMessage The potentially-new KernelMessage - * @param streamConstructionFunc The function used to create a new stream - * @param typeTag The type information associated with the stream - * @tparam T The stream type - * @return The new stream or existing stream - */ - private def constructStream[T]( - dynamicStream: DynamicVariable[T], - dynamicKernelMessage: DynamicVariable[KernelMessage], - newKernelMessage: KernelMessage, - streamConstructionFunc: (KernelMessage) => T - )(implicit typeTag: TypeTag[T]) = { - // Update the stream being used only if the information has changed - // or if the stream has not been initialized - if (updateKernelMessage(dynamicKernelMessage, newKernelMessage) || - dynamicStream.value == null) - { - logger.trace("Creating new kernel " + typeTag.tpe.toString + "!") - dynamicStream.value = streamConstructionFunc(newKernelMessage) - } - - dynamicStream.value - } - - /** - * Updates the last stream info returning the status of whether or not the - * new stream info was different than the last stream info. - * - * @param dynamicKernelMessage The dynamic variable containing the current - * stream info - * @param kernelMessage The new stream info - * @return True if the new stream info is different from the last (therefore - * replaced), otherwise false - */ - private def updateKernelMessage( - dynamicKernelMessage: DynamicVariable[KernelMessage], - kernelMessage: KernelMessage - ): Boolean = - if (kernelMessage != null && !kernelMessage.equals(dynamicKernelMessage.value)) { - dynamicKernelMessage.value = kernelMessage - true - } else { - false - } - - /** - * Retrieves the last kernel message received by the kernel. - * - * @throws IllegalArgumentException If no kernel message has been received - * - * @return The kernel message instance - */ - private def lastKernelMessage() = { - val someKernelMessage = ExecuteRequestState.lastKernelMessage - require(someKernelMessage.nonEmpty, "No kernel message received!") - someKernelMessage.get - } - - override def createSparkContext(conf: SparkConf): SparkContext = { - _sparkConf = createSparkConf(conf) - _sparkContext = initializeSparkContext(sparkConf) - _javaSparkContext = new JavaSparkContext(_sparkContext) - _sqlContext = initializeSqlContext(_sparkContext) - - val sparkMaster = _sparkConf.getOption("spark.master").getOrElse("not_set") - logger.info( s"Connecting to spark.master $sparkMaster") - - updateInterpreterWithSparkContext(interpreter, sparkContext) - updateInterpreterWithSqlContext(interpreter, sqlContext) - - magicLoader.dependencyMap = - magicLoader.dependencyMap.setSparkContext(_sparkContext) - - _sparkContext - } - - override def createSparkContext( - master: String, appName: String - ): SparkContext = { - createSparkContext(new SparkConf().setMaster(master).setAppName(appName)) - } - - // TODO: Think of a better way to test without exposing this - protected[kernel] def createSparkConf(conf: SparkConf) = { - - logger.info("Setting deployMode to client") - conf.set("spark.submit.deployMode", "client") - - KeyValuePairUtils.stringToKeyValuePairSeq( - _config.getString("spark_configuration") - ).foreach { keyValuePair => - logger.info(s"Setting ${keyValuePair.key} to ${keyValuePair.value}") - Try(conf.set(keyValuePair.key, keyValuePair.value)) - } - - // TODO: Move SparkIMain to private and insert in a different way - logger.warn("Locked to Scala interpreter with SparkIMain until decoupled!") - - // TODO: Construct class server outside of SparkIMain - logger.warn("Unable to control initialization of REPL class server!") - logger.info("REPL Class Server Uri: " + interpreter.classServerURI) - conf.set("spark.repl.class.uri", interpreter.classServerURI) - - conf - } - - // TODO: Think of a better way to test without exposing this - protected[kernel] def initializeSparkContext(sparkConf: SparkConf): SparkContext = { - - logger.debug("Constructing new Spark Context") - // TODO: Inject stream redirect headers in Spark dynamically - var sparkContext: SparkContext = null - val outStream = new KernelOutputStream( - actorLoader, KMBuilder(), global.ScheduledTaskManager.instance, - sendEmptyOutput = _config.getBoolean("send_empty_output") - ) - - // Update global stream state and use it to set the Console local variables - // for threads in the Spark threadpool - global.StreamState.setStreams(System.in, outStream, outStream) - global.StreamState.withStreams { - sparkContext = new SparkContext(sparkConf) - } - - sparkContext - } - - // TODO: Think of a better way to test without exposing this - protected[kernel] def updateInterpreterWithSparkContext( - interpreter: Interpreter, sparkContext: SparkContext - ) = { - - interpreter.bindSparkContext(sparkContext) - } - - protected[kernel] def initializeSqlContext( - sparkContext: SparkContext - ): SQLContext = { - val sqlContext: SQLContext = try { - logger.info("Attempting to create Hive Context") - val hiveContextClassString = - "org.apache.spark.sql.hive.HiveContext" - - logger.debug(s"Looking up $hiveContextClassString") - val hiveContextClass = Class.forName(hiveContextClassString) - - val sparkContextClass = classOf[SparkContext] - val sparkContextClassName = sparkContextClass.getName - - logger.debug(s"Searching for constructor taking $sparkContextClassName") - val hiveContextContructor = - hiveContextClass.getConstructor(sparkContextClass) - - logger.debug("Invoking Hive Context constructor") - hiveContextContructor.newInstance(sparkContext).asInstanceOf[SQLContext] - } catch { - case _: Throwable => - logger.warn("Unable to create Hive Context! Defaulting to SQL Context!") - new SQLContext(sparkContext) - } - - sqlContext - } - - protected[kernel] def updateInterpreterWithSqlContext( - interpreter: Interpreter, sqlContext: SQLContext - ): Unit = { - interpreter.bindSqlContext(sqlContext) - } - - override def interpreter(name: String): Option[Interpreter] = { - interpreterManager.interpreters.get(name) - } - - override def sparkContext: SparkContext = _sparkContext - override def sparkConf: SparkConf = _sparkConf - override def javaSparkContext: JavaSparkContext = _javaSparkContext - override def sqlContext: SQLContext = _sqlContext -}