http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala ---------------------------------------------------------------------- diff --git a/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala b/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala new file mode 100644 index 0000000..90c509c --- /dev/null +++ b/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala @@ -0,0 +1,259 @@ +/* + * Copyright 2015 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.spark.communication.socket + +import org.scalatest.concurrent.Eventually +import org.scalatest.mock.MockitoSugar +import org.scalatest.time.{Milliseconds, Span} +import org.scalatest.{BeforeAndAfter, FunSpec, Matchers} +import org.zeromq.ZMQ +import org.zeromq.ZMQ.{Socket, Context} + +import scala.util.Try + +class ZeroMQSocketRunnableSpec extends FunSpec with Matchers + with MockitoSugar with Eventually with BeforeAndAfter { + + implicit override val patienceConfig = PatienceConfig( + timeout = scaled(Span(2000, Milliseconds)), + interval = scaled(Span(5, Milliseconds)) + ) + + private val TestAddress = "inproc://test-address" + private var mockSocketType: SocketType = _ + private var zmqContext: ZMQ.Context = _ + private var pubSocket: ZMQ.Socket = _ + + private class TestRunnable( + private val socket: ZMQ.Socket, + private val context: Context, + private val socketType: SocketType, + private val inboundMessageCallback: Option[(Seq[String]) => Unit], + private val socketOptions: SocketOption* + ) extends ZeroMQSocketRunnable( + context, + socketType, + inboundMessageCallback, + socketOptions: _* + ) { + override protected def newZmqSocket(zmqContext: Context, socketType: Int): Socket = socket + } + + before { + mockSocketType = mock[SocketType] + zmqContext = ZMQ.context(1) + pubSocket = zmqContext.socket(PubSocket.`type`) + } + + after { + Try(zmqContext.close()) + } + + describe("ZeroMQSocketRunnable") { + describe("constructor") { + it("should throw an exception if there is no bind or connect") { + intercept[IllegalArgumentException] { + new ZeroMQSocketRunnable(zmqContext, mockSocketType, None) + } + pubSocket.close() + } + + it("should throw an exception if there is more than one connect") { + intercept[IllegalArgumentException] { + new ZeroMQSocketRunnable( + zmqContext, + mockSocketType, + None, + Connect(TestAddress), + Connect(TestAddress) + ) + } + pubSocket.close() + } + + it("should throw an exception if there is more than one bind") { + intercept[IllegalArgumentException] { + new ZeroMQSocketRunnable( + zmqContext, + mockSocketType, + None, + Bind(TestAddress), + Bind(TestAddress) + ) + } + pubSocket.close() + } + + it("should throw an exception if there is a connect and bind") { + intercept[IllegalArgumentException] { + new ZeroMQSocketRunnable( + zmqContext, + mockSocketType, + None, + Connect(""), + Bind("") + ) + } + pubSocket.close() + } + } + + describe("#run"){ + it("should set the linger option when provided") { + val expected = 999 + + val runnable: TestRunnable = new TestRunnable( + pubSocket, + zmqContext, + PubSocket, + None, + Connect(TestAddress), + Linger(expected) + ) + val thread = new Thread(runnable) + + thread.start() + + eventually { + val actual = pubSocket.getLinger + actual should be (expected) + } + + runnable.close() + } + + it("should set the identity option when provided") { + val expected = "my identity".getBytes(ZMQ.CHARSET) + + val runnable: TestRunnable = new TestRunnable( + pubSocket, + zmqContext, + PubSocket, + None, + Connect(TestAddress), + Identity(expected) + ) + val thread = new Thread(runnable) + + thread.start() + + eventually { + val actual = pubSocket.getIdentity + actual should be (expected) + } + + runnable.close() + } + + it("should close the thread when closed"){ + val runnable = new TestRunnable( + pubSocket, + zmqContext, + PubSocket, + None, + Connect(TestAddress) + ) + + val thread = new Thread(runnable) + + thread.start() + + eventually { + runnable.isProcessing should be (true) + } + + runnable.close() + + eventually{ + thread.isAlive should be (false) + } + } + } + + describe("#isProcessing") { + it("should be false when the runnable is closed") { + val runnable = new TestRunnable( + pubSocket, + zmqContext, + PubSocket, + None, + Connect(TestAddress) + ) + + val thread = new Thread(runnable) + + thread.start() + + eventually { + runnable.isProcessing should be (true) + } + + runnable.close() + + eventually { + runnable.isProcessing should be (false) + } + } + + it("should eventually be true when the runnable is started") { + val runnable = new TestRunnable( + pubSocket, + zmqContext, + PubSocket, + None, + Connect(TestAddress) + ) + + val thread = new Thread(runnable) + + thread.start() + + eventually{ + runnable.isProcessing should be (true) + } + + runnable.close() + } + } + + describe("#close"){ + it("should close the thread"){ + val runnable = new TestRunnable( + pubSocket, + zmqContext, + PubSocket, + None, + Connect(TestAddress) + ) + + val thread = new Thread(runnable) + + thread.start() + + eventually { + runnable.isProcessing should be (true) + } + + runnable.close() + + eventually{ + thread.isAlive should be(false) + } + } + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/communication/src/test/scala/org/apache/toree/communication/utils/OrderedSupportSpec.scala ---------------------------------------------------------------------- diff --git a/communication/src/test/scala/org/apache/toree/communication/utils/OrderedSupportSpec.scala b/communication/src/test/scala/org/apache/toree/communication/utils/OrderedSupportSpec.scala new file mode 100644 index 0000000..b10d4cb --- /dev/null +++ b/communication/src/test/scala/org/apache/toree/communication/utils/OrderedSupportSpec.scala @@ -0,0 +1,101 @@ +/* + * Copyright 2014 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.spark.communication.utils + +import akka.actor._ +import akka.testkit.{ImplicitSender, TestKit} +import org.scalatest.mock.MockitoSugar +import org.scalatest.{FunSpecLike, Matchers} + +case class OrderedType() +case class NotOrderedType() +case class FinishProcessingMessage() +case class ReceiveMessageCount(count: Int) + +class TestOrderedSupport extends OrderedSupport { + var receivedCounter = 0 + override def orderedTypes(): Seq[Class[_]] = Seq(classOf[OrderedType]) + + override def receive: Receive = { + case OrderedType() => + startProcessing() + receivedCounter = receivedCounter + 1 + sender ! ReceiveMessageCount(receivedCounter) + case NotOrderedType() => + receivedCounter = receivedCounter + 1 + sender ! ReceiveMessageCount(receivedCounter) + case FinishProcessingMessage() => + finishedProcessing() + } +} + +class OrderedSupportSpec extends TestKit(ActorSystem("OrderedSupportSystem")) + with ImplicitSender with Matchers with FunSpecLike + with MockitoSugar { + + describe("OrderedSupport"){ + describe("#waiting"){ + it("should wait for types defined in orderedTypes"){ + val testOrderedSupport = system.actorOf(Props[TestOrderedSupport]) + + // Send a message having a type in orderedTypes + // Starts processing and is handled with receive() + testOrderedSupport ! new OrderedType + // This message should be handled with waiting() + testOrderedSupport ! new OrderedType + + // Verify receive was not called for the second OrderedType + expectMsg(ReceiveMessageCount(1)) + + } + + it("should process types not defined in orderedTypes"){ + val testOrderedSupport = system.actorOf(Props[TestOrderedSupport]) + + // Send a message that starts the processing + testOrderedSupport ! new OrderedType + + // Send a message having a type not in orderedTypes + testOrderedSupport ! new NotOrderedType + + // Verify receive did get called for NotOrderedType + expectMsg(ReceiveMessageCount(1)) + expectMsg(ReceiveMessageCount(2)) + } + } + describe("#finishedProcessing"){ + it("should switch actor to receive method"){ + val testOrderedSupport = system.actorOf(Props[TestOrderedSupport]) + + // Switch actor to waiting mode + testOrderedSupport ! new OrderedType + + // Call finishedProcessing + testOrderedSupport ! new FinishProcessingMessage + + // Sending something that would match in receive, and is in orderedTypes + testOrderedSupport ! new OrderedType + + expectMsg(ReceiveMessageCount(1)) + expectMsg(ReceiveMessageCount(2)) + + } + + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/dependencies/DependencyDownloader.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/dependencies/DependencyDownloader.scala b/kernel-api/src/main/scala/com/ibm/spark/dependencies/DependencyDownloader.scala deleted file mode 100644 index 2f31cbe..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/dependencies/DependencyDownloader.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.dependencies - -import java.io.PrintStream -import java.net.URL - -abstract class DependencyDownloader(repositoryUrl: String, baseDir: String) { - - /** - * Retrieves the dependency and all of its dependencies as jars. - * - * @param groupId The group id associated with the main dependency - * @param artifactId The id of the dependency artifact - * @param version The version of the main dependency - * @param transitive If true, downloads all dependencies of the specified - * dependency - * @param excludeBaseDependencies If true, will exclude any dependencies - * included in the build of the kernel - * - * @return The sequence of strings pointing to the retrieved dependency jars - */ - def retrieve( - groupId: String, artifactId: String, version: String, - transitive: Boolean = true, excludeBaseDependencies: Boolean = true - ): Seq[URL] - - /** - * Sets the printstream to log to. - * - * @param printStream The new print stream to use for output logging - */ - def setPrintStream(printStream: PrintStream): Unit - -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/dependencies/IvyDependencyDownloader.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/dependencies/IvyDependencyDownloader.scala b/kernel-api/src/main/scala/com/ibm/spark/dependencies/IvyDependencyDownloader.scala deleted file mode 100644 index 2465b0e..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/dependencies/IvyDependencyDownloader.scala +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.dependencies - -import java.io.{File, PrintStream} -import java.net.URL - -import org.apache.ivy.Ivy -import org.apache.ivy.core.module.descriptor._ -import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId} -import org.apache.ivy.core.resolve.ResolveOptions -import org.apache.ivy.core.retrieve.RetrieveOptions -import org.apache.ivy.core.settings.IvySettings -import org.apache.ivy.plugins.matcher.RegexpPatternMatcher -import org.apache.ivy.plugins.parser.xml.{XmlModuleDescriptorParser, XmlModuleDescriptorWriter} -import org.apache.ivy.plugins.resolver.IBiblioResolver -import org.apache.ivy.util.{DefaultMessageLogger, Message} -import org.springframework.core.io.support._ - - -class IvyDependencyDownloader(repositoryUrl: String, baseDirectory: String) - extends DependencyDownloader(repositoryUrl, baseDirectory) -{ - private val ivySettings = new IvySettings() - private val resolver = new IBiblioResolver - - resolver.setUsepoms(true) - resolver.setM2compatible(true) - resolver.setName("central") - - // Add our resolver as the main resolver (IBiblio goes to Maven Central) - ivySettings.addResolver(resolver) - - // Mark our resolver as the default one to use - ivySettings.setDefaultResolver(resolver.getName) - - // Set the destination - ivySettings.setBaseDir(new File(baseDirectory)) - ivySettings.setDefaultResolutionCacheBasedir(baseDirectory) - ivySettings.setDefaultRepositoryCacheBasedir(baseDirectory) - - //creates an Ivy instance with settings - val ivy = Ivy.newInstance(ivySettings) - - private def getBaseDependencies: Iterable[DependencyDescriptor] = { - val xmlModuleDescriptor = XmlModuleDescriptorParser.getInstance() - val getDependencies = (url: URL) => xmlModuleDescriptor.parseDescriptor( - new IvySettings(), url, false - ).getDependencies - - // Find all of the *ivy.xml files on the classpath. - val ivyFiles = new PathMatchingResourcePatternResolver().getResources( - "classpath*:**/*ivy.xml" - ) - val classpathURLs = ivyFiles.map(_.getURI.toURL) - - // Get all of the dependencies from the *ivy.xml files - val dependencies = classpathURLs.map(getDependencies).flatten - - // Remove duplicates based on artifact name - val distinctDependencies = - dependencies.groupBy(_.getDependencyId.getName).map(_._2.head) - - distinctDependencies - } - - override def retrieve( - groupId: String, artifactId: String, version: String, - transitive: Boolean = true, excludeBaseDependencies: Boolean = true - ): Seq[URL] = { - // Start building the ivy.xml file - val ivyFile = File.createTempFile("ivy-custom", ".xml") - ivyFile.deleteOnExit() - - val md = DefaultModuleDescriptor.newDefaultInstance( - ModuleRevisionId.newInstance("com.ibm.spark", "spark-kernel", "working") - ) - - // Exclude all sources artifacts i.e. artifactId-version-sources.jar - val moduleId = new ModuleId("*", "*") - val sourcesArtifactId = new ArtifactId(moduleId, "*", "source", "*") - val sourcesExclusion = new DefaultExcludeRule( - sourcesArtifactId, new RegexpPatternMatcher(), null - ) - - // Exclude all javadoc artifacts i.e. artifactId-version-javadoc.jar - val javadocArtifactId = new ArtifactId(moduleId, "*", "javadoc", "*") - val javadocExclusion = new DefaultExcludeRule( - javadocArtifactId, new RegexpPatternMatcher(), null - ) - - // TODO: figure out why this is not excluded. It's in our build.sbt file - // TODO: and we exclude all deps there. Need to get rid of this hard-code - val scalaCompilerModuleId = new ModuleId("org.scala-lang", "*") - val scalaCompilerArtifactId = new ArtifactId( - scalaCompilerModuleId, "*", "*", "*" - ) - val scalaCompilerExclusion = new DefaultExcludeRule( - scalaCompilerArtifactId, new RegexpPatternMatcher(), null - ) - - // Create our dependency descriptor - val dependencyDescriptor = new DefaultDependencyDescriptor( - md, ModuleRevisionId.newInstance(groupId, artifactId, version), - false, false, true - ) - - md.addDependency(dependencyDescriptor) - - // Add any and all exclusions - md.addExcludeRule(sourcesExclusion) - md.addExcludeRule(javadocExclusion) - md.addExcludeRule(scalaCompilerExclusion) - - // Exclude our base dependencies if marked to do so - if (excludeBaseDependencies) { - getBaseDependencies.foreach(dep => { - val depRevId = dep.getDependencyRevisionId - val moduleId = new ModuleId(depRevId.getOrganisation, depRevId.getName) - val artifactId = new ArtifactId(moduleId, "*", "*", "*") - val excludeRule = new DefaultExcludeRule( - artifactId, new RegexpPatternMatcher(), null) - md.addExcludeRule(excludeRule) - }) - } - - // Creates our ivy configuration file - XmlModuleDescriptorWriter.write(md, ivyFile) - - // Grab our dependencies (and theirs, etc) recursively - val resolveOptions = new ResolveOptions() - .setTransitive(transitive) - .setDownload(true) - - // Init resolve report (has what was downloaded, etc) - val report = ivy.resolve(ivyFile.toURI.toURL, resolveOptions) - - // Get the jar libraries - val artifactURLs = report.getAllArtifactsReports - .map(report => new URL("file:" + report.getLocalFile.getCanonicalPath)) - - val moduleDescriptor = report.getModuleDescriptor - ivy.retrieve( - moduleDescriptor.getModuleRevisionId, - baseDirectory + "/[artifact](-[classifier]).[ext]", - new RetrieveOptions().setConfs(Seq("default").toArray) - ) - - artifactURLs - } - - /** - * Uses our printstream in Ivy's LoggingEngine - * @param printStream the print stream to use - */ - override def setPrintStream(printStream: PrintStream): Unit = { - ivy.getLoggerEngine.setDefaultLogger( - new DefaultMessageLogger(Message.MSG_INFO) { - override def doEndProgress(msg: String): Unit = - printStream.println(msg) - - override def doProgress(): Unit = - printStream.print(".") - - override def log(msg: String, level: Int): Unit = - if (level <= this.getLevel) - printStream.println(msg) - } - ) - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/global/StreamState.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/global/StreamState.scala b/kernel-api/src/main/scala/com/ibm/spark/global/StreamState.scala deleted file mode 100644 index 973c000..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/global/StreamState.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.global - -import java.io.{InputStream, OutputStream, PrintStream} - -/** - * Represents the global state for input and output streams used to communicate - * standard input and output. - */ -object StreamState { - private val _baseInputStream = System.in - private val _baseOutputStream = System.out - private val _baseErrorStream = System.err - - @volatile private var _inputStream = _baseInputStream - @volatile private var _outputStream = _baseOutputStream - @volatile private var _errorStream = _baseErrorStream - - private def init(in: InputStream, out: OutputStream, err: OutputStream) = - synchronized { - System.setIn(in) - Console.setIn(in) - - System.setOut(new PrintStream(out)) - Console.setOut(out) - - System.setErr(new PrintStream(err)) - Console.setErr(err) - } - - private def reset(): Unit = synchronized { - System.setIn(_baseInputStream) - Console.setIn(_baseInputStream) - - System.setOut(_baseOutputStream) - Console.setOut(_baseOutputStream) - - System.setErr(_baseErrorStream) - Console.setErr(_baseErrorStream) - } - - /** - * Sets the internal streams to be used with the stream block. - * - * @param inputStream The input stream to map standard in - * @param outputStream The output stream to map standard out - * @param errorStream The output stream to map standard err - */ - def setStreams( - inputStream: InputStream = _inputStream, - outputStream: OutputStream = _outputStream, - errorStream: OutputStream = _errorStream - ) = { - _inputStream = inputStream - _outputStream = new PrintStream(outputStream) - _errorStream = new PrintStream(errorStream) - } - - /** - * Execute code block, mapping all input and output to the provided streams. - */ - def withStreams[T](thunk: => T): T = { - init(_inputStream, _outputStream, _errorStream) - - val returnValue = thunk - - reset() - - returnValue - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/interpreter/ExecuteFailure.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/ExecuteFailure.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/ExecuteFailure.scala deleted file mode 100644 index 8103536..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/ExecuteFailure.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.interpreter - -/** - * Represents a generic failure in execution. - */ -sealed abstract class ExecuteFailure - -/** - * Represents an error resulting from interpret execution. - * @param name The name of the error - * @param value The message provided from the error - * @param stackTrace The stack trace as a list of strings representing lines - * in the stack trace - */ -case class ExecuteError( - name: String, value: String, stackTrace: List[String] -) extends ExecuteFailure { - override def toString: String = - "Name: " + name + "\n" + - "Message: " + value + "\n" + - "StackTrace: " + stackTrace.mkString("\n") -} - -// TODO: Replace with object? -/** - * Represents an aborted execution. - */ -class ExecuteAborted extends ExecuteFailure http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/interpreter/Interpreter.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/Interpreter.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/Interpreter.scala deleted file mode 100644 index 6200b9b..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/Interpreter.scala +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.interpreter - -import java.net.URL - -import com.ibm.spark.kernel.api.KernelLike -import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext - -import scala.tools.nsc.interpreter._ - -trait Interpreter { - - /** - * Initializes the interpreter. - * @param kernel The kernel - * @return The newly initialized interpreter - */ - def init(kernel: KernelLike): Interpreter - - /** - * Starts the interpreter, initializing any internal state. - * @return A reference to the interpreter - */ - def start(): Interpreter - - /** - * Interrupts the current code being interpreted. - * @return A reference to the interpreter - */ - def interrupt(): Interpreter - - /** - * Stops the interpreter, removing any previous internal state. - * @return A reference to the interpreter - */ - def stop(): Interpreter - - /** - * Adds external jars to the internal classpaths of the interpreter. - * @param jars The list of jar locations - */ - def addJars(jars: URL*): Unit - - /** - * Executes the provided code with the option to silence output. - * @param code The code to execute - * @param silent Whether or not to execute the code silently (no output) - * @return The success/failure of the interpretation and the output from the - * execution or the failure - */ - def interpret(code: String, silent: Boolean = false): - (Results.Result, Either[ExecuteOutput, ExecuteFailure]) - - /** - * @return Returns a string to reference the URI of where the interpreted class files are created - */ - def classServerURI: String - - /** - * Executes body and will not print anything to the console during the execution - * @param body The function to execute - * @tparam T The return type of body - * @return The return value of body - */ - def doQuietly[T](body: => T): T - - /** - * Binds the SparkContext instance to the interpreter's namespace. - * - * @param sparkContext The SparkContext to bind - */ - def bindSparkContext(sparkContext: SparkContext): Unit - - /** - * Binds the SQLContext instance to the interpreter's namespace. - * - * @param sqlContext The SQLContext to bind - */ - def bindSqlContext(sqlContext: SQLContext): Unit - - /** - * Binds a variable in the interpreter to a value. - * @param variableName The name to expose the value in the interpreter - * @param typeName The type of the variable, must be the fully qualified class name - * @param value The value of the variable binding - * @param modifiers Any annotation, scoping modifiers, etc on the variable - */ - def bind(variableName: String, typeName: String, value: Any, modifiers: List[String]) - - /** - * Retrieves the contents of the variable with the provided name from the - * interpreter. - * @param variableName The name of the variable whose contents to read - * @return An option containing the variable contents or None if the - * variable does not exist - */ - def read(variableName: String): Option[AnyRef] - - /** - * Mask the Console and System objects with our wrapper implementations - * and dump the Console methods into the public namespace (similar to - * the Predef approach). - * @param in The new input stream - * @param out The new output stream - * @param err The new error stream - */ - def updatePrintStreams(in: InputStream, out: OutputStream, err: OutputStream) - - /** - * Attempts to perform code completion via the <TAB> command. - * @param code The current cell to complete - * @param pos The cursor position - * @return The cursor position and list of possible completions - */ - def completion(code: String, pos: Int): (Int, List[String] ) - - /** - * Returns the name of the variable created from the last execution. - * @return Some String name if a variable was created, otherwise None - */ - def lastExecutionVariableName: Option[String] - - /** - * Returns the class loader used by this interpreter. - * @return The runtime class loader used by this interpreter - */ - def classLoader: ClassLoader -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/interpreter/InterpreterTypes.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/InterpreterTypes.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/InterpreterTypes.scala deleted file mode 100644 index 7ecee65..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/InterpreterTypes.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.interpreter - -/** - * Contains all types associated with the interpreter interface. - */ -object InterpreterTypes { - /** - * Represents the output from an interpret execution. - */ - type ExecuteOutput = String -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/interpreter/Results.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/Results.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/Results.scala deleted file mode 100644 index 8bd12d0..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/Results.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.interpreter - -/** - * Represents interpreter results, mostly taken from the - * tools.nsc.interpreter.Results object. - */ -object Results { - abstract sealed class Result - - /** The line was interpreted successfully. */ - case object Success extends Result { override def toString = "success" } - - /** The line was erroneous in some way. */ - case object Error extends Result { override def toString = "error" } - - /** The input was incomplete. The caller should request more input. */ - case object Incomplete extends Result { override def toString = "incomplete" } - - /** The line was aborted before completed. */ - case object Aborted extends Result { override def toString = "aborted" } -} - http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerBridge.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerBridge.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerBridge.scala deleted file mode 100644 index 94b9a24..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerBridge.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.interpreter.broker - -import com.ibm.spark.interpreter.broker.producer.{SQLContextProducerLike, JavaSparkContextProducerLike} -import com.ibm.spark.kernel.api.KernelLike -import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.sql.SQLContext -import org.apache.spark.{SparkConf, SparkContext} - -/** - * Represents the API available to the broker to act as the bridge for data - * between the JVM and some external process. - * - * @param _brokerState The container of broker state to expose - * @param _kernel The kernel API to expose through the bridge - */ -class BrokerBridge( - private val _brokerState: BrokerState, - private val _kernel: KernelLike -) extends BrokerName { - /** - * Represents the current state of the broker. - */ - val state: BrokerState = _brokerState - - /** - * Represents the kernel API available. - */ - val kernel: KernelLike = _kernel -} - http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerCode.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerCode.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerCode.scala deleted file mode 100644 index e480aa8..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerCode.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.interpreter.broker - -import BrokerTypes._ - -/** - * Represents a block of code to be evaluated. - * - * @param codeId The id to associate with the code to be executed - * @param code The code to evaluate using the broker - */ -case class BrokerCode(codeId: CodeId, code: Code) - http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerException.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerException.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerException.scala deleted file mode 100644 index b059552..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerException.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.interpreter.broker - -/** - * Represents a generic broker exception. - * - * @param message The message to associate with the exception - */ -class BrokerException(message: String) extends Throwable(message) - http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerName.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerName.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerName.scala deleted file mode 100644 index 1482ade..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerName.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.interpreter.broker - -/** - * Represents the interface that associates a name with a broker. Can be - * overridden to change name of broker in subclassing. - */ -trait BrokerName { - /** The name of the broker. */ - val brokerName: String = "broker" -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerProcess.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerProcess.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerProcess.scala deleted file mode 100644 index 5072b92..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerProcess.scala +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.interpreter.broker - -import java.io.{OutputStream, InputStream, File, FileOutputStream} - -import org.apache.commons.exec._ -import org.apache.commons.exec.environment.EnvironmentUtils -import org.apache.commons.io.{FilenameUtils, IOUtils} -import org.slf4j.LoggerFactory -import scala.collection.JavaConverters._ - -/** - * Represents the process used to evaluate broker code. - * - * @param processName The name of the process to invoke - * @param entryResource The resource to be copied and fed as the first argument - * to the process - * @param otherResources Other resources to be included in the same directory - * as the main resource - * @param brokerBridge The bridge to use to retrieve kernel output streams - * and the Spark version to be verified - * @param brokerProcessHandler The handler to use when the process fails or - * completes - * @param arguments The collection of additional arguments to pass to the - * process after the main entrypoint - */ -class BrokerProcess( - private val processName: String, - private val entryResource: String, - private val otherResources: Seq[String], - private val brokerBridge: BrokerBridge, - private val brokerProcessHandler: BrokerProcessHandler, - private val arguments: Seq[String] = Nil -) extends BrokerName { - require(processName != null && processName.trim.nonEmpty, - "Process name cannot be null or pure whitespace!") - require(entryResource != null && entryResource.trim.nonEmpty, - "Entry resource cannot be null or pure whitespace!") - - private val logger = LoggerFactory.getLogger(this.getClass) - private val classLoader = this.getClass.getClassLoader - private val outputDir = - s"kernel-$brokerName-" + java.util.UUID.randomUUID().toString - - /** Represents the current process being executed. */ - @volatile private[broker] var currentExecutor: Option[Executor] = None - - /** - * Returns the temporary directory to place any files needed for the process. - * - * @return The directory path as a string - */ - protected def getTmpDirectory: String = System.getProperty("java.io.tmpdir") - - /** - * Returns the subdirectory to use to place any files needed for the process. - * - * @return The directory path as a string - */ - protected lazy val getSubDirectory: String = - s"kernel-$brokerName-" + java.util.UUID.randomUUID().toString - - /** - * Copies a resource from an input stream to an output stream. - * - * @param inputStream The input stream to copy from - * @param outputStream The output stream to copy to - * - * @return The result of the copy operation - */ - protected def copy(inputStream: InputStream, outputStream: OutputStream) = - IOUtils.copy(inputStream, outputStream) - - /** - * Copies a file from the kernel resources to the temporary directory. - * - * @param resource The resource to copy - * - * @return The string path pointing to the resource's destination - */ - protected def copyResourceToTmp(resource: String): String = { - val brokerRunnerResourceStream = classLoader.getResourceAsStream(resource) - - val tmpDirectory = Option(getTmpDirectory) - .getOrElse(throw new BrokerException("java.io.tmpdir is not set!")) - val subDirectory = Option(getSubDirectory).getOrElse("") - val outputName = FilenameUtils.getName(resource) - - val outputDir = Seq(tmpDirectory, subDirectory) - .filter(_.trim.nonEmpty).mkString("/") - val outputScript = new File(FilenameUtils.concat(outputDir, outputName)) - - // If our script destination is a directory, we cannot copy the script - if (outputScript.exists() && outputScript.isDirectory) - throw new BrokerException(s"Failed to create script: $outputScript") - - // Ensure that all of the directories leading up to the script exist - val outputDirFile = new File(outputDir) - if (!outputDirFile.exists()) outputDirFile.mkdirs() - - // Copy the script to the specified temporary destination - val outputScriptStream = new FileOutputStream(outputScript) - copy( - brokerRunnerResourceStream, - outputScriptStream - ) - outputScriptStream.close() - - // Return the destination of the script - val destination = outputScript.getPath - logger.debug(s"Successfully copied $resource to $destination") - destination - } - - /** - * Creates a new process environment to be used for environment variable - * retrieval by the new process. - * - * @return The map of environment variables and their respective values - */ - protected def newProcessEnvironment(): Map[String, String] = { - val procEnvironment = EnvironmentUtils.getProcEnvironment - - procEnvironment.asScala.toMap - } - - /** - * Creates a new executor to be used to launch the process. - * - * @return The executor to start and manage the process - */ - protected def newExecutor(): Executor = new DefaultExecutor - - /** - * Starts the Broker process. - */ - def start(): Unit = currentExecutor.synchronized { - assert(currentExecutor.isEmpty, "Process has already been started!") - - val capitalizedBrokerName = brokerName.capitalize - - val script = copyResourceToTmp(entryResource) - logger.debug(s"New $brokerName script created: $script") - - val createdResources = otherResources.map(copyResourceToTmp) - - // Verify that all files were successfully created - val createdResult = (script +: createdResources).map(new File(_)).map(f => { - if (f.exists()) true - else { - val resource = f.getPath - logger.warn(s"Failed to create resource: $resource") - false - } - }).forall(_ == true) - if (!createdResult) throw new BrokerException( - s"Failed to create resources for $capitalizedBrokerName" - ) - - val commandLine = CommandLine - .parse(processName) - .addArgument(script) - arguments.foreach(commandLine.addArgument) - - logger.debug(s"$capitalizedBrokerName command: ${commandLine.toString}") - - val executor = newExecutor() - - // TODO: Figure out how to dynamically update the output stream used - // to use kernel.out, kernel.err, and kernel.in - // NOTE: Currently mapping to standard output/input, which will be caught - // by our system and redirected through the kernel to the client - executor.setStreamHandler(new PumpStreamHandler( - System.out, - System.err, - System.in - )) - - // Marking exit status of 1 as successful exit - executor.setExitValue(1) - - // Prevent the runner from being killed due to run time as it is a - // long-term process - executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT)) - - val processEnvironment = newProcessEnvironment().asJava - logger.debug(s"$capitalizedBrokerName environment: $processEnvironment") - - // Start the process using the environment provided to the parent - executor.execute(commandLine, processEnvironment, brokerProcessHandler) - - currentExecutor = Some(executor) - } - - /** - * Stops the Broker process. - */ - def stop(): Unit = currentExecutor.synchronized { - currentExecutor.foreach(executor => { - logger.debug(s"Stopping $brokerName process") - executor.getWatchdog.destroyProcess() - }) - currentExecutor = None - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerProcessHandler.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerProcessHandler.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerProcessHandler.scala deleted file mode 100644 index 704f974..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerProcessHandler.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.interpreter.broker - -import org.apache.commons.exec.{ExecuteException, ExecuteResultHandler} -import org.slf4j.LoggerFactory - -/** - * Represents the handler for events triggered by the broker process. - * - * @param brokerBridge The bridge to reset when the process fails or completes - * @param restartOnFailure If true, restarts the process if it fails - * @param restartOnCompletion If true, restarts the process if it completes - */ -class BrokerProcessHandler( - private val brokerBridge: BrokerBridge, - private val restartOnFailure: Boolean, - private val restartOnCompletion: Boolean -) extends ExecuteResultHandler with BrokerName { - private val logger = LoggerFactory.getLogger(this.getClass) - private val capitalizedBrokerName = brokerName.capitalize - private val resetMessage = s"$capitalizedBrokerName was reset!" - - private var performReset: String => Unit = (_) => {} - private var performRestart: () => Unit = () => {} - - /** - * Sets the reset method used when a reset of the process is asked. - * - * @param resetMethod The method to use for resetting the process - */ - def setResetMethod(resetMethod: String => Unit): Unit = - performReset = resetMethod - - /** - * Sets the restart method used when a restart of the process is asked. - * - * @param restartMethod The method to use for restarting the process - */ - def setRestartMethod(restartMethod: () => Unit): Unit = - performRestart = restartMethod - - override def onProcessFailed(ex: ExecuteException): Unit = { - logger.error(s"$capitalizedBrokerName process failed: $ex") - performReset(resetMessage) - - if (restartOnFailure) performRestart() - } - - override def onProcessComplete(exitValue: Int): Unit = { - logger.error(s"$capitalizedBrokerName process exited: $exitValue") - performReset(resetMessage) - - if (restartOnCompletion) performRestart() - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerPromise.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerPromise.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerPromise.scala deleted file mode 100644 index 3fe96bf..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerPromise.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.interpreter.broker - -import com.ibm.spark.interpreter.broker.BrokerTypes.{CodeResults, CodeId} - -import scala.concurrent.Promise - -/** - * Represents a promise made regarding the completion of broker code execution. - * - * @param codeId The id of the code that was executed - * @param promise The promise to be fulfilled when the code finishes executing - */ -case class BrokerPromise(codeId: CodeId, promise: Promise[CodeResults]) http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerService.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerService.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerService.scala deleted file mode 100644 index 27430af..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerService.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.interpreter.broker - -import com.ibm.spark.interpreter.broker.BrokerTypes.{Code, CodeResults} -import scala.concurrent.Future - -/** - * Represents the service that provides the high-level interface between the - * JVM and another process. - */ -trait BrokerService { - /** Starts the broker service. */ - def start(): Unit - - /** - * Indicates whether or not the service is running. - * - * @return True if running, otherwise false - */ - def isRunning: Boolean - - /** - * Submits code to the broker service to be executed and return a result. - * - * @param code The code to execute - * - * @return The result as a future to eventually return - */ - def submitCode(code: Code): Future[CodeResults] - - /** Stops the running broker service. */ - def stop(): Unit -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerState.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerState.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerState.scala deleted file mode 100644 index 409d789..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerState.scala +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.interpreter.broker - -import java.util.concurrent.ConcurrentHashMap - -import com.ibm.spark.interpreter.broker.BrokerTypes._ -import org.slf4j.LoggerFactory - -import scala.concurrent.{Future, promise} - -/** - * Represents the state structure of broker. - * - * @param maxQueuedCode The maximum amount of code to support being queued - * at the same time for broker execution - * - */ -class BrokerState(private val maxQueuedCode: Int) { - private val logger = LoggerFactory.getLogger(this.getClass) - - import scala.collection.JavaConverters._ - - private var _isReady: Boolean = false - protected val codeQueue: java.util.Queue[BrokerCode] = - new java.util.concurrent.ConcurrentLinkedQueue[BrokerCode]() - protected val promiseMap: collection.mutable.Map[CodeId, BrokerPromise] = - new ConcurrentHashMap[CodeId, BrokerPromise]().asScala - - /** - * Adds new code to eventually be executed. - * - * @param code The snippet of code to execute - * - * @return The future containing the results of the execution - */ - def pushCode(code: Code): Future[CodeResults] = synchronized { - // Throw the standard error if our maximum limit has been reached - if (codeQueue.size() >= maxQueuedCode) - throw new IllegalStateException( - s"Code limit of $maxQueuedCode has been reached!") - - // Generate our promise that will be fulfilled when the code is executed - // and the results are sent back - val codeExecutionPromise = promise[CodeResults]() - - // Build the code representation to send to Broker - val uniqueId = java.util.UUID.randomUUID().toString - val brokerCode = BrokerCode(uniqueId, code) - val brokerPromise = BrokerPromise(uniqueId, codeExecutionPromise) - - logger.debug(s"Queueing '$code' with id '$uniqueId' to run with broker") - - // Add the code to be executed to our queue and the promise to our map - codeQueue.add(brokerCode) - promiseMap.put(brokerPromise.codeId, brokerPromise) - - codeExecutionPromise.future - } - - /** - * Returns the total code currently queued to be executed. - * - * @return The total number of code instances queued to be executed - */ - def totalQueuedCode(): Int = codeQueue.size() - - /** - * Retrieves (and removes) the next piece of code to be executed. - * - * @note This should only be invoked by the broker process! - * - * @return The next code to execute if available, otherwise null - */ - def nextCode(): BrokerCode = { - val brokerCode = codeQueue.poll() - - if (brokerCode != null) - logger.trace(s"Sending $brokerCode to Broker runner") - - brokerCode - } - - /** - * Indicates whether or not the broker instance is ready for code. - * - * @return True if it is ready, otherwise false - */ - def isReady: Boolean = _isReady - - /** - * Marks the state of broker as ready. - */ - def markReady(): Unit = _isReady = true - - /** - * Marks the specified code as successfully completed using its id. - * - * @param codeId The id of the code to mark as a success - * @param output The output from the execution to be used as the result - */ - def markSuccess(codeId: CodeId, output: CodeResults): Unit = { - logger.debug(s"Received success for code with id '$codeId': $output") - promiseMap.remove(codeId).foreach(_.promise.success(output)) - } - - /** - * Marks the specified code as successfully completed using its id. Output - * from success is treated as an empty string. - * - * @param codeId The id of the code to mark as a success - */ - def markSuccess(codeId: CodeId): Unit = markSuccess(codeId, "") - - /** - * Marks the specified code as unsuccessful using its id. - * - * @param codeId The id of the code to mark as a failure - * @param output The output from the error to be used as the description - * of the exception - */ - def markFailure(codeId: CodeId, output: CodeResults): Unit = { - logger.debug(s"Received failure for code with id '$codeId': $output") - promiseMap.remove(codeId).foreach( - _.promise.failure(new BrokerException(output))) - } - - /** - * Marks the specified code as unsuccessful using its id. Output from failure - * is treated as an empty string. - * - * @param codeId The id of the code to mark as a failure - */ - def markFailure(codeId: CodeId): Unit = markFailure(codeId, "") - - /** - * Resets the state by clearing any pending code executions and marking all - * pending executions as failures (or success if specified). - * - * @param message The message to present through the interrupted promises - * @param markAllAsFailure If true, marks all pending executions as failures, - * otherwise marks all as success - */ - def reset(message: String, markAllAsFailure: Boolean = true): Unit = { - codeQueue.synchronized { - promiseMap.synchronized { - codeQueue.clear() - - // Use map contents for reset as it should contain non-executing - // code as well as executing code - promiseMap.foreach { case (codeId, codePromise) => - if (markAllAsFailure) - codePromise.promise.failure(new BrokerException(message)) - else - codePromise.promise.success(message) - } - promiseMap.clear() - } - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerTransformer.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerTransformer.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerTransformer.scala deleted file mode 100644 index aa18648..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerTransformer.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.interpreter.broker - -import com.ibm.spark.interpreter.InterpreterTypes.ExecuteOutput -import com.ibm.spark.interpreter.Results.Result -import com.ibm.spark.interpreter.broker.BrokerTypes.CodeResults -import com.ibm.spark.interpreter.{ExecuteError, ExecuteFailure, Results} - -import scala.concurrent.Future - -/** - * Represents a utility that can transform raw broker information to - * kernel information. - */ -class BrokerTransformer { - /** - * Transforms a pure result containing output information into a form that - * the interpreter interface expects. - * - * @param futureResult The raw result as a future - * - * @return The transformed result as a future - */ - def transformToInterpreterResult(futureResult: Future[CodeResults]): - Future[(Result, Either[ExecuteOutput, ExecuteFailure])] = - { - import scala.concurrent.ExecutionContext.Implicits.global - - futureResult - .map(results => (Results.Success, Left(results))) - .recover({ case ex: BrokerException => - (Results.Error, Right(ExecuteError( - name = ex.getClass.getName, - value = ex.getLocalizedMessage, - stackTrace = ex.getStackTrace.map(_.toString).toList - ))) - }) - } -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerTypes.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerTypes.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerTypes.scala deleted file mode 100644 index 71e4d3d..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerTypes.scala +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.interpreter.broker - -/** - * Represents all types associated with the broker interface. - */ -object BrokerTypes extends BrokerTypesProvider http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerTypesProvider.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerTypesProvider.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerTypesProvider.scala deleted file mode 100644 index 2af47e4..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/BrokerTypesProvider.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.interpreter.broker - -/** - * Provides broker types to the class/trait that implements this trait. - */ -trait BrokerTypesProvider { - /** Represents the id used to keep track of executing code. */ - type CodeId = String - - /** Represents the code to execute. */ - type Code = String - - /** Represents the results of code execution or the failure message. */ - type CodeResults = String -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/producer/JavaSparkContextProducerLike.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/producer/JavaSparkContextProducerLike.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/producer/JavaSparkContextProducerLike.scala deleted file mode 100644 index cda61f3..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/producer/JavaSparkContextProducerLike.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.interpreter.broker.producer - -import org.apache.spark.SparkContext -import org.apache.spark.api.java.JavaSparkContext - -/** - * Represents a producer for a JavaSparkContext. - */ -trait JavaSparkContextProducerLike { - /** - * Creates a new JavaSparkContext instance. - * - * @param sparkContext The SparkContext instance to use to create the Java one - * - * @return The new JavaSparkContext - */ - def newJavaSparkContext(sparkContext: SparkContext): JavaSparkContext -} - -/** - * Represents the standard producer for a JavaSparkContext. - */ -trait StandardJavaSparkContextProducer extends JavaSparkContextProducerLike { - def newJavaSparkContext(sparkContext: SparkContext): JavaSparkContext = - new JavaSparkContext(sparkContext) -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/producer/SQLContextProducerLike.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/producer/SQLContextProducerLike.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/producer/SQLContextProducerLike.scala deleted file mode 100644 index fd46268..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/broker/producer/SQLContextProducerLike.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.interpreter.broker.producer - -import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext - -/** - * Represents a producer for a SQLContext. - */ -trait SQLContextProducerLike { - /** - * Creates a new SQLContext instance. - * - * @param sparkContext The SparkContext instance to use to create the SQL one - * - * @return The new SQLContext - */ - def newSQLContext(sparkContext: SparkContext): SQLContext -} - -/** - * Represents the standard producer for a SQLContext. - */ -trait StandardSQLContextProducer extends SQLContextProducerLike { - def newSQLContext(sparkContext: SparkContext): SQLContext = - new SQLContext(sparkContext) -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/interpreter/imports/printers/WrapperConsole.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/imports/printers/WrapperConsole.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/imports/printers/WrapperConsole.scala deleted file mode 100644 index 42c5616..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/imports/printers/WrapperConsole.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.interpreter.imports.printers - -import java.io._ - -import com.ibm.spark.utils.DynamicReflectionSupport - -/** - * Represents a wrapper for the scala.Console for Scala 2.10.4 implementation. - * @param in The input stream used for standard in - * @param out The output stream used for standard out - * @param err The output stream used for standard error - */ -class WrapperConsole( - val in: BufferedReader, - val out: PrintStream, - val err: PrintStream -) extends DynamicReflectionSupport(Class.forName("scala.Console$"), scala.Console) { - require(in != null) - require(out != null) - require(err != null) - - // - // SUPPORTED PRINT OPERATIONS - // - - def print(obj: Any): Unit = out.print(obj) - def printf(text: String, args: Any*): Unit = - out.print(text.format(args: _*)) - def println(x: Any): Unit = out.println(x) - def println(): Unit = out.println() -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/interpreter/imports/printers/WrapperSystem.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/imports/printers/WrapperSystem.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/imports/printers/WrapperSystem.scala deleted file mode 100644 index 4583680..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/imports/printers/WrapperSystem.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.interpreter.imports.printers - -import java.io._ - -import com.ibm.spark.utils.DynamicReflectionSupport - -/** - * Represents a wrapper for java.lang.System. - * @param inStream The input stream used for standard in - * @param outStream The output stream used for standard out - * @param errStream The output stream used for standard error - */ -class WrapperSystem( - private val inStream: InputStream, - private val outStream: OutputStream, - private val errStream: OutputStream -) extends DynamicReflectionSupport(Class.forName("java.lang.System"), null){ - require(inStream != null) - require(outStream != null) - require(errStream != null) - - private val outPrinter = new PrintStream(outStream) - private val errPrinter = new PrintStream(errStream) - - // - // MASKED METHODS - // - - def in = inStream - def out = outPrinter - def err = errPrinter -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/interpreter/package.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/interpreter/package.scala b/kernel-api/src/main/scala/com/ibm/spark/interpreter/package.scala deleted file mode 100644 index 451316d..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/interpreter/package.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark - -// TODO: Deprecate and remove this package object as it is difficult to -// remember where this type comes from -package object interpreter { - /** - * Represents the output from an interpret execution. - */ - type ExecuteOutput = String - -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/kernel/api/FactoryMethodsLike.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/kernel/api/FactoryMethodsLike.scala b/kernel-api/src/main/scala/com/ibm/spark/kernel/api/FactoryMethodsLike.scala deleted file mode 100644 index 1642e1b..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/kernel/api/FactoryMethodsLike.scala +++ /dev/null @@ -1,34 +0,0 @@ -package com.ibm.spark.kernel.api - -import java.io.{InputStream, OutputStream} - -/** - * Represents the methods available to create objects related to the kernel. - */ -trait FactoryMethodsLike { - /** - * Creates a new kernel output stream. - * - * @param streamType The type of output stream (stdout/stderr) - * @param sendEmptyOutput If true, will send message even if output is empty - * - * @return The new KernelOutputStream instance - */ - def newKernelOutputStream( - streamType: String, - sendEmptyOutput: Boolean - ): OutputStream - - /** - * Creates a new kernel input stream. - * - * @param prompt The text to use as a prompt - * @param password If true, should treat input as a password field - * - * @return The new KernelInputStream instance - */ - def newKernelInputStream( - prompt: String, - password: Boolean - ): InputStream -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/kernel/api/KernelLike.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/kernel/api/KernelLike.scala b/kernel-api/src/main/scala/com/ibm/spark/kernel/api/KernelLike.scala deleted file mode 100644 index c9442aa..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/kernel/api/KernelLike.scala +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.kernel.api - -import java.io.{PrintStream, InputStream, OutputStream} - -import com.typesafe.config.Config -import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.SQLContext - -/** - * Interface for the kernel API. This does not include exposed variables. - */ -trait KernelLike { - - def createSparkContext(conf: SparkConf): SparkContext - - def createSparkContext(master: String, appName: String): SparkContext - - /** - * Executes a block of code represented as a string and returns the result. - * - * @param code The code as an option to execute - * - * @return A tuple containing the result (true/false) and the output as a - * string - */ - def eval(code: Option[String]): (Boolean, String) - - /** - * Returns a collection of methods that can be used to generate objects - * related to the kernel. - * - * @return The collection of factory methods - */ - def factory: FactoryMethodsLike - - /** - * Returns a collection of methods that can be used to stream data from the - * kernel to the client. - * - * @return The collection of stream methods - */ - def stream: StreamMethodsLike - - /** - * Returns a print stream to be used for communication back to clients - * via standard out. - * - * @return The print stream instance or an error if the stream info is - * not found - */ - def out: PrintStream - - /** - * Returns a print stream to be used for communication back to clients - * via standard error. - * - * @return The print stream instance or an error if the stream info is - * not found - */ - def err: PrintStream - - /** - * Returns an input stream to be used to receive information from the client. - * - * @return The input stream instance or an error if the stream info is - * not found - */ - def in: InputStream - - /** - * Represents data to be shared using the kernel as the middleman. - * - * @note Using Java structure to enable other languages to have easy access! - */ - val data: java.util.Map[String, Any] - - - def interpreter(name: String): Option[com.ibm.spark.interpreter.Interpreter] - - def config: Config - - def sparkContext: SparkContext - - def sparkConf: SparkConf - - def javaSparkContext: JavaSparkContext - - def sqlContext: SQLContext -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/kernel/api/KernelOptions.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/kernel/api/KernelOptions.scala b/kernel-api/src/main/scala/com/ibm/spark/kernel/api/KernelOptions.scala deleted file mode 100644 index 00d00c9..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/kernel/api/KernelOptions.scala +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright 2015 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.spark.kernel.api - - -object KernelOptions { - var showTypes: Boolean = false - var noTruncation: Boolean = false -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/kernel/api/StreamInfo.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/kernel/api/StreamInfo.scala b/kernel-api/src/main/scala/com/ibm/spark/kernel/api/StreamInfo.scala deleted file mode 100644 index 24cef4c..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/kernel/api/StreamInfo.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.kernel.api - -/** - * Represents a "wrapper" for information needed to stream stdout/stderr from - * the kernel to a client. - * - * @note This exists because the KernelMessage instance is defined in the - * protocol project, which is not brought into this project. Furthermore, - * it is better practice to provide an explicit wrapper type rather than - * a more common type for implicit use. - */ -trait StreamInfo http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/kernel/api/StreamMethodsLike.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/kernel/api/StreamMethodsLike.scala b/kernel-api/src/main/scala/com/ibm/spark/kernel/api/StreamMethodsLike.scala deleted file mode 100644 index 4e7d9d8..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/kernel/api/StreamMethodsLike.scala +++ /dev/null @@ -1,13 +0,0 @@ -package com.ibm.spark.kernel.api - -/** - * Represents the methods available to stream data from the kernel to the - * client. - */ -trait StreamMethodsLike { - /** - * Sends all text provided as one stream message to the client. - * @param text The text to wrap in a stream message - */ - def sendAll(text: String): Unit -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/magic/CellMagic.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/magic/CellMagic.scala b/kernel-api/src/main/scala/com/ibm/spark/magic/CellMagic.scala deleted file mode 100644 index 3da1f04..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/magic/CellMagic.scala +++ /dev/null @@ -1,8 +0,0 @@ -package com.ibm.spark.magic - -/** - * Cell Magics change the output of a cell in IPython - */ -trait CellMagic extends Magic { - override def execute(code: String): CellMagicOutput -} http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/68f7ddd6/kernel-api/src/main/scala/com/ibm/spark/magic/InternalClassLoader.scala ---------------------------------------------------------------------- diff --git a/kernel-api/src/main/scala/com/ibm/spark/magic/InternalClassLoader.scala b/kernel-api/src/main/scala/com/ibm/spark/magic/InternalClassLoader.scala deleted file mode 100644 index 349efa6..0000000 --- a/kernel-api/src/main/scala/com/ibm/spark/magic/InternalClassLoader.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2014 IBM Corp. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.spark.magic - -/** - * Represents a classloader that can load classes from within. - * - * @param classLoader The classloader to use for internal retrieval - * (defaults to self's classloader) - */ -class InternalClassLoader( - classLoader: ClassLoader = classOf[InternalClassLoader].getClassLoader -) extends ClassLoader(classLoader) { - - // TODO: Provides an exposed reference to the super loadClass to be stubbed - // out in tests. - private[magic] def parentLoadClass(name: String, resolve: Boolean) = - super.loadClass(name, resolve) - - /** - * Attempts to load the class using the local package of the builtin loader - * as the base of the name if unable to load normally. - * - * @param name The name of the class to load - * @param resolve If true, then resolve the class - * - * @return The class instance of a ClassNotFoundException - */ - override def loadClass(name: String, resolve: Boolean): Class[_] = - try { - val packageName = this.getClass.getPackage.getName - val className = name.split('.').last - - parentLoadClass(packageName + "." + className, resolve) - } catch { - case ex: ClassNotFoundException => - parentLoadClass(name, resolve) - } -}