http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/magic/builtin/LSMagic.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/LSMagic.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/LSMagic.scala new file mode 100644 index 0000000..db99cc1 --- /dev/null +++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/LSMagic.scala @@ -0,0 +1,65 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.magic.builtin + +import java.io.PrintStream + +import com.ibm.spark.magic._ +import com.ibm.spark.magic.dependencies.IncludeOutputStream + +class LSMagic extends LineMagic with IncludeOutputStream { + + private lazy val printStream = new PrintStream(outputStream) + + /** + * Lists all available magics. + * @param code The single line of code + * @return The output of the magic + */ + override def execute(code: String): Unit = { + val classes = new BuiltinLoader().loadClasses().toList + val lineMagics = magicNames("%", classOf[LineMagic], classes) + .mkString(" ").toLowerCase + val cellMagics = magicNames("%%", classOf[CellMagic], classes) + .mkString(" ").toLowerCase + val message = + s"""|Available line magics: + |$lineMagics + | + |Available cell magics: + |$cellMagics + | + |Type %<magic_name> for usage info. + """.stripMargin + + printStream.println(message) + } + + /** + * Provides a list of class names from the given list that implement + * the specified interface, with the specified prefix prepended. + * @param prefix prepended to each name, e.g. "%%" + * @param interface a magic interface, e.g. classOf[LineMagic] + * @param classes a list of magic classes + * @return list of class names with prefix + */ + protected[magic] def magicNames(prefix: String, interface: Class[_], + classes: List[Class[_]]) : List[String] = { + val filteredClasses = classes.filter(_.getInterfaces.contains(interface)) + filteredClasses.map(c => s"${prefix}${c.getSimpleName}") + } +}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/magic/builtin/RDD.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/RDD.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/RDD.scala new file mode 100644 index 0000000..dbee517 --- /dev/null +++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/RDD.scala @@ -0,0 +1,61 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.magic.builtin + +import com.ibm.spark.interpreter.{ExecuteFailure, Results, ExecuteAborted, ExecuteError} +import com.ibm.spark.kernel.protocol.v5.MIMEType +import com.ibm.spark.magic._ +import com.ibm.spark.magic.dependencies.{IncludeKernelInterpreter, IncludeInterpreter} +import com.ibm.spark.utils.LogLike +import com.ibm.spark.utils.json.RddToJson +import org.apache.spark.sql.SchemaRDD + +/** + * Temporary magic to show an RDD as JSON + */ +class RDD extends CellMagic with IncludeKernelInterpreter with LogLike { + + private def convertToJson(code: String) = { + val (result, message) = kernelInterpreter.interpret(code) + result match { + case Results.Success => + val rddVarName = kernelInterpreter.lastExecutionVariableName.getOrElse("") + kernelInterpreter.read(rddVarName).map(rddVal => { + try{ + CellMagicOutput(MIMEType.ApplicationJson -> RddToJson.convert(rddVal.asInstanceOf[SchemaRDD])) + } catch { + case _: Throwable => + CellMagicOutput(MIMEType.PlainText -> s"Could note convert RDD to JSON: ${rddVarName}->${rddVal}") + } + }).getOrElse(CellMagicOutput(MIMEType.PlainText -> "No RDD Value found!")) + case _ => + val errorMessage = message.right.toOption match { + case Some(executeFailure) => executeFailure match { + case _: ExecuteAborted => throw new Exception("RDD magic aborted!") + case executeError: ExecuteError => throw new Exception(executeError.value) + } + case _ => "No error information available!" + } + logger.error(s"Error retrieving RDD value: ${errorMessage}") + CellMagicOutput(MIMEType.PlainText -> + (s"An error occurred converting RDD to JSON.\n${errorMessage}")) + } + } + + override def execute(code: String): CellMagicOutput = + convertToJson(code) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/magic/builtin/ShowTypes.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/ShowTypes.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/ShowTypes.scala new file mode 100644 index 0000000..47d4f65 --- /dev/null +++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/ShowTypes.scala @@ -0,0 +1,41 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.spark.magic.builtin + +import com.ibm.spark.magic.LineMagic +import com.ibm.spark.magic.dependencies.IncludeOutputStream +import java.io.PrintStream +import com.ibm.spark.kernel.api.KernelOptions + + +class ShowTypes extends LineMagic with IncludeOutputStream { + private lazy val printStream = new PrintStream(outputStream) + + override def execute(code: String): Unit = { + code match { + case "on" => + printStream.println(s"Types will be printed.") + KernelOptions.showTypes = true + case "off" => + printStream.println(s"Types will not be printed") + KernelOptions.showTypes = false + case "" => + printStream.println(s"ShowTypes is currently ${if (KernelOptions.showTypes) "on" else "off"} ") + case other => + printStream.println(s"${other} is not a valid option for the ShowTypes magic.") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/magic/builtin/Truncation.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/org/apache/toree/magic/builtin/Truncation.scala b/kernel/src/main/scala/org/apache/toree/magic/builtin/Truncation.scala new file mode 100644 index 0000000..d30736e --- /dev/null +++ b/kernel/src/main/scala/org/apache/toree/magic/builtin/Truncation.scala @@ -0,0 +1,41 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.spark.magic.builtin + +import com.ibm.spark.magic.LineMagic +import com.ibm.spark.magic.dependencies.IncludeOutputStream +import java.io.PrintStream +import com.ibm.spark.kernel.api.KernelOptions + + +class Truncation extends LineMagic with IncludeOutputStream { + private lazy val printStream = new PrintStream(outputStream) + + override def execute(code: String): Unit = { + code match { + case "on" => + printStream.println(s"Output WILL be truncated.") + KernelOptions.noTruncation = false + case "off" => + printStream.println(s"Output will NOT be truncated") + KernelOptions.noTruncation = true + case "" => + printStream.println(s"Truncation is currently ${if (KernelOptions.noTruncation) "off" else "on"} ") + case other => + printStream.println(s"${other} is not a valid option for the NoTruncation magic.") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/utils/MessageLogSupport.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/org/apache/toree/utils/MessageLogSupport.scala b/kernel/src/main/scala/org/apache/toree/utils/MessageLogSupport.scala new file mode 100644 index 0000000..05c2216 --- /dev/null +++ b/kernel/src/main/scala/org/apache/toree/utils/MessageLogSupport.scala @@ -0,0 +1,74 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.utils + +import com.ibm.spark.kernel.protocol.v5.{MessageType, KernelMessage} + +trait MessageLogSupport extends LogLike { + /** + * Logs various pieces of a KernelMessage at different levels of logging. + * @param km + */ + def logMessage(km: KernelMessage): Unit = { + logger.trace(s"Kernel message ids: ${km.ids}") + logger.trace(s"Kernel message signature: ${km.signature}") + logger.debug(s"Kernel message header id: ${km.header.msg_id}") + logger.debug(s"Kernel message header type: ${km.header.msg_type}") + val incomingMessage = isIncomingMessage(km.header.msg_type) + (km.parentHeader, incomingMessage) match { + case (null, true) => // Don't do anything, this is expected + case (null, false) => // Messages coming from the kernel should have parent headers + logger.warn(s"Parent header is null for message ${km.header.msg_id} " + + s"of type ${km.header.msg_type}") + case _ => + logger.trace(s"Kernel message parent id: ${km.parentHeader.msg_id}") + logger.trace(s"Kernel message parent type: ${km.parentHeader.msg_type}") + } + logger.trace(s"Kernel message metadata: ${km.metadata}") + logger.trace(s"Kernel message content: ${km.contentString}") + } + + /** + * Logs an action, along with message id and type for a KernelMessage. + * @param action + * @param km + */ + def logKernelMessageAction(action: String, km: KernelMessage): Unit = { + logger.debug(s"${action} KernelMessage ${km.header.msg_id} " + + s"of type ${km.header.msg_type}") + } + + // TODO: Migrate this to a helper method in MessageType.Incoming + /** + * This method is used to determine if a message is being received by the + * kernel or being sent from the kernel. + * @return true if the message is received by the kernel, false otherwise. + */ + private def isIncomingMessage(messageType: String): Boolean ={ + MessageType.Incoming.CompleteRequest.toString.equals(messageType) || + MessageType.Incoming.ConnectRequest.toString.equals(messageType) || + MessageType.Incoming.ExecuteRequest.toString.equals(messageType) || + MessageType.Incoming.HistoryRequest.toString.equals(messageType) || + MessageType.Incoming.InspectRequest.toString.equals(messageType) || + MessageType.Incoming.ShutdownRequest.toString.equals(messageType)|| + MessageType.Incoming.KernelInfoRequest.toString.equals(messageType) || + MessageType.Incoming.CommOpen.toString.equals(messageType) || + MessageType.Incoming.CommMsg.toString.equals(messageType) || + MessageType.Incoming.CommClose.toString.equals(messageType) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/main/scala/org/apache/toree/utils/json/RddToJson.scala ---------------------------------------------------------------------- diff --git a/kernel/src/main/scala/org/apache/toree/utils/json/RddToJson.scala b/kernel/src/main/scala/org/apache/toree/utils/json/RddToJson.scala new file mode 100644 index 0000000..3439d0d --- /dev/null +++ b/kernel/src/main/scala/org/apache/toree/utils/json/RddToJson.scala @@ -0,0 +1,41 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.utils.json + +import org.apache.spark.sql.{DataFrame, SchemaRDD} +import play.api.libs.json.{JsObject, JsString, Json} + +/** + * Utility to convert RDD to JSON. + */ +object RddToJson { + + /** + * Converts a SchemaRDD to a JSON table format. + * + * @param rdd The schema rdd (now a dataframe) to convert + * + * @return The resulting string representing the JSON + */ + def convert(rdd: DataFrame, limit: Int = 10): String = + JsObject(Seq( + "type" -> JsString("rdd/schema"), + "columns" -> Json.toJson(rdd.schema.fieldNames), + "rows" -> Json.toJson(rdd.map(row => + row.toSeq.map(_.toString).toArray).take(limit)) + )).toString() +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/boot/CommandLineOptionsSpec.scala ---------------------------------------------------------------------- diff --git a/kernel/src/test/scala/com/ibm/spark/boot/CommandLineOptionsSpec.scala b/kernel/src/test/scala/com/ibm/spark/boot/CommandLineOptionsSpec.scala deleted file mode 100644 index 703d677..0000000 --- a/kernel/src/test/scala/com/ibm/spark/boot/CommandLineOptionsSpec.scala +++ /dev/null @@ -1,328 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.boot - -import java.io.File - -import com.typesafe.config.Config -import joptsimple.OptionException -import org.scalatest.{FunSpec, Matchers} - -import scala.collection.JavaConverters._ - -class CommandLineOptionsSpec extends FunSpec with Matchers { - - describe("CommandLineOptions") { - describe("when received --max-interpreter-threads=<int>") { - it("should set the configuration to the specified value") { - val expected = 999 - val options = new CommandLineOptions( - s"--max-interpreter-threads=$expected" :: Nil - ) - - val actual = options.toConfig.getInt("max_interpreter_threads") - - actual should be (expected) - } - } - - describe("when received --help") { - it("should set the help flag to true") { - val options = new CommandLineOptions("--help" :: Nil) - - options.help should be (true) - } - } - - describe("when received -h") { - it("should set the help flag to true") { - val options = new CommandLineOptions("-h" :: Nil) - - options.help should be (true) - } - } - - describe("when not received --help or -h") { - it("should set the help flag to false") { - val options = new CommandLineOptions(Nil) - - options.help should be (false) - } - } - - describe("when received --version") { - it("should set the version flag to true") { - val options = new CommandLineOptions("--version" :: Nil) - - options.version should be (true) - } - } - - describe("when received -v") { - it("should set the version flag to true") { - val options = new CommandLineOptions("-v" :: Nil) - - options.version should be (true) - } - } - - describe("when not received --version or -v") { - it("should set the version flag to false") { - val options = new CommandLineOptions(Nil) - - options.version should be (false) - } - } - - describe("when received --spark-conf=<key>=<value>") { - it("should add the key-value pair to the string representation") { - val expected = "key=value" - val options = new CommandLineOptions(s"--spark-conf=$expected" :: Nil) - val actual = options.toConfig.getString("spark_configuration") - - actual should be (expected) - } - - it("should append to existing command line key-value pairs") { - val expected = "key1=value1" :: "key2=value2" :: Nil - val options = new CommandLineOptions( - s"--spark-conf=${expected(0)}" :: - s"--spark-conf=${expected(1)}" :: - Nil - ) - val actual = options.toConfig.getString("spark_configuration") - - actual should be (expected.mkString(",")) - } - } - - describe("when received -S<key>=<value>") { - it("should add the key-value pair to the string representation") { - val expected = "key=value" - val options = new CommandLineOptions(s"-S$expected" :: Nil) - val actual = options.toConfig.getString("spark_configuration") - - actual should be(expected) - } - - it("should append to existing command line key-value pairs") { - val expected = "key1=value1" :: "key2=value2" :: Nil - val options = new CommandLineOptions( - s"-S${expected(0)}" :: - s"-S${expected(1)}" :: - Nil - ) - val actual = options.toConfig.getString("spark_configuration") - - actual should be (expected.mkString(",")) - } - } - - describe("when received --profile=<path>") { - it("should error if path is not set") { - intercept[OptionException] { - new CommandLineOptions(Seq("--profile")) - } - } - - describe("#toConfig") { - it("should include values specified in file") { - - val pathToProfileFixture: String = new File(getClass.getResource("/fixtures/profile.json").toURI).getAbsolutePath - val options = new CommandLineOptions(Seq("--profile="+pathToProfileFixture)) - - val config: Config = options.toConfig - - config.entrySet() should not be ('empty) - config.getInt("stdin_port") should be(12345) - config.getInt("shell_port") should be(54321) - config.getInt("iopub_port") should be(11111) - config.getInt("control_port") should be(22222) - config.getInt("hb_port") should be(33333) - } - } - } - - describe("when received --<protocol port name>=<value>"){ - it("should error if value is not set") { - intercept[OptionException] { - new CommandLineOptions(Seq("--stdin-port")) - } - intercept[OptionException] { - new CommandLineOptions(Seq("--shell-port")) - } - intercept[OptionException] { - new CommandLineOptions(Seq("--iopub-port")) - } - intercept[OptionException] { - new CommandLineOptions(Seq("--control-port")) - } - intercept[OptionException] { - new CommandLineOptions(Seq("--heartbeat-port")) - } - } - - describe("#toConfig") { - it("should return config with commandline option values") { - - val options = new CommandLineOptions(List( - "--stdin-port", "99999", - "--shell-port", "88888", - "--iopub-port", "77777", - "--control-port", "55555", - "--heartbeat-port", "44444" - )) - - val config: Config = options.toConfig - - config.entrySet() should not be ('empty) - config.getInt("stdin_port") should be(99999) - config.getInt("shell_port") should be(88888) - config.getInt("iopub_port") should be(77777) - config.getInt("control_port") should be(55555) - config.getInt("hb_port") should be(44444) - } - } - } - - describe("when received --profile and --<protocol port name>=<value>"){ - describe("#toConfig") { - it("should return config with <protocol port> argument value") { - - val pathToProfileFixture: String = (new File(getClass.getResource("/fixtures/profile.json").toURI)).getAbsolutePath - val options = new CommandLineOptions(List("--profile", pathToProfileFixture, "--stdin-port", "99999", "--shell-port", "88888")) - - val config: Config = options.toConfig - - config.entrySet() should not be ('empty) - config.getInt("stdin_port") should be(99999) - config.getInt("shell_port") should be(88888) - config.getInt("iopub_port") should be(11111) - config.getInt("control_port") should be(22222) - } - } - - } - - describe("when no arguments are received"){ - describe("#toConfig") { - it("should read default value set in reference.conf") { - - val options = new CommandLineOptions(Nil) - - val config: Config = options.toConfig - config.getInt("stdin_port") should be(48691) - config.getInt("shell_port") should be(40544) - config.getInt("iopub_port") should be(43462) - config.getInt("control_port") should be(44808) - } - } - } - - describe("when using -- to separate interpreter arguments"){ - describe("#toConfig") { - it("should return interpreter_args config property when there are args before --") { - - val options = new CommandLineOptions(List("--stdin-port", "99999", "--shell-port", "88888", "--", "someArg1", "someArg2", "someArg3")) - - val config: Config = options .toConfig - - config.entrySet() should not be ('empty) - config.getStringList("interpreter_args").asScala should be (List("someArg1", "someArg2", "someArg3")) - } - - it("should return interpreter_args config property when args is at the beginning") { - - val options = new CommandLineOptions(List("--", "someArg1", "someArg2", "someArg3")) - - val config: Config = options .toConfig - - config.entrySet() should not be ('empty) - config.getStringList("interpreter_args").asScala should be (List("someArg1", "someArg2", "someArg3")) - } - - it("should return interpreter_args config property as empty list when there is nothing after --") { - - val options = new CommandLineOptions(List("--stdin-port", "99999", "--shell-port", "88888", "--")) - - val config: Config = options .toConfig - - config.entrySet() should not be ('empty) - config.getStringList("interpreter_args").asScala should be ('empty) - } - } - } - - describe("when received --ip=<value>") { - it("should error if value is not set") { - intercept[OptionException] { - new CommandLineOptions(Seq("--ip")) - } - } - - describe("#toConfig") { - it("should set ip to specified value") { - val expected = "1.2.3.4" - val options = new CommandLineOptions(s"--ip=${expected}" :: Nil) - val config: Config = options.toConfig - - config.getString("ip") should be(expected) - } - - it("should set ip to 127.0.0.1") { - val options = new CommandLineOptions(Nil) - val config: Config = options.toConfig - - config.getString("ip") should be("127.0.0.1") - } - } - } - - describe("when received options with surrounding whitespace") { - it("should trim whitespace") { - val url1 = "url1" - val url2 = "url2" - - val options = new CommandLineOptions(Seq( - " --magic-url ", s" ${url1}\t", - "--magic-url", s" \t ${url2} \t" - )) - val config: Config = options.toConfig - - config.getList("magic_urls").unwrapped.asScala should - be (Seq(url1, url2)) - } - } - - describe("when received --interpreter-plugin") { - it("should return the interpreter-plugin along with the defaults") { - val options = new CommandLineOptions(Seq( - "--interpreter-plugin", - "dummy:test.utils.DummyInterpreter" - )) - - val config: Config = options.toConfig - - val p = config.getList("interpreter_plugins") - - p should not be empty - - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/comm/KernelCommManagerSpec.scala ---------------------------------------------------------------------- diff --git a/kernel/src/test/scala/com/ibm/spark/comm/KernelCommManagerSpec.scala b/kernel/src/test/scala/com/ibm/spark/comm/KernelCommManagerSpec.scala deleted file mode 100644 index 7b4442c..0000000 --- a/kernel/src/test/scala/com/ibm/spark/comm/KernelCommManagerSpec.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.comm - -import com.ibm.spark.kernel.protocol.v5 -import com.ibm.spark.kernel.protocol.v5._ -import com.ibm.spark.kernel.protocol.v5.content.CommContent -import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader -import org.scalatest.mock.MockitoSugar -import org.mockito.Mockito._ -import org.mockito.Matchers._ -import org.scalatest.{BeforeAndAfter, FunSpec, Matchers} - -class KernelCommManagerSpec extends FunSpec with Matchers with BeforeAndAfter - with MockitoSugar -{ - private val TestTargetName = "some target" - - private var mockActorLoader: ActorLoader = _ - private var mockKMBuilder: KMBuilder = _ - private var mockCommRegistrar: CommRegistrar = _ - private var kernelCommManager: KernelCommManager = _ - - private var generatedCommWriter: CommWriter = _ - - before { - mockActorLoader = mock[ActorLoader] - mockKMBuilder = mock[KMBuilder] - mockCommRegistrar = mock[CommRegistrar] - - kernelCommManager = new KernelCommManager( - mockActorLoader, - mockKMBuilder, - mockCommRegistrar - ) { - override protected def newCommWriter(commId: UUID): CommWriter = { - val commWriter = super.newCommWriter(commId) - - generatedCommWriter = commWriter - - val spyCommWriter = spy(commWriter) - doNothing().when(spyCommWriter) - .sendCommKernelMessage(any[KernelMessageContent with CommContent]) - - spyCommWriter - } - } - } - - describe("KernelCommManager") { - describe("#open") { - it("should return a wrapped instance of KernelCommWriter") { - kernelCommManager.open(TestTargetName, v5.MsgData.Empty) - - // Exposed hackishly for testing - generatedCommWriter shouldBe a [KernelCommWriter] - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/comm/KernelCommWriterSpec.scala ---------------------------------------------------------------------- diff --git a/kernel/src/test/scala/com/ibm/spark/comm/KernelCommWriterSpec.scala b/kernel/src/test/scala/com/ibm/spark/comm/KernelCommWriterSpec.scala deleted file mode 100644 index eb792bb..0000000 --- a/kernel/src/test/scala/com/ibm/spark/comm/KernelCommWriterSpec.scala +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.comm - -import java.util.UUID -import com.ibm.spark.kernel.protocol.v5._ -import com.ibm.spark.kernel.protocol.v5.content._ -import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader -import play.api.libs.json.Json -import scala.concurrent.duration._ - -import akka.actor.{ActorSelection, ActorSystem} -import akka.testkit.{TestProbe, TestKit} -import com.typesafe.config.ConfigFactory -import org.scalatest.mock.MockitoSugar -import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers} -import org.mockito.Mockito._ -import org.mockito.Matchers._ - -object KernelCommWriterSpec { - val config =""" - akka { - loglevel = "WARNING" - }""" -} - -class KernelCommWriterSpec extends TestKit( - ActorSystem("KernelCommWriterSpec", - ConfigFactory.parseString(KernelCommWriterSpec.config)) -) with FunSpecLike with Matchers with BeforeAndAfter with MockitoSugar -{ - - private val commId = UUID.randomUUID().toString - private var kernelCommWriter: KernelCommWriter = _ - private var kernelMessageBuilder: KMBuilder = _ - - private var actorLoader: ActorLoader = _ - private var kernelMessageRelayProbe: TestProbe = _ - - /** - * Retrieves the next message available. - * - * @return The KernelMessage instance (or an error if timed out) - */ - private def getNextMessage = - kernelMessageRelayProbe.receiveOne(200.milliseconds) - .asInstanceOf[KernelMessage] - - /** - * Retrieves the next message available and returns its type. - * - * @return The type of the message (pulled from message header) - */ - private def getNextMessageType = getNextMessage.header.msg_type - - /** - * Retrieves the next message available and parses the content string. - * - * @tparam T The type to coerce the content string into - * - * @return The resulting KernelMessageContent instance - */ - private def getNextMessageContents[T <: KernelMessageContent] - (implicit fjs: play.api.libs.json.Reads[T], mf: Manifest[T]) = - { - val receivedMessage = getNextMessage - - Json.parse(receivedMessage.contentString).as[T] - } - - before { - kernelMessageBuilder = spy(KMBuilder()) - - // Construct path for kernel message relay - actorLoader = mock[ActorLoader] - kernelMessageRelayProbe = TestProbe() - val kernelMessageRelaySelection: ActorSelection = - system.actorSelection(kernelMessageRelayProbe.ref.path.toString) - doReturn(kernelMessageRelaySelection) - .when(actorLoader).load(SystemActorType.KernelMessageRelay) - - // Create a new writer to use for testing - kernelCommWriter = new KernelCommWriter(actorLoader, kernelMessageBuilder, commId) - } - - describe("KernelCommWriter") { - describe("#writeOpen") { - it("should send a comm_open message to the relay") { - kernelCommWriter.writeOpen(anyString()) - - getNextMessageType should be (CommOpen.toTypeString) - } - - it("should include the comm_id in the message") { - val expected = commId - kernelCommWriter.writeOpen(anyString()) - - val actual = getNextMessageContents[CommOpen].comm_id - - actual should be (expected) - } - - it("should include the target name in the message") { - val expected = "<TARGET_NAME>" - kernelCommWriter.writeOpen(expected) - - val actual = getNextMessageContents[CommOpen].target_name - - actual should be (expected) - } - - it("should provide empty data in the message if no data is provided") { - val expected = MsgData.Empty - kernelCommWriter.writeOpen(anyString()) - - val actual = getNextMessageContents[CommOpen].data - - actual should be (expected) - } - - it("should include the data in the message") { - val expected = MsgData("some key" -> "some value") - kernelCommWriter.writeOpen(anyString(), expected) - - val actual = getNextMessageContents[CommOpen].data - - actual should be (expected) - } - } - - describe("#writeMsg") { - it("should send a comm_msg message to the relay") { - kernelCommWriter.writeMsg(MsgData.Empty) - - getNextMessageType should be (CommMsg.toTypeString) - } - - it("should include the comm_id in the message") { - val expected = commId - kernelCommWriter.writeMsg(MsgData.Empty) - - val actual = getNextMessageContents[CommMsg].comm_id - - actual should be (expected) - } - - it("should fail a require if the data is null") { - intercept[IllegalArgumentException] { - kernelCommWriter.writeMsg(null) - } - } - - it("should include the data in the message") { - val expected = MsgData("some key" -> "some value") - kernelCommWriter.writeMsg(expected) - - val actual = getNextMessageContents[CommMsg].data - - actual should be (expected) - } - } - - describe("#writeClose") { - it("should send a comm_close message to the relay") { - kernelCommWriter.writeClose() - - getNextMessageType should be (CommClose.toTypeString) - } - - it("should include the comm_id in the message") { - val expected = commId - kernelCommWriter.writeClose() - - val actual = getNextMessageContents[CommClose].comm_id - - actual should be (expected) - } - - it("should provide empty data in the message if no data is provided") { - val expected = MsgData.Empty - kernelCommWriter.writeClose() - - val actual = getNextMessageContents[CommClose].data - - actual should be (expected) - } - - it("should include the data in the message") { - val expected = MsgData("some key" -> "some value") - kernelCommWriter.writeClose(expected) - - val actual = getNextMessageContents[CommClose].data - - actual should be (expected) - } - } - - describe("#write") { - it("should send a comm_msg message to the relay") { - kernelCommWriter.write(Array('a'), 0, 1) - - getNextMessageType should be (CommMsg.toTypeString) - } - - it("should include the comm_id in the message") { - val expected = commId - kernelCommWriter.write(Array('a'), 0, 1) - - val actual = getNextMessageContents[CommMsg].comm_id - - actual should be (expected) - } - - it("should package the string as part of the data with a 'message' key") { - val expected = MsgData("message" -> "a") - kernelCommWriter.write(Array('a'), 0, 1) - - val actual = getNextMessageContents[CommMsg].data - - actual should be (expected) - } - } - - describe("#flush") { - it("should do nothing") { - // TODO: Is this test necessary? It does nothing. - kernelCommWriter.flush() - } - } - - describe("#close") { - it("should send a comm_close message to the relay") { - kernelCommWriter.close() - - getNextMessageType should be (CommClose.toTypeString) - } - - it("should include the comm_id in the message") { - val expected = commId - kernelCommWriter.close() - - val actual = getNextMessageContents[CommClose].comm_id - - actual should be (expected) - } - - it("should provide empty data in the message") { - val expected = MsgData.Empty - kernelCommWriter.close() - - val actual = getNextMessageContents[CommClose].data - - actual should be (expected) - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/global/ExecutionCounterSpec.scala ---------------------------------------------------------------------- diff --git a/kernel/src/test/scala/com/ibm/spark/global/ExecutionCounterSpec.scala b/kernel/src/test/scala/com/ibm/spark/global/ExecutionCounterSpec.scala deleted file mode 100644 index 4d1641f..0000000 --- a/kernel/src/test/scala/com/ibm/spark/global/ExecutionCounterSpec.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.global - -import org.scalatest.{FunSpec, Matchers} - -class ExecutionCounterSpec extends FunSpec with Matchers { - describe("ExecutionCounter") { - describe("#increment( String )"){ - it("should increment value when key is not present"){ - ExecutionCounter incr "foo" should be(1) - } - it("should increment value for key when it is present"){ - ExecutionCounter incr "bar" should be(1) - ExecutionCounter incr "bar" should be(2) - } - - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala ---------------------------------------------------------------------- diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala deleted file mode 100644 index 58ea0c5..0000000 --- a/kernel/src/test/scala/com/ibm/spark/kernel/api/KernelSpec.scala +++ /dev/null @@ -1,178 +0,0 @@ -package com.ibm.spark.kernel.api - -import java.io.{InputStream, PrintStream} - -import com.ibm.spark.boot.layer.InterpreterManager -import com.ibm.spark.comm.CommManager -import com.ibm.spark.interpreter._ -import com.ibm.spark.kernel.protocol.v5._ -import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader -import com.ibm.spark.magic.MagicLoader -import com.typesafe.config.Config -import org.apache.spark.{SparkConf, SparkContext} -import org.mockito.Mockito._ -import org.mockito.Matchers._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{BeforeAndAfter, FunSpec, Matchers} -import com.ibm.spark.global.ExecuteRequestState - -class KernelSpec extends FunSpec with Matchers with MockitoSugar - with BeforeAndAfter -{ - private val BadCode = Some("abc foo bar") - private val GoodCode = Some("val foo = 1") - private val ErrorCode = Some("val foo = bar") - private val ErrorMsg = "Name: error\n" + - "Message: bad\n" + - "StackTrace: 1" - - private var mockConfig: Config = _ - private var mockSparkContext: SparkContext = _ - private var mockSparkConf: SparkConf = _ - private var mockActorLoader: ActorLoader = _ - private var mockInterpreter: Interpreter = _ - private var mockInterpreterManager: InterpreterManager = _ - private var mockCommManager: CommManager = _ - private var mockMagicLoader: MagicLoader = _ - private var kernel: Kernel = _ - private var spyKernel: Kernel = _ - - before { - mockConfig = mock[Config] - mockInterpreter = mock[Interpreter] - mockInterpreterManager = mock[InterpreterManager] - mockSparkContext = mock[SparkContext] - mockSparkConf = mock[SparkConf] - when(mockInterpreterManager.defaultInterpreter) - .thenReturn(Some(mockInterpreter)) - when(mockInterpreterManager.interpreters) - .thenReturn(Map[String, com.ibm.spark.interpreter.Interpreter]()) - when(mockInterpreter.interpret(BadCode.get)) - .thenReturn((Results.Incomplete, null)) - when(mockInterpreter.interpret(GoodCode.get)) - .thenReturn((Results.Success, Left(new ExecuteOutput("ok")))) - when(mockInterpreter.interpret(ErrorCode.get)) - .thenReturn((Results.Error, Right(ExecuteError("error","bad", List("1"))))) - - - mockCommManager = mock[CommManager] - mockActorLoader = mock[ActorLoader] - mockMagicLoader = mock[MagicLoader] - - kernel = new Kernel( - mockConfig, mockActorLoader, mockInterpreterManager, mockCommManager, - mockMagicLoader - ) - - spyKernel = spy(kernel) - - } - - after { - ExecuteRequestState.reset() - } - - describe("Kernel") { - describe("#eval") { - it("should return syntax error") { - kernel eval BadCode should be((false, "Syntax Error!")) - } - - it("should return ok") { - kernel eval GoodCode should be((true, "ok")) - } - - it("should return error") { - kernel eval ErrorCode should be((false, ErrorMsg)) - } - - it("should return error on None") { - kernel eval None should be ((false, "Error!")) - } - } - - describe("#out") { - it("should throw an exception if the ExecuteRequestState has not been set") { - intercept[IllegalArgumentException] { - kernel.out - } - } - - it("should create a new PrintStream instance if the ExecuteRequestState has been set") { - ExecuteRequestState.processIncomingKernelMessage( - new KernelMessage(Nil, "", mock[Header], mock[ParentHeader], - mock[Metadata], "") - ) - kernel.out shouldBe a [PrintStream] - } - } - - describe("#err") { - it("should throw an exception if the ExecuteRequestState has not been set") { - intercept[IllegalArgumentException] { - kernel.err - } - } - - it("should create a new PrintStream instance if the ExecuteRequestState has been set") { - ExecuteRequestState.processIncomingKernelMessage( - new KernelMessage(Nil, "", mock[Header], mock[ParentHeader], - mock[Metadata], "") - ) - - // TODO: Access the underlying streamType field to assert stderr? - kernel.err shouldBe a [PrintStream] - } - } - - describe("#in") { - it("should throw an exception if the ExecuteRequestState has not been set") { - intercept[IllegalArgumentException] { - kernel.in - } - } - - it("should create a new InputStream instance if the ExecuteRequestState has been set") { - ExecuteRequestState.processIncomingKernelMessage( - new KernelMessage(Nil, "", mock[Header], mock[ParentHeader], - mock[Metadata], "") - ) - - kernel.in shouldBe a [InputStream] - } - } - - describe("#stream") { - it("should throw an exception if the ExecuteRequestState has not been set") { - intercept[IllegalArgumentException] { - kernel.stream - } - } - - it("should create a StreamMethods instance if the ExecuteRequestState has been set") { - ExecuteRequestState.processIncomingKernelMessage( - new KernelMessage(Nil, "", mock[Header], mock[ParentHeader], - mock[Metadata], "") - ) - - kernel.stream shouldBe a [StreamMethods] - } - } - - describe("when spark.master is set in config") { - - it("should create SparkConf") { - val expected = "some value" - doReturn(expected).when(mockConfig).getString("spark.master") - doReturn("").when(mockConfig).getString("spark_configuration") - - // Provide stub for interpreter classServerURI since also executed - doReturn("").when(mockInterpreter).classServerURI - - val sparkConf = kernel.createSparkConf(new SparkConf().setMaster(expected)) - - sparkConf.get("spark.master") should be (expected) - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/kernel/api/StreamMethodsSpec.scala ---------------------------------------------------------------------- diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/api/StreamMethodsSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/api/StreamMethodsSpec.scala deleted file mode 100644 index fc87588..0000000 --- a/kernel/src/test/scala/com/ibm/spark/kernel/api/StreamMethodsSpec.scala +++ /dev/null @@ -1,70 +0,0 @@ -package com.ibm.spark.kernel.api - -import akka.actor.ActorSystem -import akka.testkit.{ImplicitSender, TestKit, TestProbe} -import com.ibm.spark.kernel.protocol.v5 -import com.ibm.spark.kernel.protocol.v5.KernelMessage -import org.scalatest.mock.MockitoSugar -import org.scalatest.{FunSpecLike, BeforeAndAfter, Matchers, FunSpec} -import play.api.libs.json.Json - -import scala.concurrent.duration._ - -import org.mockito.Mockito._ - -class StreamMethodsSpec extends TestKit( - ActorSystem("StreamMethodsSpec") -) with ImplicitSender with FunSpecLike with Matchers with MockitoSugar - with BeforeAndAfter -{ - private val MaxDuration = 300.milliseconds - - private var kernelMessageRelayProbe: TestProbe = _ - private var mockParentHeader: v5.ParentHeader = _ - private var mockActorLoader: v5.kernel.ActorLoader = _ - private var mockKernelMessage: v5.KernelMessage = _ - private var streamMethods: StreamMethods = _ - - before { - kernelMessageRelayProbe = TestProbe() - - mockParentHeader = mock[v5.ParentHeader] - - mockActorLoader = mock[v5.kernel.ActorLoader] - doReturn(system.actorSelection(kernelMessageRelayProbe.ref.path)) - .when(mockActorLoader).load(v5.SystemActorType.KernelMessageRelay) - - mockKernelMessage = mock[v5.KernelMessage] - doReturn(mockParentHeader).when(mockKernelMessage).header - - streamMethods = new StreamMethods(mockActorLoader, mockKernelMessage) - } - - describe("StreamMethods") { - describe("#()") { - it("should put the header of the given message as the parent header") { - val expected = mockKernelMessage.header - val actual = streamMethods.kmBuilder.build.parentHeader - - actual should be (expected) - } - } - - describe("#sendAll") { - it("should send a message containing all of the given text") { - val expected = "some text" - - streamMethods.sendAll(expected) - - val outgoingMessage = kernelMessageRelayProbe.receiveOne(MaxDuration) - val kernelMessage = outgoingMessage.asInstanceOf[KernelMessage] - - val actual = Json.parse(kernelMessage.contentString) - .as[v5.content.StreamContent].text - - actual should be (expected) - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/dispatch/StatusDispatchSpec.scala ---------------------------------------------------------------------- diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/dispatch/StatusDispatchSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/dispatch/StatusDispatchSpec.scala deleted file mode 100644 index 60d3b42..0000000 --- a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/dispatch/StatusDispatchSpec.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.kernel.protocol.v5.dispatch - -import akka.actor.{ActorRef, ActorSystem, Props} -import akka.testkit.{TestKit, TestProbe} -import com.ibm.spark.kernel.protocol.v5._ -import com.ibm.spark.kernel.protocol.v5.content.KernelStatus -import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader -import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers} -import play.api.libs.json.Json - -import scala.concurrent.duration._ - -class StatusDispatchSpec extends TestKit(ActorSystem("StatusDispatchSystem")) -with FunSpecLike with Matchers with MockitoSugar with BeforeAndAfter{ - var statusDispatchRef: ActorRef = _ - var relayProbe: TestProbe = _ - before { - // Mock the relay with a probe - relayProbe = TestProbe() - // Mock the ActorLoader - val mockActorLoader: ActorLoader = mock[ActorLoader] - when(mockActorLoader.load(SystemActorType.KernelMessageRelay)) - .thenReturn(system.actorSelection(relayProbe.ref.path.toString)) - - statusDispatchRef = system.actorOf(Props(classOf[StatusDispatch],mockActorLoader)) - } - - - describe("StatusDispatch") { - describe("#receive( KernelStatusType )") { - it("should send a status message to the relay") { - statusDispatchRef ! KernelStatusType.Busy - // Check the kernel message is the correct type - val statusMessage: KernelMessage = relayProbe.receiveOne(500.milliseconds).asInstanceOf[KernelMessage] - statusMessage.header.msg_type should be (MessageType.Outgoing.Status.toString) - // Check the status is what we sent - val status: KernelStatus = Json.parse(statusMessage.contentString).as[KernelStatus] - status.execution_state should be (KernelStatusType.Busy.toString) - } - } - - describe("#receive( KernelStatusType, Header )") { - it("should send a status message to the relay") { - val tuple = Tuple2(KernelStatusType.Busy, mock[Header]) - statusDispatchRef ! tuple - // Check the kernel message is the correct type - val statusMessage: KernelMessage = relayProbe.receiveOne(500.milliseconds).asInstanceOf[KernelMessage] - statusMessage.header.msg_type should be (MessageType.Outgoing.Status.toString) - // Check the status is what we sent - val status: KernelStatus = Json.parse(statusMessage.contentString).as[KernelStatus] - status.execution_state should be (KernelStatusType.Busy.toString) - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CodeCompleteHandlerSpec.scala ---------------------------------------------------------------------- diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CodeCompleteHandlerSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CodeCompleteHandlerSpec.scala deleted file mode 100644 index b44ad1c..0000000 --- a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CodeCompleteHandlerSpec.scala +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.kernel.protocol.v5.handler - -import akka.actor._ -import akka.testkit.{TestProbe, ImplicitSender, TestKit} -import com.ibm.spark.kernel.protocol.v5._ -import com.ibm.spark.kernel.protocol.v5.content.CompleteRequest -import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader -import com.ibm.spark.kernel.protocol.v5Test._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{FunSpecLike, BeforeAndAfter, Matchers} -import org.mockito.Mockito._ -import scala.concurrent.duration._ - -class CodeCompleteHandlerSpec extends TestKit( - ActorSystem("CodeCompleteHandlerSpec") -) with ImplicitSender with FunSpecLike with Matchers with MockitoSugar - with BeforeAndAfter { - - var actorLoader: ActorLoader = _ - var handlerActor: ActorRef = _ - var kernelMessageRelayProbe: TestProbe = _ - var interpreterProbe: TestProbe = _ - var statusDispatchProbe: TestProbe = _ - - before { - actorLoader = mock[ActorLoader] - - handlerActor = system.actorOf(Props(classOf[CodeCompleteHandler], actorLoader)) - - kernelMessageRelayProbe = TestProbe() - when(actorLoader.load(SystemActorType.KernelMessageRelay)) - .thenReturn(system.actorSelection(kernelMessageRelayProbe.ref.path.toString)) - - interpreterProbe = new TestProbe(system) - when(actorLoader.load(SystemActorType.Interpreter)) - .thenReturn(system.actorSelection(interpreterProbe.ref.path.toString)) - - statusDispatchProbe = new TestProbe(system) - when(actorLoader.load(SystemActorType.StatusDispatch)) - .thenReturn(system.actorSelection(statusDispatchProbe.ref.path.toString)) - } - - def replyToHandlerWithOkAndResult() = { - val expectedClass = classOf[CompleteRequest] - interpreterProbe.expectMsgClass(expectedClass) - interpreterProbe.reply((0, List[String]())) - } - - def replyToHandlerWithOkAndBadResult() = { - val expectedClass = classOf[CompleteRequest] - interpreterProbe.expectMsgClass(expectedClass) - interpreterProbe.reply("hello") - } - - describe("CodeCompleteHandler (ActorLoader)") { - it("should send a CompleteRequest") { - handlerActor ! MockCompleteRequestKernelMessage - replyToHandlerWithOkAndResult() - kernelMessageRelayProbe.fishForMessage(500.milliseconds) { - case KernelMessage(_, _, header, _, _, _) => - header.msg_type == MessageType.Outgoing.CompleteReply.toString - } - } - - it("should throw an error for bad JSON") { - handlerActor ! MockKernelMessageWithBadJSON - var result = false - try { - replyToHandlerWithOkAndResult() - } - catch { - case t: Throwable => result = true - } - result should be (true) - } - - it("should throw an error for bad code completion") { - handlerActor ! MockCompleteRequestKernelMessage - try { - replyToHandlerWithOkAndBadResult() - } - catch { - case error: Exception => error.getMessage should be ("Parse error in CodeCompleteHandler") - } - } - - it("should send an idle message") { - handlerActor ! MockCompleteRequestKernelMessage - replyToHandlerWithOkAndResult() - statusDispatchProbe.fishForMessage(500.milliseconds) { - case Tuple2(status, _) => - status == KernelStatusType.Idle - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommCloseHandlerSpec.scala ---------------------------------------------------------------------- diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommCloseHandlerSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommCloseHandlerSpec.scala deleted file mode 100644 index 49582f8..0000000 --- a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommCloseHandlerSpec.scala +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.kernel.protocol.v5.handler - -import java.util.UUID - -import akka.actor.{Props, ActorRef, ActorSystem} -import akka.testkit.{TestProbe, ImplicitSender, TestKit} -import com.ibm.spark.kernel.protocol.v5 -import com.ibm.spark.kernel.protocol.v5.content.{ClearOutput, CommClose} -import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader -import com.ibm.spark.kernel.protocol.v5.{KernelMessage, SystemActorType, KMBuilder} -import com.ibm.spark.comm.{CommRegistrar, CommWriter, CommCallbacks, CommStorage} -import org.scalatest.mock.MockitoSugar -import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers} -import org.mockito.Mockito._ -import org.mockito.Matchers._ - -import scala.concurrent.duration._ - -class CommCloseHandlerSpec extends TestKit( - ActorSystem("CommCloseHandlerSpec") -) with ImplicitSender with FunSpecLike with Matchers with MockitoSugar - with BeforeAndAfter -{ - private val TestCommId = UUID.randomUUID().toString - private val TestTargetName = "some test target" - - private var kmBuilder: KMBuilder = _ - private var spyCommStorage: CommStorage = _ - private var mockCommCallbacks: CommCallbacks = _ - private var mockCommRegistrar: CommRegistrar = _ - private var mockActorLoader: ActorLoader = _ - private var commCloseHandler: ActorRef = _ - private var kernelMessageRelayProbe: TestProbe = _ - private var statusDispatchProbe: TestProbe = _ - - before { - kmBuilder = KMBuilder() - mockCommCallbacks = mock[CommCallbacks] - spyCommStorage = spy(new CommStorage()) - mockCommRegistrar = mock[CommRegistrar] - - mockActorLoader = mock[ActorLoader] - - commCloseHandler = system.actorOf(Props( - classOf[CommCloseHandler], - mockActorLoader, mockCommRegistrar, spyCommStorage - )) - - // Used to intercept responses - kernelMessageRelayProbe = TestProbe() - when(mockActorLoader.load(SystemActorType.KernelMessageRelay)) - .thenReturn(system.actorSelection(kernelMessageRelayProbe.ref.path.toString)) - - // Used to intercept busy/idle messages - statusDispatchProbe = new TestProbe(system) - when(mockActorLoader.load(SystemActorType.StatusDispatch)) - .thenReturn(system.actorSelection(statusDispatchProbe.ref.path.toString)) - } - - describe("CommCloseHandler") { - describe("#process") { - it("should execute close callbacks if the id is registered") { - // Mark our id as registered - doReturn(Some(mockCommCallbacks)).when(spyCommStorage) - .getCommIdCallbacks(TestCommId) - - // Send a comm_open message with the test target - commCloseHandler ! kmBuilder - .withHeader(CommClose.toTypeString) - .withContentString(CommClose(TestCommId, v5.MsgData.Empty)) - .build - - // Should receive a busy and an idle message - statusDispatchProbe.receiveN(2, 200.milliseconds) - - // Verify that the msg callbacks were triggered along the way - verify(mockCommCallbacks).executeCloseCallbacks( - any[CommWriter], any[v5.UUID], any[v5.MsgData]) - } - - it("should not execute close callbacks if the id is not registered") { - // Mark our target as not registered - doReturn(None).when(spyCommStorage).getCommIdCallbacks(TestCommId) - - // Send a comm_msg message with the test id - commCloseHandler ! kmBuilder - .withHeader(CommClose.toTypeString) - .withContentString(CommClose(TestCommId, v5.MsgData.Empty)) - .build - - // Should receive a busy and an idle message - statusDispatchProbe.receiveN(2, 200.milliseconds) - - // Verify that the msg callbacks were NOT triggered along the way - verify(mockCommCallbacks, never()).executeCloseCallbacks( - any[CommWriter], any[v5.UUID], any[v5.MsgData]) - } - - it("should do nothing if there is a parsing error") { - // Send a comm_open message with an invalid content string - commCloseHandler ! kmBuilder - .withHeader(CommClose.toTypeString) - .withContentString(ClearOutput(_wait = true)) - .build - - // TODO: Is there a better way to test for this without an upper time - // limit? Is there a different logical approach? - kernelMessageRelayProbe.expectNoMsg(200.milliseconds) - } - - it("should include the parent's header in the parent header of " + - "outgoing messages"){ - - // Register a callback that sends a message using the comm writer - val closeCallback: CommCallbacks.CloseCallback = - new CommCallbacks.CloseCallback() { - def apply(v1: CommWriter, v2: v5.UUID, v4: v5.MsgData) = - v1.writeMsg(v5.MsgData.Empty) - } - val callbacks = (new CommCallbacks).addCloseCallback(closeCallback) - doReturn(Some(callbacks)).when(spyCommStorage) - .getCommIdCallbacks(TestCommId) - - // Send a comm close message - val msg = kmBuilder - .withHeader(CommClose.toTypeString) - .withContentString(CommClose(TestCommId, v5.MsgData.Empty)) - .build - commCloseHandler ! msg - - // Verify that the message sent by the handler has the desired property - kernelMessageRelayProbe.fishForMessage(200.milliseconds) { - case KernelMessage(_, _, _, parentHeader, _, _) => - parentHeader == msg.header - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandlerSpec.scala ---------------------------------------------------------------------- diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandlerSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandlerSpec.scala deleted file mode 100644 index 582a08e..0000000 --- a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommMsgHandlerSpec.scala +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.kernel.protocol.v5.handler - -import java.util.UUID - -import akka.actor.{ActorRef, ActorSystem, Props} -import akka.testkit.{ImplicitSender, TestKit, TestProbe} -import com.ibm.spark.kernel.protocol.v5 -import com.ibm.spark.kernel.protocol.v5._ -import com.ibm.spark.comm._ -import com.ibm.spark.kernel.protocol.v5.content.{CommMsg, ClearOutput, CommOpen} -import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers} - -import scala.concurrent.duration._ - -class CommMsgHandlerSpec extends TestKit( - ActorSystem("CommMsgHandlerSpec") -) with ImplicitSender with FunSpecLike with Matchers with MockitoSugar - with BeforeAndAfter -{ - private val TestCommId = UUID.randomUUID().toString - private val TestTargetName = "some test target" - - private var kmBuilder: KMBuilder = _ - private var spyCommStorage: CommStorage = _ - private var mockCommCallbacks: CommCallbacks = _ - private var mockActorLoader: ActorLoader = _ - private var commMsgHandler: ActorRef = _ - private var kernelMessageRelayProbe: TestProbe = _ - private var statusDispatchProbe: TestProbe = _ - - before { - kmBuilder = KMBuilder() - mockCommCallbacks = mock[CommCallbacks] - spyCommStorage = spy(new CommStorage()) - - mockActorLoader = mock[ActorLoader] - - commMsgHandler = system.actorOf(Props( - classOf[CommMsgHandler], - mockActorLoader, mock[CommRegistrar], spyCommStorage - )) - - // Used to intercept responses - kernelMessageRelayProbe = TestProbe() - when(mockActorLoader.load(SystemActorType.KernelMessageRelay)) - .thenReturn(system.actorSelection(kernelMessageRelayProbe.ref.path.toString)) - - // Used to intercept busy/idle messages - statusDispatchProbe = new TestProbe(system) - when(mockActorLoader.load(SystemActorType.StatusDispatch)) - .thenReturn(system.actorSelection(statusDispatchProbe.ref.path.toString)) - } - - describe("CommMsgHandler") { - describe("#process") { - it("should execute msg callbacks if the id is registered") { - // Mark our id as registered - doReturn(Some(mockCommCallbacks)).when(spyCommStorage) - .getCommIdCallbacks(TestCommId) - - // Send a comm_open message with the test target - commMsgHandler ! kmBuilder - .withHeader(CommMsg.toTypeString) - .withContentString(CommMsg(TestCommId, v5.MsgData.Empty)) - .build - - // Should receive a busy and an idle message - statusDispatchProbe.receiveN(2, 200.milliseconds) - - // Verify that the msg callbacks were triggered along the way - verify(mockCommCallbacks).executeMsgCallbacks( - any[CommWriter], any[v5.UUID], any[v5.MsgData]) - } - - it("should not execute msg callbacks if the id is not registered") { - // Mark our target as not registered - doReturn(None).when(spyCommStorage).getCommIdCallbacks(TestCommId) - - // Send a comm_msg message with the test id - commMsgHandler ! kmBuilder - .withHeader(CommMsg.toTypeString) - .withContentString(CommMsg(TestCommId, v5.MsgData.Empty)) - .build - - // Should receive a busy and an idle message - statusDispatchProbe.receiveN(2, 200.milliseconds) - - // Verify that the msg callbacks were NOT triggered along the way - verify(mockCommCallbacks, never()).executeMsgCallbacks( - any[CommWriter], any[v5.UUID], any[v5.MsgData]) - } - - it("should do nothing if there is a parsing error") { - // Send a comm_open message with an invalid content string - commMsgHandler ! kmBuilder - .withHeader(CommMsg.toTypeString) - .withContentString(ClearOutput(_wait = true)) - .build - - // TODO: Is there a better way to test for this without an upper time - // limit? Is there a different logical approach? - kernelMessageRelayProbe.expectNoMsg(200.milliseconds) - } - - it("should include the parent's header in the parent header of " + - "outgoing messages"){ - - // Register a callback that sends a message using the comm writer - val msgCallback: CommCallbacks.MsgCallback = - new CommCallbacks.MsgCallback() { - def apply(v1: CommWriter, v2: v5.UUID, v3: v5.MsgData): Unit = - v1.writeMsg(MsgData.Empty) - } - val callbacks = (new CommCallbacks).addMsgCallback(msgCallback) - doReturn(Some(callbacks)).when(spyCommStorage) - .getCommIdCallbacks(TestCommId) - - // Send a comm_msg message with the test id - val msg = kmBuilder - .withHeader(CommMsg.toTypeString) - .withContentString(CommMsg(TestCommId, v5.MsgData.Empty)) - .build - commMsgHandler ! msg - - // Verify that the message sent by the handler has the desired property - kernelMessageRelayProbe.fishForMessage(200.milliseconds) { - case KernelMessage(_, _, _, parentHeader, _, _) => - parentHeader == msg.header - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommOpenHandlerSpec.scala ---------------------------------------------------------------------- diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommOpenHandlerSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommOpenHandlerSpec.scala deleted file mode 100644 index 64013e9..0000000 --- a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/CommOpenHandlerSpec.scala +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.kernel.protocol.v5.handler - -import java.util.UUID - -import com.ibm.spark.kernel.protocol.v5 - -import akka.actor.{Props, ActorRef, ActorSystem} -import akka.testkit.{TestProbe, ImplicitSender, TestKit} -import com.ibm.spark.kernel.protocol.v5.content.{CommClose, ClearOutput, CommOpen} -import com.ibm.spark.kernel.protocol.v5._ -import com.ibm.spark.comm._ -import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader -import org.mockito.Mockito._ -import org.mockito.Matchers._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers} - -import scala.concurrent.duration._ - -class CommOpenHandlerSpec extends TestKit( - ActorSystem("CommOpenHandlerSpec") -) with ImplicitSender with FunSpecLike with Matchers with MockitoSugar - with BeforeAndAfter -{ - private val TestCommId = UUID.randomUUID().toString - private val TestTargetName = "some test target" - - private var kmBuilder: KMBuilder = _ - private var spyCommStorage: CommStorage = _ - private var mockCommCallbacks: CommCallbacks = _ - private var mockCommRegistrar: CommRegistrar = _ - private var mockActorLoader: ActorLoader = _ - private var commOpenHandler: ActorRef = _ - private var kernelMessageRelayProbe: TestProbe = _ - private var statusDispatchProbe: TestProbe = _ - - before { - kmBuilder = KMBuilder() - mockCommCallbacks = mock[CommCallbacks] - spyCommStorage = spy(new CommStorage()) - mockCommRegistrar = mock[CommRegistrar] - - mockActorLoader = mock[ActorLoader] - - commOpenHandler = system.actorOf(Props( - classOf[CommOpenHandler], - mockActorLoader, mockCommRegistrar, spyCommStorage - )) - - // Used to intercept responses - kernelMessageRelayProbe = TestProbe() - when(mockActorLoader.load(SystemActorType.KernelMessageRelay)) - .thenReturn(system.actorSelection(kernelMessageRelayProbe.ref.path.toString)) - - // Used to intercept busy/idle messages - statusDispatchProbe = new TestProbe(system) - when(mockActorLoader.load(SystemActorType.StatusDispatch)) - .thenReturn(system.actorSelection(statusDispatchProbe.ref.path.toString)) - } - - describe("CommOpenHandler") { - describe("#process") { - it("should execute open callbacks if the target exists") { - // Mark our target as registered - doReturn(Some(mockCommCallbacks)).when(spyCommStorage) - .getTargetCallbacks(TestTargetName) - - // Send a comm_open message with the test target - commOpenHandler ! kmBuilder - .withHeader(CommOpen.toTypeString) - .withContentString(CommOpen(TestCommId, TestTargetName, v5.MsgData.Empty)) - .build - - // Should receive a busy and an idle message - statusDispatchProbe.receiveN(2, 200.milliseconds) - - // Verify that the open callbacks were triggered along the way - verify(mockCommCallbacks).executeOpenCallbacks( - any[CommWriter], any[v5.UUID], anyString(), any[v5.MsgData]) - } - - it("should close the comm connection if the target does not exist") { - // Mark our target as not registered - doReturn(None).when(spyCommStorage).getTargetCallbacks(TestTargetName) - - // Send a comm_open message with the test target - commOpenHandler ! kmBuilder - .withHeader(CommOpen.toTypeString) - .withContentString(CommOpen(TestCommId, TestTargetName, v5.MsgData.Empty)) - .build - - // Should receive a close message as a result of the target missing - kernelMessageRelayProbe.expectMsgPF(200.milliseconds) { - case KernelMessage(_, _, header, _, _, _) => - header.msg_type should be (CommClose.toTypeString) - } - } - - it("should do nothing if there is a parsing error") { - // Send a comm_open message with an invalid content string - commOpenHandler ! kmBuilder - .withHeader(CommOpen.toTypeString) - .withContentString(ClearOutput(_wait = true)) - .build - - // TODO: Is there a better way to test for this without an upper time - // limit? Is there a different logical approach? - kernelMessageRelayProbe.expectNoMsg(200.milliseconds) - } - - it("should include the parent's header in the parent header of " + - "outgoing messages"){ - - // Register a callback that sends a message using the comm writer - val openCallback: CommCallbacks.OpenCallback = - new CommCallbacks.OpenCallback() { - def apply(v1: CommWriter, v2: v5.UUID, v3: String, v4: v5.MsgData) = - v1.writeMsg(MsgData.Empty) - } - val callbacks = (new CommCallbacks).addOpenCallback(openCallback) - doReturn(Some(callbacks)).when(spyCommStorage) - .getCommIdCallbacks(TestCommId) - - // Send a comm_open message - val msg = kmBuilder - .withHeader(CommOpen.toTypeString) - .withContentString( - CommOpen(TestCommId, TestTargetName, v5.MsgData.Empty) - ) - .build - commOpenHandler ! msg - - // Verify that the message sent by the handler has the desired property - kernelMessageRelayProbe.fishForMessage(200.milliseconds) { - case KernelMessage(_, _, _, parentHeader, _, _) => - parentHeader == msg.header - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/ExecuteRequestHandlerSpec.scala ---------------------------------------------------------------------- diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/ExecuteRequestHandlerSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/ExecuteRequestHandlerSpec.scala deleted file mode 100644 index c22bc41..0000000 --- a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/ExecuteRequestHandlerSpec.scala +++ /dev/null @@ -1,282 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.kernel.protocol.v5.handler - -import java.io.OutputStream -import java.util.concurrent.atomic.AtomicInteger - -import akka.actor._ -import akka.testkit.{ImplicitSender, TestKit, TestProbe} -import com.ibm.spark.kernel.api.{FactoryMethods, FactoryMethodsLike, Kernel} -import com.ibm.spark.kernel.protocol.v5._ -import com.ibm.spark.kernel.protocol.v5.content._ -import com.ibm.spark.kernel.protocol.v5.kernel.ActorLoader -import com.ibm.spark.kernel.protocol.v5Test._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers} -import play.api.libs.json.Json - -import org.mockito.Mockito._ -import org.mockito.Matchers._ -import scala.concurrent.duration._ -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent._ - -class ExecuteRequestHandlerSpec extends TestKit( - ActorSystem("ExecuteRequestHandlerSpec") -) with ImplicitSender with FunSpecLike with Matchers with MockitoSugar - with BeforeAndAfter { - - private var mockActorLoader: ActorLoader = _ - private var mockFactoryMethods: FactoryMethods = _ - private var mockKernel: Kernel = _ - private var mockOutputStream: OutputStream = _ - private var handlerActor: ActorRef = _ - private var kernelMessageRelayProbe: TestProbe = _ - private var executeRequestRelayProbe: TestProbe = _ - private var statusDispatchProbe: TestProbe = _ - - before { - mockActorLoader = mock[ActorLoader] - mockFactoryMethods = mock[FactoryMethods] - mockKernel = mock[Kernel] - mockOutputStream = mock[OutputStream] - doReturn(mockFactoryMethods).when(mockKernel) - .factory(any[KernelMessage], any[KMBuilder]) - - doReturn(mockOutputStream).when(mockFactoryMethods) - .newKernelOutputStream(anyString(), anyBoolean()) - - // Add our handler and mock interpreter to the actor system - handlerActor = system.actorOf(Props( - classOf[ExecuteRequestHandler], - mockActorLoader, - mockKernel - )) - - kernelMessageRelayProbe = new TestProbe(system) - when(mockActorLoader.load(SystemActorType.KernelMessageRelay)) - .thenReturn(system.actorSelection(kernelMessageRelayProbe.ref.path.toString)) - - executeRequestRelayProbe = new TestProbe(system) - when(mockActorLoader.load(SystemActorType.ExecuteRequestRelay)) - .thenReturn(system.actorSelection(executeRequestRelayProbe.ref.path.toString)) - - statusDispatchProbe = new TestProbe(system) - when(mockActorLoader.load(SystemActorType.StatusDispatch)) - .thenReturn(system.actorSelection(statusDispatchProbe.ref.path.toString)) - } - - /** - * This method simulates the interpreter passing back an - * execute result and reply. - */ - def replyToHandlerWithOkAndResult() = { - // This stubs the behaviour of the interpreter executing code - val expectedClass = classOf[(ExecuteRequest, KernelMessage, OutputStream)] - executeRequestRelayProbe.expectMsgClass(expectedClass) - executeRequestRelayProbe.reply(( - ExecuteReplyOk(1, Some(Payloads()), Some(UserExpressions())), - ExecuteResult(1, Data("text/plain" -> "resulty result"), Metadata()) - )) - } - - def replyToHandlerWithOk() = { - // This stubs the behaviour of the interpreter executing code - val expectedClass = classOf[(ExecuteRequest, KernelMessage, OutputStream)] - executeRequestRelayProbe.expectMsgClass(expectedClass) - executeRequestRelayProbe.reply(( - ExecuteReplyOk(1, Some(Payloads()), Some(UserExpressions())), - ExecuteResult(1, Data("text/plain" -> ""), Metadata()) - )) - } - - /** - * This method simulates the interpreter passing back an - * execute result and reply - */ - def replyToHandlerWithErrorAndResult() = { - // This stubs the behaviour of the interpreter executing code - val expectedClass = classOf[(ExecuteRequest, KernelMessage, OutputStream)] - executeRequestRelayProbe.expectMsgClass(expectedClass) - executeRequestRelayProbe.reply(( - ExecuteReplyError(1, Some(""), Some(""), Some(Nil)), - ExecuteResult(1, Data("text/plain" -> "resulty result"), Metadata()) - )) - } - - describe("ExecuteRequestHandler( ActorLoader )") { - describe("#receive( KernelMessage ) when interpreter replies") { - - it("should send an execute result message if the result is not empty") { - handlerActor ! MockExecuteRequestKernelMessage - replyToHandlerWithOkAndResult() - kernelMessageRelayProbe.fishForMessage(100.milliseconds) { - case KernelMessage(_, _, header, _, _, _) => - header.msg_type == ExecuteResult.toTypeString - } - } - - it("should not send an execute result message if there is no result") { - handlerActor ! MockExecuteRequestKernelMessage - replyToHandlerWithOk() - kernelMessageRelayProbe.fishForMessage(200.milliseconds) { - case KernelMessage(_, _, header, _, _, _) => - header.msg_type != ExecuteResult.toTypeString - } - - } - - it("should send an execute reply message") { - handlerActor ! MockExecuteRequestKernelMessage - replyToHandlerWithOkAndResult() - kernelMessageRelayProbe.fishForMessage(200.milliseconds) { - case KernelMessage(_, _, header, _, _, _) => - header.msg_type == ExecuteResult.toTypeString - } - } - - it("should send a status idle message after the reply and result") { - handlerActor ! MockExecuteRequestKernelMessage - replyToHandlerWithOkAndResult() - - val msgCount = new AtomicInteger(0) - var statusMsgNum = -1 - var statusReceived = false - - val f1 = future { - kernelMessageRelayProbe.fishForMessage(4.seconds) { - case KernelMessage(_, _, header, _, _, _) => - if (header.msg_type == ExecuteResult.toTypeString && - !statusReceived) - msgCount.incrementAndGet() - else if (header.msg_type == ExecuteReply.toTypeString && - !statusReceived) - msgCount.incrementAndGet() - statusReceived || (msgCount.get() >= 2) - } - } - - val f2 = future { - statusDispatchProbe.fishForMessage(4.seconds) { - case (status, header) => - if (status == KernelStatusIdle.toString) - statusReceived = true - statusMsgNum = msgCount.get() - statusReceived || (msgCount.get() >= 2) - } - } - val fs = (f1 zip f2) - Await.ready(fs, 5.seconds) - - statusMsgNum should equal(2) - } - - it("should send an execute input message") { - handlerActor ! MockExecuteRequestKernelMessage - kernelMessageRelayProbe.fishForMessage(200.milliseconds) { - case KernelMessage(_, _, header, _, _, _) => - header.msg_type == ExecuteInput.toTypeString - } - } - - it("should send a message with ids equal to the incoming " + - "KernelMessage's ids") { - handlerActor ! MockExecuteRequestKernelMessage - kernelMessageRelayProbe.fishForMessage(200.milliseconds) { - case KernelMessage(ids, _, _, _, _, _) => - ids == MockExecuteRequestKernelMessage.ids - } - } - - it("should send a message with parent header equal to the incoming " + - "KernelMessage's header") { - handlerActor ! MockExecuteRequestKernelMessage - kernelMessageRelayProbe.fishForMessage(200.milliseconds) { - case KernelMessage(_, _, _, parentHeader, _, _) => - parentHeader == MockExecuteRequestKernelMessage.header - } - } - - // TODO: Investigate if this is still relevant at all -// it("should send a status busy and idle message") { -// handlerActor ! MockExecuteRequestKernelMessage -// replyToHandlerWithOkAndResult() -// var busy = false -// var idle = false -// -// statusDispatchProbe.receiveWhile(100.milliseconds) { -// case Tuple2(status: KernelStatusType, header: Header)=> -// if(status == KernelStatusType.Busy) -// busy = true -// if(status == KernelStatusType.Idle) -// idle = true -// } -// -// idle should be (true) -// busy should be (true) -// } - } - } - - // Testing error timeout for interpreter future - describe("ExecuteRequestHandler( ActorLoader )") { - describe("#receive( KernelMessage with bad JSON content )"){ - it("should respond with an execute_reply with status error") { - handlerActor ! MockKernelMessageWithBadExecuteRequest - - kernelMessageRelayProbe.fishForMessage(200.milliseconds) { - // Only mark as successful if this specific message was received - case KernelMessage(_, _, header, _, _, contentString) - if header.msg_type == ExecuteReply.toTypeString => - val reply = Json.parse(contentString).as[ExecuteReply] - reply.status == "error" - case _ => false - } - } - - it("should send error message to relay") { - handlerActor ! MockKernelMessageWithBadExecuteRequest - - kernelMessageRelayProbe.fishForMessage(200.milliseconds) { - // Only mark as successful if this specific message was received - case KernelMessage(_, _, header, _, _, _) - if header.msg_type == ErrorContent.toTypeString => true - case _ => false - } - } - - // TODO: Investigate if this is still relevant at all -// it("should send a status idle message") { -// handlerActor ! MockKernelMessageWithBadExecuteRequest -// var busy = false -// var idle = false -// -// statusDispatchProbe.receiveWhile(100.milliseconds) { -// case Tuple2(status: KernelStatusType, header: Header)=> -// if(status == KernelStatusType.Busy) -// busy = true -// if(status == KernelStatusType.Idle) -// idle = true -// } -// -// idle should be (true) -// busy should be (false) -// } - } - } -}