http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/main/scala/com/cloudera/livy/scalaapi/LivyScalaClient.scala ---------------------------------------------------------------------- diff --git a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/LivyScalaClient.scala b/scala-api/src/main/scala/com/cloudera/livy/scalaapi/LivyScalaClient.scala deleted file mode 100644 index 3d26c7a..0000000 --- a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/LivyScalaClient.scala +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.scalaapi - -import java.io.File -import java.net.URI -import java.util.concurrent.{Executors, Future => JFuture, ScheduledFuture, ThreadFactory, TimeUnit} - -import scala.concurrent._ -import scala.util.Try - -import com.cloudera.livy._ - -/** - * A client for submitting Spark-based jobs to a Livy backend. - * @constructor Creates a Scala client. - * @param livyJavaClient the Java client of Livy. - */ -class LivyScalaClient(livyJavaClient: LivyClient) { - - private val executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory { - override def newThread(r: Runnable): Thread = { - val thread = new Thread(r, "LivyScalaClient-PollingContainer") - thread.setDaemon(true) - thread - } - }) - - /** - * Submits a job for asynchronous execution. - * - * @param fn The job to be executed. It is a function that takes in a ScalaJobContext and - * returns the result of the execution of the job with that context. - * @return A handle that can be used to monitor the job. - */ - def submit[T](fn: ScalaJobContext => T): ScalaJobHandle[T] = { - val job = new Job[T] { - @throws(classOf[Exception]) - override def call(jobContext: JobContext): T = fn(new ScalaJobContext(jobContext)) - } - new ScalaJobHandle(livyJavaClient.submit(job)) - } - - /** - * Asks the remote context to run a job immediately. - * - * Normally, the remote context will queue jobs and execute them based on how many worker - * threads have been configured. This method will run the submitted job in the same thread - * processing the RPC message, so that queueing does not apply. - * - * It's recommended that this method only be used to run code that finishes quickly. This - * avoids interfering with the normal operation of the context. - * - * @param fn The job to be executed. It is a function that takes in a ScalaJobContext and - * returns the result of the execution of the job with that context. - * @return A handle that can be used to monitor the job. - */ - def run[T](fn: ScalaJobContext => T): Future[T] = { - val job = new Job[T] { - @throws(classOf[Exception]) - override def call(jobContext: JobContext): T = { - val scalaJobContext = new ScalaJobContext(jobContext) - fn(scalaJobContext) - } - } - new PollingContainer(livyJavaClient.run(job)).poll() - } - - /** - * Stops the remote context. - * - * Any pending jobs will be cancelled, and the remote context will be torn down. - * - * @param shutdownContext Whether to shutdown the underlying Spark context. If false, the - * context will keep running and it's still possible to send commands - * to it, if the backend being used supports it. - */ - def stop(shutdownContext: Boolean): Unit = { - executor.shutdown() - livyJavaClient.stop(shutdownContext) - } - - /** - * Upload a jar to be added to the Spark application classpath. - * - * @param jar The local file to be uploaded. - * @return A future that can be used to monitor this operation. - */ - def uploadJar(jar: File): Future[_] = new PollingContainer(livyJavaClient.uploadJar(jar)).poll() - - /** - * Adds a jar file to the running remote context. - * - * Note that the URL should be reachable by the Spark driver process. If running the driver - * in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist - * on that node (and not on the client machine). - * - * If the provided URI has no scheme, it's considered to be relative to the default file system - * configured in the Livy server. - * - * @param uri The location of the jar file. - * @return A future that can be used to monitor the operation. - */ - def addJar(uri: URI): Future[_] = new PollingContainer(livyJavaClient.addJar(uri)).poll() - - /** - * Upload a file to be passed to the Spark application. - * - * @param file The local file to be uploaded. - * @return A future that can be used to monitor this operation. - */ - def uploadFile(file: File): Future[_] = - new PollingContainer(livyJavaClient.uploadFile(file)).poll() - - /** - * Adds a file to the running remote context. - * - * Note that the URL should be reachable by the Spark driver process. If running the driver - * in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist - * on that node (and not on the client machine). - * - * If the provided URI has no scheme, it's considered to be relative to the default file system - * configured in the Livy server. - * - * @param uri The location of the file. - * @return A future that can be used to monitor the operation. - */ - def addFile(uri: URI): Future[_] = new PollingContainer(livyJavaClient.addFile(uri)).poll() - - private class PollingContainer[T] private[livy] (jFuture: JFuture[T]) extends Runnable { - - private val initialDelay = 1 - private val longDelay = 1 - private var scheduledFuture: ScheduledFuture[_] = _ - private val promise = Promise[T] - - def poll(): Future[T] = { - scheduledFuture = - executor.scheduleWithFixedDelay(this, initialDelay, longDelay, TimeUnit.SECONDS) - promise.future - } - - override def run(): Unit = { - if (jFuture.isDone) { - promise.complete(Try(getJavaFutureResult(jFuture))) - scheduledFuture.cancel(false) - } - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobContext.scala ---------------------------------------------------------------------- diff --git a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobContext.scala b/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobContext.scala deleted file mode 100644 index 57a3ab4..0000000 --- a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobContext.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.cloudera.livy.scalaapi - -import java.io.File - -import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.hive.HiveContext -import org.apache.spark.streaming.StreamingContext - -import com.cloudera.livy.JobContext - -/** - * Holds runtime information about the job execution context. - * - * @constructor Creates a ScalaJobContext. - * @param context the Java JobContext of Livy. - */ -class ScalaJobContext private[livy] (context: JobContext) { - - /** The shared SparkContext instance. */ - def sc: SparkContext = context.sc().sc - - /** The shared SQLContext instance. */ - def sqlctx: SQLContext = context.sqlctx() - - /** The shared HiveContext instance. */ - def hivectx: HiveContext = context.hivectx() - - /** Returns the StreamingContext which has already been created. */ - def streamingctx: StreamingContext = context.streamingctx().ssc - - def sparkSession[E]: E = context.sparkSession() - - /** - * Creates the SparkStreaming context. - * - * @param batchDuration Time interval at which streaming data will be divided into batches, - * in milliseconds. - */ - def createStreamingContext(batchDuration: Long): Unit = - context.createStreamingContext(batchDuration) - - /** Stops the SparkStreaming context. */ - def stopStreamingContext(): Unit = context.stopStreamingCtx() - - /** - * Returns a local tmp dir specific to the context. - */ - def localTmpDir: File = context.getLocalTmpDir - -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobHandle.scala ---------------------------------------------------------------------- diff --git a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobHandle.scala b/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobHandle.scala deleted file mode 100644 index 7bd2edc..0000000 --- a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobHandle.scala +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.cloudera.livy.scalaapi - -import scala.concurrent.{CanAwait, ExecutionContext, Future, TimeoutException} -import scala.concurrent.duration.Duration -import scala.util.Try - -import com.cloudera.livy.JobHandle -import com.cloudera.livy.JobHandle.{Listener, State} - -/** - * A handle to a submitted job. Allows for monitoring and controlling of the running remote job. - * - * @constructor Creates a ScalaJobHandle. - * @param jobHandle the Java JobHandle of Livy. - * - * @define multipleCallbacks - * Multiple callbacks may be registered; there is no guarantee that they will be - * executed in a particular order. - * - * @define nonDeterministic - * Note: using this method yields nondeterministic dataflow programs. - * - * @define callbackInContext - * The provided callback always runs in the provided implicit - *` ExecutionContext`, though there is no guarantee that the - * `execute()` method on the `ExecutionContext` will be called once - * per callback or that `execute()` will be called in the current - * thread. That is, the implementation may run multiple callbacks - * in a batch within a single `execute()` and it may run - * `execute()` either immediately or asynchronously. - */ -class ScalaJobHandle[T] private[livy] (jobHandle: JobHandle[T]) extends Future[T] { - - /** - * Return the current state of the job. - */ - def state: State = jobHandle.getState() - - /** - * When the job is completed, either through an exception, or a value, - * apply the provided function. - * - * If the job has already been completed, - * this will either be applied immediately or be scheduled asynchronously. - * - * $multipleCallbacks - * $callbackInContext - */ - override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit = { - jobHandle.addListener(new AbstractScalaJobHandleListener[T] { - override def onJobSucceeded(job: JobHandle[T], result: T): Unit = { - val onJobSucceededTask = new Runnable { - override def run(): Unit = func(Try(result)) - } - executor.execute(onJobSucceededTask) - } - - override def onJobFailed(job: JobHandle[T], cause: Throwable): Unit = { - val onJobFailedTask = new Runnable { - override def run(): Unit = func(Try(getJavaFutureResult(job))) - } - executor.execute(onJobFailedTask) - } - }) - } - - /** - * When this job is queued, apply the provided function. - * - * $multipleCallbacks - * $callbackInContext - */ - def onJobQueued[U](func: => Unit)(implicit executor: ExecutionContext): Unit = { - jobHandle.addListener(new AbstractScalaJobHandleListener[T] { - override def onJobQueued(job: JobHandle[T]): Unit = { - val onJobQueuedTask = new Runnable { - override def run(): Unit = func - } - executor.execute(onJobQueuedTask) - } - }) - } - - /** - * When this job has started, apply the provided function. - * - * $multipleCallbacks - * $callbackInContext - */ - def onJobStarted[U](func: => Unit)(implicit executor: ExecutionContext): Unit = { - jobHandle.addListener(new AbstractScalaJobHandleListener[T] { - override def onJobStarted(job: JobHandle[T]): Unit = { - val onJobStartedTask = new Runnable { - override def run(): Unit = func - } - executor.execute(onJobStartedTask) - } - }) - } - - /** - * When this job is cancelled, apply the provided function. - * - * $multipleCallbacks - * $callbackInContext - */ - def onJobCancelled[U](func: Boolean => Unit)(implicit executor: ExecutionContext): Unit = { - jobHandle.addListener(new AbstractScalaJobHandleListener[T] { - override def onJobCancelled(job: JobHandle[T]): Unit = { - val onJobCancelledTask = new Runnable { - override def run(): Unit = func(job.cancel(false)) - } - executor.execute(onJobCancelledTask) - } - }) - } - - /** - * Returns whether the job has already been completed with - * a value or an exception. - * - * $nonDeterministic - * - * @return `true` if the job is already completed, `false` otherwise. - */ - override def isCompleted: Boolean = jobHandle.isDone - - /** - * The result value of the job. - * - * If the job is not completed the returned value will be `None`. - * If the job is completed the value will be `Some(Success(t))`. - * if it contains a valid result, or `Some(Failure(error))` if it contains - * an exception. - */ - override def value: Option[Try[T]] = { - if (isCompleted) { - Some(Try(getJavaFutureResult(jobHandle))) - } else { - None - } - } - - /** - * Supports Scala's Await.result(atmost) which awaits the completion of the job and returns the - * result (of type `T`). - * - * @param atMost - * maximum wait time, which may be negative (no waiting is done), - * [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for unbounded waiting, - * or a finite positive duration. - * @return the result value if job is completed within the specific maximum wait time. - * @throws Exception the underlying exception on the execution of the job. - */ - @throws(classOf[Exception]) - override def result(atMost: Duration)(implicit permit: CanAwait): T = - getJavaFutureResult(jobHandle, atMost) - - /** - * Supports Scala's Await.ready(atmost) which awaits the completion of the job. - * - * @param atMost - * maximum wait time, which may be negative (no waiting is done), - * [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for unbounded waiting, - * or a finite positive duration. - * @return ScalaJobHandle - * @throws InterruptedException if the current thread is interrupted while waiting. - * @throws TimeoutException if after waiting for the specified time the job - * is still not ready. - */ - @throws(classOf[InterruptedException]) - @throws(classOf[TimeoutException]) - override def ready(atMost: Duration)(implicit permit: CanAwait): ScalaJobHandle.this.type = { - getJavaFutureResult(jobHandle, atMost) - this - } -} - -private abstract class AbstractScalaJobHandleListener[T] extends Listener[T] { - override def onJobQueued(job: JobHandle[T]): Unit = {} - - override def onJobCancelled(job: JobHandle[T]): Unit = {} - - override def onJobSucceeded(job: JobHandle[T], result: T): Unit = {} - - override def onJobStarted(job: JobHandle[T]): Unit = {} - - override def onJobFailed(job: JobHandle[T], cause: Throwable): Unit = {} -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/main/scala/com/cloudera/livy/scalaapi/package.scala ---------------------------------------------------------------------- diff --git a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/package.scala b/scala-api/src/main/scala/com/cloudera/livy/scalaapi/package.scala deleted file mode 100644 index 71c8a7c..0000000 --- a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/package.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy - -import java.util.concurrent.{ExecutionException, Future => JFuture, TimeUnit} - -import scala.concurrent.duration.Duration - -package object scalaapi { - - /** - * A Scala Client for Livy which is a wrapper over the Java client. - * @constructor Creates a Scala client. - * @param livyJavaClient the Java client of Livy. - * {{{ - * import com.cloudera.livy._ - * import com.cloudera.livy.scalaapi._ - * val url = "http://example.com" - * val livyJavaClient = new LivyClientBuilder(false).setURI(new URI(url))).build() - * val livyScalaClient = livyJavaClient.asScalaClient - * }}} - */ - implicit class ScalaWrapper(livyJavaClient: LivyClient) { - def asScalaClient: LivyScalaClient = new LivyScalaClient(livyJavaClient) - } - - private[livy] def getJavaFutureResult[T](jFuture: JFuture[T], - atMost: Duration = Duration.Undefined): T = { - try { - if (!atMost.isFinite()) jFuture.get else jFuture.get(atMost.toMillis, TimeUnit.MILLISECONDS) - } catch { - case executionException: ExecutionException => throw executionException.getCause - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/main/scala/org/apache/livy/scalaapi/LivyScalaClient.scala ---------------------------------------------------------------------- diff --git a/scala-api/src/main/scala/org/apache/livy/scalaapi/LivyScalaClient.scala b/scala-api/src/main/scala/org/apache/livy/scalaapi/LivyScalaClient.scala new file mode 100644 index 0000000..ca7199a --- /dev/null +++ b/scala-api/src/main/scala/org/apache/livy/scalaapi/LivyScalaClient.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.scalaapi + +import java.io.File +import java.net.URI +import java.util.concurrent.{Executors, Future => JFuture, ScheduledFuture, ThreadFactory, TimeUnit} + +import scala.concurrent._ +import scala.util.Try + +import org.apache.livy._ + +/** + * A client for submitting Spark-based jobs to a Livy backend. + * @constructor Creates a Scala client. + * @param livyJavaClient the Java client of Livy. + */ +class LivyScalaClient(livyJavaClient: LivyClient) { + + private val executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory { + override def newThread(r: Runnable): Thread = { + val thread = new Thread(r, "LivyScalaClient-PollingContainer") + thread.setDaemon(true) + thread + } + }) + + /** + * Submits a job for asynchronous execution. + * + * @param fn The job to be executed. It is a function that takes in a ScalaJobContext and + * returns the result of the execution of the job with that context. + * @return A handle that can be used to monitor the job. + */ + def submit[T](fn: ScalaJobContext => T): ScalaJobHandle[T] = { + val job = new Job[T] { + @throws(classOf[Exception]) + override def call(jobContext: JobContext): T = fn(new ScalaJobContext(jobContext)) + } + new ScalaJobHandle(livyJavaClient.submit(job)) + } + + /** + * Asks the remote context to run a job immediately. + * + * Normally, the remote context will queue jobs and execute them based on how many worker + * threads have been configured. This method will run the submitted job in the same thread + * processing the RPC message, so that queueing does not apply. + * + * It's recommended that this method only be used to run code that finishes quickly. This + * avoids interfering with the normal operation of the context. + * + * @param fn The job to be executed. It is a function that takes in a ScalaJobContext and + * returns the result of the execution of the job with that context. + * @return A handle that can be used to monitor the job. + */ + def run[T](fn: ScalaJobContext => T): Future[T] = { + val job = new Job[T] { + @throws(classOf[Exception]) + override def call(jobContext: JobContext): T = { + val scalaJobContext = new ScalaJobContext(jobContext) + fn(scalaJobContext) + } + } + new PollingContainer(livyJavaClient.run(job)).poll() + } + + /** + * Stops the remote context. + * + * Any pending jobs will be cancelled, and the remote context will be torn down. + * + * @param shutdownContext Whether to shutdown the underlying Spark context. If false, the + * context will keep running and it's still possible to send commands + * to it, if the backend being used supports it. + */ + def stop(shutdownContext: Boolean): Unit = { + executor.shutdown() + livyJavaClient.stop(shutdownContext) + } + + /** + * Upload a jar to be added to the Spark application classpath. + * + * @param jar The local file to be uploaded. + * @return A future that can be used to monitor this operation. + */ + def uploadJar(jar: File): Future[_] = new PollingContainer(livyJavaClient.uploadJar(jar)).poll() + + /** + * Adds a jar file to the running remote context. + * + * Note that the URL should be reachable by the Spark driver process. If running the driver + * in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist + * on that node (and not on the client machine). + * + * If the provided URI has no scheme, it's considered to be relative to the default file system + * configured in the Livy server. + * + * @param uri The location of the jar file. + * @return A future that can be used to monitor the operation. + */ + def addJar(uri: URI): Future[_] = new PollingContainer(livyJavaClient.addJar(uri)).poll() + + /** + * Upload a file to be passed to the Spark application. + * + * @param file The local file to be uploaded. + * @return A future that can be used to monitor this operation. + */ + def uploadFile(file: File): Future[_] = + new PollingContainer(livyJavaClient.uploadFile(file)).poll() + + /** + * Adds a file to the running remote context. + * + * Note that the URL should be reachable by the Spark driver process. If running the driver + * in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist + * on that node (and not on the client machine). + * + * If the provided URI has no scheme, it's considered to be relative to the default file system + * configured in the Livy server. + * + * @param uri The location of the file. + * @return A future that can be used to monitor the operation. + */ + def addFile(uri: URI): Future[_] = new PollingContainer(livyJavaClient.addFile(uri)).poll() + + private class PollingContainer[T] private[livy] (jFuture: JFuture[T]) extends Runnable { + + private val initialDelay = 1 + private val longDelay = 1 + private var scheduledFuture: ScheduledFuture[_] = _ + private val promise = Promise[T] + + def poll(): Future[T] = { + scheduledFuture = + executor.scheduleWithFixedDelay(this, initialDelay, longDelay, TimeUnit.SECONDS) + promise.future + } + + override def run(): Unit = { + if (jFuture.isDone) { + promise.complete(Try(getJavaFutureResult(jFuture))) + scheduledFuture.cancel(false) + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobContext.scala ---------------------------------------------------------------------- diff --git a/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobContext.scala b/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobContext.scala new file mode 100644 index 0000000..ee082ec --- /dev/null +++ b/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobContext.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.scalaapi + +import java.io.File + +import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.streaming.StreamingContext + +import org.apache.livy.JobContext + +/** + * Holds runtime information about the job execution context. + * + * @constructor Creates a ScalaJobContext. + * @param context the Java JobContext of Livy. + */ +class ScalaJobContext private[livy] (context: JobContext) { + + /** The shared SparkContext instance. */ + def sc: SparkContext = context.sc().sc + + /** The shared SQLContext instance. */ + def sqlctx: SQLContext = context.sqlctx() + + /** The shared HiveContext instance. */ + def hivectx: HiveContext = context.hivectx() + + /** Returns the StreamingContext which has already been created. */ + def streamingctx: StreamingContext = context.streamingctx().ssc + + def sparkSession[E]: E = context.sparkSession() + + /** + * Creates the SparkStreaming context. + * + * @param batchDuration Time interval at which streaming data will be divided into batches, + * in milliseconds. + */ + def createStreamingContext(batchDuration: Long): Unit = + context.createStreamingContext(batchDuration) + + /** Stops the SparkStreaming context. */ + def stopStreamingContext(): Unit = context.stopStreamingCtx() + + /** + * Returns a local tmp dir specific to the context. + */ + def localTmpDir: File = context.getLocalTmpDir + +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobHandle.scala ---------------------------------------------------------------------- diff --git a/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobHandle.scala b/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobHandle.scala new file mode 100644 index 0000000..d1cf29d --- /dev/null +++ b/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobHandle.scala @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.scalaapi + +import scala.concurrent.{CanAwait, ExecutionContext, Future, TimeoutException} +import scala.concurrent.duration.Duration +import scala.util.Try + +import org.apache.livy.JobHandle +import org.apache.livy.JobHandle.{Listener, State} + +/** + * A handle to a submitted job. Allows for monitoring and controlling of the running remote job. + * + * @constructor Creates a ScalaJobHandle. + * @param jobHandle the Java JobHandle of Livy. + * + * @define multipleCallbacks + * Multiple callbacks may be registered; there is no guarantee that they will be + * executed in a particular order. + * + * @define nonDeterministic + * Note: using this method yields nondeterministic dataflow programs. + * + * @define callbackInContext + * The provided callback always runs in the provided implicit + *` ExecutionContext`, though there is no guarantee that the + * `execute()` method on the `ExecutionContext` will be called once + * per callback or that `execute()` will be called in the current + * thread. That is, the implementation may run multiple callbacks + * in a batch within a single `execute()` and it may run + * `execute()` either immediately or asynchronously. + */ +class ScalaJobHandle[T] private[livy] (jobHandle: JobHandle[T]) extends Future[T] { + + /** + * Return the current state of the job. + */ + def state: State = jobHandle.getState() + + /** + * When the job is completed, either through an exception, or a value, + * apply the provided function. + * + * If the job has already been completed, + * this will either be applied immediately or be scheduled asynchronously. + * + * $multipleCallbacks + * $callbackInContext + */ + override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit = { + jobHandle.addListener(new AbstractScalaJobHandleListener[T] { + override def onJobSucceeded(job: JobHandle[T], result: T): Unit = { + val onJobSucceededTask = new Runnable { + override def run(): Unit = func(Try(result)) + } + executor.execute(onJobSucceededTask) + } + + override def onJobFailed(job: JobHandle[T], cause: Throwable): Unit = { + val onJobFailedTask = new Runnable { + override def run(): Unit = func(Try(getJavaFutureResult(job))) + } + executor.execute(onJobFailedTask) + } + }) + } + + /** + * When this job is queued, apply the provided function. + * + * $multipleCallbacks + * $callbackInContext + */ + def onJobQueued[U](func: => Unit)(implicit executor: ExecutionContext): Unit = { + jobHandle.addListener(new AbstractScalaJobHandleListener[T] { + override def onJobQueued(job: JobHandle[T]): Unit = { + val onJobQueuedTask = new Runnable { + override def run(): Unit = func + } + executor.execute(onJobQueuedTask) + } + }) + } + + /** + * When this job has started, apply the provided function. + * + * $multipleCallbacks + * $callbackInContext + */ + def onJobStarted[U](func: => Unit)(implicit executor: ExecutionContext): Unit = { + jobHandle.addListener(new AbstractScalaJobHandleListener[T] { + override def onJobStarted(job: JobHandle[T]): Unit = { + val onJobStartedTask = new Runnable { + override def run(): Unit = func + } + executor.execute(onJobStartedTask) + } + }) + } + + /** + * When this job is cancelled, apply the provided function. + * + * $multipleCallbacks + * $callbackInContext + */ + def onJobCancelled[U](func: Boolean => Unit)(implicit executor: ExecutionContext): Unit = { + jobHandle.addListener(new AbstractScalaJobHandleListener[T] { + override def onJobCancelled(job: JobHandle[T]): Unit = { + val onJobCancelledTask = new Runnable { + override def run(): Unit = func(job.cancel(false)) + } + executor.execute(onJobCancelledTask) + } + }) + } + + /** + * Returns whether the job has already been completed with + * a value or an exception. + * + * $nonDeterministic + * + * @return `true` if the job is already completed, `false` otherwise. + */ + override def isCompleted: Boolean = jobHandle.isDone + + /** + * The result value of the job. + * + * If the job is not completed the returned value will be `None`. + * If the job is completed the value will be `Some(Success(t))`. + * if it contains a valid result, or `Some(Failure(error))` if it contains + * an exception. + */ + override def value: Option[Try[T]] = { + if (isCompleted) { + Some(Try(getJavaFutureResult(jobHandle))) + } else { + None + } + } + + /** + * Supports Scala's Await.result(atmost) which awaits the completion of the job and returns the + * result (of type `T`). + * + * @param atMost + * maximum wait time, which may be negative (no waiting is done), + * [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for unbounded waiting, + * or a finite positive duration. + * @return the result value if job is completed within the specific maximum wait time. + * @throws Exception the underlying exception on the execution of the job. + */ + @throws(classOf[Exception]) + override def result(atMost: Duration)(implicit permit: CanAwait): T = + getJavaFutureResult(jobHandle, atMost) + + /** + * Supports Scala's Await.ready(atmost) which awaits the completion of the job. + * + * @param atMost + * maximum wait time, which may be negative (no waiting is done), + * [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for unbounded waiting, + * or a finite positive duration. + * @return ScalaJobHandle + * @throws InterruptedException if the current thread is interrupted while waiting. + * @throws TimeoutException if after waiting for the specified time the job + * is still not ready. + */ + @throws(classOf[InterruptedException]) + @throws(classOf[TimeoutException]) + override def ready(atMost: Duration)(implicit permit: CanAwait): ScalaJobHandle.this.type = { + getJavaFutureResult(jobHandle, atMost) + this + } +} + +private abstract class AbstractScalaJobHandleListener[T] extends Listener[T] { + override def onJobQueued(job: JobHandle[T]): Unit = {} + + override def onJobCancelled(job: JobHandle[T]): Unit = {} + + override def onJobSucceeded(job: JobHandle[T], result: T): Unit = {} + + override def onJobStarted(job: JobHandle[T]): Unit = {} + + override def onJobFailed(job: JobHandle[T], cause: Throwable): Unit = {} +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/main/scala/org/apache/livy/scalaapi/package.scala ---------------------------------------------------------------------- diff --git a/scala-api/src/main/scala/org/apache/livy/scalaapi/package.scala b/scala-api/src/main/scala/org/apache/livy/scalaapi/package.scala new file mode 100644 index 0000000..6e53a37 --- /dev/null +++ b/scala-api/src/main/scala/org/apache/livy/scalaapi/package.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy + +import java.util.concurrent.{ExecutionException, Future => JFuture, TimeUnit} + +import scala.concurrent.duration.Duration + +package object scalaapi { + + /** + * A Scala Client for Livy which is a wrapper over the Java client. + * @constructor Creates a Scala client. + * @param livyJavaClient the Java client of Livy. + * {{{ + * import com.cloudera.livy._ + * import com.cloudera.livy.scalaapi._ + * val url = "http://example.com" + * val livyJavaClient = new LivyClientBuilder(false).setURI(new URI(url))).build() + * val livyScalaClient = livyJavaClient.asScalaClient + * }}} + */ + implicit class ScalaWrapper(livyJavaClient: LivyClient) { + def asScalaClient: LivyScalaClient = new LivyScalaClient(livyJavaClient) + } + + private[livy] def getJavaFutureResult[T](jFuture: JFuture[T], + atMost: Duration = Duration.Undefined): T = { + try { + if (!atMost.isFinite()) jFuture.get else jFuture.get(atMost.toMillis, TimeUnit.MILLISECONDS) + } catch { + case executionException: ExecutionException => throw executionException.getCause + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaClientTest.scala ---------------------------------------------------------------------- diff --git a/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaClientTest.scala b/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaClientTest.scala deleted file mode 100644 index 1bf879c..0000000 --- a/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaClientTest.scala +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.cloudera.livy.scalaapi - -import java.io._ -import java.net.URI -import java.nio.charset.StandardCharsets._ -import java.util._ -import java.util.concurrent.CountDownLatch -import java.util.jar.JarOutputStream -import java.util.zip.ZipEntry - -import scala.concurrent.Await -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration._ -import scala.language.postfixOps -import scala.util.{Failure, Success} - -import org.apache.spark.SparkFiles -import org.apache.spark.launcher.SparkLauncher -import org.scalatest.{BeforeAndAfter, FunSuite} -import org.scalatest.concurrent.ScalaFutures - -import com.cloudera.livy.LivyBaseUnitTestSuite -import com.cloudera.livy.rsc.RSCConf.Entry._ - -class ScalaClientTest extends FunSuite - with ScalaFutures - with BeforeAndAfter - with LivyBaseUnitTestSuite { - - import com.cloudera.livy._ - - private var client: LivyScalaClient = _ - - after { - if (client != null) { - client.stop(true) - client = null - } - } - - test("test Job Submission") { - configureClient(true) - val jobHandle = client.submit(ScalaClientTestUtils.helloJob) - ScalaClientTestUtils.assertTestPassed(jobHandle, "hello") - } - - test("test Simple Spark Job") { - configureClient(true) - val sFuture = client.submit(ScalaClientTestUtils.simpleSparkJob) - ScalaClientTestUtils.assertTestPassed(sFuture, 5) - } - - test("test Job Failure") { - configureClient(true) - val sFuture = client.submit(ScalaClientTestUtils.throwExceptionJob) - val lock = new CountDownLatch(1) - var testFailure : Option[String] = None - sFuture onComplete { - case Success(t) => { - testFailure = Some("Test should have thrown CustomFailureException") - lock.countDown() - } - case Failure(e) => { - if (!e.getMessage.contains("CustomTestFailureException")) { - testFailure = Some("Test did not throw expected exception - CustomFailureException") - } - lock.countDown() - } - } - ScalaClientTestUtils.assertAwait(lock) - testFailure.foreach(fail(_)) - } - - test("test Sync Rpc") { - configureClient(true) - val future = client.run(ScalaClientTestUtils.helloJob) - ScalaClientTestUtils.assertTestPassed(future, "hello") - } - - test("test Remote client") { - configureClient(false) - val sFuture = client.submit(ScalaClientTestUtils.simpleSparkJob) - ScalaClientTestUtils.assertTestPassed(sFuture, 5) - } - - test("test add file") { - configureClient(true) - val file = File.createTempFile("test", ".file") - val fileStream = new FileOutputStream(file) - fileStream.write("test file".getBytes("UTF-8")) - fileStream.close - val addFileFuture = client.addFile(new URI("file:" + file.getAbsolutePath())) - Await.ready(addFileFuture, ScalaClientTestUtils.Timeout second) - val sFuture = client.submit { context => - ScalaClientTest.fileOperation(false, file.getName, context) - } - ScalaClientTestUtils.assertTestPassed(sFuture, "test file") - } - - test("test add jar") { - configureClient(true) - val jar = File.createTempFile("test", ".resource") - val jarFile = new JarOutputStream(new FileOutputStream(jar)) - jarFile.putNextEntry(new ZipEntry("test.resource")) - jarFile.write("test resource".getBytes("UTF-8")) - jarFile.closeEntry() - jarFile.close() - val addJarFuture = client.addJar(new URI("file:" + jar.getAbsolutePath())) - Await.ready(addJarFuture, ScalaClientTestUtils.Timeout second) - val sFuture = client.submit { context => - ScalaClientTest.fileOperation(true, "test.resource", context) - } - ScalaClientTestUtils.assertTestPassed(sFuture, "test resource") - } - - test("Successive onComplete callbacks") { - var testFailure: Option[String] = None - configureClient(true) - val future = client.run(ScalaClientTestUtils.helloJob) - val lock = new CountDownLatch(3) - for (i <- 0 to 2) { - future onComplete { - case Success(t) => { - if (!t.equals("hello")) testFailure = Some("Expected message not returned") - lock.countDown() - } - case Failure(e) => { - testFailure = Some("onComplete should not have triggered Failure callback") - lock.countDown() - } - } - } - ScalaClientTestUtils.assertAwait(lock) - testFailure.foreach(fail(_)) - } - - private def configureClient(local: Boolean) = { - val conf = ScalaClientTest.createConf(local) - val javaClient = new LivyClientBuilder(false).setURI(new URI("rsc:/")).setAll(conf).build() - client = javaClient.asScalaClient - pingJob() - } - - private def pingJob() = { - val future = client.submit { context => - null - } - ScalaClientTestUtils.assertTestPassed(future, null) - } -} - -class CustomTestFailureException extends RuntimeException {} - -object ScalaClientTest { - - def createConf(local: Boolean): Properties = { - val conf = new Properties - if (local) { - conf.put(CLIENT_IN_PROCESS.key, "true") - conf.put(SparkLauncher.SPARK_MASTER, "local") - conf.put("spark.app.name", "SparkClientSuite Local App") - } else { - val classpath: String = System.getProperty("java.class.path") - conf.put("spark.app.name", "SparkClientSuite Remote App") - conf.put(SparkLauncher.DRIVER_MEMORY, "512m") - conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, classpath) - conf.put(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH, classpath) - } - conf.put(LIVY_JARS.key, "") - conf - } - - def fileOperation(isResource: Boolean, fileName: String, context: ScalaJobContext): String = { - val arr = Seq(1) - val rdd = context.sc.parallelize(arr).map { value => - var inputStream: InputStream = null - if (isResource) { - val ccl = Thread.currentThread.getContextClassLoader - inputStream = ccl.getResourceAsStream(fileName) - } else { - inputStream = new FileInputStream(SparkFiles.get(fileName)) - } - try { - val out = new ByteArrayOutputStream() - val buffer = new Array[Byte](1024) - var read = inputStream.read(buffer) - while (read >= 0) { - out.write(buffer, 0, read) - read = inputStream.read(buffer) - } - val bytes = out.toByteArray - new String(bytes, 0, bytes.length, UTF_8) - } finally { - inputStream.close() - } - } - rdd.collect().head - } -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaClientTestUtils.scala ---------------------------------------------------------------------- diff --git a/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaClientTestUtils.scala b/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaClientTestUtils.scala deleted file mode 100644 index 8727db5..0000000 --- a/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaClientTestUtils.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.cloudera.livy.scalaapi - -import java.util.Random -import java.util.concurrent.{CountDownLatch, TimeUnit} - -import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{Await, Future} -import scala.concurrent.duration._ - -import org.scalatest.FunSuite - -import com.cloudera.livy.LivyBaseUnitTestSuite - -object ScalaClientTestUtils extends FunSuite with LivyBaseUnitTestSuite { - - val Timeout = 40 - - def helloJob(context: ScalaJobContext): String = "hello" - - def throwExceptionJob(context: ScalaJobContext): Unit = throw new CustomTestFailureException - - def simpleSparkJob(context: ScalaJobContext): Long = { - val r = new Random - val count = 5 - val partitions = Math.min(r.nextInt(10) + 1, count) - val buffer = new ArrayBuffer[Int]() - for (a <- 1 to count) { - buffer += r.nextInt() - } - context.sc.parallelize(buffer, partitions).count() - } - - def assertAwait(lock: CountDownLatch): Unit = { - assert(lock.await(Timeout, TimeUnit.SECONDS) == true) - } - - def assertTestPassed[T](future: Future[T], expectedValue: T): Unit = { - val result = Await.result(future, Timeout second) - assert(result === expectedValue) - } -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaJobHandleTest.scala ---------------------------------------------------------------------- diff --git a/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaJobHandleTest.scala b/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaJobHandleTest.scala deleted file mode 100644 index 7dd9444..0000000 --- a/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaJobHandleTest.scala +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.cloudera.livy.scalaapi - -import java.util.concurrent.{CountDownLatch, TimeUnit} - -import scala.concurrent.Await -import scala.concurrent.duration._ -import scala.concurrent.ExecutionContext.Implicits.global -import scala.language.postfixOps -import scala.util.{Failure, Success} - -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalatest.{BeforeAndAfter, FunSuite} -import org.scalatest.concurrent.ScalaFutures - -import com.cloudera.livy.{JobHandle, LivyBaseUnitTestSuite} -import com.cloudera.livy.JobHandle.{Listener, State} - -class ScalaJobHandleTest extends FunSuite - with ScalaFutures - with BeforeAndAfter - with LivyBaseUnitTestSuite { - - private var mockJobHandle: JobHandle[String] = null - private var scalaJobHandle: ScalaJobHandle[String] = null - private val timeoutInMilliseconds = 5000 - private var listener: JobHandle.Listener[String] = null - - before { - listener = mock(classOf[JobHandle.Listener[String]]) - mockJobHandle = mock(classOf[JobHandle[String]]) - scalaJobHandle = new ScalaJobHandle(mockJobHandle) - } - - test("get result when job is already complete") { - when(mockJobHandle.get(timeoutInMilliseconds, TimeUnit.MILLISECONDS)).thenReturn("hello") - val result = Await.result(scalaJobHandle, 5 seconds) - assert(result == "hello") - verify(mockJobHandle, times(1)).get(timeoutInMilliseconds, TimeUnit.MILLISECONDS) - } - - test("ready when the thread waits for the mentioned duration for job to complete") { - when(mockJobHandle.get(timeoutInMilliseconds, TimeUnit.MILLISECONDS)).thenReturn("hello") - val result = Await.ready(scalaJobHandle, 5 seconds) - assert(result == scalaJobHandle) - verify(mockJobHandle, times(1)).get(timeoutInMilliseconds, TimeUnit.MILLISECONDS) - } - - test("ready with Infinite Duration") { - when(mockJobHandle.isDone).thenReturn(true) - when(mockJobHandle.get()).thenReturn("hello") - val result = Await.ready(scalaJobHandle, Duration.Undefined) - assert(result == scalaJobHandle) - verify(mockJobHandle, times(1)).get() - } - - test("verify addListener call of java jobHandle for onComplete") { - doNothing().when(mockJobHandle).addListener(listener) - scalaJobHandle onComplete { - case Success(t) => {} - } - verify(mockJobHandle).addListener(isA(classOf[Listener[String]])) - verify(mockJobHandle, times(1)).addListener(any()) - } - - test("onComplete Success") { - val jobHandleStub = new AbstractJobHandleStub[String] { - override def addListener(l: Listener[String]): Unit = l.onJobSucceeded(this, "hello") - } - val lock = new CountDownLatch(1) - var testFailure: Option[String] = None - val testScalaHandle = new ScalaJobHandle(jobHandleStub) - testScalaHandle onComplete { - case Success(t) => { - if (!t.equals("hello")) { - testFailure = Some("onComplete has not returned the expected message") - } - lock.countDown() - } - case Failure(e) => { - testFailure = Some("onComplete should not have triggered Failure callback") - lock.countDown() - } - } - ScalaClientTestUtils.assertAwait(lock) - testFailure.foreach(fail(_)) - } - - test("onComplete Failure") { - val jobHandleStub = new AbstractJobHandleStub[String] { - override def addListener(l: Listener[String]): Unit = - l.onJobFailed(this, new CustomTestFailureException) - - override def get(): String = throw new CustomTestFailureException() - } - val lock = new CountDownLatch(1) - var testFailure: Option[String] = None - val testScalaHandle = new ScalaJobHandle(jobHandleStub) - testScalaHandle onComplete { - case Success(t) => { - testFailure = Some("Test should have thrown CustomFailureException") - lock.countDown() - } - case Failure(e) => { - if (!e.isInstanceOf[CustomTestFailureException]) { - testFailure = Some("Test did not throw expected exception - CustomFailureException") - } - lock.countDown() - } - } - ScalaClientTestUtils.assertAwait(lock) - testFailure.foreach(fail(_)) - } - - test("onJobCancelled") { - val jobHandleStub = new AbstractJobHandleStub[String] { - override def addListener(l: Listener[String]): Unit = l.onJobCancelled(this) - override def cancel(mayInterruptIfRunning: Boolean): Boolean = true - } - var testFailure: Option[String] = None - val lock = new CountDownLatch(1) - val testScalaHandle = new ScalaJobHandle(jobHandleStub) - testScalaHandle onJobCancelled { - case true => lock.countDown() - case false => { - testFailure = Some("False callback should not have been triggered") - lock.countDown() - } - } - ScalaClientTestUtils.assertAwait(lock) - testFailure.foreach(fail(_)) - } - - test("onJobQueued") { - val jobHandleStub = new AbstractJobHandleStub[String] { - override def addListener(l: Listener[String]): Unit = l.onJobQueued(this) - } - val lock = new CountDownLatch(1) - val testScalaHandle = new ScalaJobHandle(jobHandleStub) - testScalaHandle onJobQueued { - lock.countDown() - } - ScalaClientTestUtils.assertAwait(lock) - } - - test("onJobStarted") { - val jobHandleStub = new AbstractJobHandleStub[String] { - override def addListener(l: Listener[String]): Unit = l.onJobStarted(this) - } - val lock = new CountDownLatch(1) - val testScalaHandle = new ScalaJobHandle(jobHandleStub) - testScalaHandle onJobStarted { - lock.countDown() - } - ScalaClientTestUtils.assertAwait(lock) - } -} - -private abstract class AbstractJobHandleStub[T] private[livy] extends JobHandle[T] { - - override def getState: State = null - - override def addListener(l: Listener[T]): Unit = {} - - override def isCancelled: Boolean = false - - override def get(): T = null.asInstanceOf[T] - - override def get(timeout: Long, unit: TimeUnit): T = null.asInstanceOf[T] - - override def cancel(mayInterruptIfRunning: Boolean): Boolean = false - - override def isDone: Boolean = true -} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTest.scala ---------------------------------------------------------------------- diff --git a/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTest.scala b/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTest.scala new file mode 100644 index 0000000..79760b7 --- /dev/null +++ b/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTest.scala @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.scalaapi + +import java.io._ +import java.net.URI +import java.nio.charset.StandardCharsets._ +import java.util._ +import java.util.concurrent.CountDownLatch +import java.util.jar.JarOutputStream +import java.util.zip.ZipEntry + +import scala.concurrent.Await +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.util.{Failure, Success} + +import org.apache.spark.SparkFiles +import org.apache.spark.launcher.SparkLauncher +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.ScalaFutures + +import org.apache.livy.LivyBaseUnitTestSuite +import org.apache.livy.rsc.RSCConf.Entry._ + +class ScalaClientTest extends FunSuite + with ScalaFutures + with BeforeAndAfter + with LivyBaseUnitTestSuite { + + import org.apache.livy._ + + private var client: LivyScalaClient = _ + + after { + if (client != null) { + client.stop(true) + client = null + } + } + + test("test Job Submission") { + configureClient(true) + val jobHandle = client.submit(ScalaClientTestUtils.helloJob) + ScalaClientTestUtils.assertTestPassed(jobHandle, "hello") + } + + test("test Simple Spark Job") { + configureClient(true) + val sFuture = client.submit(ScalaClientTestUtils.simpleSparkJob) + ScalaClientTestUtils.assertTestPassed(sFuture, 5) + } + + test("test Job Failure") { + configureClient(true) + val sFuture = client.submit(ScalaClientTestUtils.throwExceptionJob) + val lock = new CountDownLatch(1) + var testFailure : Option[String] = None + sFuture onComplete { + case Success(t) => { + testFailure = Some("Test should have thrown CustomFailureException") + lock.countDown() + } + case Failure(e) => { + if (!e.getMessage.contains("CustomTestFailureException")) { + testFailure = Some("Test did not throw expected exception - CustomFailureException") + } + lock.countDown() + } + } + ScalaClientTestUtils.assertAwait(lock) + testFailure.foreach(fail(_)) + } + + test("test Sync Rpc") { + configureClient(true) + val future = client.run(ScalaClientTestUtils.helloJob) + ScalaClientTestUtils.assertTestPassed(future, "hello") + } + + test("test Remote client") { + configureClient(false) + val sFuture = client.submit(ScalaClientTestUtils.simpleSparkJob) + ScalaClientTestUtils.assertTestPassed(sFuture, 5) + } + + test("test add file") { + configureClient(true) + val file = File.createTempFile("test", ".file") + val fileStream = new FileOutputStream(file) + fileStream.write("test file".getBytes("UTF-8")) + fileStream.close + val addFileFuture = client.addFile(new URI("file:" + file.getAbsolutePath())) + Await.ready(addFileFuture, ScalaClientTestUtils.Timeout second) + val sFuture = client.submit { context => + ScalaClientTest.fileOperation(false, file.getName, context) + } + ScalaClientTestUtils.assertTestPassed(sFuture, "test file") + } + + test("test add jar") { + configureClient(true) + val jar = File.createTempFile("test", ".resource") + val jarFile = new JarOutputStream(new FileOutputStream(jar)) + jarFile.putNextEntry(new ZipEntry("test.resource")) + jarFile.write("test resource".getBytes("UTF-8")) + jarFile.closeEntry() + jarFile.close() + val addJarFuture = client.addJar(new URI("file:" + jar.getAbsolutePath())) + Await.ready(addJarFuture, ScalaClientTestUtils.Timeout second) + val sFuture = client.submit { context => + ScalaClientTest.fileOperation(true, "test.resource", context) + } + ScalaClientTestUtils.assertTestPassed(sFuture, "test resource") + } + + test("Successive onComplete callbacks") { + var testFailure: Option[String] = None + configureClient(true) + val future = client.run(ScalaClientTestUtils.helloJob) + val lock = new CountDownLatch(3) + for (i <- 0 to 2) { + future onComplete { + case Success(t) => { + if (!t.equals("hello")) testFailure = Some("Expected message not returned") + lock.countDown() + } + case Failure(e) => { + testFailure = Some("onComplete should not have triggered Failure callback") + lock.countDown() + } + } + } + ScalaClientTestUtils.assertAwait(lock) + testFailure.foreach(fail(_)) + } + + private def configureClient(local: Boolean) = { + val conf = ScalaClientTest.createConf(local) + val javaClient = new LivyClientBuilder(false).setURI(new URI("rsc:/")).setAll(conf).build() + client = javaClient.asScalaClient + pingJob() + } + + private def pingJob() = { + val future = client.submit { context => + null + } + ScalaClientTestUtils.assertTestPassed(future, null) + } +} + +class CustomTestFailureException extends RuntimeException {} + +object ScalaClientTest { + + def createConf(local: Boolean): Properties = { + val conf = new Properties + if (local) { + conf.put(CLIENT_IN_PROCESS.key, "true") + conf.put(SparkLauncher.SPARK_MASTER, "local") + conf.put("spark.app.name", "SparkClientSuite Local App") + } else { + val classpath: String = System.getProperty("java.class.path") + conf.put("spark.app.name", "SparkClientSuite Remote App") + conf.put(SparkLauncher.DRIVER_MEMORY, "512m") + conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, classpath) + conf.put(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH, classpath) + } + conf.put(LIVY_JARS.key, "") + conf + } + + def fileOperation(isResource: Boolean, fileName: String, context: ScalaJobContext): String = { + val arr = Seq(1) + val rdd = context.sc.parallelize(arr).map { value => + var inputStream: InputStream = null + if (isResource) { + val ccl = Thread.currentThread.getContextClassLoader + inputStream = ccl.getResourceAsStream(fileName) + } else { + inputStream = new FileInputStream(SparkFiles.get(fileName)) + } + try { + val out = new ByteArrayOutputStream() + val buffer = new Array[Byte](1024) + var read = inputStream.read(buffer) + while (read >= 0) { + out.write(buffer, 0, read) + read = inputStream.read(buffer) + } + val bytes = out.toByteArray + new String(bytes, 0, bytes.length, UTF_8) + } finally { + inputStream.close() + } + } + rdd.collect().head + } +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTestUtils.scala ---------------------------------------------------------------------- diff --git a/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTestUtils.scala b/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTestUtils.scala new file mode 100644 index 0000000..458ff3b --- /dev/null +++ b/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTestUtils.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.scalaapi + +import java.util.Random +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ + +import org.scalatest.FunSuite + +import org.apache.livy.LivyBaseUnitTestSuite + +object ScalaClientTestUtils extends FunSuite with LivyBaseUnitTestSuite { + + val Timeout = 40 + + def helloJob(context: ScalaJobContext): String = "hello" + + def throwExceptionJob(context: ScalaJobContext): Unit = throw new CustomTestFailureException + + def simpleSparkJob(context: ScalaJobContext): Long = { + val r = new Random + val count = 5 + val partitions = Math.min(r.nextInt(10) + 1, count) + val buffer = new ArrayBuffer[Int]() + for (a <- 1 to count) { + buffer += r.nextInt() + } + context.sc.parallelize(buffer, partitions).count() + } + + def assertAwait(lock: CountDownLatch): Unit = { + assert(lock.await(Timeout, TimeUnit.SECONDS) == true) + } + + def assertTestPassed[T](future: Future[T], expectedValue: T): Unit = { + val result = Await.result(future, Timeout second) + assert(result === expectedValue) + } +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaJobHandleTest.scala ---------------------------------------------------------------------- diff --git a/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaJobHandleTest.scala b/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaJobHandleTest.scala new file mode 100644 index 0000000..3f5bdc6 --- /dev/null +++ b/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaJobHandleTest.scala @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.livy.scalaapi + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.language.postfixOps +import scala.util.{Failure, Success} + +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.ScalaFutures + +import org.apache.livy.{JobHandle, LivyBaseUnitTestSuite} +import org.apache.livy.JobHandle.{Listener, State} + +class ScalaJobHandleTest extends FunSuite + with ScalaFutures + with BeforeAndAfter + with LivyBaseUnitTestSuite { + + private var mockJobHandle: JobHandle[String] = null + private var scalaJobHandle: ScalaJobHandle[String] = null + private val timeoutInMilliseconds = 5000 + private var listener: JobHandle.Listener[String] = null + + before { + listener = mock(classOf[JobHandle.Listener[String]]) + mockJobHandle = mock(classOf[JobHandle[String]]) + scalaJobHandle = new ScalaJobHandle(mockJobHandle) + } + + test("get result when job is already complete") { + when(mockJobHandle.get(timeoutInMilliseconds, TimeUnit.MILLISECONDS)).thenReturn("hello") + val result = Await.result(scalaJobHandle, 5 seconds) + assert(result == "hello") + verify(mockJobHandle, times(1)).get(timeoutInMilliseconds, TimeUnit.MILLISECONDS) + } + + test("ready when the thread waits for the mentioned duration for job to complete") { + when(mockJobHandle.get(timeoutInMilliseconds, TimeUnit.MILLISECONDS)).thenReturn("hello") + val result = Await.ready(scalaJobHandle, 5 seconds) + assert(result == scalaJobHandle) + verify(mockJobHandle, times(1)).get(timeoutInMilliseconds, TimeUnit.MILLISECONDS) + } + + test("ready with Infinite Duration") { + when(mockJobHandle.isDone).thenReturn(true) + when(mockJobHandle.get()).thenReturn("hello") + val result = Await.ready(scalaJobHandle, Duration.Undefined) + assert(result == scalaJobHandle) + verify(mockJobHandle, times(1)).get() + } + + test("verify addListener call of java jobHandle for onComplete") { + doNothing().when(mockJobHandle).addListener(listener) + scalaJobHandle onComplete { + case Success(t) => {} + } + verify(mockJobHandle).addListener(isA(classOf[Listener[String]])) + verify(mockJobHandle, times(1)).addListener(any()) + } + + test("onComplete Success") { + val jobHandleStub = new AbstractJobHandleStub[String] { + override def addListener(l: Listener[String]): Unit = l.onJobSucceeded(this, "hello") + } + val lock = new CountDownLatch(1) + var testFailure: Option[String] = None + val testScalaHandle = new ScalaJobHandle(jobHandleStub) + testScalaHandle onComplete { + case Success(t) => { + if (!t.equals("hello")) { + testFailure = Some("onComplete has not returned the expected message") + } + lock.countDown() + } + case Failure(e) => { + testFailure = Some("onComplete should not have triggered Failure callback") + lock.countDown() + } + } + ScalaClientTestUtils.assertAwait(lock) + testFailure.foreach(fail(_)) + } + + test("onComplete Failure") { + val jobHandleStub = new AbstractJobHandleStub[String] { + override def addListener(l: Listener[String]): Unit = + l.onJobFailed(this, new CustomTestFailureException) + + override def get(): String = throw new CustomTestFailureException() + } + val lock = new CountDownLatch(1) + var testFailure: Option[String] = None + val testScalaHandle = new ScalaJobHandle(jobHandleStub) + testScalaHandle onComplete { + case Success(t) => { + testFailure = Some("Test should have thrown CustomFailureException") + lock.countDown() + } + case Failure(e) => { + if (!e.isInstanceOf[CustomTestFailureException]) { + testFailure = Some("Test did not throw expected exception - CustomFailureException") + } + lock.countDown() + } + } + ScalaClientTestUtils.assertAwait(lock) + testFailure.foreach(fail(_)) + } + + test("onJobCancelled") { + val jobHandleStub = new AbstractJobHandleStub[String] { + override def addListener(l: Listener[String]): Unit = l.onJobCancelled(this) + override def cancel(mayInterruptIfRunning: Boolean): Boolean = true + } + var testFailure: Option[String] = None + val lock = new CountDownLatch(1) + val testScalaHandle = new ScalaJobHandle(jobHandleStub) + testScalaHandle onJobCancelled { + case true => lock.countDown() + case false => { + testFailure = Some("False callback should not have been triggered") + lock.countDown() + } + } + ScalaClientTestUtils.assertAwait(lock) + testFailure.foreach(fail(_)) + } + + test("onJobQueued") { + val jobHandleStub = new AbstractJobHandleStub[String] { + override def addListener(l: Listener[String]): Unit = l.onJobQueued(this) + } + val lock = new CountDownLatch(1) + val testScalaHandle = new ScalaJobHandle(jobHandleStub) + testScalaHandle onJobQueued { + lock.countDown() + } + ScalaClientTestUtils.assertAwait(lock) + } + + test("onJobStarted") { + val jobHandleStub = new AbstractJobHandleStub[String] { + override def addListener(l: Listener[String]): Unit = l.onJobStarted(this) + } + val lock = new CountDownLatch(1) + val testScalaHandle = new ScalaJobHandle(jobHandleStub) + testScalaHandle onJobStarted { + lock.countDown() + } + ScalaClientTestUtils.assertAwait(lock) + } +} + +private abstract class AbstractJobHandleStub[T] private[livy] extends JobHandle[T] { + + override def getState: State = null + + override def addListener(l: Listener[T]): Unit = {} + + override def isCancelled: Boolean = false + + override def get(): T = null.asInstanceOf[T] + + override def get(timeout: Long, unit: TimeUnit): T = null.asInstanceOf[T] + + override def cancel(mayInterruptIfRunning: Boolean): Boolean = false + + override def isDone: Boolean = true +} http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala/pom.xml ---------------------------------------------------------------------- diff --git a/scala/pom.xml b/scala/pom.xml index 8a2d6ee..d304bb3 100644 --- a/scala/pom.xml +++ b/scala/pom.xml @@ -19,15 +19,15 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>multi-scala-project-root</artifactId> - <version>0.4.0-SNAPSHOT</version> + <version>0.4.0-incubating-SNAPSHOT</version> <packaging>pom</packaging> <parent> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>livy-main</artifactId> - <version>0.4.0-SNAPSHOT</version> + <version>0.4.0-incubating-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scalastyle.xml ---------------------------------------------------------------------- diff --git a/scalastyle.xml b/scalastyle.xml index 2b06267..28609fb 100644 --- a/scalastyle.xml +++ b/scalastyle.xml @@ -122,8 +122,8 @@ <parameter name="groups">java,scala,3rdParty,livy</parameter> <parameter name="group.java">javax?\..*</parameter> <parameter name="group.scala">scala\..*</parameter> - <parameter name="group.3rdParty">(?!com\.cloudera.livy\.).*</parameter> - <parameter name="group.livy">com\.cloudera\.livy\..*</parameter> + <parameter name="group.3rdParty">(?!org\.apache.livy\.).*</parameter> + <parameter name="group.livy">org\.apache\.livy\..*</parameter> </parameters> </check> http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/pom.xml ---------------------------------------------------------------------- diff --git a/server/pom.xml b/server/pom.xml index 280f7c5..1ad6bfa 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -20,14 +20,14 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>livy-main</artifactId> <relativePath>../pom.xml</relativePath> - <version>0.4.0-SNAPSHOT</version> + <version>0.4.0-incubating-SNAPSHOT</version> </parent> <artifactId>livy-server</artifactId> - <version>0.4.0-SNAPSHOT</version> + <version>0.4.0-incubating-SNAPSHOT</version> <packaging>jar</packaging> <dependencies> @@ -53,7 +53,7 @@ </dependency> <dependency> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>livy-test-lib</artifactId> <version>${project.version}</version> <scope>test</scope> @@ -171,7 +171,7 @@ </dependency> <dependency> - <groupId>com.cloudera.livy</groupId> + <groupId>org.apache.livy</groupId> <artifactId>livy-repl_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>test</scope> @@ -234,7 +234,7 @@ <configuration> <archive> <manifest> - <mainClass>com.cloudera.livy.server.Main</mainClass> + <mainClass>org.apache.livy.server.Main</mainClass> </manifest> </archive> <outputDirectory>${project.build.directory}/jars</outputDirectory> http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/resources/com/cloudera/livy/server/ui/static/all-sessions.js ---------------------------------------------------------------------- diff --git a/server/src/main/resources/com/cloudera/livy/server/ui/static/all-sessions.js b/server/src/main/resources/com/cloudera/livy/server/ui/static/all-sessions.js deleted file mode 100644 index 4fe3f8f..0000000 --- a/server/src/main/resources/com/cloudera/livy/server/ui/static/all-sessions.js +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -function appIdLink(session) { - var appUiUrl = session.appInfo.sparkUiUrl; - if (appUiUrl != null) { - return '<a href="' + appUiUrl + '">' + session.appId + "</a>"; - } else { - return session.appId; - } -} - -function tdWrap(str) { - return "<td>" + str + "</td>"; -} - -function loadSessionsTable(sessions) { - $.each(sessions, function(index, session) { - $("#interactive-sessions .sessions-table-body").append( - "<tr>" + - tdWrap(session.id) + - tdWrap(appIdLink(session)) + - tdWrap(session.owner) + - tdWrap(session.proxyUser) + - tdWrap(session.kind) + - tdWrap(session.state) + - "</tr>" - ); - }); -} - -function loadBatchesTable(sessions) { - $.each(sessions, function(index, session) { - $("#batches .sessions-table-body").append( - "<tr>" + - tdWrap(session.id) + - tdWrap(appIdLink(session)) + - tdWrap(session.state) + - "</tr>" - ); - }); -} - -var numSessions = 0; -var numBatches = 0; - -$(document).ready(function () { - $.extend( $.fn.dataTable.defaults, { - stateSave: true, - }); - - var sessionsReq = $.getJSON(location.origin + "/sessions", function(response) { - if (response && response.total > 0) { - $("#interactive-sessions").load("/static/sessions-table.html .sessions-template", function() { - loadSessionsTable(response.sessions); - $("#interactive-sessions-table").DataTable(); - $('#interactive-sessions [data-toggle="tooltip"]').tooltip(); - }); - } - numSessions = response.total; - }); - - var batchesReq = $.getJSON(location.origin + "/batches", function(response) { - if (response && response.total > 0) { - $("#batches").load("/static/batches-table.html .sessions-template", function() { - loadBatchesTable(response.sessions); - $("#batches-table").DataTable(); - $('#batches [data-toggle="tooltip"]').tooltip(); - }); - } - numBatches = response.total; - }); - - $.when(sessionsReq, batchesReq).done(function () { - if (numSessions + numBatches == 0) { - $("#all-sessions").append('<h4>No Sessions or Batches have been created yet.</h4>'); - } - }); -}); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/resources/com/cloudera/livy/server/ui/static/batches-table.html ---------------------------------------------------------------------- diff --git a/server/src/main/resources/com/cloudera/livy/server/ui/static/batches-table.html b/server/src/main/resources/com/cloudera/livy/server/ui/static/batches-table.html deleted file mode 100644 index e0b3213..0000000 --- a/server/src/main/resources/com/cloudera/livy/server/ui/static/batches-table.html +++ /dev/null @@ -1,42 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<h4 id="batches-header" class="sessions-template">Batch Sessions</h4> - -<table id="batches-table" class="table table-striped sessions-table sessions-template"> - <thead class="sessions-table-head"> - <tr> - <th>Batch Id</th> - <th> - <span data-toggle="tooltip" - title="Spark Application Id for this session. - If available, links to Spark Application Web UI"> - Application Id - </span> - </th> - <th> - <span data-toggle="tooltip" - title="Session State (not_started, starting, idle, busy, - shutting_down, error, dead, success)"> - State - </span> - </th> - </tr> - </thead> - <tbody class="sessions-table-body"> - </tbody> -</table> \ No newline at end of file