diff --git 
a/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java 
b/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
index b1e253fb..2245eb98 100644
--- 
a/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
+++ 
b/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
@@ -53,6 +53,7 @@ private CreateClientRequest() {
   public static class SessionInfo implements ClientMessage {
 
     public final int id;
+    public final String name;
     public final String appId;
     public final String owner;
     public final String proxyUser;
@@ -61,9 +62,10 @@ private CreateClientRequest() {
     public final Map<String, String> appInfo;
     public final List<String> log;
 
-    public SessionInfo(int id, String appId, String owner, String proxyUser, 
String state,
-        String kind, Map<String, String> appInfo, List<String> log) {
+    public SessionInfo(int id, String name, String appId, String owner, String 
proxyUser,
+        String state, String kind, Map<String, String> appInfo, List<String> 
log) {
       this.id = id;
+      this.name = name;
       this.appId = appId;
       this.owner = owner;
       this.proxyUser = proxyUser;
@@ -74,7 +76,7 @@ public SessionInfo(int id, String appId, String owner, String 
proxyUser, String
     }
 
     private SessionInfo() {
-      this(-1, null, null, null, null, null, null, null);
+      this(-1, null, null, null, null, null, null, null, null);
     }
 
   }
diff --git 
a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala 
b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
index 837c24c6..f53d9f5b 100644
--- 
a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
+++ 
b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
@@ -273,6 +273,7 @@ private class HttpClientTestBootstrap extends LifeCycle {
         val session = mock(classOf[InteractiveSession])
         val id = sessionManager.nextId()
         when(session.id).thenReturn(id)
+        when(session.name).thenReturn(None)
         when(session.appId).thenReturn(None)
         when(session.appInfo).thenReturn(AppInfo())
         when(session.state).thenReturn(SessionState.Idle)
diff --git 
a/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala
 
b/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala
index 10875594..cf68f770 100644
--- 
a/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala
+++ 
b/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala
@@ -249,12 +249,14 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val 
livyEndpoint: String)
   }
 
   def startBatch(
+      name: Option[String],
       file: String,
       className: Option[String],
       args: List[String],
       sparkConf: Map[String, String]): BatchSession = {
     val r = new CreateBatchRequest()
     r.file = file
+    r.name = name
     r.className = className
     r.args = args
     r.conf = Map("spark.yarn.maxAppAttempts" -> "1") ++ sparkConf
@@ -264,12 +266,14 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val 
livyEndpoint: String)
   }
 
   def startSession(
+      name: Option[String],
       kind: Kind,
       sparkConf: Map[String, String],
       heartbeatTimeoutInSecond: Int): InteractiveSession = {
     val r = new CreateInteractiveRequest()
     r.kind = kind
     r.conf = sparkConf
+    r.name = name
     r.heartbeatTimeoutInSecond = heartbeatTimeoutInSecond
 
     val id = start(INTERACTIVE_TYPE, mapper.writeValueAsString(r))
diff --git a/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala 
b/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala
index 7c433fb8..a6f4e73e 100644
--- a/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala
+++ b/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala
@@ -159,14 +159,14 @@ class BatchIT extends BaseIntegrationTestSuite with 
BeforeAndAfterAll {
   private def withScript[R]
     (scriptPath: String, args: List[String], sparkConf: Map[String, String] = 
Map.empty)
     (f: (LivyRestClient#BatchSession) => R): R = {
-    val s = livyClient.startBatch(scriptPath, None, args, sparkConf)
+    val s = livyClient.startBatch(None, scriptPath, None, args, sparkConf)
     withSession(s)(f)
   }
 
   private def withTestLib[R]
     (testClass: Class[_], args: List[String], sparkConf: Map[String, String] = 
Map.empty)
     (f: (LivyRestClient#BatchSession) => R): R = {
-    val s = livyClient.startBatch(testLibPath, Some(testClass.getName()), 
args, sparkConf)
+    val s = livyClient.startBatch(None, testLibPath, 
Some(testClass.getName()), args, sparkConf)
     withSession(s)(f)
   }
 }
diff --git 
a/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala 
b/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala
index 689195cb..0613bf39 100644
--- a/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala
+++ b/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala
@@ -206,7 +206,7 @@ class InteractiveIT extends BaseIntegrationTestSuite {
       waitForIdle: Boolean = true,
       heartbeatTimeoutInSecond: Int = 0)
     (f: (LivyRestClient#InteractiveSession) => R): R = {
-    withSession(livyClient.startSession(kind, sparkConf, 
heartbeatTimeoutInSecond)) { s =>
+    withSession(livyClient.startSession(None, kind, sparkConf, 
heartbeatTimeoutInSecond)) { s =>
       if (waitForIdle) {
         s.verifySessionIdle()
       }
diff --git a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala 
b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
index d62a96ec..0c52a1e4 100644
--- a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
+++ b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
@@ -184,8 +184,15 @@ abstract class SessionServlet[S <: Session, R <: 
RecoveryMetadata](
   private def doWithSession(fn: (S => Any),
       allowAll: Boolean,
       checkFn: Option[(String, String) => Boolean]): Any = {
-    val sessionId = params("id").toInt
-    sessionManager.get(sessionId) match {
+    val idOrNameParam: String = params("id")
+    val session = if (idOrNameParam.forall(_.isDigit)) {
+      val sessionId = idOrNameParam.toInt
+      sessionManager.get(sessionId)
+    } else {
+      val sessionName = idOrNameParam
+      sessionManager.get(sessionName)
+    }
+    session match {
       case Some(session) =>
         if (allowAll || checkFn.map(_(session.owner, 
remoteUser(request))).getOrElse(false)) {
           fn(session)
@@ -193,7 +200,7 @@ abstract class SessionServlet[S <: Session, R <: 
RecoveryMetadata](
           Forbidden()
         }
       case None =>
-        NotFound(ResponseMessage(s"Session '$sessionId' not found."))
+        NotFound(ResponseMessage(s"Session '$idOrNameParam' not found."))
     }
   }
 
diff --git 
a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala 
b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
index c15057bc..a63746c0 100644
--- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
@@ -35,6 +35,7 @@ import org.apache.livy.utils.{AppInfo, SparkApp, 
SparkAppListener, SparkProcessB
 @JsonIgnoreProperties(ignoreUnknown = true)
 case class BatchRecoveryMetadata(
     id: Int,
+    name: Option[String],
     appId: Option[String],
     appTag: String,
     owner: String,
@@ -53,6 +54,7 @@ object BatchSession extends Logging {
 
   def create(
       id: Int,
+      name: Option[String],
       request: CreateBatchRequest,
       livyConf: LivyConf,
       accessManager: AccessManager,
@@ -110,6 +112,7 @@ object BatchSession extends Logging {
 
     new BatchSession(
       id,
+      name,
       appTag,
       SessionState.Starting,
       livyConf,
@@ -126,6 +129,7 @@ object BatchSession extends Logging {
       mockApp: Option[SparkApp] = None): BatchSession = {
     new BatchSession(
       m.id,
+      m.name,
       m.appTag,
       SessionState.Recovering,
       livyConf,
@@ -140,6 +144,7 @@ object BatchSession extends Logging {
 
 class BatchSession(
     id: Int,
+    name: Option[String],
     appTag: String,
     initialState: SessionState,
     livyConf: LivyConf,
@@ -147,24 +152,30 @@ class BatchSession(
     override val proxyUser: Option[String],
     sessionStore: SessionStore,
     sparkApp: BatchSession => SparkApp)
-  extends Session(id, owner, livyConf) with SparkAppListener {
+  extends Session(id, name, owner, livyConf) with SparkAppListener {
   import BatchSession._
 
   protected implicit def executor: ExecutionContextExecutor = 
ExecutionContext.global
 
   private[this] var _state: SessionState = initialState
-  private val app = sparkApp(this)
+
+  private var app: Option[SparkApp] = None
 
   override def state: SessionState = _state
 
-  override def logLines(): IndexedSeq[String] = app.log()
+  override def logLines(): IndexedSeq[String] = 
app.map(_.log()).getOrElse(IndexedSeq.empty[String])
+
+  override def start(): Unit = {
+    app = Option(sparkApp(this))
+  }
 
   override def stopSession(): Unit = {
-    app.kill()
+    app.foreach(_.kill())
   }
 
   override def appIdKnown(appId: String): Unit = {
-    _appId = Option(appId)
+    _appId = Option(
+      appId)
     sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
   }
 
@@ -187,5 +198,5 @@ class BatchSession(
   override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo }
 
   override def recoveryMetadata: RecoveryMetadata =
-    BatchRecoveryMetadata(id, appId, appTag, owner, proxyUser)
+    BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser)
 }
diff --git 
a/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala 
b/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala
index a069a50b..e48ad8f9 100644
--- 
a/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala
+++ 
b/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala
@@ -27,6 +27,7 @@ import org.apache.livy.utils.AppInfo
 
 case class BatchSessionView(
   id: Long,
+  name: Option[String],
   state: String,
   appId: Option[String],
   appInfo: AppInfo,
@@ -42,8 +43,11 @@ class BatchSessionServlet(
 
   override protected def createSession(req: HttpServletRequest): BatchSession 
= {
     val createRequest = bodyAs[CreateBatchRequest](req)
+    val sessionId = sessionManager.nextId()
+    val sessionName = createRequest.name
     BatchSession.create(
-      sessionManager.nextId(),
+      sessionId,
+      sessionName,
       createRequest,
       livyConf,
       accessManager,
@@ -66,7 +70,8 @@ class BatchSessionServlet(
       } else {
         Nil
       }
-    BatchSessionView(session.id, session.state.toString, session.appId, 
session.appInfo, logs)
+    BatchSessionView(session.id, session.name, session.state.toString, 
session.appId,
+      session.appInfo, logs)
   }
 
 }
diff --git 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index 3b3095f3..5e80b58e 100644
--- 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++ 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.concurrent.Future
+import scala.concurrent.{Future, Promise}
 import scala.concurrent.duration.{Duration, FiniteDuration}
 import scala.util.{Random, Try}
 
@@ -49,6 +49,7 @@ import org.apache.livy.utils._
 @JsonIgnoreProperties(ignoreUnknown = true)
 case class InteractiveRecoveryMetadata(
     id: Int,
+    name: Option[String],
     appId: Option[String],
     appTag: String,
     kind: Kind,
@@ -66,6 +67,7 @@ object InteractiveSession extends Logging {
 
   def create(
       id: Int,
+      name: Option[String],
       owner: String,
       livyConf: LivyConf,
       accessManager: AccessManager,
@@ -111,6 +113,7 @@ object InteractiveSession extends Logging {
 
     new InteractiveSession(
       id,
+      name,
       None,
       appTag,
       client,
@@ -137,6 +140,7 @@ object InteractiveSession extends Logging {
 
     new InteractiveSession(
       metadata.id,
+      metadata.name,
       metadata.appId,
       metadata.appTag,
       client,
@@ -347,6 +351,7 @@ object InteractiveSession extends Logging {
 
 class InteractiveSession(
     id: Int,
+    name: Option[String],
     appIdHint: Option[String],
     appTag: String,
     val client: Option[RSCClient],
@@ -358,7 +363,7 @@ class InteractiveSession(
     override val proxyUser: Option[String],
     sessionStore: SessionStore,
     mockApp: Option[SparkApp]) // For unit test.
-  extends Session(id, owner, livyConf)
+  extends Session(id, name, owner, livyConf)
   with SessionHeartbeat
   with SparkAppListener {
 
@@ -380,66 +385,71 @@ class InteractiveSession(
   sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
   heartbeat()
 
-  private val app = mockApp.orElse {
-    val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
+  private var app: Option[SparkApp] = None
+
+  override def start(): Unit = {
+    app = mockApp.orElse {
+      val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
         .map(new LineBufferedProcess(_, 
livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))
-    driverProcess.map { _ => SparkApp.create(appTag, appId, driverProcess, 
livyConf, Some(this)) }
-  }
+      driverProcess.map { _ => SparkApp.create(appTag, appId, driverProcess, 
livyConf, Some(this)) }
+    }
 
-  if (client.isEmpty) {
-    transition(Dead())
-    val msg = s"Cannot recover interactive session $id because its RSCDriver 
URI is unknown."
-    info(msg)
-    sessionLog = IndexedSeq(msg)
-  } else {
-    val uriFuture = Future { client.get.getServerUri.get() }
-
-    uriFuture onSuccess { case url =>
-      rscDriverUri = Option(url)
-      sessionSaveLock.synchronized {
-        sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
+    if (client.isEmpty) {
+      transition(Dead())
+      val msg = s"Cannot recover interactive session $id because its RSCDriver 
URI is unknown."
+      info(msg)
+      sessionLog = IndexedSeq(msg)
+    } else {
+      val uriFuture = Future { client.get.getServerUri.get() }
+
+      uriFuture.onSuccess { case url =>
+        rscDriverUri = Option(url)
+        sessionSaveLock.synchronized {
+          sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
+        }
       }
-    }
-    uriFuture onFailure { case e => warn("Fail to get rsc uri", e) }
+      uriFuture.onFailure { case e => warn("Fail to get rsc uri", e) }
 
-    // Send a dummy job that will return once the client is ready to be used, 
and set the
-    // state to "idle" at that point.
-    client.get.submit(new PingJob()).addListener(new 
JobHandle.Listener[Void]() {
+      // Send a dummy job that will return once the client is ready to be 
used, and set the
+      // state to "idle" at that point.
+      client.get.submit(new PingJob()).addListener(new 
JobHandle.Listener[Void]() {
       override def onJobQueued(job: JobHandle[Void]): Unit = { }
       override def onJobStarted(job: JobHandle[Void]): Unit = { }
 
-      override def onJobCancelled(job: JobHandle[Void]): Unit = errorOut()
+        override def onJobCancelled(job: JobHandle[Void]): Unit = errorOut()
 
-      override def onJobFailed(job: JobHandle[Void], cause: Throwable): Unit = 
errorOut()
+        override def onJobFailed(job: JobHandle[Void], cause: Throwable): Unit 
= errorOut()
 
-      override def onJobSucceeded(job: JobHandle[Void], result: Void): Unit = {
-        transition(SessionState.Running)
-        info(s"Interactive session $id created [appid: ${appId.orNull}, owner: 
$owner, proxyUser:" +
-          s" $proxyUser, state: ${state.toString}, kind: ${kind.toString}, " +
-          s"info: ${appInfo.asJavaMap}]")
-      }
+        override def onJobSucceeded(job: JobHandle[Void], result: Void): Unit 
= {
+          transition(SessionState.Running)
+          info(s"Interactive session $id created [appid: ${appId.orNull}, " +
+            s"owner: $owner, proxyUser:" +
+            s" $proxyUser, state: ${state.toString}, kind: ${kind.toString}, " 
+
+            s"info: ${appInfo.asJavaMap}]")
+        }
 
-      private def errorOut(): Unit = {
-        // Other code might call stop() to close the RPC channel. When RPC 
channel is closing,
-        // this callback might be triggered. Check and don't call stop() to 
avoid nested called
-        // if the session is already shutting down.
-        if (serverSideState != SessionState.ShuttingDown) {
-          transition(SessionState.Error())
-          stop()
-          app.foreach { a =>
-            info(s"Failed to ping RSC driver for session $id. Killing 
application.")
-            a.kill()
+        private def errorOut(): Unit = {
+          // Other code might call stop() to close the RPC channel. When RPC 
channel is closing,
+          // this callback might be triggered. Check and don't call stop() to 
avoid nested called
+          // if the session is already shutting down.
+          if (serverSideState != SessionState.ShuttingDown) {
+            transition(SessionState.Error())
+            stop()
+            app.foreach { a =>
+              info(s"Failed to ping RSC driver for session $id. Killing 
application.")
+              a.kill()
+            }
           }
         }
-      }
-    })
+      })
+    }
   }
 
   override def logLines(): IndexedSeq[String] = 
app.map(_.log()).getOrElse(sessionLog)
 
   override def recoveryMetadata: RecoveryMetadata =
-    InteractiveRecoveryMetadata(
-      id, appId, appTag, kind, heartbeatTimeout.toSeconds.toInt, owner, 
proxyUser, rscDriverUri)
+    InteractiveRecoveryMetadata(id, name, appId, appTag, kind,
+      heartbeatTimeout.toSeconds.toInt, owner, proxyUser, rscDriverUri)
 
   override def state: SessionState = {
     if (serverSideState == SessionState.Running) {
diff --git 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
index 7450cd7c..dec88a84 100644
--- 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
+++ 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
@@ -54,6 +54,7 @@ class InteractiveSessionServlet(
     val createRequest = bodyAs[CreateInteractiveRequest](req)
     InteractiveSession.create(
       sessionManager.nextId(),
+      createRequest.name,
       remoteUser(req),
       livyConf,
       accessManager,
@@ -79,8 +80,9 @@ class InteractiveSessionServlet(
         Nil
       }
 
-    new SessionInfo(session.id, session.appId.orNull, session.owner, 
session.proxyUser.orNull,
-      session.state.toString, session.kind.toString, 
session.appInfo.asJavaMap, logs.asJava)
+    new SessionInfo(session.id, session.name.orNull, session.appId.orNull, 
session.owner,
+      session.proxyUser.orNull, session.state.toString, session.kind.toString,
+      session.appInfo.asJavaMap, logs.asJava)
   }
 
   post("/:id/stop") {
diff --git a/server/src/main/scala/org/apache/livy/sessions/Session.scala 
b/server/src/main/scala/org/apache/livy/sessions/Session.scala
index 0b547798..749d2436 100644
--- a/server/src/main/scala/org/apache/livy/sessions/Session.scala
+++ b/server/src/main/scala/org/apache/livy/sessions/Session.scala
@@ -21,7 +21,6 @@ import java.io.InputStream
 import java.net.{URI, URISyntaxException}
 import java.security.PrivilegedExceptionAction
 import java.util.UUID
-import java.util.concurrent.TimeUnit
 
 import scala.concurrent.{ExecutionContext, Future}
 
@@ -135,13 +134,24 @@ object Session {
   }
 }
 
-abstract class Session(val id: Int, val owner: String, val livyConf: LivyConf)
+abstract class Session(val id: Int, val name: Option[String], val owner: 
String,
+                       val livyConf: LivyConf)
   extends Logging {
 
   import Session._
 
   protected implicit val executionContext = ExecutionContext.global
 
+  private def isSessionId(str: String): Boolean = {
+    str.nonEmpty && str.forall(_.isDigit)
+  }
+
+  name.filter(isSessionId).foreach { idStr =>
+    val errMsg = s"Session name cannot be a number: $idStr"
+    error(errMsg)
+    throw new IllegalArgumentException(errMsg)
+  }
+
   protected var _appId: Option[String] = None
 
   private var _lastActivity = System.nanoTime()
@@ -171,6 +181,8 @@ abstract class Session(val id: Int, val owner: String, val 
livyConf: LivyConf)
 
   def state: SessionState
 
+  def start(): Unit
+
   def stop(): Future[Unit] = Future {
     try {
       info(s"Stopping $this...")
diff --git 
a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala 
b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
index 5926071e..13a0a32b 100644
--- a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
+++ b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
@@ -73,6 +73,8 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : 
ClassTag](
 
   protected[this] final val idCounter = new AtomicInteger(0)
   protected[this] final val sessions = mutable.LinkedHashMap[Int, S]()
+  private[this] final val sessionsByName = mutable.HashMap[String, S]()
+
 
   private[this] final val sessionTimeoutCheck = 
livyConf.getBoolean(LivyConf.SESSION_TIMEOUT_CHECK)
   private[this] final val sessionTimeout =
@@ -92,13 +94,27 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : 
ClassTag](
   def register(session: S): S = {
     info(s"Registering new session ${session.id}")
     synchronized {
+      session.name.foreach { sessionName =>
+        if (sessionsByName.contains(sessionName)) {
+          val msg = s"Session ${session.name} already exists!"
+          error(msg)
+          delete(session)
+          throw new IllegalArgumentException(msg)
+        } else {
+          // info(s"sessionNames = ${sessionsByName.keys.mkString}")
+          sessionsByName.put(sessionName, session)
+        }
+      }
       sessions.put(session.id, session)
+      session.start()
     }
     session
   }
 
   def get(id: Int): Option[S] = sessions.get(id)
 
+  def get(sessionName: String): Option[S] = sessionsByName.get(sessionName)
+
   def size(): Int = sessions.size
 
   def all(): Iterable[S] = sessions.values
@@ -113,6 +129,7 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : 
ClassTag](
         sessionStore.remove(sessionType, session.id)
         synchronized {
           sessions.remove(session.id)
+          session.name.foreach(sessionsByName.remove)
         }
       } catch {
         case NonFatal(e) =>
diff --git 
a/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala 
b/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala
index e0ebd9a0..38b79ce9 100644
--- a/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala
@@ -32,7 +32,7 @@ object SessionServletSpec {
   val PROXY_USER = "proxyUser"
 
   class MockSession(id: Int, owner: String, livyConf: LivyConf)
-    extends Session(id, owner, livyConf) {
+    extends Session(id, None, owner, livyConf) {
 
     case class MockRecoveryMetadata(id: Int) extends RecoveryMetadata()
 
@@ -42,6 +42,8 @@ object SessionServletSpec {
 
     override def state: SessionState = SessionState.Idle
 
+    override def start(): Unit = ()
+
     override protected def stopSession(): Unit = ()
 
     override def logLines(): IndexedSeq[String] = IndexedSeq("log")
diff --git 
a/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala 
b/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala
index 2c37c19d..6738220a 100644
--- a/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala
@@ -116,13 +116,15 @@ class BatchServletSpec extends 
BaseSessionServletSpec[BatchSession, BatchRecover
 
     it("should respect config black list") {
       val createRequest = new CreateBatchRequest()
+      createRequest.name = Some("TEST-BatchServletSpec-Session-0")
       createRequest.file = script.toString
       createRequest.conf = BLACKLISTED_CONFIG
       jpost[Map[String, Any]]("/", createRequest, expectedStatus = 
SC_BAD_REQUEST) { _ => }
     }
 
-    it("should show session properties") {
+    it("should show session properties for sessions with a name") {
       val id = 0
+      val name = "TEST-batch-session"
       val state = SessionState.Running
       val appId = "appid"
       val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL"))
@@ -130,6 +132,7 @@ class BatchServletSpec extends 
BaseSessionServletSpec[BatchSession, BatchRecover
 
       val session = mock[BatchSession]
       when(session.id).thenReturn(id)
+      when(session.name).thenReturn(Some(name))
       when(session.state).thenReturn(state)
       when(session.appId).thenReturn(Some(appId))
       when(session.appInfo).thenReturn(appInfo)
@@ -141,6 +144,36 @@ class BatchServletSpec extends 
BaseSessionServletSpec[BatchSession, BatchRecover
         .asInstanceOf[BatchSessionView]
 
       view.id shouldEqual id
+      view.name.isDefined shouldBe true
+      view.name shouldEqual Some(name)
+      view.state shouldEqual state.toString
+      view.appId shouldEqual Some(appId)
+      view.appInfo shouldEqual appInfo
+      view.log shouldEqual log
+    }
+
+    it("should show session properties for sessions without a name") {
+      val id = 0
+      val state = SessionState.Running
+      val appId = "appid"
+      val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL"))
+      val log = IndexedSeq[String]("log1", "log2")
+
+      val session = mock[BatchSession]
+      when(session.id).thenReturn(id)
+      when(session.name).thenReturn(None)
+      when(session.state).thenReturn(state)
+      when(session.appId).thenReturn(Some(appId))
+      when(session.appInfo).thenReturn(appInfo)
+      when(session.logLines()).thenReturn(log)
+
+      val req = mock[HttpServletRequest]
+
+      val view = 
servlet.asInstanceOf[BatchSessionServlet].clientSessionView(session, req)
+        .asInstanceOf[BatchSessionView]
+
+      view.id shouldEqual id
+      view.name.isDefined shouldBe false
       view.state shouldEqual state.toString
       view.appId shouldEqual Some(appId)
       view.appInfo shouldEqual appInfo
diff --git 
a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala 
b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
index 8381c953..aa6e0fca 100644
--- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
@@ -70,7 +70,8 @@ class BatchSessionSpec
 
       val conf = new LivyConf().set(LivyConf.LOCAL_FS_WHITELIST, 
sys.props("java.io.tmpdir"))
       val accessManager = new AccessManager(conf)
-      val batch = BatchSession.create(0, req, conf, accessManager, null, 
sessionStore)
+      val batch = BatchSession.create(0, None, req, conf, accessManager, null, 
sessionStore)
+      batch.start()
 
       Utils.waitUntil({ () => !batch.state.isActive }, Duration(10, 
TimeUnit.SECONDS))
       (batch.state match {
@@ -87,7 +88,8 @@ class BatchSessionSpec
       val mockApp = mock[SparkApp]
       val accessManager = new AccessManager(conf)
       val batch = BatchSession.create(
-        0, req, conf, accessManager, null, sessionStore, Some(mockApp))
+        0, None, req, conf, accessManager, null, sessionStore, Some(mockApp))
+      batch.start()
 
       val expectedAppId = "APPID"
       batch.appIdKnown(expectedAppId)
@@ -100,11 +102,25 @@ class BatchSessionSpec
       batch.appInfo shouldEqual expectedAppInfo
     }
 
-    it("should recover session") {
+    it("should recover named session") {
       val conf = new LivyConf()
       val req = new CreateBatchRequest()
       val mockApp = mock[SparkApp]
-      val m = BatchRecoveryMetadata(99, None, "appTag", null, None)
+      val m = BatchRecoveryMetadata(99, Some("Test Batch Session"), None, 
"appTag", null, None)
+      val batch = BatchSession.recover(m, conf, sessionStore, Some(mockApp))
+
+      batch.state shouldBe (SessionState.Recovering)
+
+      batch.appIdKnown("appId")
+      verify(sessionStore, atLeastOnce()).save(
+        Matchers.eq(BatchSession.RECOVERY_SESSION_TYPE), anyObject())
+    }
+
+    it("should recover session with no name ") {
+      val conf = new LivyConf()
+      val req = new CreateBatchRequest()
+      val mockApp = mock[SparkApp]
+      val m = BatchRecoveryMetadata(999, None, None, "appTag", null, None)
       val batch = BatchSession.recover(m, conf, sessionStore, Some(mockApp))
 
       batch.state shouldBe (SessionState.Recovering)
diff --git 
a/server/src/test/scala/org/apache/livy/server/interactive/BaseInteractiveServletSpec.scala
 
b/server/src/test/scala/org/apache/livy/server/interactive/BaseInteractiveServletSpec.scala
index b16e74ff..7f8bbfe0 100644
--- 
a/server/src/test/scala/org/apache/livy/server/interactive/BaseInteractiveServletSpec.scala
+++ 
b/server/src/test/scala/org/apache/livy/server/interactive/BaseInteractiveServletSpec.scala
@@ -61,6 +61,7 @@ abstract class BaseInteractiveServletSpec
     val classpath = sys.props("java.class.path")
     val request = new CreateInteractiveRequest()
     request.kind = kind
+    request.name = None
     request.conf = extraConf ++ Map(
       RSCConf.Entry.LIVY_JARS.key() -> "",
       RSCConf.Entry.CLIENT_IN_PROCESS.key() -> inProcess.toString,
diff --git 
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
 
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
index e946fc0c..2c296de8 100644
--- 
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
+++ 
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
@@ -60,6 +60,7 @@ class InteractiveSessionServletSpec extends 
BaseInteractiveServletSpec {
 
       val session = mock[InteractiveSession]
       when(session.kind).thenReturn(Spark)
+      when(session.name).thenReturn(None)
       when(session.appId).thenReturn(None)
       when(session.appInfo).thenReturn(AppInfo())
       when(session.logLines()).thenReturn(IndexedSeq())
@@ -153,6 +154,7 @@ class InteractiveSessionServletSpec extends 
BaseInteractiveServletSpec {
 
   it("should show session properties") {
     val id = 0
+    val name = "TEST-interactive-session"
     val appId = "appid"
     val owner = "owner"
     val proxyUser = "proxyUser"
@@ -163,6 +165,7 @@ class InteractiveSessionServletSpec extends 
BaseInteractiveServletSpec {
 
     val session = mock[InteractiveSession]
     when(session.id).thenReturn(id)
+    when(session.name).thenReturn(Some(name))
     when(session.appId).thenReturn(Some(appId))
     when(session.owner).thenReturn(owner)
     when(session.proxyUser).thenReturn(Some(proxyUser))
@@ -177,6 +180,45 @@ class InteractiveSessionServletSpec extends 
BaseInteractiveServletSpec {
       .asInstanceOf[SessionInfo]
 
     view.id shouldEqual id
+    view.name shouldEqual name
+    view.appId shouldEqual appId
+    view.owner shouldEqual owner
+    view.proxyUser shouldEqual proxyUser
+    view.state shouldEqual state.toString
+    view.kind shouldEqual kind.toString
+    view.appInfo should contain (Entry(AppInfo.DRIVER_LOG_URL_NAME, 
appInfo.driverLogUrl.get))
+    view.appInfo should contain (Entry(AppInfo.SPARK_UI_URL_NAME, 
appInfo.sparkUiUrl.get))
+    view.log shouldEqual log.asJava
+  }
+
+  it("should show session properties for sessions without a name") {
+    val id = 0
+    val appId = "appid"
+    val owner = "owner"
+    val proxyUser = "proxyUser"
+    val state = SessionState.Running
+    val kind = Spark
+    val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL"))
+    val log = IndexedSeq[String]("log1", "log2")
+
+    val session = mock[InteractiveSession]
+    when(session.id).thenReturn(id)
+    when(session.name).thenReturn(None)
+    when(session.appId).thenReturn(Some(appId))
+    when(session.owner).thenReturn(owner)
+    when(session.proxyUser).thenReturn(Some(proxyUser))
+    when(session.state).thenReturn(state)
+    when(session.kind).thenReturn(kind)
+    when(session.appInfo).thenReturn(appInfo)
+    when(session.logLines()).thenReturn(log)
+
+    val req = mock[HttpServletRequest]
+
+    val view = 
servlet.asInstanceOf[InteractiveSessionServlet].clientSessionView(session, req)
+      .asInstanceOf[SessionInfo]
+
+    view.id shouldEqual id
+    view.name shouldEqual null
     view.appId shouldEqual appId
     view.owner shouldEqual owner
     view.proxyUser shouldEqual proxyUser
diff --git 
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
 
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
index f07e61f5..95bc08aa 100644
--- 
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
+++ 
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
@@ -70,7 +70,7 @@ class InteractiveSessionSpec extends FunSpec
       SparkLauncher.DRIVER_EXTRA_CLASSPATH -> sys.props("java.class.path"),
       RSCConf.Entry.LIVY_JARS.key() -> ""
     )
-    InteractiveSession.create(0, null, livyConf, accessManager, req, 
sessionStore, mockApp)
+    InteractiveSession.create(0, None, null, livyConf, accessManager, req, 
sessionStore, mockApp)
   }
 
   private def executeStatement(code: String, codeType: Option[String] = None): 
JValue = {
@@ -160,6 +160,7 @@ class InteractiveSessionSpec extends FunSpec
       val mockApp = mock[SparkApp]
       val sessionStore = mock[SessionStore]
       session = createSession(sessionStore, Some(mockApp))
+      session.start()
 
       val expectedAppId = "APPID"
       session.appIdKnown(expectedAppId)
@@ -242,15 +243,32 @@ class InteractiveSessionSpec extends FunSpec
   }
 
   describe("recovery") {
-    it("should recover session") {
+    it("should recover named sessions") {
       val conf = new LivyConf()
       val sessionStore = mock[SessionStore]
       val mockClient = mock[RSCClient]
       
when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]])
-      val m =
-        InteractiveRecoveryMetadata(
-          78, None, "appTag", Spark, 0, null, None, Some(URI.create("")))
+      val m = InteractiveRecoveryMetadata(
+          78, Some("Test session"), None, "appTag", Spark, 0, null, None, 
Some(URI.create("")))
+      val s = InteractiveSession.recover(m, conf, sessionStore, None, 
Some(mockClient))
+      s.start()
+
+      s.state shouldBe (SessionState.Recovering)
+
+      s.appIdKnown("appId")
+      verify(sessionStore, atLeastOnce()).save(
+        MockitoMatchers.eq(InteractiveSession.RECOVERY_SESSION_TYPE), 
anyObject())
+    }
+
+    it("should recover sessions with no name") {
+      val conf = new LivyConf()
+      val sessionStore = mock[SessionStore]
+      val mockClient = mock[RSCClient]
+      
when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]])
+      val m = InteractiveRecoveryMetadata(
+          78, None, None, "appTag", Spark, 0, null, None, Some(URI.create("")))
       val s = InteractiveSession.recover(m, conf, sessionStore, None, 
Some(mockClient))
+      s.start()
 
       s.state shouldBe (SessionState.Recovering)
 
@@ -263,9 +281,9 @@ class InteractiveSessionSpec extends FunSpec
       val conf = new LivyConf()
       val sessionStore = mock[SessionStore]
       val m = InteractiveRecoveryMetadata(
-        78, Some("appId"), "appTag", Spark, 0, null, None, None)
+        78, None, Some("appId"), "appTag", Spark, 0, null, None, None)
       val s = InteractiveSession.recover(m, conf, sessionStore, None)
-
+      s.start()
       s.state shouldBe a[SessionState.Dead]
       s.logLines().mkString should include("RSCDriver URI is unknown")
     }
diff --git 
a/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala
 
b/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala
index 12c8bbb2..6d2cff8a 100644
--- 
a/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala
+++ 
b/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala
@@ -55,7 +55,8 @@ class SessionHeartbeatSpec extends FunSpec with Matchers {
   }
 
   describe("SessionHeartbeatWatchdog") {
-    abstract class TestSession extends Session(0, null, null) with 
SessionHeartbeat {}
+    abstract class TestSession
+      extends Session(0, None, null, null) with SessionHeartbeat {}
     class TestWatchdog(conf: LivyConf)
       extends SessionManager[TestSession, RecoveryMetadata](
         conf,
@@ -68,10 +69,12 @@ class SessionHeartbeatSpec extends FunSpec with Matchers {
     it("should delete only expired sessions") {
       val expiredSession: TestSession = mock[TestSession]
       when(expiredSession.id).thenReturn(0)
+      when(expiredSession.name).thenReturn(None)
       when(expiredSession.heartbeatExpired).thenReturn(true)
 
       val nonExpiredSession: TestSession = mock[TestSession]
       when(nonExpiredSession.id).thenReturn(1)
+      when(nonExpiredSession.name).thenReturn(None)
       when(nonExpiredSession.heartbeatExpired).thenReturn(false)
 
       val n = new TestWatchdog(new LivyConf())
diff --git a/server/src/test/scala/org/apache/livy/sessions/MockSession.scala 
b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
index 1604f4d9..3d0cc26f 100644
--- a/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
+++ b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
@@ -19,11 +19,14 @@ package org.apache.livy.sessions
 
 import org.apache.livy.LivyConf
 
-class MockSession(id: Int, owner: String, conf: LivyConf) extends Session(id, 
owner, conf) {
+class MockSession(id: Int, owner: String, conf: LivyConf, name: Option[String] 
= None)
+  extends Session(id, name, owner, conf) {
   case class RecoveryMetadata(id: Int) extends Session.RecoveryMetadata()
 
   override val proxyUser = None
 
+  override def start(): Unit = ()
+
   override protected def stopSession(): Unit = ()
 
   override def logLines(): IndexedSeq[String] = IndexedSeq()
diff --git 
a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala 
b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
index 547af8b5..a5ffb084 100644
--- a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
+++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
@@ -54,10 +54,48 @@ class SessionManagerSpec extends FunSpec with Matchers with 
LivyBaseUnitTestSuit
       }
     }
 
+    it("should create sessions with names") {
+      val livyConf = new LivyConf()
+      val name = "Mock-session"
+      livyConf.set(LivyConf.SESSION_TIMEOUT, "100ms")
+      val manager = new SessionManager[MockSession, RecoveryMetadata](
+        livyConf,
+        { _ => assert(false).asInstanceOf[MockSession] },
+        mock[SessionStore],
+        "test",
+        Some(Seq.empty))
+      val session = manager.register(new MockSession(manager.nextId(), null, 
livyConf, Some(name)))
+      manager.get(session.id).isDefined should be(true)
+      manager.get(name).isDefined should be(true)
+    }
+
+    it("should not create sessions with the same name") {
+      val livyConf = new LivyConf()
+      val name = "Mock-session"
+      livyConf.set(LivyConf.SESSION_TIMEOUT, "100ms")
+      val manager = new SessionManager[MockSession, RecoveryMetadata](
+        livyConf,
+        { _ => assert(false).asInstanceOf[MockSession] },
+        mock[SessionStore],
+        "test",
+        Some(Seq.empty))
+      val session1 = new MockSession(manager.nextId(), null, livyConf, 
Some(name))
+      val session2 = new MockSession(manager.nextId(), null, livyConf, 
Some(name))
+      manager.register(session1)
+      an[IllegalArgumentException] should be thrownBy 
manager.register(session2)
+      manager.get(session1.id).isDefined should be(true)
+      manager.get(session2.id).isDefined should be(false)
+      eventually(timeout(5 seconds), interval(100 millis)) {
+        Await.result(manager.collectGarbage(), Duration.Inf)
+        manager.get(session1.id) should be(None)
+      }
+    }
+
     it("batch session should not be gc-ed until application is finished") {
       val sessionId = 24
       val session = mock[BatchSession]
       when(session.id).thenReturn(sessionId)
+      when(session.name).thenReturn(None)
       when(session.stop()).thenReturn(Future {})
       when(session.lastActivity).thenReturn(System.nanoTime())
 
@@ -70,6 +108,7 @@ class SessionManagerSpec extends FunSpec with Matchers with 
LivyBaseUnitTestSuit
       val sessionId = 24
       val session = mock[InteractiveSession]
       when(session.id).thenReturn(sessionId)
+      when(session.name).thenReturn(None)
       when(session.stop()).thenReturn(Future {})
       when(session.lastActivity).thenReturn(System.nanoTime())
 
@@ -112,12 +151,13 @@ class SessionManagerSpec extends FunSpec with Matchers 
with LivyBaseUnitTestSuit
     implicit def executor: ExecutionContext = ExecutionContext.global
 
     def makeMetadata(id: Int, appTag: String): BatchRecoveryMetadata = {
-      BatchRecoveryMetadata(id, None, appTag, null, None)
+      BatchRecoveryMetadata(id, Some(s"test-session-$id"), None, appTag, null, 
None)
     }
 
     def mockSession(id: Int): BatchSession = {
       val session = mock[BatchSession]
       when(session.id).thenReturn(id)
+      when(session.name).thenReturn(None)
       when(session.stop()).thenReturn(Future {})
       when(session.lastActivity).thenReturn(System.nanoTime())
 
diff --git 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
index 5be5536d..31ea2f0a 100644
--- 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
+++ 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
@@ -228,6 +228,7 @@ class LivyThriftSessionManager(val server: 
LivyThriftServer, val livyConf: LivyC
       createInteractiveRequest.kind = Spark
       val newSession = InteractiveSession.create(
         server.livySessionManager.nextId(),
+        None,
         username,
         server.livyConf,
         server.accessManager,


With regards,
Apache Git Services

Reply via email to