http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/main/scala/com/cloudera/livy/scalaapi/LivyScalaClient.scala
----------------------------------------------------------------------
diff --git 
a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/LivyScalaClient.scala 
b/scala-api/src/main/scala/com/cloudera/livy/scalaapi/LivyScalaClient.scala
deleted file mode 100644
index 3d26c7a..0000000
--- a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/LivyScalaClient.scala
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.scalaapi
-
-import java.io.File
-import java.net.URI
-import java.util.concurrent.{Executors, Future => JFuture, ScheduledFuture, 
ThreadFactory, TimeUnit}
-
-import scala.concurrent._
-import scala.util.Try
-
-import com.cloudera.livy._
-
-/**
- * A client for submitting Spark-based jobs to a Livy backend.
- * @constructor  Creates a Scala client.
- * @param  livyJavaClient  the Java client of Livy.
- */
-class LivyScalaClient(livyJavaClient: LivyClient) {
-
-  private val executor = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactory {
-    override def newThread(r: Runnable): Thread = {
-      val thread = new Thread(r, "LivyScalaClient-PollingContainer")
-      thread.setDaemon(true)
-      thread
-    }
-  })
-
-  /**
-   * Submits a job for asynchronous execution.
-   *
-   * @param fn The job to be executed. It is a function that takes in a 
ScalaJobContext and
-   * returns the result of the execution of the job with that context.
-   * @return A handle that can be used to monitor the job.
-   */
-  def submit[T](fn: ScalaJobContext => T): ScalaJobHandle[T] = {
-    val job = new Job[T] {
-      @throws(classOf[Exception])
-      override def call(jobContext: JobContext): T = fn(new 
ScalaJobContext(jobContext))
-    }
-    new ScalaJobHandle(livyJavaClient.submit(job))
-  }
-
-  /**
-   * Asks the remote context to run a job immediately.
-   *
-   * Normally, the remote context will queue jobs and execute them based on 
how many worker
-   * threads have been configured. This method will run the submitted job in 
the same thread
-   * processing the RPC message, so that queueing does not apply.
-   *
-   * It's recommended that this method only be used to run code that finishes 
quickly. This
-   * avoids interfering with the normal operation of the context.
-   *
-   * @param fn The job to be executed. It is a function that takes in a 
ScalaJobContext and
-   * returns the result of the execution of the job with that context.
-   * @return A handle that can be used to monitor the job.
-   */
-  def run[T](fn: ScalaJobContext => T): Future[T] = {
-    val job = new Job[T] {
-      @throws(classOf[Exception])
-      override def call(jobContext: JobContext): T = {
-        val scalaJobContext = new ScalaJobContext(jobContext)
-        fn(scalaJobContext)
-      }
-    }
-    new PollingContainer(livyJavaClient.run(job)).poll()
-  }
-
-  /**
-   * Stops the remote context.
-   *
-   * Any pending jobs will be cancelled, and the remote context will be torn 
down.
-   *
-   * @param  shutdownContext  Whether to shutdown the underlying Spark 
context. If false, the
-   *                          context will keep running and it's still 
possible to send commands
-   *                          to it, if the backend being used supports it.
-   */
-  def stop(shutdownContext: Boolean): Unit = {
-    executor.shutdown()
-    livyJavaClient.stop(shutdownContext)
-  }
-
-  /**
-   * Upload a jar to be added to the Spark application classpath.
-   *
-   * @param jar The local file to be uploaded.
-   * @return A future that can be used to monitor this operation.
-   */
-  def uploadJar(jar: File): Future[_] = new 
PollingContainer(livyJavaClient.uploadJar(jar)).poll()
-
-  /**
-   * Adds a jar file to the running remote context.
-   *
-   * Note that the URL should be reachable by the Spark driver process. If 
running the driver
-   * in cluster mode, it may reside on a different host, meaning "file:" URLs 
have to exist
-   * on that node (and not on the client machine).
-   *
-   * If the provided URI has no scheme, it's considered to be relative to the 
default file system
-   * configured in the Livy server.
-   *
-   * @param uri The location of the jar file.
-   * @return A future that can be used to monitor the operation.
-   */
-  def addJar(uri: URI): Future[_] = new 
PollingContainer(livyJavaClient.addJar(uri)).poll()
-
-  /**
-   * Upload a file to be passed to the Spark application.
-   *
-   * @param file The local file to be uploaded.
-   * @return A future that can be used to monitor this operation.
-   */
-  def uploadFile(file: File): Future[_] =
-    new PollingContainer(livyJavaClient.uploadFile(file)).poll()
-
-  /**
-   * Adds a file to the running remote context.
-   *
-   * Note that the URL should be reachable by the Spark driver process. If 
running the driver
-   * in cluster mode, it may reside on a different host, meaning "file:" URLs 
have to exist
-   * on that node (and not on the client machine).
-   *
-   * If the provided URI has no scheme, it's considered to be relative to the 
default file system
-   * configured in the Livy server.
-   *
-   * @param uri The location of the file.
-   * @return A future that can be used to monitor the operation.
-   */
-  def addFile(uri: URI): Future[_] = new 
PollingContainer(livyJavaClient.addFile(uri)).poll()
-
-  private class PollingContainer[T] private[livy] (jFuture: JFuture[T]) 
extends Runnable {
-
-    private val initialDelay = 1
-    private val longDelay = 1
-    private var scheduledFuture: ScheduledFuture[_] = _
-    private val promise = Promise[T]
-
-    def poll(): Future[T] = {
-      scheduledFuture =
-        executor.scheduleWithFixedDelay(this, initialDelay, longDelay, 
TimeUnit.SECONDS)
-      promise.future
-    }
-
-    override def run(): Unit = {
-      if (jFuture.isDone) {
-        promise.complete(Try(getJavaFutureResult(jFuture)))
-        scheduledFuture.cancel(false)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobContext.scala
----------------------------------------------------------------------
diff --git 
a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobContext.scala 
b/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobContext.scala
deleted file mode 100644
index 57a3ab4..0000000
--- a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobContext.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.cloudera.livy.scalaapi
-
-import java.io.File
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.streaming.StreamingContext
-
-import com.cloudera.livy.JobContext
-
-/**
- *  Holds runtime information about the job execution context.
- *
- *  @constructor Creates a ScalaJobContext.
- *  @param context the Java JobContext of Livy.
- */
-class ScalaJobContext private[livy] (context: JobContext) {
-
-  /** The shared SparkContext instance. */
-  def sc: SparkContext = context.sc().sc
-
-  /** The shared SQLContext instance. */
-  def sqlctx: SQLContext = context.sqlctx()
-
-  /** The shared HiveContext instance. */
-  def hivectx: HiveContext = context.hivectx()
-
-  /** Returns the StreamingContext which has already been created. */
-  def streamingctx: StreamingContext = context.streamingctx().ssc
-
-  def sparkSession[E]: E = context.sparkSession()
-
-  /**
-   * Creates the SparkStreaming context.
-   *
-   * @param batchDuration  Time interval at which streaming data will be 
divided into batches,
-   *                       in milliseconds.
-   */
-  def createStreamingContext(batchDuration: Long): Unit =
-    context.createStreamingContext(batchDuration)
-
-  /** Stops the SparkStreaming context. */
-  def stopStreamingContext(): Unit = context.stopStreamingCtx()
-
-  /**
-   * Returns a local tmp dir specific to the context.
-   */
-  def localTmpDir: File = context.getLocalTmpDir
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobHandle.scala
----------------------------------------------------------------------
diff --git 
a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobHandle.scala 
b/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobHandle.scala
deleted file mode 100644
index 7bd2edc..0000000
--- a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/ScalaJobHandle.scala
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.cloudera.livy.scalaapi
-
-import scala.concurrent.{CanAwait, ExecutionContext, Future, TimeoutException}
-import scala.concurrent.duration.Duration
-import scala.util.Try
-
-import com.cloudera.livy.JobHandle
-import com.cloudera.livy.JobHandle.{Listener, State}
-
-/**
- *  A handle to a submitted job. Allows for monitoring and controlling of the 
running remote job.
- *
- *  @constructor Creates a ScalaJobHandle.
- *  @param jobHandle the Java JobHandle of Livy.
- *
- *  @define multipleCallbacks
- *  Multiple callbacks may be registered; there is no guarantee that they will 
be
- *  executed in a particular order.
- *
- *  @define nonDeterministic
- *  Note: using this method yields nondeterministic dataflow programs.
- *
- *  @define callbackInContext
- *  The provided callback always runs in the provided implicit
- *` ExecutionContext`, though there is no guarantee that the
- *  `execute()` method on the `ExecutionContext` will be called once
- *  per callback or that `execute()` will be called in the current
- *  thread. That is, the implementation may run multiple callbacks
- *  in a batch within a single `execute()` and it may run
- *  `execute()` either immediately or asynchronously.
- */
-class ScalaJobHandle[T] private[livy] (jobHandle: JobHandle[T]) extends 
Future[T] {
-
-  /**
-   * Return the current state of the job.
-   */
-  def state: State = jobHandle.getState()
-
-  /**
-   *  When the job is completed, either through an exception, or a value,
-   *  apply the provided function.
-   *
-   *  If the job has already been completed,
-   *  this will either be applied immediately or be scheduled asynchronously.
-   *
-   *  $multipleCallbacks
-   *  $callbackInContext
-   */
-  override def onComplete[U](func: (Try[T]) => U)(implicit executor: 
ExecutionContext): Unit = {
-    jobHandle.addListener(new AbstractScalaJobHandleListener[T] {
-      override def onJobSucceeded(job: JobHandle[T], result: T): Unit = {
-        val onJobSucceededTask = new Runnable {
-          override def run(): Unit = func(Try(result))
-        }
-        executor.execute(onJobSucceededTask)
-      }
-
-      override def onJobFailed(job: JobHandle[T], cause: Throwable): Unit = {
-        val onJobFailedTask = new Runnable {
-          override def run(): Unit = func(Try(getJavaFutureResult(job)))
-        }
-        executor.execute(onJobFailedTask)
-      }
-    })
-  }
-
-  /**
-   *  When this job is queued, apply the provided function.
-   *
-   *  $multipleCallbacks
-   *  $callbackInContext
-   */
-  def onJobQueued[U](func: => Unit)(implicit executor: ExecutionContext): Unit 
= {
-    jobHandle.addListener(new AbstractScalaJobHandleListener[T] {
-      override def onJobQueued(job: JobHandle[T]): Unit = {
-        val onJobQueuedTask = new Runnable {
-          override def run(): Unit = func
-        }
-        executor.execute(onJobQueuedTask)
-      }
-    })
-  }
-
-  /**
-   *  When this job has started, apply the provided function.
-   *
-   *  $multipleCallbacks
-   *  $callbackInContext
-   */
-  def onJobStarted[U](func: => Unit)(implicit executor: ExecutionContext): 
Unit = {
-    jobHandle.addListener(new AbstractScalaJobHandleListener[T] {
-      override def onJobStarted(job: JobHandle[T]): Unit = {
-        val onJobStartedTask = new Runnable {
-          override def run(): Unit = func
-        }
-        executor.execute(onJobStartedTask)
-      }
-    })
-  }
-
-  /**
-   *  When this job is cancelled, apply the provided function.
-   *
-   *  $multipleCallbacks
-   *  $callbackInContext
-   */
-  def onJobCancelled[U](func: Boolean => Unit)(implicit executor: 
ExecutionContext): Unit = {
-    jobHandle.addListener(new AbstractScalaJobHandleListener[T] {
-      override def onJobCancelled(job: JobHandle[T]): Unit = {
-        val onJobCancelledTask = new Runnable {
-          override def run(): Unit = func(job.cancel(false))
-        }
-        executor.execute(onJobCancelledTask)
-      }
-    })
-  }
-
-  /**
-   *  Returns whether the job has already been completed with
-   *  a value or an exception.
-   *
-   *  $nonDeterministic
-   *
-   *  @return    `true` if the job is already completed, `false` otherwise.
-   */
-  override def isCompleted: Boolean = jobHandle.isDone
-
-  /**
-   *  The result value of the job.
-   *
-   *  If the job is not completed the returned value will be `None`.
-   *  If the job is completed the value will be `Some(Success(t))`.
-   *  if it contains a valid result, or `Some(Failure(error))` if it contains
-   *  an exception.
-   */
-  override def value: Option[Try[T]] = {
-    if (isCompleted) {
-      Some(Try(getJavaFutureResult(jobHandle)))
-    } else {
-      None
-    }
-  }
-
-  /**
-   * Supports Scala's Await.result(atmost) which awaits the completion of the 
job and returns the
-   * result (of type `T`).
-   *
-   * @param  atMost
-   *         maximum wait time, which may be negative (no waiting is done),
-   *         [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for 
unbounded waiting,
-   *         or a finite positive duration.
-   * @return the result value if job is completed within the specific maximum 
wait time.
-   * @throws Exception     the underlying exception on the execution of the 
job.
-   */
-  @throws(classOf[Exception])
-  override def result(atMost: Duration)(implicit permit: CanAwait): T =
-    getJavaFutureResult(jobHandle, atMost)
-
-  /**
-   * Supports Scala's Await.ready(atmost) which awaits the completion of the 
job.
-   *
-   * @param  atMost
-   *         maximum wait time, which may be negative (no waiting is done),
-   *         [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for 
unbounded waiting,
-   *         or a finite positive duration.
-   * @return ScalaJobHandle
-   * @throws InterruptedException     if the current thread is interrupted 
while waiting.
-   * @throws TimeoutException         if after waiting for the specified time 
the job
-   *                                  is still not ready.
-   */
-  @throws(classOf[InterruptedException])
-  @throws(classOf[TimeoutException])
-  override def ready(atMost: Duration)(implicit permit: CanAwait): 
ScalaJobHandle.this.type = {
-    getJavaFutureResult(jobHandle, atMost)
-    this
-  }
-}
-
-private abstract class AbstractScalaJobHandleListener[T] extends Listener[T] {
-  override def onJobQueued(job: JobHandle[T]): Unit = {}
-
-  override def onJobCancelled(job: JobHandle[T]): Unit = {}
-
-  override def onJobSucceeded(job: JobHandle[T], result: T): Unit = {}
-
-  override def onJobStarted(job: JobHandle[T]): Unit = {}
-
-  override def onJobFailed(job: JobHandle[T], cause: Throwable): Unit = {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/main/scala/com/cloudera/livy/scalaapi/package.scala
----------------------------------------------------------------------
diff --git a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/package.scala 
b/scala-api/src/main/scala/com/cloudera/livy/scalaapi/package.scala
deleted file mode 100644
index 71c8a7c..0000000
--- a/scala-api/src/main/scala/com/cloudera/livy/scalaapi/package.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy
-
-import java.util.concurrent.{ExecutionException, Future => JFuture, TimeUnit}
-
-import scala.concurrent.duration.Duration
-
-package object scalaapi {
-
-  /**
-   *  A Scala Client for Livy which is a wrapper over the Java client.
-   *  @constructor Creates a Scala client.
-   *  @param livyJavaClient  the Java client of Livy.
-   *  {{{
-   *     import com.cloudera.livy._
-   *     import com.cloudera.livy.scalaapi._
-   *     val url = "http://example.com";
-   *     val livyJavaClient = new LivyClientBuilder(false).setURI(new 
URI(url))).build()
-   *     val livyScalaClient = livyJavaClient.asScalaClient
-   *  }}}
-   */
-  implicit class ScalaWrapper(livyJavaClient: LivyClient) {
-    def asScalaClient: LivyScalaClient = new LivyScalaClient(livyJavaClient)
-  }
-
-  private[livy] def getJavaFutureResult[T](jFuture: JFuture[T],
-                                           atMost: Duration = 
Duration.Undefined): T = {
-    try {
-      if (!atMost.isFinite()) jFuture.get else jFuture.get(atMost.toMillis, 
TimeUnit.MILLISECONDS)
-    } catch {
-      case executionException: ExecutionException => throw 
executionException.getCause
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/main/scala/org/apache/livy/scalaapi/LivyScalaClient.scala
----------------------------------------------------------------------
diff --git 
a/scala-api/src/main/scala/org/apache/livy/scalaapi/LivyScalaClient.scala 
b/scala-api/src/main/scala/org/apache/livy/scalaapi/LivyScalaClient.scala
new file mode 100644
index 0000000..ca7199a
--- /dev/null
+++ b/scala-api/src/main/scala/org/apache/livy/scalaapi/LivyScalaClient.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.scalaapi
+
+import java.io.File
+import java.net.URI
+import java.util.concurrent.{Executors, Future => JFuture, ScheduledFuture, 
ThreadFactory, TimeUnit}
+
+import scala.concurrent._
+import scala.util.Try
+
+import org.apache.livy._
+
+/**
+ * A client for submitting Spark-based jobs to a Livy backend.
+ * @constructor  Creates a Scala client.
+ * @param  livyJavaClient  the Java client of Livy.
+ */
+class LivyScalaClient(livyJavaClient: LivyClient) {
+
+  private val executor = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactory {
+    override def newThread(r: Runnable): Thread = {
+      val thread = new Thread(r, "LivyScalaClient-PollingContainer")
+      thread.setDaemon(true)
+      thread
+    }
+  })
+
+  /**
+   * Submits a job for asynchronous execution.
+   *
+   * @param fn The job to be executed. It is a function that takes in a 
ScalaJobContext and
+   * returns the result of the execution of the job with that context.
+   * @return A handle that can be used to monitor the job.
+   */
+  def submit[T](fn: ScalaJobContext => T): ScalaJobHandle[T] = {
+    val job = new Job[T] {
+      @throws(classOf[Exception])
+      override def call(jobContext: JobContext): T = fn(new 
ScalaJobContext(jobContext))
+    }
+    new ScalaJobHandle(livyJavaClient.submit(job))
+  }
+
+  /**
+   * Asks the remote context to run a job immediately.
+   *
+   * Normally, the remote context will queue jobs and execute them based on 
how many worker
+   * threads have been configured. This method will run the submitted job in 
the same thread
+   * processing the RPC message, so that queueing does not apply.
+   *
+   * It's recommended that this method only be used to run code that finishes 
quickly. This
+   * avoids interfering with the normal operation of the context.
+   *
+   * @param fn The job to be executed. It is a function that takes in a 
ScalaJobContext and
+   * returns the result of the execution of the job with that context.
+   * @return A handle that can be used to monitor the job.
+   */
+  def run[T](fn: ScalaJobContext => T): Future[T] = {
+    val job = new Job[T] {
+      @throws(classOf[Exception])
+      override def call(jobContext: JobContext): T = {
+        val scalaJobContext = new ScalaJobContext(jobContext)
+        fn(scalaJobContext)
+      }
+    }
+    new PollingContainer(livyJavaClient.run(job)).poll()
+  }
+
+  /**
+   * Stops the remote context.
+   *
+   * Any pending jobs will be cancelled, and the remote context will be torn 
down.
+   *
+   * @param  shutdownContext  Whether to shutdown the underlying Spark 
context. If false, the
+   *                          context will keep running and it's still 
possible to send commands
+   *                          to it, if the backend being used supports it.
+   */
+  def stop(shutdownContext: Boolean): Unit = {
+    executor.shutdown()
+    livyJavaClient.stop(shutdownContext)
+  }
+
+  /**
+   * Upload a jar to be added to the Spark application classpath.
+   *
+   * @param jar The local file to be uploaded.
+   * @return A future that can be used to monitor this operation.
+   */
+  def uploadJar(jar: File): Future[_] = new 
PollingContainer(livyJavaClient.uploadJar(jar)).poll()
+
+  /**
+   * Adds a jar file to the running remote context.
+   *
+   * Note that the URL should be reachable by the Spark driver process. If 
running the driver
+   * in cluster mode, it may reside on a different host, meaning "file:" URLs 
have to exist
+   * on that node (and not on the client machine).
+   *
+   * If the provided URI has no scheme, it's considered to be relative to the 
default file system
+   * configured in the Livy server.
+   *
+   * @param uri The location of the jar file.
+   * @return A future that can be used to monitor the operation.
+   */
+  def addJar(uri: URI): Future[_] = new 
PollingContainer(livyJavaClient.addJar(uri)).poll()
+
+  /**
+   * Upload a file to be passed to the Spark application.
+   *
+   * @param file The local file to be uploaded.
+   * @return A future that can be used to monitor this operation.
+   */
+  def uploadFile(file: File): Future[_] =
+    new PollingContainer(livyJavaClient.uploadFile(file)).poll()
+
+  /**
+   * Adds a file to the running remote context.
+   *
+   * Note that the URL should be reachable by the Spark driver process. If 
running the driver
+   * in cluster mode, it may reside on a different host, meaning "file:" URLs 
have to exist
+   * on that node (and not on the client machine).
+   *
+   * If the provided URI has no scheme, it's considered to be relative to the 
default file system
+   * configured in the Livy server.
+   *
+   * @param uri The location of the file.
+   * @return A future that can be used to monitor the operation.
+   */
+  def addFile(uri: URI): Future[_] = new 
PollingContainer(livyJavaClient.addFile(uri)).poll()
+
+  private class PollingContainer[T] private[livy] (jFuture: JFuture[T]) 
extends Runnable {
+
+    private val initialDelay = 1
+    private val longDelay = 1
+    private var scheduledFuture: ScheduledFuture[_] = _
+    private val promise = Promise[T]
+
+    def poll(): Future[T] = {
+      scheduledFuture =
+        executor.scheduleWithFixedDelay(this, initialDelay, longDelay, 
TimeUnit.SECONDS)
+      promise.future
+    }
+
+    override def run(): Unit = {
+      if (jFuture.isDone) {
+        promise.complete(Try(getJavaFutureResult(jFuture)))
+        scheduledFuture.cancel(false)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobContext.scala
----------------------------------------------------------------------
diff --git 
a/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobContext.scala 
b/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobContext.scala
new file mode 100644
index 0000000..ee082ec
--- /dev/null
+++ b/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobContext.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.scalaapi
+
+import java.io.File
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.streaming.StreamingContext
+
+import org.apache.livy.JobContext
+
+/**
+ *  Holds runtime information about the job execution context.
+ *
+ *  @constructor Creates a ScalaJobContext.
+ *  @param context the Java JobContext of Livy.
+ */
+class ScalaJobContext private[livy] (context: JobContext) {
+
+  /** The shared SparkContext instance. */
+  def sc: SparkContext = context.sc().sc
+
+  /** The shared SQLContext instance. */
+  def sqlctx: SQLContext = context.sqlctx()
+
+  /** The shared HiveContext instance. */
+  def hivectx: HiveContext = context.hivectx()
+
+  /** Returns the StreamingContext which has already been created. */
+  def streamingctx: StreamingContext = context.streamingctx().ssc
+
+  def sparkSession[E]: E = context.sparkSession()
+
+  /**
+   * Creates the SparkStreaming context.
+   *
+   * @param batchDuration  Time interval at which streaming data will be 
divided into batches,
+   *                       in milliseconds.
+   */
+  def createStreamingContext(batchDuration: Long): Unit =
+    context.createStreamingContext(batchDuration)
+
+  /** Stops the SparkStreaming context. */
+  def stopStreamingContext(): Unit = context.stopStreamingCtx()
+
+  /**
+   * Returns a local tmp dir specific to the context.
+   */
+  def localTmpDir: File = context.getLocalTmpDir
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobHandle.scala
----------------------------------------------------------------------
diff --git 
a/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobHandle.scala 
b/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobHandle.scala
new file mode 100644
index 0000000..d1cf29d
--- /dev/null
+++ b/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobHandle.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.scalaapi
+
+import scala.concurrent.{CanAwait, ExecutionContext, Future, TimeoutException}
+import scala.concurrent.duration.Duration
+import scala.util.Try
+
+import org.apache.livy.JobHandle
+import org.apache.livy.JobHandle.{Listener, State}
+
+/**
+ *  A handle to a submitted job. Allows for monitoring and controlling of the 
running remote job.
+ *
+ *  @constructor Creates a ScalaJobHandle.
+ *  @param jobHandle the Java JobHandle of Livy.
+ *
+ *  @define multipleCallbacks
+ *  Multiple callbacks may be registered; there is no guarantee that they will 
be
+ *  executed in a particular order.
+ *
+ *  @define nonDeterministic
+ *  Note: using this method yields nondeterministic dataflow programs.
+ *
+ *  @define callbackInContext
+ *  The provided callback always runs in the provided implicit
+ *` ExecutionContext`, though there is no guarantee that the
+ *  `execute()` method on the `ExecutionContext` will be called once
+ *  per callback or that `execute()` will be called in the current
+ *  thread. That is, the implementation may run multiple callbacks
+ *  in a batch within a single `execute()` and it may run
+ *  `execute()` either immediately or asynchronously.
+ */
+class ScalaJobHandle[T] private[livy] (jobHandle: JobHandle[T]) extends 
Future[T] {
+
+  /**
+   * Return the current state of the job.
+   */
+  def state: State = jobHandle.getState()
+
+  /**
+   *  When the job is completed, either through an exception, or a value,
+   *  apply the provided function.
+   *
+   *  If the job has already been completed,
+   *  this will either be applied immediately or be scheduled asynchronously.
+   *
+   *  $multipleCallbacks
+   *  $callbackInContext
+   */
+  override def onComplete[U](func: (Try[T]) => U)(implicit executor: 
ExecutionContext): Unit = {
+    jobHandle.addListener(new AbstractScalaJobHandleListener[T] {
+      override def onJobSucceeded(job: JobHandle[T], result: T): Unit = {
+        val onJobSucceededTask = new Runnable {
+          override def run(): Unit = func(Try(result))
+        }
+        executor.execute(onJobSucceededTask)
+      }
+
+      override def onJobFailed(job: JobHandle[T], cause: Throwable): Unit = {
+        val onJobFailedTask = new Runnable {
+          override def run(): Unit = func(Try(getJavaFutureResult(job)))
+        }
+        executor.execute(onJobFailedTask)
+      }
+    })
+  }
+
+  /**
+   *  When this job is queued, apply the provided function.
+   *
+   *  $multipleCallbacks
+   *  $callbackInContext
+   */
+  def onJobQueued[U](func: => Unit)(implicit executor: ExecutionContext): Unit 
= {
+    jobHandle.addListener(new AbstractScalaJobHandleListener[T] {
+      override def onJobQueued(job: JobHandle[T]): Unit = {
+        val onJobQueuedTask = new Runnable {
+          override def run(): Unit = func
+        }
+        executor.execute(onJobQueuedTask)
+      }
+    })
+  }
+
+  /**
+   *  When this job has started, apply the provided function.
+   *
+   *  $multipleCallbacks
+   *  $callbackInContext
+   */
+  def onJobStarted[U](func: => Unit)(implicit executor: ExecutionContext): 
Unit = {
+    jobHandle.addListener(new AbstractScalaJobHandleListener[T] {
+      override def onJobStarted(job: JobHandle[T]): Unit = {
+        val onJobStartedTask = new Runnable {
+          override def run(): Unit = func
+        }
+        executor.execute(onJobStartedTask)
+      }
+    })
+  }
+
+  /**
+   *  When this job is cancelled, apply the provided function.
+   *
+   *  $multipleCallbacks
+   *  $callbackInContext
+   */
+  def onJobCancelled[U](func: Boolean => Unit)(implicit executor: 
ExecutionContext): Unit = {
+    jobHandle.addListener(new AbstractScalaJobHandleListener[T] {
+      override def onJobCancelled(job: JobHandle[T]): Unit = {
+        val onJobCancelledTask = new Runnable {
+          override def run(): Unit = func(job.cancel(false))
+        }
+        executor.execute(onJobCancelledTask)
+      }
+    })
+  }
+
+  /**
+   *  Returns whether the job has already been completed with
+   *  a value or an exception.
+   *
+   *  $nonDeterministic
+   *
+   *  @return    `true` if the job is already completed, `false` otherwise.
+   */
+  override def isCompleted: Boolean = jobHandle.isDone
+
+  /**
+   *  The result value of the job.
+   *
+   *  If the job is not completed the returned value will be `None`.
+   *  If the job is completed the value will be `Some(Success(t))`.
+   *  if it contains a valid result, or `Some(Failure(error))` if it contains
+   *  an exception.
+   */
+  override def value: Option[Try[T]] = {
+    if (isCompleted) {
+      Some(Try(getJavaFutureResult(jobHandle)))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Supports Scala's Await.result(atmost) which awaits the completion of the 
job and returns the
+   * result (of type `T`).
+   *
+   * @param  atMost
+   *         maximum wait time, which may be negative (no waiting is done),
+   *         [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for 
unbounded waiting,
+   *         or a finite positive duration.
+   * @return the result value if job is completed within the specific maximum 
wait time.
+   * @throws Exception     the underlying exception on the execution of the 
job.
+   */
+  @throws(classOf[Exception])
+  override def result(atMost: Duration)(implicit permit: CanAwait): T =
+    getJavaFutureResult(jobHandle, atMost)
+
+  /**
+   * Supports Scala's Await.ready(atmost) which awaits the completion of the 
job.
+   *
+   * @param  atMost
+   *         maximum wait time, which may be negative (no waiting is done),
+   *         [[scala.concurrent.duration.Duration.Inf Duration.Inf]] for 
unbounded waiting,
+   *         or a finite positive duration.
+   * @return ScalaJobHandle
+   * @throws InterruptedException     if the current thread is interrupted 
while waiting.
+   * @throws TimeoutException         if after waiting for the specified time 
the job
+   *                                  is still not ready.
+   */
+  @throws(classOf[InterruptedException])
+  @throws(classOf[TimeoutException])
+  override def ready(atMost: Duration)(implicit permit: CanAwait): 
ScalaJobHandle.this.type = {
+    getJavaFutureResult(jobHandle, atMost)
+    this
+  }
+}
+
+private abstract class AbstractScalaJobHandleListener[T] extends Listener[T] {
+  override def onJobQueued(job: JobHandle[T]): Unit = {}
+
+  override def onJobCancelled(job: JobHandle[T]): Unit = {}
+
+  override def onJobSucceeded(job: JobHandle[T], result: T): Unit = {}
+
+  override def onJobStarted(job: JobHandle[T]): Unit = {}
+
+  override def onJobFailed(job: JobHandle[T], cause: Throwable): Unit = {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/main/scala/org/apache/livy/scalaapi/package.scala
----------------------------------------------------------------------
diff --git a/scala-api/src/main/scala/org/apache/livy/scalaapi/package.scala 
b/scala-api/src/main/scala/org/apache/livy/scalaapi/package.scala
new file mode 100644
index 0000000..6e53a37
--- /dev/null
+++ b/scala-api/src/main/scala/org/apache/livy/scalaapi/package.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy
+
+import java.util.concurrent.{ExecutionException, Future => JFuture, TimeUnit}
+
+import scala.concurrent.duration.Duration
+
+package object scalaapi {
+
+  /**
+   *  A Scala Client for Livy which is a wrapper over the Java client.
+   *  @constructor Creates a Scala client.
+   *  @param livyJavaClient  the Java client of Livy.
+   *  {{{
+   *     import com.cloudera.livy._
+   *     import com.cloudera.livy.scalaapi._
+   *     val url = "http://example.com";
+   *     val livyJavaClient = new LivyClientBuilder(false).setURI(new 
URI(url))).build()
+   *     val livyScalaClient = livyJavaClient.asScalaClient
+   *  }}}
+   */
+  implicit class ScalaWrapper(livyJavaClient: LivyClient) {
+    def asScalaClient: LivyScalaClient = new LivyScalaClient(livyJavaClient)
+  }
+
+  private[livy] def getJavaFutureResult[T](jFuture: JFuture[T],
+                                           atMost: Duration = 
Duration.Undefined): T = {
+    try {
+      if (!atMost.isFinite()) jFuture.get else jFuture.get(atMost.toMillis, 
TimeUnit.MILLISECONDS)
+    } catch {
+      case executionException: ExecutionException => throw 
executionException.getCause
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaClientTest.scala
----------------------------------------------------------------------
diff --git 
a/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaClientTest.scala 
b/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaClientTest.scala
deleted file mode 100644
index 1bf879c..0000000
--- a/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaClientTest.scala
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.scalaapi
-
-import java.io._
-import java.net.URI
-import java.nio.charset.StandardCharsets._
-import java.util._
-import java.util.concurrent.CountDownLatch
-import java.util.jar.JarOutputStream
-import java.util.zip.ZipEntry
-
-import scala.concurrent.Await
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.{Failure, Success}
-
-import org.apache.spark.SparkFiles
-import org.apache.spark.launcher.SparkLauncher
-import org.scalatest.{BeforeAndAfter, FunSuite}
-import org.scalatest.concurrent.ScalaFutures
-
-import com.cloudera.livy.LivyBaseUnitTestSuite
-import com.cloudera.livy.rsc.RSCConf.Entry._
-
-class ScalaClientTest extends FunSuite
-  with ScalaFutures
-  with BeforeAndAfter
-  with LivyBaseUnitTestSuite {
-
-  import com.cloudera.livy._
-
-  private var client: LivyScalaClient = _
-
-  after {
-    if (client != null) {
-      client.stop(true)
-      client = null
-    }
-  }
-
-  test("test Job Submission") {
-    configureClient(true)
-    val jobHandle = client.submit(ScalaClientTestUtils.helloJob)
-    ScalaClientTestUtils.assertTestPassed(jobHandle, "hello")
-  }
-
-  test("test Simple Spark Job") {
-    configureClient(true)
-    val sFuture = client.submit(ScalaClientTestUtils.simpleSparkJob)
-    ScalaClientTestUtils.assertTestPassed(sFuture, 5)
-  }
-
-  test("test Job Failure") {
-    configureClient(true)
-    val sFuture = client.submit(ScalaClientTestUtils.throwExceptionJob)
-    val lock = new CountDownLatch(1)
-    var testFailure : Option[String] = None
-    sFuture onComplete {
-      case Success(t) => {
-        testFailure = Some("Test should have thrown CustomFailureException")
-        lock.countDown()
-      }
-      case Failure(e) => {
-        if (!e.getMessage.contains("CustomTestFailureException")) {
-          testFailure = Some("Test did not throw expected exception - 
CustomFailureException")
-        }
-        lock.countDown()
-      }
-    }
-    ScalaClientTestUtils.assertAwait(lock)
-    testFailure.foreach(fail(_))
-  }
-
-  test("test Sync Rpc") {
-    configureClient(true)
-    val future = client.run(ScalaClientTestUtils.helloJob)
-    ScalaClientTestUtils.assertTestPassed(future, "hello")
-  }
-
-  test("test Remote client") {
-    configureClient(false)
-    val sFuture = client.submit(ScalaClientTestUtils.simpleSparkJob)
-    ScalaClientTestUtils.assertTestPassed(sFuture, 5)
-  }
-
-  test("test add file") {
-    configureClient(true)
-    val file = File.createTempFile("test", ".file")
-    val fileStream = new FileOutputStream(file)
-    fileStream.write("test file".getBytes("UTF-8"))
-    fileStream.close
-    val addFileFuture = client.addFile(new URI("file:" + 
file.getAbsolutePath()))
-    Await.ready(addFileFuture, ScalaClientTestUtils.Timeout second)
-    val sFuture = client.submit { context =>
-      ScalaClientTest.fileOperation(false, file.getName, context)
-    }
-    ScalaClientTestUtils.assertTestPassed(sFuture, "test file")
-  }
-
-  test("test add jar") {
-    configureClient(true)
-    val jar = File.createTempFile("test", ".resource")
-    val jarFile = new JarOutputStream(new FileOutputStream(jar))
-    jarFile.putNextEntry(new ZipEntry("test.resource"))
-    jarFile.write("test resource".getBytes("UTF-8"))
-    jarFile.closeEntry()
-    jarFile.close()
-    val addJarFuture = client.addJar(new URI("file:" + jar.getAbsolutePath()))
-    Await.ready(addJarFuture, ScalaClientTestUtils.Timeout second)
-    val sFuture = client.submit { context =>
-      ScalaClientTest.fileOperation(true, "test.resource", context)
-    }
-    ScalaClientTestUtils.assertTestPassed(sFuture, "test resource")
-  }
-
-  test("Successive onComplete callbacks") {
-    var testFailure: Option[String] = None
-    configureClient(true)
-    val future = client.run(ScalaClientTestUtils.helloJob)
-    val lock = new CountDownLatch(3)
-    for (i <- 0 to 2) {
-      future onComplete {
-        case Success(t) => {
-          if (!t.equals("hello")) testFailure = Some("Expected message not 
returned")
-          lock.countDown()
-        }
-        case Failure(e) => {
-          testFailure = Some("onComplete should not have triggered Failure 
callback")
-          lock.countDown()
-        }
-      }
-    }
-    ScalaClientTestUtils.assertAwait(lock)
-    testFailure.foreach(fail(_))
-  }
-
-  private def configureClient(local: Boolean) = {
-    val conf = ScalaClientTest.createConf(local)
-    val javaClient = new LivyClientBuilder(false).setURI(new 
URI("rsc:/")).setAll(conf).build()
-    client = javaClient.asScalaClient
-    pingJob()
-  }
-
-  private def pingJob() = {
-    val future = client.submit { context =>
-      null
-    }
-    ScalaClientTestUtils.assertTestPassed(future, null)
-  }
-}
-
-class CustomTestFailureException extends RuntimeException {}
-
-object ScalaClientTest {
-
-  def createConf(local: Boolean): Properties = {
-    val conf = new Properties
-    if (local) {
-      conf.put(CLIENT_IN_PROCESS.key, "true")
-      conf.put(SparkLauncher.SPARK_MASTER, "local")
-      conf.put("spark.app.name", "SparkClientSuite Local App")
-    } else {
-      val classpath: String = System.getProperty("java.class.path")
-      conf.put("spark.app.name", "SparkClientSuite Remote App")
-      conf.put(SparkLauncher.DRIVER_MEMORY, "512m")
-      conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, classpath)
-      conf.put(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH, classpath)
-    }
-    conf.put(LIVY_JARS.key, "")
-    conf
-  }
-
-  def fileOperation(isResource: Boolean, fileName: String, context: 
ScalaJobContext): String = {
-    val arr = Seq(1)
-    val rdd = context.sc.parallelize(arr).map { value =>
-      var inputStream: InputStream = null
-      if (isResource) {
-        val ccl = Thread.currentThread.getContextClassLoader
-        inputStream = ccl.getResourceAsStream(fileName)
-      } else {
-        inputStream = new FileInputStream(SparkFiles.get(fileName))
-      }
-      try {
-        val out = new ByteArrayOutputStream()
-        val buffer = new Array[Byte](1024)
-        var read = inputStream.read(buffer)
-        while (read >= 0) {
-          out.write(buffer, 0, read)
-          read = inputStream.read(buffer)
-        }
-        val bytes = out.toByteArray
-        new String(bytes, 0, bytes.length, UTF_8)
-      } finally {
-        inputStream.close()
-      }
-    }
-    rdd.collect().head
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaClientTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaClientTestUtils.scala
 
b/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaClientTestUtils.scala
deleted file mode 100644
index 8727db5..0000000
--- 
a/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaClientTestUtils.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.cloudera.livy.scalaapi
-
-import java.util.Random
-import java.util.concurrent.{CountDownLatch, TimeUnit}
-
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{Await, Future}
-import scala.concurrent.duration._
-
-import org.scalatest.FunSuite
-
-import com.cloudera.livy.LivyBaseUnitTestSuite
-
-object ScalaClientTestUtils extends FunSuite with LivyBaseUnitTestSuite {
-
-  val Timeout = 40
-
-  def helloJob(context: ScalaJobContext): String = "hello"
-
-  def throwExceptionJob(context: ScalaJobContext): Unit = throw new 
CustomTestFailureException
-
-  def simpleSparkJob(context: ScalaJobContext): Long = {
-    val r = new Random
-    val count = 5
-    val partitions = Math.min(r.nextInt(10) + 1, count)
-    val buffer = new ArrayBuffer[Int]()
-    for (a <- 1 to count) {
-      buffer += r.nextInt()
-    }
-    context.sc.parallelize(buffer, partitions).count()
-  }
-
-  def assertAwait(lock: CountDownLatch): Unit = {
-    assert(lock.await(Timeout, TimeUnit.SECONDS) == true)
-  }
-
-  def assertTestPassed[T](future: Future[T], expectedValue: T): Unit = {
-    val result = Await.result(future, Timeout second)
-    assert(result === expectedValue)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaJobHandleTest.scala
----------------------------------------------------------------------
diff --git 
a/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaJobHandleTest.scala 
b/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaJobHandleTest.scala
deleted file mode 100644
index 7dd9444..0000000
--- 
a/scala-api/src/test/scala/com/cloudera/livy/scalaapi/ScalaJobHandleTest.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.cloudera.livy.scalaapi
-
-import java.util.concurrent.{CountDownLatch, TimeUnit}
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.language.postfixOps
-import scala.util.{Failure, Success}
-
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.{BeforeAndAfter, FunSuite}
-import org.scalatest.concurrent.ScalaFutures
-
-import com.cloudera.livy.{JobHandle, LivyBaseUnitTestSuite}
-import com.cloudera.livy.JobHandle.{Listener, State}
-
-class ScalaJobHandleTest extends FunSuite
-  with ScalaFutures
-  with BeforeAndAfter
-  with LivyBaseUnitTestSuite {
-
-  private var mockJobHandle: JobHandle[String] = null
-  private var scalaJobHandle: ScalaJobHandle[String] = null
-  private val timeoutInMilliseconds = 5000
-  private var listener: JobHandle.Listener[String] = null
-
-  before {
-    listener = mock(classOf[JobHandle.Listener[String]])
-    mockJobHandle = mock(classOf[JobHandle[String]])
-    scalaJobHandle = new ScalaJobHandle(mockJobHandle)
-  }
-
-  test("get result when job is already complete") {
-    when(mockJobHandle.get(timeoutInMilliseconds, 
TimeUnit.MILLISECONDS)).thenReturn("hello")
-    val result = Await.result(scalaJobHandle, 5 seconds)
-    assert(result == "hello")
-    verify(mockJobHandle, times(1)).get(timeoutInMilliseconds, 
TimeUnit.MILLISECONDS)
-  }
-
-  test("ready when the thread waits for the mentioned duration for job to 
complete") {
-    when(mockJobHandle.get(timeoutInMilliseconds, 
TimeUnit.MILLISECONDS)).thenReturn("hello")
-    val result = Await.ready(scalaJobHandle, 5 seconds)
-    assert(result == scalaJobHandle)
-    verify(mockJobHandle, times(1)).get(timeoutInMilliseconds, 
TimeUnit.MILLISECONDS)
-  }
-
-  test("ready with Infinite Duration") {
-    when(mockJobHandle.isDone).thenReturn(true)
-    when(mockJobHandle.get()).thenReturn("hello")
-    val result = Await.ready(scalaJobHandle, Duration.Undefined)
-    assert(result == scalaJobHandle)
-    verify(mockJobHandle, times(1)).get()
-  }
-
-  test("verify addListener call of java jobHandle for onComplete") {
-    doNothing().when(mockJobHandle).addListener(listener)
-    scalaJobHandle onComplete {
-      case Success(t) => {}
-    }
-    verify(mockJobHandle).addListener(isA(classOf[Listener[String]]))
-    verify(mockJobHandle, times(1)).addListener(any())
-  }
-
-  test("onComplete Success") {
-    val jobHandleStub = new AbstractJobHandleStub[String] {
-      override def addListener(l: Listener[String]): Unit = 
l.onJobSucceeded(this, "hello")
-    }
-    val lock = new CountDownLatch(1)
-    var testFailure: Option[String] = None
-    val testScalaHandle = new ScalaJobHandle(jobHandleStub)
-    testScalaHandle onComplete {
-      case Success(t) => {
-        if (!t.equals("hello")) {
-          testFailure = Some("onComplete has not returned the expected 
message")
-        }
-        lock.countDown()
-      }
-      case Failure(e) => {
-        testFailure = Some("onComplete should not have triggered Failure 
callback")
-        lock.countDown()
-      }
-    }
-    ScalaClientTestUtils.assertAwait(lock)
-    testFailure.foreach(fail(_))
-  }
-
-  test("onComplete Failure") {
-    val jobHandleStub = new AbstractJobHandleStub[String] {
-      override def addListener(l: Listener[String]): Unit =
-        l.onJobFailed(this, new CustomTestFailureException)
-
-      override def get(): String = throw new CustomTestFailureException()
-    }
-    val lock = new CountDownLatch(1)
-    var testFailure: Option[String] = None
-    val testScalaHandle = new ScalaJobHandle(jobHandleStub)
-    testScalaHandle onComplete {
-      case Success(t) => {
-        testFailure = Some("Test should have thrown CustomFailureException")
-        lock.countDown()
-      }
-      case Failure(e) => {
-        if (!e.isInstanceOf[CustomTestFailureException]) {
-          testFailure = Some("Test did not throw expected exception - 
CustomFailureException")
-        }
-        lock.countDown()
-      }
-    }
-    ScalaClientTestUtils.assertAwait(lock)
-    testFailure.foreach(fail(_))
-  }
-
-  test("onJobCancelled") {
-    val jobHandleStub = new AbstractJobHandleStub[String] {
-      override def addListener(l: Listener[String]): Unit = 
l.onJobCancelled(this)
-      override def cancel(mayInterruptIfRunning: Boolean): Boolean = true
-    }
-    var testFailure: Option[String] = None
-    val lock = new CountDownLatch(1)
-    val testScalaHandle = new ScalaJobHandle(jobHandleStub)
-    testScalaHandle onJobCancelled  {
-      case true => lock.countDown()
-      case false => {
-        testFailure = Some("False callback should not have been triggered")
-        lock.countDown()
-      }
-    }
-    ScalaClientTestUtils.assertAwait(lock)
-    testFailure.foreach(fail(_))
-  }
-
-  test("onJobQueued") {
-    val jobHandleStub = new AbstractJobHandleStub[String] {
-      override def addListener(l: Listener[String]): Unit = l.onJobQueued(this)
-    }
-    val lock = new CountDownLatch(1)
-    val testScalaHandle = new ScalaJobHandle(jobHandleStub)
-    testScalaHandle onJobQueued {
-      lock.countDown()
-    }
-    ScalaClientTestUtils.assertAwait(lock)
-  }
-
-  test("onJobStarted") {
-    val jobHandleStub = new AbstractJobHandleStub[String] {
-      override def addListener(l: Listener[String]): Unit = 
l.onJobStarted(this)
-    }
-    val lock = new CountDownLatch(1)
-    val testScalaHandle = new ScalaJobHandle(jobHandleStub)
-    testScalaHandle onJobStarted {
-      lock.countDown()
-    }
-    ScalaClientTestUtils.assertAwait(lock)
-  }
-}
-
-private abstract class AbstractJobHandleStub[T] private[livy] extends 
JobHandle[T] {
-
-  override def getState: State = null
-
-  override def addListener(l: Listener[T]): Unit = {}
-
-  override def isCancelled: Boolean = false
-
-  override def get(): T = null.asInstanceOf[T]
-
-  override def get(timeout: Long, unit: TimeUnit): T = null.asInstanceOf[T]
-
-  override def cancel(mayInterruptIfRunning: Boolean): Boolean = false
-
-  override def isDone: Boolean = true
-}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTest.scala
----------------------------------------------------------------------
diff --git 
a/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTest.scala 
b/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTest.scala
new file mode 100644
index 0000000..79760b7
--- /dev/null
+++ b/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTest.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.scalaapi
+
+import java.io._
+import java.net.URI
+import java.nio.charset.StandardCharsets._
+import java.util._
+import java.util.concurrent.CountDownLatch
+import java.util.jar.JarOutputStream
+import java.util.zip.ZipEntry
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.{Failure, Success}
+
+import org.apache.spark.SparkFiles
+import org.apache.spark.launcher.SparkLauncher
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.ScalaFutures
+
+import org.apache.livy.LivyBaseUnitTestSuite
+import org.apache.livy.rsc.RSCConf.Entry._
+
+class ScalaClientTest extends FunSuite
+  with ScalaFutures
+  with BeforeAndAfter
+  with LivyBaseUnitTestSuite {
+
+  import org.apache.livy._
+
+  private var client: LivyScalaClient = _
+
+  after {
+    if (client != null) {
+      client.stop(true)
+      client = null
+    }
+  }
+
+  test("test Job Submission") {
+    configureClient(true)
+    val jobHandle = client.submit(ScalaClientTestUtils.helloJob)
+    ScalaClientTestUtils.assertTestPassed(jobHandle, "hello")
+  }
+
+  test("test Simple Spark Job") {
+    configureClient(true)
+    val sFuture = client.submit(ScalaClientTestUtils.simpleSparkJob)
+    ScalaClientTestUtils.assertTestPassed(sFuture, 5)
+  }
+
+  test("test Job Failure") {
+    configureClient(true)
+    val sFuture = client.submit(ScalaClientTestUtils.throwExceptionJob)
+    val lock = new CountDownLatch(1)
+    var testFailure : Option[String] = None
+    sFuture onComplete {
+      case Success(t) => {
+        testFailure = Some("Test should have thrown CustomFailureException")
+        lock.countDown()
+      }
+      case Failure(e) => {
+        if (!e.getMessage.contains("CustomTestFailureException")) {
+          testFailure = Some("Test did not throw expected exception - 
CustomFailureException")
+        }
+        lock.countDown()
+      }
+    }
+    ScalaClientTestUtils.assertAwait(lock)
+    testFailure.foreach(fail(_))
+  }
+
+  test("test Sync Rpc") {
+    configureClient(true)
+    val future = client.run(ScalaClientTestUtils.helloJob)
+    ScalaClientTestUtils.assertTestPassed(future, "hello")
+  }
+
+  test("test Remote client") {
+    configureClient(false)
+    val sFuture = client.submit(ScalaClientTestUtils.simpleSparkJob)
+    ScalaClientTestUtils.assertTestPassed(sFuture, 5)
+  }
+
+  test("test add file") {
+    configureClient(true)
+    val file = File.createTempFile("test", ".file")
+    val fileStream = new FileOutputStream(file)
+    fileStream.write("test file".getBytes("UTF-8"))
+    fileStream.close
+    val addFileFuture = client.addFile(new URI("file:" + 
file.getAbsolutePath()))
+    Await.ready(addFileFuture, ScalaClientTestUtils.Timeout second)
+    val sFuture = client.submit { context =>
+      ScalaClientTest.fileOperation(false, file.getName, context)
+    }
+    ScalaClientTestUtils.assertTestPassed(sFuture, "test file")
+  }
+
+  test("test add jar") {
+    configureClient(true)
+    val jar = File.createTempFile("test", ".resource")
+    val jarFile = new JarOutputStream(new FileOutputStream(jar))
+    jarFile.putNextEntry(new ZipEntry("test.resource"))
+    jarFile.write("test resource".getBytes("UTF-8"))
+    jarFile.closeEntry()
+    jarFile.close()
+    val addJarFuture = client.addJar(new URI("file:" + jar.getAbsolutePath()))
+    Await.ready(addJarFuture, ScalaClientTestUtils.Timeout second)
+    val sFuture = client.submit { context =>
+      ScalaClientTest.fileOperation(true, "test.resource", context)
+    }
+    ScalaClientTestUtils.assertTestPassed(sFuture, "test resource")
+  }
+
+  test("Successive onComplete callbacks") {
+    var testFailure: Option[String] = None
+    configureClient(true)
+    val future = client.run(ScalaClientTestUtils.helloJob)
+    val lock = new CountDownLatch(3)
+    for (i <- 0 to 2) {
+      future onComplete {
+        case Success(t) => {
+          if (!t.equals("hello")) testFailure = Some("Expected message not 
returned")
+          lock.countDown()
+        }
+        case Failure(e) => {
+          testFailure = Some("onComplete should not have triggered Failure 
callback")
+          lock.countDown()
+        }
+      }
+    }
+    ScalaClientTestUtils.assertAwait(lock)
+    testFailure.foreach(fail(_))
+  }
+
+  private def configureClient(local: Boolean) = {
+    val conf = ScalaClientTest.createConf(local)
+    val javaClient = new LivyClientBuilder(false).setURI(new 
URI("rsc:/")).setAll(conf).build()
+    client = javaClient.asScalaClient
+    pingJob()
+  }
+
+  private def pingJob() = {
+    val future = client.submit { context =>
+      null
+    }
+    ScalaClientTestUtils.assertTestPassed(future, null)
+  }
+}
+
+class CustomTestFailureException extends RuntimeException {}
+
+object ScalaClientTest {
+
+  def createConf(local: Boolean): Properties = {
+    val conf = new Properties
+    if (local) {
+      conf.put(CLIENT_IN_PROCESS.key, "true")
+      conf.put(SparkLauncher.SPARK_MASTER, "local")
+      conf.put("spark.app.name", "SparkClientSuite Local App")
+    } else {
+      val classpath: String = System.getProperty("java.class.path")
+      conf.put("spark.app.name", "SparkClientSuite Remote App")
+      conf.put(SparkLauncher.DRIVER_MEMORY, "512m")
+      conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, classpath)
+      conf.put(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH, classpath)
+    }
+    conf.put(LIVY_JARS.key, "")
+    conf
+  }
+
+  def fileOperation(isResource: Boolean, fileName: String, context: 
ScalaJobContext): String = {
+    val arr = Seq(1)
+    val rdd = context.sc.parallelize(arr).map { value =>
+      var inputStream: InputStream = null
+      if (isResource) {
+        val ccl = Thread.currentThread.getContextClassLoader
+        inputStream = ccl.getResourceAsStream(fileName)
+      } else {
+        inputStream = new FileInputStream(SparkFiles.get(fileName))
+      }
+      try {
+        val out = new ByteArrayOutputStream()
+        val buffer = new Array[Byte](1024)
+        var read = inputStream.read(buffer)
+        while (read >= 0) {
+          out.write(buffer, 0, read)
+          read = inputStream.read(buffer)
+        }
+        val bytes = out.toByteArray
+        new String(bytes, 0, bytes.length, UTF_8)
+      } finally {
+        inputStream.close()
+      }
+    }
+    rdd.collect().head
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTestUtils.scala 
b/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTestUtils.scala
new file mode 100644
index 0000000..458ff3b
--- /dev/null
+++ 
b/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTestUtils.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.scalaapi
+
+import java.util.Random
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+
+import org.scalatest.FunSuite
+
+import org.apache.livy.LivyBaseUnitTestSuite
+
+object ScalaClientTestUtils extends FunSuite with LivyBaseUnitTestSuite {
+
+  val Timeout = 40
+
+  def helloJob(context: ScalaJobContext): String = "hello"
+
+  def throwExceptionJob(context: ScalaJobContext): Unit = throw new 
CustomTestFailureException
+
+  def simpleSparkJob(context: ScalaJobContext): Long = {
+    val r = new Random
+    val count = 5
+    val partitions = Math.min(r.nextInt(10) + 1, count)
+    val buffer = new ArrayBuffer[Int]()
+    for (a <- 1 to count) {
+      buffer += r.nextInt()
+    }
+    context.sc.parallelize(buffer, partitions).count()
+  }
+
+  def assertAwait(lock: CountDownLatch): Unit = {
+    assert(lock.await(Timeout, TimeUnit.SECONDS) == true)
+  }
+
+  def assertTestPassed[T](future: Future[T], expectedValue: T): Unit = {
+    val result = Await.result(future, Timeout second)
+    assert(result === expectedValue)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaJobHandleTest.scala
----------------------------------------------------------------------
diff --git 
a/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaJobHandleTest.scala 
b/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaJobHandleTest.scala
new file mode 100644
index 0000000..3f5bdc6
--- /dev/null
+++ b/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaJobHandleTest.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.livy.scalaapi
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.language.postfixOps
+import scala.util.{Failure, Success}
+
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.ScalaFutures
+
+import org.apache.livy.{JobHandle, LivyBaseUnitTestSuite}
+import org.apache.livy.JobHandle.{Listener, State}
+
+class ScalaJobHandleTest extends FunSuite
+  with ScalaFutures
+  with BeforeAndAfter
+  with LivyBaseUnitTestSuite {
+
+  private var mockJobHandle: JobHandle[String] = null
+  private var scalaJobHandle: ScalaJobHandle[String] = null
+  private val timeoutInMilliseconds = 5000
+  private var listener: JobHandle.Listener[String] = null
+
+  before {
+    listener = mock(classOf[JobHandle.Listener[String]])
+    mockJobHandle = mock(classOf[JobHandle[String]])
+    scalaJobHandle = new ScalaJobHandle(mockJobHandle)
+  }
+
+  test("get result when job is already complete") {
+    when(mockJobHandle.get(timeoutInMilliseconds, 
TimeUnit.MILLISECONDS)).thenReturn("hello")
+    val result = Await.result(scalaJobHandle, 5 seconds)
+    assert(result == "hello")
+    verify(mockJobHandle, times(1)).get(timeoutInMilliseconds, 
TimeUnit.MILLISECONDS)
+  }
+
+  test("ready when the thread waits for the mentioned duration for job to 
complete") {
+    when(mockJobHandle.get(timeoutInMilliseconds, 
TimeUnit.MILLISECONDS)).thenReturn("hello")
+    val result = Await.ready(scalaJobHandle, 5 seconds)
+    assert(result == scalaJobHandle)
+    verify(mockJobHandle, times(1)).get(timeoutInMilliseconds, 
TimeUnit.MILLISECONDS)
+  }
+
+  test("ready with Infinite Duration") {
+    when(mockJobHandle.isDone).thenReturn(true)
+    when(mockJobHandle.get()).thenReturn("hello")
+    val result = Await.ready(scalaJobHandle, Duration.Undefined)
+    assert(result == scalaJobHandle)
+    verify(mockJobHandle, times(1)).get()
+  }
+
+  test("verify addListener call of java jobHandle for onComplete") {
+    doNothing().when(mockJobHandle).addListener(listener)
+    scalaJobHandle onComplete {
+      case Success(t) => {}
+    }
+    verify(mockJobHandle).addListener(isA(classOf[Listener[String]]))
+    verify(mockJobHandle, times(1)).addListener(any())
+  }
+
+  test("onComplete Success") {
+    val jobHandleStub = new AbstractJobHandleStub[String] {
+      override def addListener(l: Listener[String]): Unit = 
l.onJobSucceeded(this, "hello")
+    }
+    val lock = new CountDownLatch(1)
+    var testFailure: Option[String] = None
+    val testScalaHandle = new ScalaJobHandle(jobHandleStub)
+    testScalaHandle onComplete {
+      case Success(t) => {
+        if (!t.equals("hello")) {
+          testFailure = Some("onComplete has not returned the expected 
message")
+        }
+        lock.countDown()
+      }
+      case Failure(e) => {
+        testFailure = Some("onComplete should not have triggered Failure 
callback")
+        lock.countDown()
+      }
+    }
+    ScalaClientTestUtils.assertAwait(lock)
+    testFailure.foreach(fail(_))
+  }
+
+  test("onComplete Failure") {
+    val jobHandleStub = new AbstractJobHandleStub[String] {
+      override def addListener(l: Listener[String]): Unit =
+        l.onJobFailed(this, new CustomTestFailureException)
+
+      override def get(): String = throw new CustomTestFailureException()
+    }
+    val lock = new CountDownLatch(1)
+    var testFailure: Option[String] = None
+    val testScalaHandle = new ScalaJobHandle(jobHandleStub)
+    testScalaHandle onComplete {
+      case Success(t) => {
+        testFailure = Some("Test should have thrown CustomFailureException")
+        lock.countDown()
+      }
+      case Failure(e) => {
+        if (!e.isInstanceOf[CustomTestFailureException]) {
+          testFailure = Some("Test did not throw expected exception - 
CustomFailureException")
+        }
+        lock.countDown()
+      }
+    }
+    ScalaClientTestUtils.assertAwait(lock)
+    testFailure.foreach(fail(_))
+  }
+
+  test("onJobCancelled") {
+    val jobHandleStub = new AbstractJobHandleStub[String] {
+      override def addListener(l: Listener[String]): Unit = 
l.onJobCancelled(this)
+      override def cancel(mayInterruptIfRunning: Boolean): Boolean = true
+    }
+    var testFailure: Option[String] = None
+    val lock = new CountDownLatch(1)
+    val testScalaHandle = new ScalaJobHandle(jobHandleStub)
+    testScalaHandle onJobCancelled  {
+      case true => lock.countDown()
+      case false => {
+        testFailure = Some("False callback should not have been triggered")
+        lock.countDown()
+      }
+    }
+    ScalaClientTestUtils.assertAwait(lock)
+    testFailure.foreach(fail(_))
+  }
+
+  test("onJobQueued") {
+    val jobHandleStub = new AbstractJobHandleStub[String] {
+      override def addListener(l: Listener[String]): Unit = l.onJobQueued(this)
+    }
+    val lock = new CountDownLatch(1)
+    val testScalaHandle = new ScalaJobHandle(jobHandleStub)
+    testScalaHandle onJobQueued {
+      lock.countDown()
+    }
+    ScalaClientTestUtils.assertAwait(lock)
+  }
+
+  test("onJobStarted") {
+    val jobHandleStub = new AbstractJobHandleStub[String] {
+      override def addListener(l: Listener[String]): Unit = 
l.onJobStarted(this)
+    }
+    val lock = new CountDownLatch(1)
+    val testScalaHandle = new ScalaJobHandle(jobHandleStub)
+    testScalaHandle onJobStarted {
+      lock.countDown()
+    }
+    ScalaClientTestUtils.assertAwait(lock)
+  }
+}
+
+private abstract class AbstractJobHandleStub[T] private[livy] extends 
JobHandle[T] {
+
+  override def getState: State = null
+
+  override def addListener(l: Listener[T]): Unit = {}
+
+  override def isCancelled: Boolean = false
+
+  override def get(): T = null.asInstanceOf[T]
+
+  override def get(timeout: Long, unit: TimeUnit): T = null.asInstanceOf[T]
+
+  override def cancel(mayInterruptIfRunning: Boolean): Boolean = false
+
+  override def isDone: Boolean = true
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scala/pom.xml
----------------------------------------------------------------------
diff --git a/scala/pom.xml b/scala/pom.xml
index 8a2d6ee..d304bb3 100644
--- a/scala/pom.xml
+++ b/scala/pom.xml
@@ -19,15 +19,15 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
-  <groupId>com.cloudera.livy</groupId>
+  <groupId>org.apache.livy</groupId>
   <artifactId>multi-scala-project-root</artifactId>
-  <version>0.4.0-SNAPSHOT</version>
+  <version>0.4.0-incubating-SNAPSHOT</version>
   <packaging>pom</packaging>
 
   <parent>
-    <groupId>com.cloudera.livy</groupId>
+    <groupId>org.apache.livy</groupId>
     <artifactId>livy-main</artifactId>
-    <version>0.4.0-SNAPSHOT</version>
+    <version>0.4.0-incubating-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/scalastyle.xml
----------------------------------------------------------------------
diff --git a/scalastyle.xml b/scalastyle.xml
index 2b06267..28609fb 100644
--- a/scalastyle.xml
+++ b/scalastyle.xml
@@ -122,8 +122,8 @@
       <parameter name="groups">java,scala,3rdParty,livy</parameter>
       <parameter name="group.java">javax?\..*</parameter>
       <parameter name="group.scala">scala\..*</parameter>
-      <parameter name="group.3rdParty">(?!com\.cloudera.livy\.).*</parameter>
-      <parameter name="group.livy">com\.cloudera\.livy\..*</parameter>
+      <parameter name="group.3rdParty">(?!org\.apache.livy\.).*</parameter>
+      <parameter name="group.livy">org\.apache\.livy\..*</parameter>
     </parameters>
   </check>
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index 280f7c5..1ad6bfa 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -20,14 +20,14 @@
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
   <parent>
-    <groupId>com.cloudera.livy</groupId>
+    <groupId>org.apache.livy</groupId>
     <artifactId>livy-main</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>0.4.0-SNAPSHOT</version>
+    <version>0.4.0-incubating-SNAPSHOT</version>
   </parent>
 
   <artifactId>livy-server</artifactId>
-  <version>0.4.0-SNAPSHOT</version>
+  <version>0.4.0-incubating-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <dependencies>
@@ -53,7 +53,7 @@
     </dependency>
 
     <dependency>
-      <groupId>com.cloudera.livy</groupId>
+      <groupId>org.apache.livy</groupId>
       <artifactId>livy-test-lib</artifactId>
       <version>${project.version}</version>
       <scope>test</scope>
@@ -171,7 +171,7 @@
     </dependency>
 
     <dependency>
-      <groupId>com.cloudera.livy</groupId>
+      <groupId>org.apache.livy</groupId>
       <artifactId>livy-repl_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
       <scope>test</scope>
@@ -234,7 +234,7 @@
         <configuration>
           <archive>
             <manifest>
-              <mainClass>com.cloudera.livy.server.Main</mainClass>
+              <mainClass>org.apache.livy.server.Main</mainClass>
             </manifest>
           </archive>
           <outputDirectory>${project.build.directory}/jars</outputDirectory>

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/resources/com/cloudera/livy/server/ui/static/all-sessions.js
----------------------------------------------------------------------
diff --git 
a/server/src/main/resources/com/cloudera/livy/server/ui/static/all-sessions.js 
b/server/src/main/resources/com/cloudera/livy/server/ui/static/all-sessions.js
deleted file mode 100644
index 4fe3f8f..0000000
--- 
a/server/src/main/resources/com/cloudera/livy/server/ui/static/all-sessions.js
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-function appIdLink(session) {
-  var appUiUrl = session.appInfo.sparkUiUrl;
-  if (appUiUrl != null) {
-    return '<a href="' + appUiUrl + '">' + session.appId + "</a>";
-  } else {
-    return session.appId;
-  }
-}
-
-function tdWrap(str) {
-  return "<td>" + str + "</td>";
-}
-
-function loadSessionsTable(sessions) {
-  $.each(sessions, function(index, session) {
-    $("#interactive-sessions .sessions-table-body").append(
-      "<tr>" +
-        tdWrap(session.id) +
-        tdWrap(appIdLink(session)) +
-        tdWrap(session.owner) +
-        tdWrap(session.proxyUser) +
-        tdWrap(session.kind) +
-        tdWrap(session.state) +
-       "</tr>"
-    );
-  });
-}
-
-function loadBatchesTable(sessions) {
-  $.each(sessions, function(index, session) {
-    $("#batches .sessions-table-body").append(
-      "<tr>" +
-        tdWrap(session.id) +
-        tdWrap(appIdLink(session)) +
-        tdWrap(session.state) +
-       "</tr>"
-    );
-  });
-}
-
-var numSessions = 0;
-var numBatches = 0;
-
-$(document).ready(function () {
-  $.extend( $.fn.dataTable.defaults, {
-    stateSave: true,
-  });
-
-  var sessionsReq = $.getJSON(location.origin + "/sessions", 
function(response) {
-    if (response && response.total > 0) {
-      $("#interactive-sessions").load("/static/sessions-table.html 
.sessions-template", function() {
-        loadSessionsTable(response.sessions);
-        $("#interactive-sessions-table").DataTable();
-        $('#interactive-sessions [data-toggle="tooltip"]').tooltip();
-      });
-    }
-    numSessions = response.total;
-  });
-
-  var batchesReq = $.getJSON(location.origin + "/batches", function(response) {
-    if (response && response.total > 0) {
-      $("#batches").load("/static/batches-table.html .sessions-template", 
function() {
-        loadBatchesTable(response.sessions);
-        $("#batches-table").DataTable();
-        $('#batches [data-toggle="tooltip"]').tooltip();
-      });
-    }
-    numBatches = response.total;
-  });
-
-  $.when(sessionsReq, batchesReq).done(function () {
-    if (numSessions + numBatches == 0) {
-      $("#all-sessions").append('<h4>No Sessions or Batches have been created 
yet.</h4>');
-    }
-  });
-});
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/412ccc8f/server/src/main/resources/com/cloudera/livy/server/ui/static/batches-table.html
----------------------------------------------------------------------
diff --git 
a/server/src/main/resources/com/cloudera/livy/server/ui/static/batches-table.html
 
b/server/src/main/resources/com/cloudera/livy/server/ui/static/batches-table.html
deleted file mode 100644
index e0b3213..0000000
--- 
a/server/src/main/resources/com/cloudera/livy/server/ui/static/batches-table.html
+++ /dev/null
@@ -1,42 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<h4 id="batches-header" class="sessions-template">Batch Sessions</h4>
-
-<table id="batches-table" class="table table-striped sessions-table 
sessions-template">
-  <thead class="sessions-table-head">
-  <tr>
-    <th>Batch Id</th>
-    <th>
-      <span data-toggle="tooltip"
-            title="Spark Application Id for this session.
-            If available, links to Spark Application Web UI">
-        Application Id
-      </span>
-    </th>
-    <th>
-      <span data-toggle="tooltip"
-            title="Session State (not_started, starting, idle, busy,
-            shutting_down, error, dead, success)">
-        State
-      </span>
-    </th>
-  </tr>
-  </thead>
-  <tbody class="sessions-table-body">
-  </tbody>
-</table>
\ No newline at end of file

Reply via email to