http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/server/interactive/CreateInteractiveRequestSpec.scala ---------------------------------------------------------------------- diff --git a/server/src/test/scala/com/cloudera/livy/server/interactive/CreateInteractiveRequestSpec.scala b/server/src/test/scala/com/cloudera/livy/server/interactive/CreateInteractiveRequestSpec.scala deleted file mode 100644 index 168d487..0000000 --- a/server/src/test/scala/com/cloudera/livy/server/interactive/CreateInteractiveRequestSpec.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.server.interactive - -import com.fasterxml.jackson.databind.ObjectMapper -import org.scalatest.FunSpec - -import com.cloudera.livy.LivyBaseUnitTestSuite -import com.cloudera.livy.sessions.{PySpark, SessionKindModule} - -class CreateInteractiveRequestSpec extends FunSpec with LivyBaseUnitTestSuite { - - private val mapper = new ObjectMapper() - .registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule) - .registerModule(new SessionKindModule()) - - describe("CreateInteractiveRequest") { - - it("should have default values for fields after deserialization") { - val json = """{ "kind" : "pyspark" }""" - val req = mapper.readValue(json, classOf[CreateInteractiveRequest]) - assert(req.kind === PySpark()) - assert(req.proxyUser === None) - assert(req.jars === List()) - assert(req.pyFiles === List()) - assert(req.files === List()) - assert(req.driverMemory === None) - assert(req.driverCores === None) - assert(req.executorMemory === None) - assert(req.executorCores === None) - assert(req.numExecutors === None) - assert(req.archives === List()) - assert(req.queue === None) - assert(req.name === None) - assert(req.conf === Map()) - } - - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionServletSpec.scala ---------------------------------------------------------------------- diff --git a/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionServletSpec.scala b/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionServletSpec.scala deleted file mode 100644 index c546718..0000000 --- a/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionServletSpec.scala +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.server.interactive - -import java.util.concurrent.atomic.AtomicInteger -import javax.servlet.http.{HttpServletRequest, HttpServletResponse} - -import scala.collection.JavaConverters._ -import scala.concurrent.Future - -import org.json4s.jackson.Json4sScalaModule -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.mockito.invocation.InvocationOnMock -import org.mockito.stubbing.Answer -import org.scalatest.Entry -import org.scalatest.mock.MockitoSugar.mock - -import com.cloudera.livy.{ExecuteRequest, LivyConf} -import com.cloudera.livy.client.common.HttpMessages.SessionInfo -import com.cloudera.livy.rsc.driver.{Statement, StatementState} -import com.cloudera.livy.server.recovery.SessionStore -import com.cloudera.livy.sessions._ -import com.cloudera.livy.utils.AppInfo - -class InteractiveSessionServletSpec extends BaseInteractiveServletSpec { - - mapper.registerModule(new Json4sScalaModule()) - - class MockInteractiveSessionServlet( - sessionManager: InteractiveSessionManager, - conf: LivyConf) - extends InteractiveSessionServlet(sessionManager, mock[SessionStore], conf) { - - private var statements = IndexedSeq[Statement]() - - override protected def createSession(req: HttpServletRequest): InteractiveSession = { - val statementCounter = new AtomicInteger() - - val session = mock[InteractiveSession] - when(session.kind).thenReturn(Spark()) - when(session.appId).thenReturn(None) - when(session.appInfo).thenReturn(AppInfo()) - when(session.logLines()).thenReturn(IndexedSeq()) - when(session.state).thenReturn(SessionState.Idle()) - when(session.stop()).thenReturn(Future.successful(())) - when(session.proxyUser).thenReturn(None) - when(session.statements).thenAnswer( - new Answer[IndexedSeq[Statement]]() { - override def answer(args: InvocationOnMock): IndexedSeq[Statement] = statements - }) - when(session.executeStatement(any(classOf[ExecuteRequest]))).thenAnswer( - new Answer[Statement]() { - override def answer(args: InvocationOnMock): Statement = { - val id = statementCounter.getAndIncrement - val statement = new Statement(id, "1+1", StatementState.Available, "1") - - statements :+= statement - statement - } - }) - when(session.cancelStatement(anyInt())).thenAnswer( - new Answer[Unit] { - override def answer(args: InvocationOnMock): Unit = { - statements = IndexedSeq( - new Statement(statementCounter.get(), null, StatementState.Cancelled, null)) - } - } - ) - - session - } - - } - - override def createServlet(): InteractiveSessionServlet = { - val conf = createConf() - val sessionManager = new InteractiveSessionManager(conf, mock[SessionStore], Some(Seq.empty)) - new MockInteractiveSessionServlet(sessionManager, conf) - } - - it("should setup and tear down an interactive session") { - jget[Map[String, Any]]("/") { data => - data("sessions") should equal(Seq()) - } - - jpost[Map[String, Any]]("/", createRequest()) { data => - header("Location") should equal("/0") - data("id") should equal (0) - - val session = servlet.sessionManager.get(0) - session should be (defined) - } - - jget[Map[String, Any]]("/0") { data => - data("id") should equal (0) - data("state") should equal ("idle") - - val batch = servlet.sessionManager.get(0) - batch should be (defined) - } - - jpost[Map[String, Any]]("/0/statements", ExecuteRequest("foo")) { data => - data("id") should be (0) - data("code") shouldBe "1+1" - data("progress") should be (0.0) - data("output") shouldBe 1 - } - - jget[Map[String, Any]]("/0/statements") { data => - data("total_statements") should be (1) - data("statements").asInstanceOf[Seq[Map[String, Any]]](0)("id") should be (0) - } - - jpost[Map[String, Any]]("/0/statements/0/cancel", null, HttpServletResponse.SC_OK) { data => - data should equal(Map("msg" -> "canceled")) - } - - jget[Map[String, Any]]("/0/statements") { data => - data("total_statements") should be (1) - data("statements").asInstanceOf[Seq[Map[String, Any]]](0)("state") should be ("cancelled") - } - - jdelete[Map[String, Any]]("/0") { data => - data should equal (Map("msg" -> "deleted")) - - val session = servlet.sessionManager.get(0) - session should not be defined - } - } - - it("should show session properties") { - val id = 0 - val appId = "appid" - val owner = "owner" - val proxyUser = "proxyUser" - val state = SessionState.Running() - val kind = Spark() - val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL")) - val log = IndexedSeq[String]("log1", "log2") - - val session = mock[InteractiveSession] - when(session.id).thenReturn(id) - when(session.appId).thenReturn(Some(appId)) - when(session.owner).thenReturn(owner) - when(session.proxyUser).thenReturn(Some(proxyUser)) - when(session.state).thenReturn(state) - when(session.kind).thenReturn(kind) - when(session.appInfo).thenReturn(appInfo) - when(session.logLines()).thenReturn(log) - - val req = mock[HttpServletRequest] - - val view = servlet.asInstanceOf[InteractiveSessionServlet].clientSessionView(session, req) - .asInstanceOf[SessionInfo] - - view.id shouldEqual id - view.appId shouldEqual appId - view.owner shouldEqual owner - view.proxyUser shouldEqual proxyUser - view.state shouldEqual state.toString - view.kind shouldEqual kind.toString - view.appInfo should contain (Entry(AppInfo.DRIVER_LOG_URL_NAME, appInfo.driverLogUrl.get)) - view.appInfo should contain (Entry(AppInfo.SPARK_UI_URL_NAME, appInfo.sparkUiUrl.get)) - view.log shouldEqual log.asJava - } - -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala ---------------------------------------------------------------------- diff --git a/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala deleted file mode 100644 index 7d92a43..0000000 --- a/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.server.interactive - -import java.net.URI - -import scala.concurrent.Await -import scala.concurrent.duration._ -import scala.language.postfixOps - -import org.apache.spark.launcher.SparkLauncher -import org.json4s.{DefaultFormats, Extraction, JValue} -import org.json4s.jackson.JsonMethods.parse -import org.mockito.{Matchers => MockitoMatchers} -import org.mockito.Matchers._ -import org.mockito.Mockito.{atLeastOnce, verify, when} -import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers} -import org.scalatest.concurrent.Eventually._ -import org.scalatest.mock.MockitoSugar.mock - -import com.cloudera.livy.{ExecuteRequest, JobHandle, LivyBaseUnitTestSuite, LivyConf} -import com.cloudera.livy.rsc.{PingJob, RSCClient, RSCConf} -import com.cloudera.livy.rsc.driver.StatementState -import com.cloudera.livy.server.recovery.SessionStore -import com.cloudera.livy.sessions.{PySpark, SessionState, Spark} -import com.cloudera.livy.utils.{AppInfo, SparkApp} - -class InteractiveSessionSpec extends FunSpec - with Matchers with BeforeAndAfterAll with LivyBaseUnitTestSuite { - - private val livyConf = new LivyConf() - livyConf.set(LivyConf.REPL_JARS, "dummy.jar") - .set(LivyConf.LIVY_SPARK_VERSION, "1.6.0") - .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10.5") - - implicit val formats = DefaultFormats - - private var session: InteractiveSession = null - - private def createSession( - sessionStore: SessionStore = mock[SessionStore], - mockApp: Option[SparkApp] = None): InteractiveSession = { - assume(sys.env.get("SPARK_HOME").isDefined, "SPARK_HOME is not set.") - - val req = new CreateInteractiveRequest() - req.kind = PySpark() - req.driverMemory = Some("512m") - req.driverCores = Some(1) - req.executorMemory = Some("512m") - req.executorCores = Some(1) - req.name = Some("InteractiveSessionSpec") - req.conf = Map( - SparkLauncher.DRIVER_EXTRA_CLASSPATH -> sys.props("java.class.path"), - RSCConf.Entry.LIVY_JARS.key() -> "" - ) - InteractiveSession.create(0, null, None, livyConf, req, sessionStore, mockApp) - } - - private def executeStatement(code: String): JValue = { - val id = session.executeStatement(ExecuteRequest(code)).id - eventually(timeout(30 seconds), interval(100 millis)) { - val s = session.getStatement(id).get - s.state.get() shouldBe StatementState.Available - parse(s.output) - } - } - - override def afterAll(): Unit = { - if (session != null) { - Await.ready(session.stop(), 30 seconds) - session = null - } - super.afterAll() - } - - private def withSession(desc: String)(fn: (InteractiveSession) => Unit): Unit = { - it(desc) { - assume(session != null, "No active session.") - eventually(timeout(30 seconds), interval(100 millis)) { - session.state shouldBe a[SessionState.Idle] - } - fn(session) - } - } - - describe("A spark session") { - - it("should get scala version matched jars with livy.repl.jars") { - val testedJars = Seq( - "test_2.10-0.1.jar", - "local://dummy-path/test/test1_2.10-1.0.jar", - "file:///dummy-path/test/test2_2.11-1.0-SNAPSHOT.jar", - "hdfs:///dummy-path/test/test3.jar", - "non-jar", - "dummy.jar" - ) - val livyConf = new LivyConf(false) - .set(LivyConf.REPL_JARS, testedJars.mkString(",")) - .set(LivyConf.LIVY_SPARK_VERSION, "1.6.2") - .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10") - val properties = InteractiveSession.prepareBuilderProp(Map.empty, Spark(), livyConf) - assert(properties(LivyConf.SPARK_JARS).split(",").toSet === Set("test_2.10-0.1.jar", - "local://dummy-path/test/test1_2.10-1.0.jar", - "hdfs:///dummy-path/test/test3.jar", - "dummy.jar")) - - livyConf.set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.11") - val properties1 = InteractiveSession.prepareBuilderProp(Map.empty, Spark(), livyConf) - assert(properties1(LivyConf.SPARK_JARS).split(",").toSet === Set( - "file:///dummy-path/test/test2_2.11-1.0-SNAPSHOT.jar", - "hdfs:///dummy-path/test/test3.jar", - "dummy.jar")) - } - - - it("should set rsc jars through livy conf") { - val rscJars = Set( - "dummy.jar", - "local:///dummy-path/dummy1.jar", - "file:///dummy-path/dummy2.jar", - "hdfs:///dummy-path/dummy3.jar") - val livyConf = new LivyConf(false) - .set(LivyConf.REPL_JARS, "dummy.jar") - .set(LivyConf.RSC_JARS, rscJars.mkString(",")) - .set(LivyConf.LIVY_SPARK_VERSION, "1.6.2") - .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10") - val properties = InteractiveSession.prepareBuilderProp(Map.empty, Spark(), livyConf) - // if livy.rsc.jars is configured in LivyConf, it should be passed to RSCConf. - properties(RSCConf.Entry.LIVY_JARS.key()).split(",").toSet === rscJars - - val rscJars1 = Set( - "foo.jar", - "local:///dummy-path/foo1.jar", - "file:///dummy-path/foo2.jar", - "hdfs:///dummy-path/foo3.jar") - val properties1 = InteractiveSession.prepareBuilderProp( - Map(RSCConf.Entry.LIVY_JARS.key() -> rscJars1.mkString(",")), Spark(), livyConf) - // if rsc jars are configured both in LivyConf and RSCConf, RSCConf should take precedence. - properties1(RSCConf.Entry.LIVY_JARS.key()).split(",").toSet === rscJars1 - } - - it("should start in the idle state") { - session = createSession() - session.state should (be(a[SessionState.Starting]) or be(a[SessionState.Idle])) - } - - it("should update appId and appInfo and session store") { - val mockApp = mock[SparkApp] - val sessionStore = mock[SessionStore] - val session = createSession(sessionStore, Some(mockApp)) - - val expectedAppId = "APPID" - session.appIdKnown(expectedAppId) - session.appId shouldEqual Some(expectedAppId) - - val expectedAppInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL")) - session.infoChanged(expectedAppInfo) - session.appInfo shouldEqual expectedAppInfo - - verify(sessionStore, atLeastOnce()).save( - MockitoMatchers.eq(InteractiveSession.RECOVERY_SESSION_TYPE), anyObject()) - } - - withSession("should execute `1 + 2` == 3") { session => - val result = executeStatement("1 + 2") - val expectedResult = Extraction.decompose(Map( - "status" -> "ok", - "execution_count" -> 0, - "data" -> Map( - "text/plain" -> "3" - ) - )) - - result should equal (expectedResult) - } - - withSession("should report an error if accessing an unknown variable") { session => - val result = executeStatement("x") - val expectedResult = Extraction.decompose(Map( - "status" -> "error", - "execution_count" -> 1, - "ename" -> "NameError", - "evalue" -> "name 'x' is not defined", - "traceback" -> List( - "Traceback (most recent call last):\n", - "NameError: name 'x' is not defined\n" - ) - )) - - result should equal (expectedResult) - eventually(timeout(10 seconds), interval(30 millis)) { - session.state shouldBe a[SessionState.Idle] - } - } - - withSession("should get statement progress along with statement result") { session => - val code = - """ - |from time import sleep - |sleep(3) - """.stripMargin - val statement = session.executeStatement(ExecuteRequest(code)) - statement.progress should be (0.0) - - eventually(timeout(10 seconds), interval(100 millis)) { - val s = session.getStatement(statement.id).get - s.state.get() shouldBe StatementState.Available - s.progress should be (1.0) - } - } - - withSession("should error out the session if the interpreter dies") { session => - session.executeStatement(ExecuteRequest("import os; os._exit(666)")) - eventually(timeout(30 seconds), interval(100 millis)) { - session.state shouldBe a[SessionState.Error] - } - } - } - - describe("recovery") { - it("should recover session") { - val conf = new LivyConf() - val sessionStore = mock[SessionStore] - val mockClient = mock[RSCClient] - when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]]) - val m = - InteractiveRecoveryMetadata( - 78, None, "appTag", Spark(), 0, null, None, Some(URI.create(""))) - val s = InteractiveSession.recover(m, conf, sessionStore, None, Some(mockClient)) - - s.state shouldBe a[SessionState.Recovering] - - s.appIdKnown("appId") - verify(sessionStore, atLeastOnce()).save( - MockitoMatchers.eq(InteractiveSession.RECOVERY_SESSION_TYPE), anyObject()) - } - - it("should recover session to dead state if rscDriverUri is unknown") { - val conf = new LivyConf() - val sessionStore = mock[SessionStore] - val m = InteractiveRecoveryMetadata( - 78, Some("appId"), "appTag", Spark(), 0, null, None, None) - val s = InteractiveSession.recover(m, conf, sessionStore, None) - - s.state shouldBe a[SessionState.Dead] - s.logLines().mkString should include("RSCDriver URI is unknown") - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/server/interactive/JobApiSpec.scala ---------------------------------------------------------------------- diff --git a/server/src/test/scala/com/cloudera/livy/server/interactive/JobApiSpec.scala b/server/src/test/scala/com/cloudera/livy/server/interactive/JobApiSpec.scala deleted file mode 100644 index f8c7c3f..0000000 --- a/server/src/test/scala/com/cloudera/livy/server/interactive/JobApiSpec.scala +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.server.interactive - -import java.io.File -import java.net.URI -import java.nio.ByteBuffer -import java.nio.file.{Files, Paths} -import javax.servlet.http.HttpServletResponse._ - -import scala.concurrent.duration._ -import scala.io.Source -import scala.language.postfixOps - -import org.scalatest.concurrent.Eventually._ -import org.scalatest.mock.MockitoSugar.mock - -import com.cloudera.livy.{Job, JobHandle} -import com.cloudera.livy.client.common.{BufferUtils, Serializer} -import com.cloudera.livy.client.common.HttpMessages._ -import com.cloudera.livy.server.RemoteUserOverride -import com.cloudera.livy.server.recovery.SessionStore -import com.cloudera.livy.sessions.{InteractiveSessionManager, SessionState} -import com.cloudera.livy.test.jobs.{Echo, GetCurrentUser} - -class JobApiSpec extends BaseInteractiveServletSpec { - - private val PROXY = "__proxy__" - - private var sessionId: Int = -1 - - override def createServlet(): InteractiveSessionServlet = { - val conf = createConf() - val sessionStore = mock[SessionStore] - val sessionManager = new InteractiveSessionManager(conf, sessionStore, Some(Seq.empty)) - new InteractiveSessionServlet(sessionManager, sessionStore, conf) with RemoteUserOverride - } - - def withSessionId(desc: String)(fn: (Int) => Unit): Unit = { - it(desc) { - assume(sessionId != -1, "No active session.") - fn(sessionId) - } - } - - describe("Interactive Servlet") { - - it("should create sessions") { - jpost[SessionInfo]("/", createRequest()) { data => - waitForIdle(data.id) - header("Location") should equal("/0") - data.id should equal (0) - sessionId = data.id - } - } - - withSessionId("should handle asynchronous jobs") { testJobSubmission(_, false) } - - withSessionId("should handle synchronous jobs") { testJobSubmission(_, true) } - - // Test that the file does get copied over to the live home dir on HDFS - does not test end - // to end that the RSCClient class copies it over to the app. - withSessionId("should support file uploads") { id => - testResourceUpload("file", id) - } - - withSessionId("should support jar uploads") { id => - testResourceUpload("jar", id) - } - - withSessionId("should monitor async Spark jobs") { sid => - val ser = new Serializer() - val job = BufferUtils.toByteArray(ser.serialize(new Echo("hello"))) - var jobId: Long = -1L - jpost[JobStatus](s"/$sid/submit-job", new SerializedJob(job)) { status => - jobId = status.id - } - - eventually(timeout(1 minute), interval(100 millis)) { - jget[JobStatus](s"/$sid/jobs/$jobId") { status => - status.state should be (JobHandle.State.SUCCEEDED) - } - } - } - - withSessionId("should update last activity on connect") { sid => - val currentActivity = servlet.sessionManager.get(sid).get.lastActivity - jpost[SessionInfo](s"/$sid/connect", null, expectedStatus = SC_OK) { info => - val newActivity = servlet.sessionManager.get(sid).get.lastActivity - assert(newActivity > currentActivity) - } - } - - withSessionId("should tear down sessions") { id => - jdelete[Map[String, Any]](s"/$id") { data => - data should equal (Map("msg" -> "deleted")) - } - jget[Map[String, Any]]("/") { data => - data("sessions") match { - case contents: Seq[_] => contents.size should equal (0) - case _ => fail("Response is not an array.") - } - } - - // Make sure the session's staging directory was cleaned up. - assert(tempDir.listFiles().length === 0) - } - - it("should support user impersonation") { - val headers = makeUserHeaders(PROXY) - jpost[SessionInfo]("/", createRequest(inProcess = false), headers = headers) { data => - try { - waitForIdle(data.id) - data.owner should be (PROXY) - data.proxyUser should be (PROXY) - val user = runJob(data.id, new GetCurrentUser(), headers = headers) - user should be (PROXY) - } finally { - deleteSession(data.id) - } - } - } - - it("should honor impersonation requests") { - val request = createRequest(inProcess = false) - request.proxyUser = Some(PROXY) - jpost[SessionInfo]("/", request, headers = adminHeaders) { data => - try { - waitForIdle(data.id) - data.owner should be (ADMIN) - data.proxyUser should be (PROXY) - val user = runJob(data.id, new GetCurrentUser(), headers = adminHeaders) - user should be (PROXY) - - // Test that files are uploaded to a new session directory. - assert(tempDir.listFiles().length === 0) - testResourceUpload("file", data.id) - } finally { - deleteSession(data.id) - assert(tempDir.listFiles().length === 0) - } - } - } - - it("should respect config black list") { - jpost[SessionInfo]("/", createRequest(extraConf = BLACKLISTED_CONFIG), - expectedStatus = SC_BAD_REQUEST) { _ => } - } - - } - - private def waitForIdle(id: Int): Unit = { - eventually(timeout(1 minute), interval(100 millis)) { - jget[SessionInfo](s"/$id") { status => - status.state should be (SessionState.Idle().toString()) - } - } - } - - private def deleteSession(id: Int): Unit = { - jdelete[Map[String, Any]](s"/$id", headers = adminHeaders) { _ => } - } - - private def testResourceUpload(cmd: String, sessionId: Int): Unit = { - val f = File.createTempFile("uploadTestFile", cmd) - val conf = createConf() - - Files.write(Paths.get(f.getAbsolutePath), "Test data".getBytes()) - - jupload[Unit](s"/$sessionId/upload-$cmd", Map(cmd -> f), expectedStatus = SC_OK) { _ => - // There should be a single directory under the staging dir. - val subdirs = tempDir.listFiles() - assert(subdirs.length === 1) - val stagingDir = subdirs(0).toURI().toString() - - val resultFile = new File(new URI(s"$stagingDir/${f.getName}")) - resultFile.deleteOnExit() - resultFile.exists() should be(true) - Source.fromFile(resultFile).mkString should be("Test data") - } - } - - private def testJobSubmission(sid: Int, sync: Boolean): Unit = { - val result = runJob(sid, new Echo(42), sync = sync) - result should be (42) - } - - private def runJob[T]( - sid: Int, - job: Job[T], - sync: Boolean = false, - headers: Map[String, String] = defaultHeaders): T = { - val ser = new Serializer() - val jobData = BufferUtils.toByteArray(ser.serialize(job)) - val route = if (sync) s"/$sid/submit-job" else s"/$sid/run-job" - var jobId: Long = -1L - jpost[JobStatus](route, new SerializedJob(jobData), headers = headers) { data => - jobId = data.id - } - - var result: Option[T] = None - eventually(timeout(1 minute), interval(100 millis)) { - jget[JobStatus](s"/$sid/jobs/$jobId") { status => - status.id should be (jobId) - status.state should be (JobHandle.State.SUCCEEDED) - result = Some(ser.deserialize(ByteBuffer.wrap(status.result)).asInstanceOf[T]) - } - } - result.getOrElse(throw new IllegalStateException()) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/server/interactive/SessionHeartbeatSpec.scala ---------------------------------------------------------------------- diff --git a/server/src/test/scala/com/cloudera/livy/server/interactive/SessionHeartbeatSpec.scala b/server/src/test/scala/com/cloudera/livy/server/interactive/SessionHeartbeatSpec.scala deleted file mode 100644 index 36eb7ef..0000000 --- a/server/src/test/scala/com/cloudera/livy/server/interactive/SessionHeartbeatSpec.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.server.interactive - -import scala.concurrent.duration._ -import scala.language.postfixOps - -import org.mockito.Mockito.{never, verify, when} -import org.scalatest.{FunSpec, Matchers} -import org.scalatest.concurrent.Eventually._ -import org.scalatest.mock.MockitoSugar.mock - -import com.cloudera.livy.LivyConf -import com.cloudera.livy.server.recovery.SessionStore -import com.cloudera.livy.sessions.{Session, SessionManager} -import com.cloudera.livy.sessions.Session.RecoveryMetadata - -class SessionHeartbeatSpec extends FunSpec with Matchers { - describe("SessionHeartbeat") { - class TestHeartbeat(override val heartbeatTimeout: FiniteDuration) extends SessionHeartbeat {} - - it("should not expire if heartbeat was never called.") { - val t = new TestHeartbeat(Duration.Zero) - t.heartbeatExpired shouldBe false - } - - it("should expire if time has elapsed.") { - val t = new TestHeartbeat(Duration.fromNanos(1)) - t.heartbeat() - eventually(timeout(2 nano), interval(1 nano)) { - t.heartbeatExpired shouldBe true - } - } - - it("should not expire if time hasn't elapsed.") { - val t = new TestHeartbeat(Duration.create(1, DAYS)) - t.heartbeat() - t.heartbeatExpired shouldBe false - } - } - - describe("SessionHeartbeatWatchdog") { - abstract class TestSession extends Session(0, null, null) with SessionHeartbeat {} - class TestWatchdog(conf: LivyConf) - extends SessionManager[TestSession, RecoveryMetadata]( - conf, - { _ => assert(false).asInstanceOf[TestSession] }, - mock[SessionStore], - "test", - Some(Seq.empty)) - with SessionHeartbeatWatchdog[TestSession, RecoveryMetadata] {} - - it("should delete only expired sessions") { - val expiredSession: TestSession = mock[TestSession] - when(expiredSession.id).thenReturn(0) - when(expiredSession.heartbeatExpired).thenReturn(true) - - val nonExpiredSession: TestSession = mock[TestSession] - when(nonExpiredSession.id).thenReturn(1) - when(nonExpiredSession.heartbeatExpired).thenReturn(false) - - val n = new TestWatchdog(new LivyConf()) - - n.register(expiredSession) - n.register(nonExpiredSession) - n.deleteExpiredSessions() - - verify(expiredSession).stop() - verify(nonExpiredSession, never).stop() - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/server/recovery/BlackholeStateStoreSpec.scala ---------------------------------------------------------------------- diff --git a/server/src/test/scala/com/cloudera/livy/server/recovery/BlackholeStateStoreSpec.scala b/server/src/test/scala/com/cloudera/livy/server/recovery/BlackholeStateStoreSpec.scala deleted file mode 100644 index c11feff..0000000 --- a/server/src/test/scala/com/cloudera/livy/server/recovery/BlackholeStateStoreSpec.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.server.recovery - -import org.scalatest.FunSpec -import org.scalatest.Matchers._ - -import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf} - -class BlackholeStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { - describe("BlackholeStateStore") { - val stateStore = new BlackholeStateStore(new LivyConf()) - - it("set should not throw") { - stateStore.set("", 1.asInstanceOf[Object]) - } - - it("get should return None") { - val v = stateStore.get[Object]("") - v shouldBe None - } - - it("getChildren should return empty list") { - val c = stateStore.getChildren("") - c shouldBe empty - } - - it("remove should not throw") { - stateStore.remove("") - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/server/recovery/FileSystemStateStoreSpec.scala ---------------------------------------------------------------------- diff --git a/server/src/test/scala/com/cloudera/livy/server/recovery/FileSystemStateStoreSpec.scala b/server/src/test/scala/com/cloudera/livy/server/recovery/FileSystemStateStoreSpec.scala deleted file mode 100644 index 9b7b0f3..0000000 --- a/server/src/test/scala/com/cloudera/livy/server/recovery/FileSystemStateStoreSpec.scala +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.server.recovery - -import java.io.{FileNotFoundException, InputStream, IOException} -import java.util - -import org.apache.hadoop.fs._ -import org.apache.hadoop.fs.Options.{CreateOpts, Rename} -import org.apache.hadoop.fs.permission.FsPermission -import org.hamcrest.Description -import org.mockito.ArgumentMatcher -import org.mockito.Matchers.{any, anyInt, argThat, eq => equal} -import org.mockito.Mockito.{atLeastOnce, verify, when} -import org.mockito.internal.matchers.Equals -import org.mockito.invocation.InvocationOnMock -import org.mockito.stubbing.Answer -import org.scalatest.FunSpec -import org.scalatest.Matchers._ -import org.scalatest.mock.MockitoSugar.mock - -import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf} - -class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { - describe("FileSystemStateStore") { - def pathEq(wantedPath: String): Path = argThat(new ArgumentMatcher[Path] { - private val matcher = new Equals(wantedPath) - - override def matches(path: Any): Boolean = matcher.matches(path.toString) - - override def describeTo(d: Description): Unit = { matcher.describeTo(d) } - }) - - def makeConf(): LivyConf = { - val conf = new LivyConf() - conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "file://tmp/") - - conf - } - - def mockFileContext(rootDirPermission: String): FileContext = { - val fileContext = mock[FileContext] - val rootDirStatus = mock[FileStatus] - when(fileContext.getFileStatus(any())).thenReturn(rootDirStatus) - when(rootDirStatus.getPermission).thenReturn(new FsPermission(rootDirPermission)) - - fileContext - } - - it("should throw if url is not configured") { - intercept[IllegalArgumentException](new FileSystemStateStore(new LivyConf())) - } - - it("should set and verify file permission") { - val fileContext = mockFileContext("700") - new FileSystemStateStore(makeConf(), Some(fileContext)) - - verify(fileContext).setUMask(new FsPermission("077")) - } - - it("should reject insecure permission") { - def test(permission: String): Unit = { - val fileContext = mockFileContext(permission) - - intercept[IllegalArgumentException](new FileSystemStateStore(makeConf(), Some(fileContext))) - } - test("600") - test("400") - test("677") - test("670") - test("607") - } - - it("set should write with an intermediate file") { - val fileContext = mockFileContext("700") - val outputStream = mock[FSDataOutputStream] - when(fileContext.create(pathEq("/key.tmp"), any[util.EnumSet[CreateFlag]], any[CreateOpts])) - .thenReturn(outputStream) - - val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext)) - - stateStore.set("key", "value") - - verify(outputStream).write(""""value"""".getBytes) - verify(outputStream, atLeastOnce).close() - - - verify(fileContext).rename(pathEq("/key.tmp"), pathEq("/key"), equal(Rename.OVERWRITE)) - verify(fileContext).delete(pathEq("/.key.tmp.crc"), equal(false)) - } - - it("get should read file") { - val fileContext = mockFileContext("700") - abstract class MockInputStream extends InputStream with Seekable with PositionedReadable {} - val inputStream: InputStream = mock[MockInputStream] - when(inputStream.read(any[Array[Byte]](), anyInt(), anyInt())).thenAnswer(new Answer[Int] { - private var firstCall = true - override def answer(invocation: InvocationOnMock): Int = { - if (firstCall) { - firstCall = false - val buf = invocation.getArguments()(0).asInstanceOf[Array[Byte]] - val b = """"value"""".getBytes() - b.copyToArray(buf) - b.length - } else { - -1 - } - } - }) - - when(fileContext.open(pathEq("/key"))).thenReturn(new FSDataInputStream(inputStream)) - - val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext)) - - stateStore.get[String]("key") shouldBe Some("value") - - verify(inputStream, atLeastOnce).close() - } - - it("get non-existent key should return None") { - val fileContext = mockFileContext("700") - when(fileContext.open(any())).thenThrow(new FileNotFoundException("Unit test")) - - val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext)) - - stateStore.get[String]("key") shouldBe None - } - - it("getChildren should list file") { - val parentPath = "path" - def makeFileStatus(name: String): FileStatus = { - val fs = new FileStatus() - fs.setPath(new Path(parentPath, name)) - fs - } - val children = Seq("c1", "c2") - - val fileContext = mockFileContext("700") - val util = mock[FileContext#Util] - when(util.listStatus(pathEq(s"/$parentPath"))) - .thenReturn(children.map(makeFileStatus).toArray) - when(fileContext.util()).thenReturn(util) - - val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext)) - stateStore.getChildren(parentPath) should contain theSameElementsAs children - } - - def getChildrenErrorTest(error: Exception): Unit = { - val parentPath = "path" - - val fileContext = mockFileContext("700") - val util = mock[FileContext#Util] - when(util.listStatus(pathEq(s"/$parentPath"))).thenThrow(error) - when(fileContext.util()).thenReturn(util) - - val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext)) - stateStore.getChildren(parentPath) shouldBe empty - } - - it("getChildren should return empty list if the key doesn't exist") { - getChildrenErrorTest(new IOException("Unit test")) - } - - it("getChildren should return empty list if key doesn't exist") { - getChildrenErrorTest(new FileNotFoundException("Unit test")) - } - - it("remove should delete file") { - val fileContext = mockFileContext("700") - - val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext)) - stateStore.remove("key") - - verify(fileContext).delete(pathEq("/key"), equal(false)) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/server/recovery/SessionStoreSpec.scala ---------------------------------------------------------------------- diff --git a/server/src/test/scala/com/cloudera/livy/server/recovery/SessionStoreSpec.scala b/server/src/test/scala/com/cloudera/livy/server/recovery/SessionStoreSpec.scala deleted file mode 100644 index 3435c2e..0000000 --- a/server/src/test/scala/com/cloudera/livy/server/recovery/SessionStoreSpec.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.server.recovery - -import scala.util.Success - -import org.mockito.Mockito._ -import org.scalatest.FunSpec -import org.scalatest.Matchers._ -import org.scalatest.mock.MockitoSugar.mock - -import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf} -import com.cloudera.livy.sessions.Session.RecoveryMetadata - -class SessionStoreSpec extends FunSpec with LivyBaseUnitTestSuite { - describe("SessionStore") { - case class TestRecoveryMetadata(id: Int) extends RecoveryMetadata - - val sessionType = "test" - val sessionPath = s"v1/$sessionType" - val sessionManagerPath = s"v1/$sessionType/state" - - val conf = new LivyConf() - it("should set session state and session counter when saving a session.") { - val stateStore = mock[StateStore] - val sessionStore = new SessionStore(conf, stateStore) - - val m = TestRecoveryMetadata(99) - sessionStore.save(sessionType, m) - verify(stateStore).set(s"$sessionPath/99", m) - } - - it("should return existing sessions") { - val validMetadata = Map( - "0" -> Some(TestRecoveryMetadata(0)), - "5" -> None, - "77" -> Some(TestRecoveryMetadata(77))) - val corruptedMetadata = Map( - "7" -> new RuntimeException("Test"), - "11212" -> new RuntimeException("Test") - ) - val stateStore = mock[StateStore] - val sessionStore = new SessionStore(conf, stateStore) - when(stateStore.getChildren(sessionPath)) - .thenReturn((validMetadata ++ corruptedMetadata).keys.toList) - - validMetadata.foreach { case (id, m) => - when(stateStore.get[TestRecoveryMetadata](s"$sessionPath/$id")).thenReturn(m) - } - - corruptedMetadata.foreach { case (id, ex) => - when(stateStore.get[TestRecoveryMetadata](s"$sessionPath/$id")).thenThrow(ex) - } - - val s = sessionStore.getAllSessions[TestRecoveryMetadata](sessionType) - // Verify normal metadata are retrieved. - s.filter(_.isSuccess) should contain theSameElementsAs - validMetadata.values.filter(_.isDefined).map(m => Success(m.get)) - // Verify exceptions are wrapped as in Try and are returned. - s.filter(_.isFailure) should have size corruptedMetadata.size - } - - it("should not throw if the state store is empty") { - val stateStore = mock[StateStore] - val sessionStore = new SessionStore(conf, stateStore) - when(stateStore.getChildren(sessionPath)).thenReturn(Seq.empty) - - val s = sessionStore.getAllSessions[TestRecoveryMetadata](sessionType) - s.filter(_.isSuccess) shouldBe empty - } - - it("should return correct next session id") { - val stateStore = mock[StateStore] - val sessionStore = new SessionStore(conf, stateStore) - - when(stateStore.get[SessionManagerState](sessionManagerPath)).thenReturn(None) - sessionStore.getNextSessionId(sessionType) shouldBe 0 - - val sms = SessionManagerState(100) - when(stateStore.get[SessionManagerState](sessionManagerPath)).thenReturn(Some(sms)) - sessionStore.getNextSessionId(sessionType) shouldBe sms.nextSessionId - } - - it("should remove session") { - val stateStore = mock[StateStore] - val sessionStore = new SessionStore(conf, stateStore) - val id = 1 - - sessionStore.remove(sessionType, 1) - verify(stateStore).remove(s"$sessionPath/$id") - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/server/recovery/StateStoreSpec.scala ---------------------------------------------------------------------- diff --git a/server/src/test/scala/com/cloudera/livy/server/recovery/StateStoreSpec.scala b/server/src/test/scala/com/cloudera/livy/server/recovery/StateStoreSpec.scala deleted file mode 100644 index c0c4918..0000000 --- a/server/src/test/scala/com/cloudera/livy/server/recovery/StateStoreSpec.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.server.recovery - -import scala.reflect.classTag - -import org.scalatest.{BeforeAndAfter, FunSpec} -import org.scalatest.Matchers._ - -import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf} -import com.cloudera.livy.sessions.SessionManager - -class StateStoreSpec extends FunSpec with BeforeAndAfter with LivyBaseUnitTestSuite { - describe("StateStore") { - after { - StateStore.cleanup() - } - - def createConf(stateStore: String): LivyConf = { - val conf = new LivyConf() - conf.set(LivyConf.RECOVERY_MODE.key, SessionManager.SESSION_RECOVERY_MODE_RECOVERY) - conf.set(LivyConf.RECOVERY_STATE_STORE.key, stateStore) - conf - } - - it("should throw an error on get if it's not initialized") { - intercept[AssertionError] { StateStore.get } - } - - it("should initialize blackhole state store if recovery is disabled") { - StateStore.init(new LivyConf()) - StateStore.get shouldBe a[BlackholeStateStore] - } - - it("should pick the correct store according to state store config") { - StateStore.pickStateStore(createConf("filesystem")) shouldBe classOf[FileSystemStateStore] - StateStore.pickStateStore(createConf("zookeeper")) shouldBe classOf[ZooKeeperStateStore] - } - - it("should return error if an unknown recovery mode is set") { - val conf = new LivyConf() - conf.set(LivyConf.RECOVERY_MODE.key, "unknown") - intercept[IllegalArgumentException] { StateStore.init(conf) } - } - - it("should return error if an unknown state store is set") { - intercept[IllegalArgumentException] { StateStore.init(createConf("unknown")) } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStoreSpec.scala ---------------------------------------------------------------------- diff --git a/server/src/test/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStoreSpec.scala b/server/src/test/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStoreSpec.scala deleted file mode 100644 index 860568f..0000000 --- a/server/src/test/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStoreSpec.scala +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.server.recovery - -import scala.collection.JavaConverters._ - -import org.apache.curator.framework.CuratorFramework -import org.apache.curator.framework.api._ -import org.apache.curator.framework.listen.Listenable -import org.apache.zookeeper.data.Stat -import org.mockito.Mockito._ -import org.scalatest.FunSpec -import org.scalatest.Matchers._ -import org.scalatest.mock.MockitoSugar.mock - -import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf} - -class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { - describe("ZooKeeperStateStore") { - case class TestFixture(stateStore: ZooKeeperStateStore, curatorClient: CuratorFramework) - val conf = new LivyConf() - conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "host") - val key = "key" - val prefixedKey = s"/livy/$key" - - def withMock[R](testBody: TestFixture => R): R = { - val curatorClient = mock[CuratorFramework] - when(curatorClient.getUnhandledErrorListenable()) - .thenReturn(mock[Listenable[UnhandledErrorListener]]) - val stateStore = new ZooKeeperStateStore(conf, Some(curatorClient)) - testBody(TestFixture(stateStore, curatorClient)) - } - - def mockExistsBuilder(curatorClient: CuratorFramework, exists: Boolean): Unit = { - val existsBuilder = mock[ExistsBuilder] - when(curatorClient.checkExists()).thenReturn(existsBuilder) - if (exists) { - when(existsBuilder.forPath(prefixedKey)).thenReturn(mock[Stat]) - } - } - - it("should throw on bad config") { - withMock { f => - val conf = new LivyConf() - intercept[IllegalArgumentException] { new ZooKeeperStateStore(conf) } - - conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "host") - conf.set(ZooKeeperStateStore.ZK_RETRY_CONF, "bad") - intercept[IllegalArgumentException] { new ZooKeeperStateStore(conf) } - } - } - - it("set should use curatorClient") { - withMock { f => - mockExistsBuilder(f.curatorClient, true) - - val setDataBuilder = mock[SetDataBuilder] - when(f.curatorClient.setData()).thenReturn(setDataBuilder) - - f.stateStore.set("key", 1.asInstanceOf[Object]) - - verify(f.curatorClient).start() - verify(setDataBuilder).forPath(prefixedKey, Array[Byte](49)) - } - } - - it("set should create parents if they don't exist") { - withMock { f => - mockExistsBuilder(f.curatorClient, false) - - val createBuilder = mock[CreateBuilder] - when(f.curatorClient.create()).thenReturn(createBuilder) - val p = mock[ProtectACLCreateModePathAndBytesable[String]] - when(createBuilder.creatingParentsIfNeeded()).thenReturn(p) - - f.stateStore.set("key", 1.asInstanceOf[Object]) - - verify(f.curatorClient).start() - verify(p).forPath(prefixedKey, Array[Byte](49)) - } - } - - it("get should retrieve retry policy configs") { - conf.set(com.cloudera.livy.server.recovery.ZooKeeperStateStore.ZK_RETRY_CONF, "11,77") - withMock { f => - mockExistsBuilder(f.curatorClient, true) - - f.stateStore.retryPolicy should not be null - f.stateStore.retryPolicy.getN shouldBe 11 - } - } - - it("get should retrieve data from curatorClient") { - withMock { f => - mockExistsBuilder(f.curatorClient, true) - - val getDataBuilder = mock[GetDataBuilder] - when(f.curatorClient.getData()).thenReturn(getDataBuilder) - when(getDataBuilder.forPath(prefixedKey)).thenReturn(Array[Byte](50)) - - val v = f.stateStore.get[Int]("key") - - verify(f.curatorClient).start() - v shouldBe Some(2) - } - } - - it("get should return None if key doesn't exist") { - withMock { f => - mockExistsBuilder(f.curatorClient, false) - - val v = f.stateStore.get[Int]("key") - - verify(f.curatorClient).start() - v shouldBe None - } - } - - it("getChildren should use curatorClient") { - withMock { f => - mockExistsBuilder(f.curatorClient, true) - - val getChildrenBuilder = mock[GetChildrenBuilder] - when(f.curatorClient.getChildren()).thenReturn(getChildrenBuilder) - val children = List("abc", "def") - when(getChildrenBuilder.forPath(prefixedKey)).thenReturn(children.asJava) - - val c = f.stateStore.getChildren("key") - - verify(f.curatorClient).start() - c shouldBe children - } - } - - it("getChildren should return empty list if key doesn't exist") { - withMock { f => - mockExistsBuilder(f.curatorClient, false) - - val c = f.stateStore.getChildren("key") - - verify(f.curatorClient).start() - c shouldBe empty - } - } - - it("remove should use curatorClient") { - withMock { f => - val deleteBuilder = mock[DeleteBuilder] - when(f.curatorClient.delete()).thenReturn(deleteBuilder) - val g = mock[ChildrenDeletable] - when(deleteBuilder.guaranteed()).thenReturn(g) - - f.stateStore.remove(key) - - verify(g).forPath(prefixedKey) - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/sessions/MockSession.scala ---------------------------------------------------------------------- diff --git a/server/src/test/scala/com/cloudera/livy/sessions/MockSession.scala b/server/src/test/scala/com/cloudera/livy/sessions/MockSession.scala deleted file mode 100644 index 9d129bc..0000000 --- a/server/src/test/scala/com/cloudera/livy/sessions/MockSession.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.sessions - -import com.cloudera.livy.LivyConf - -class MockSession(id: Int, owner: String, conf: LivyConf) extends Session(id, owner, conf) { - case class RecoveryMetadata(id: Int) extends Session.RecoveryMetadata() - - override val proxyUser = None - - override protected def stopSession(): Unit = () - - override def logLines(): IndexedSeq[String] = IndexedSeq() - - override def state: SessionState = SessionState.Idle() - - override def recoveryMetadata: RecoveryMetadata = RecoveryMetadata(0) -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala ---------------------------------------------------------------------- diff --git a/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala deleted file mode 100644 index ef9b8c1..0000000 --- a/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.sessions - -import scala.concurrent.{Await, ExecutionContext, Future} -import scala.concurrent.duration._ -import scala.language.postfixOps -import scala.util.{Failure, Try} - -import org.mockito.Mockito.{doReturn, never, verify, when} -import org.scalatest.{FunSpec, Matchers} -import org.scalatest.concurrent.Eventually._ -import org.scalatest.mock.MockitoSugar.mock - -import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf} -import com.cloudera.livy.server.batch.{BatchRecoveryMetadata, BatchSession} -import com.cloudera.livy.server.interactive.InteractiveSession -import com.cloudera.livy.server.recovery.SessionStore -import com.cloudera.livy.sessions.Session.RecoveryMetadata - -class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuite { - implicit def executor: ExecutionContext = ExecutionContext.global - - describe("SessionManager") { - it("should garbage collect old sessions") { - val livyConf = new LivyConf() - livyConf.set(LivyConf.SESSION_TIMEOUT, "100ms") - val manager = new SessionManager[MockSession, RecoveryMetadata]( - livyConf, - { _ => assert(false).asInstanceOf[MockSession] }, - mock[SessionStore], - "test", - Some(Seq.empty)) - val session = manager.register(new MockSession(manager.nextId(), null, livyConf)) - manager.get(session.id).isDefined should be(true) - eventually(timeout(5 seconds), interval(100 millis)) { - Await.result(manager.collectGarbage(), Duration.Inf) - manager.get(session.id) should be(None) - } - } - - it("batch session should not be gc-ed until application is finished") { - val sessionId = 24 - val session = mock[BatchSession] - when(session.id).thenReturn(sessionId) - when(session.stop()).thenReturn(Future {}) - when(session.lastActivity).thenReturn(System.nanoTime()) - - val conf = new LivyConf().set(LivyConf.SESSION_STATE_RETAIN_TIME, "1s") - val sm = new BatchSessionManager(conf, mock[SessionStore], Some(Seq(session))) - testSessionGC(session, sm) - } - - it("interactive session should not gc-ed if session timeout check is off") { - val sessionId = 24 - val session = mock[InteractiveSession] - when(session.id).thenReturn(sessionId) - when(session.stop()).thenReturn(Future {}) - when(session.lastActivity).thenReturn(System.nanoTime()) - - val conf = new LivyConf().set(LivyConf.SESSION_TIMEOUT_CHECK, false) - .set(LivyConf.SESSION_STATE_RETAIN_TIME, "1s") - val sm = new InteractiveSessionManager(conf, mock[SessionStore], Some(Seq(session))) - testSessionGC(session, sm) - } - - def testSessionGC(session: Session, sm: SessionManager[_, _]): Unit = { - - def changeStateAndCheck(s: SessionState)(fn: SessionManager[_, _] => Unit): Unit = { - doReturn(s).when(session).state - Await.result(sm.collectGarbage(), Duration.Inf) - fn(sm) - } - - // Batch session should not be gc-ed when alive - for (s <- Seq(SessionState.Running(), - SessionState.Idle(), - SessionState.Recovering(), - SessionState.NotStarted(), - SessionState.Busy(), - SessionState.ShuttingDown())) { - changeStateAndCheck(s) { sm => sm.get(session.id) should be (Some(session)) } - } - - // Stopped session should be gc-ed after retained timeout - for (s <- Seq(SessionState.Error(), - SessionState.Success(), - SessionState.Dead())) { - eventually(timeout(30 seconds), interval(100 millis)) { - changeStateAndCheck(s) { sm => sm.get(session.id) should be (None) } - } - } - } - } - - describe("BatchSessionManager") { - implicit def executor: ExecutionContext = ExecutionContext.global - - def makeMetadata(id: Int, appTag: String): BatchRecoveryMetadata = { - BatchRecoveryMetadata(id, None, appTag, null, None) - } - - def mockSession(id: Int): BatchSession = { - val session = mock[BatchSession] - when(session.id).thenReturn(id) - when(session.stop()).thenReturn(Future {}) - when(session.lastActivity).thenReturn(System.nanoTime()) - - session - } - - it("should not fail if state store is empty") { - val conf = new LivyConf() - - val sessionStore = mock[SessionStore] - when(sessionStore.getAllSessions[BatchRecoveryMetadata]("batch")) - .thenReturn(Seq.empty) - - val sm = new BatchSessionManager(conf, sessionStore) - sm.nextId() shouldBe 0 - } - - it("should recover sessions from state store") { - val conf = new LivyConf() - conf.set(LivyConf.LIVY_SPARK_MASTER.key, "yarn-cluster") - - val sessionType = "batch" - val nextId = 99 - - val validMetadata = List(makeMetadata(0, "t1"), makeMetadata(77, "t2")).map(Try(_)) - val invalidMetadata = List(Failure(new Exception("Fake invalid metadata"))) - val sessionStore = mock[SessionStore] - when(sessionStore.getNextSessionId(sessionType)).thenReturn(nextId) - when(sessionStore.getAllSessions[BatchRecoveryMetadata](sessionType)) - .thenReturn(validMetadata ++ invalidMetadata) - - val sm = new BatchSessionManager(conf, sessionStore) - sm.nextId() shouldBe nextId - validMetadata.foreach { m => - sm.get(m.get.id) shouldBe defined - } - sm.size shouldBe validMetadata.size - } - - it("should delete sessions from state store") { - val conf = new LivyConf() - - val sessionType = "batch" - val sessionId = 24 - val sessionStore = mock[SessionStore] - val session = mockSession(sessionId) - - val sm = new BatchSessionManager(conf, sessionStore, Some(Seq(session))) - sm.get(sessionId) shouldBe defined - - Await.ready(sm.delete(sessionId).get, 30 seconds) - - verify(sessionStore).remove(sessionType, sessionId) - sm.get(sessionId) shouldBe None - } - - it("should delete sessions on shutdown when recovery is off") { - val conf = new LivyConf() - val sessionId = 24 - val sessionStore = mock[SessionStore] - val session = mockSession(sessionId) - - val sm = new BatchSessionManager(conf, sessionStore, Some(Seq(session))) - sm.get(sessionId) shouldBe defined - sm.shutdown() - - verify(session).stop() - } - - it("should not delete sessions on shutdown with recovery is on") { - val conf = new LivyConf() - conf.set(LivyConf.RECOVERY_MODE, SessionManager.SESSION_RECOVERY_MODE_RECOVERY) - - val sessionId = 24 - val sessionStore = mock[SessionStore] - val session = mockSession(sessionId) - - val sm = new BatchSessionManager(conf, sessionStore, Some(Seq(session))) - sm.get(sessionId) shouldBe defined - sm.shutdown() - - verify(session, never).stop() - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/sessions/SessionSpec.scala ---------------------------------------------------------------------- diff --git a/server/src/test/scala/com/cloudera/livy/sessions/SessionSpec.scala b/server/src/test/scala/com/cloudera/livy/sessions/SessionSpec.scala deleted file mode 100644 index b45a0ed..0000000 --- a/server/src/test/scala/com/cloudera/livy/sessions/SessionSpec.scala +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.sessions - -import java.net.URI - -import org.scalatest.FunSuite - -import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf} - -class SessionSpec extends FunSuite with LivyBaseUnitTestSuite { - - test("use default fs in paths") { - val conf = new LivyConf(false) - conf.hadoopConf.set("fs.defaultFS", "dummy:///") - - val uris = Seq("http://example.com/foo", "hdfs:/bar", "/baz") - val expected = Seq(uris(0), uris(1), "dummy:///baz") - assert(Session.resolveURIs(uris, conf) === expected) - - intercept[IllegalArgumentException] { - Session.resolveURI(new URI("relative_path"), conf) - } - } - - test("local fs whitelist") { - val conf = new LivyConf(false) - conf.set(LivyConf.LOCAL_FS_WHITELIST, "/allowed/,/also_allowed") - - Seq("/allowed/file", "/also_allowed/file").foreach { path => - assert(Session.resolveURI(new URI(path), conf) === new URI("file://" + path)) - } - - Seq("/not_allowed", "/allowed_not_really").foreach { path => - intercept[IllegalArgumentException] { - Session.resolveURI(new URI(path), conf) - } - } - } - - test("conf validation and preparation") { - val conf = new LivyConf(false) - conf.hadoopConf.set("fs.defaultFS", "dummy:///") - conf.set(LivyConf.LOCAL_FS_WHITELIST, "/allowed") - - // Test baseline. - assert(Session.prepareConf(Map(), Nil, Nil, Nil, Nil, conf) === Map("spark.master" -> "local")) - - // Test validations. - intercept[IllegalArgumentException] { - Session.prepareConf(Map("spark.do_not_set" -> "1"), Nil, Nil, Nil, Nil, conf) - } - conf.sparkFileLists.foreach { key => - intercept[IllegalArgumentException] { - Session.prepareConf(Map(key -> "file:/not_allowed"), Nil, Nil, Nil, Nil, conf) - } - } - intercept[IllegalArgumentException] { - Session.prepareConf(Map(), Seq("file:/not_allowed"), Nil, Nil, Nil, conf) - } - intercept[IllegalArgumentException] { - Session.prepareConf(Map(), Nil, Seq("file:/not_allowed"), Nil, Nil, conf) - } - intercept[IllegalArgumentException] { - Session.prepareConf(Map(), Nil, Nil, Seq("file:/not_allowed"), Nil, conf) - } - intercept[IllegalArgumentException] { - Session.prepareConf(Map(), Nil, Nil, Nil, Seq("file:/not_allowed"), conf) - } - - // Test that file lists are merged and resolved. - val base = "/file1.txt" - val other = Seq("/file2.txt") - val expected = Some(Seq("dummy://" + other(0), "dummy://" + base).mkString(",")) - - val userLists = Seq(LivyConf.SPARK_JARS, LivyConf.SPARK_FILES, LivyConf.SPARK_ARCHIVES, - LivyConf.SPARK_PY_FILES) - val baseConf = userLists.map { key => (key -> base) }.toMap - val result = Session.prepareConf(baseConf, other, other, other, other, conf) - userLists.foreach { key => assert(result.get(key) === expected) } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/test/scala/com/cloudera/livy/utils/LivySparkUtilsSuite.scala ---------------------------------------------------------------------- diff --git a/server/src/test/scala/com/cloudera/livy/utils/LivySparkUtilsSuite.scala b/server/src/test/scala/com/cloudera/livy/utils/LivySparkUtilsSuite.scala deleted file mode 100644 index b01194a..0000000 --- a/server/src/test/scala/com/cloudera/livy/utils/LivySparkUtilsSuite.scala +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.utils - -import org.scalatest.FunSuite -import org.scalatest.Matchers - -import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf} -import com.cloudera.livy.LivyConf._ -import com.cloudera.livy.server.LivyServer - -class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSuite { - - import LivySparkUtils._ - - private val livyConf = new LivyConf() - private val livyConf210 = new LivyConf() - livyConf210.set(LIVY_SPARK_SCALA_VERSION, "2.10.6") - - private val livyConf211 = new LivyConf() - livyConf211.set(LIVY_SPARK_SCALA_VERSION, "2.11.1") - - test("check for SPARK_HOME") { - testSparkHome(livyConf) - } - - test("check spark-submit version") { - testSparkSubmit(livyConf) - } - - test("should support Spark 1.6") { - testSparkVersion("1.6.0") - testSparkVersion("1.6.1") - testSparkVersion("1.6.1-SNAPSHOT") - testSparkVersion("1.6.2") - testSparkVersion("1.6") - testSparkVersion("1.6.3.2.5.0-12") - } - - test("should support Spark 2.0.x") { - testSparkVersion("2.0.0") - testSparkVersion("2.0.1") - testSparkVersion("2.0.2") - testSparkVersion("2.0.3-SNAPSHOT") - testSparkVersion("2.0.0.2.5.1.0-56") // LIVY-229 - testSparkVersion("2.0") - testSparkVersion("2.1.0") - testSparkVersion("2.1.1") - } - - test("should not support Spark older than 1.6") { - intercept[IllegalArgumentException] { testSparkVersion("1.4.0") } - intercept[IllegalArgumentException] { testSparkVersion("1.5.0") } - intercept[IllegalArgumentException] { testSparkVersion("1.5.1") } - intercept[IllegalArgumentException] { testSparkVersion("1.5.2") } - intercept[IllegalArgumentException] { testSparkVersion("1.5.0-cdh5.6.1") } - } - - test("should fail on bad version") { - intercept[IllegalArgumentException] { testSparkVersion("not a version") } - } - - test("should error out if recovery is turned on but master isn't yarn") { - val livyConf = new LivyConf() - livyConf.set(LivyConf.LIVY_SPARK_MASTER, "local") - livyConf.set(LivyConf.RECOVERY_MODE, "recovery") - val s = new LivyServer() - intercept[IllegalArgumentException] { s.testRecovery(livyConf) } - } - - test("formatScalaVersion() should format Scala version") { - formatScalaVersion("2.10.8") shouldBe "2.10" - formatScalaVersion("2.11.4") shouldBe "2.11" - formatScalaVersion("2.10") shouldBe "2.10" - formatScalaVersion("2.10.x.x.x.x") shouldBe "2.10" - - // Throw exception for bad Scala version. - intercept[IllegalArgumentException] { formatScalaVersion("") } - intercept[IllegalArgumentException] { formatScalaVersion("xxx") } - } - - test("defaultSparkScalaVersion() should return default Scala version") { - defaultSparkScalaVersion(formatSparkVersion("1.6.0")) shouldBe "2.10" - defaultSparkScalaVersion(formatSparkVersion("1.6.1")) shouldBe "2.10" - defaultSparkScalaVersion(formatSparkVersion("1.6.2")) shouldBe "2.10" - defaultSparkScalaVersion(formatSparkVersion("2.0.0")) shouldBe "2.11" - defaultSparkScalaVersion(formatSparkVersion("2.0.1")) shouldBe "2.11" - - // Throw exception for unsupported Spark version. - intercept[IllegalArgumentException] { defaultSparkScalaVersion(formatSparkVersion("1.5.0")) } - } - - test("sparkScalaVersion() should use spark-submit detected Scala version.") { - sparkScalaVersion(formatSparkVersion("2.0.1"), Some("2.10"), livyConf) shouldBe "2.10" - sparkScalaVersion(formatSparkVersion("1.6.0"), Some("2.11"), livyConf) shouldBe "2.11" - } - - test("sparkScalaVersion() should throw if configured and detected Scala version mismatch.") { - intercept[IllegalArgumentException] { - sparkScalaVersion(formatSparkVersion("2.0.1"), Some("2.11"), livyConf210) - } - intercept[IllegalArgumentException] { - sparkScalaVersion(formatSparkVersion("1.6.1"), Some("2.10"), livyConf211) - } - } - - test("sparkScalaVersion() should use configured Scala version if spark-submit doesn't tell.") { - sparkScalaVersion(formatSparkVersion("1.6.0"), None, livyConf210) shouldBe "2.10" - sparkScalaVersion(formatSparkVersion("1.6.2"), None, livyConf210) shouldBe "2.10" - sparkScalaVersion(formatSparkVersion("2.0.0"), None, livyConf210) shouldBe "2.10" - sparkScalaVersion(formatSparkVersion("2.0.1"), None, livyConf210) shouldBe "2.10" - sparkScalaVersion(formatSparkVersion("1.6.0"), None, livyConf211) shouldBe "2.11" - sparkScalaVersion(formatSparkVersion("1.6.2"), None, livyConf211) shouldBe "2.11" - sparkScalaVersion(formatSparkVersion("2.0.0"), None, livyConf211) shouldBe "2.11" - sparkScalaVersion(formatSparkVersion("2.0.1"), None, livyConf211) shouldBe "2.11" - } - - test("sparkScalaVersion() should use default Spark Scala version.") { - sparkScalaVersion(formatSparkVersion("1.6.0"), None, livyConf) shouldBe "2.10" - sparkScalaVersion(formatSparkVersion("1.6.2"), None, livyConf) shouldBe "2.10" - sparkScalaVersion(formatSparkVersion("2.0.0"), None, livyConf) shouldBe "2.11" - sparkScalaVersion(formatSparkVersion("2.0.1"), None, livyConf) shouldBe "2.11" - sparkScalaVersion(formatSparkVersion("2.1.0"), None, livyConf) shouldBe "2.11" - } -}