This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new def6318  [LIVY-41] Let users access sessions by session name
def6318 is described below

commit def6318c84f32a09f219065691857f11ca74e7cb
Author: Fathi Salmi, Meisam(mfathisalmi) <mfathisa...@paypal.com>
AuthorDate: Tue Feb 5 11:25:06 2019 -0800

    [LIVY-41] Let users access sessions by session name
    
    This commit  enables Livy users to access sessions either by names or by 
auto-generated sessiond id's.
    
    It also prevents users from creating sessions that have the same name.
    
    This commit keeps API change  minimal. These are places that API change
    is needed:
    - `Session` and its sub-classes adds a new field, `name`.
    - `RecoveryMetadata` and its sub-classes adds a new field, `name`.
    - `SessionManager` adds a new method `getSession(name: String)` which looks 
sessions up by name.
    
    Task-url: https://issues.apache.org/jira/browse/LIVY-41
    
    Author: Fathi Salmi, Meisam(mfathisalmi) <mfathisa...@paypal.com>
    Author: Meisam Fathi <meisam.fa...@gmail.com>
    Author: Fathi Salmi, Meisam <meisam.fa...@gmail.com>
    Author: Fathi, Meisam <meisam.fa...@gmail.com>
    
    Closes #48 from meisam/LIVY-41-rebased.
---
 .../apache/livy/client/common/HttpMessages.java    |   8 +-
 .../apache/livy/client/http/HttpClientSpec.scala   |   1 +
 .../livy/test/framework/LivyRestClient.scala       |   4 +
 .../test/scala/org/apache/livy/test/BatchIT.scala  |   4 +-
 .../scala/org/apache/livy/test/InteractiveIT.scala |   2 +-
 .../org/apache/livy/server/SessionServlet.scala    |  13 ++-
 .../apache/livy/server/batch/BatchSession.scala    |  20 +++-
 .../livy/server/batch/BatchSessionServlet.scala    |   9 +-
 .../server/interactive/InteractiveSession.scala    | 104 +++++++++++----------
 .../interactive/InteractiveSessionServlet.scala    |   6 +-
 .../scala/org/apache/livy/sessions/Session.scala   |  16 +++-
 .../org/apache/livy/sessions/SessionManager.scala  |  13 +++
 .../apache/livy/server/SessionServletSpec.scala    |   4 +-
 .../livy/server/batch/BatchServletSpec.scala       |  60 +++++++-----
 .../livy/server/batch/BatchSessionSpec.scala       |  19 +++-
 .../interactive/BaseInteractiveServletSpec.scala   |   1 +
 .../InteractiveSessionServletSpec.scala            |  12 ++-
 .../interactive/InteractiveSessionSpec.scala       |  32 +++++--
 .../server/interactive/SessionHeartbeatSpec.scala  |   5 +-
 .../server/recovery/BlackholeStateStoreSpec.scala  |  20 ++++
 .../org/apache/livy/sessions/MockSession.scala     |   5 +-
 .../apache/livy/sessions/SessionManagerSpec.scala  |  46 +++++++--
 .../thriftserver/LivyThriftSessionManager.scala    |   1 +
 23 files changed, 289 insertions(+), 116 deletions(-)

diff --git 
a/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java 
b/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
index b1e253f..2245eb9 100644
--- 
a/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
+++ 
b/client-common/src/main/java/org/apache/livy/client/common/HttpMessages.java
@@ -53,6 +53,7 @@ public class HttpMessages {
   public static class SessionInfo implements ClientMessage {
 
     public final int id;
+    public final String name;
     public final String appId;
     public final String owner;
     public final String proxyUser;
@@ -61,9 +62,10 @@ public class HttpMessages {
     public final Map<String, String> appInfo;
     public final List<String> log;
 
-    public SessionInfo(int id, String appId, String owner, String proxyUser, 
String state,
-        String kind, Map<String, String> appInfo, List<String> log) {
+    public SessionInfo(int id, String name, String appId, String owner, String 
proxyUser,
+        String state, String kind, Map<String, String> appInfo, List<String> 
log) {
       this.id = id;
+      this.name = name;
       this.appId = appId;
       this.owner = owner;
       this.proxyUser = proxyUser;
@@ -74,7 +76,7 @@ public class HttpMessages {
     }
 
     private SessionInfo() {
-      this(-1, null, null, null, null, null, null, null);
+      this(-1, null, null, null, null, null, null, null, null);
     }
 
   }
diff --git 
a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala 
b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
index 837c24c..f53d9f5 100644
--- 
a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
+++ 
b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala
@@ -273,6 +273,7 @@ private class HttpClientTestBootstrap extends LifeCycle {
         val session = mock(classOf[InteractiveSession])
         val id = sessionManager.nextId()
         when(session.id).thenReturn(id)
+        when(session.name).thenReturn(None)
         when(session.appId).thenReturn(None)
         when(session.appInfo).thenReturn(AppInfo())
         when(session.state).thenReturn(SessionState.Idle)
diff --git 
a/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala
 
b/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala
index 1087559..cf68f77 100644
--- 
a/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala
+++ 
b/integration-test/src/main/scala/org/apache/livy/test/framework/LivyRestClient.scala
@@ -249,12 +249,14 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val 
livyEndpoint: String)
   }
 
   def startBatch(
+      name: Option[String],
       file: String,
       className: Option[String],
       args: List[String],
       sparkConf: Map[String, String]): BatchSession = {
     val r = new CreateBatchRequest()
     r.file = file
+    r.name = name
     r.className = className
     r.args = args
     r.conf = Map("spark.yarn.maxAppAttempts" -> "1") ++ sparkConf
@@ -264,12 +266,14 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val 
livyEndpoint: String)
   }
 
   def startSession(
+      name: Option[String],
       kind: Kind,
       sparkConf: Map[String, String],
       heartbeatTimeoutInSecond: Int): InteractiveSession = {
     val r = new CreateInteractiveRequest()
     r.kind = kind
     r.conf = sparkConf
+    r.name = name
     r.heartbeatTimeoutInSecond = heartbeatTimeoutInSecond
 
     val id = start(INTERACTIVE_TYPE, mapper.writeValueAsString(r))
diff --git a/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala 
b/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala
index 7c433fb..a6f4e73 100644
--- a/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala
+++ b/integration-test/src/test/scala/org/apache/livy/test/BatchIT.scala
@@ -159,14 +159,14 @@ class BatchIT extends BaseIntegrationTestSuite with 
BeforeAndAfterAll {
   private def withScript[R]
     (scriptPath: String, args: List[String], sparkConf: Map[String, String] = 
Map.empty)
     (f: (LivyRestClient#BatchSession) => R): R = {
-    val s = livyClient.startBatch(scriptPath, None, args, sparkConf)
+    val s = livyClient.startBatch(None, scriptPath, None, args, sparkConf)
     withSession(s)(f)
   }
 
   private def withTestLib[R]
     (testClass: Class[_], args: List[String], sparkConf: Map[String, String] = 
Map.empty)
     (f: (LivyRestClient#BatchSession) => R): R = {
-    val s = livyClient.startBatch(testLibPath, Some(testClass.getName()), 
args, sparkConf)
+    val s = livyClient.startBatch(None, testLibPath, 
Some(testClass.getName()), args, sparkConf)
     withSession(s)(f)
   }
 }
diff --git 
a/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala 
b/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala
index 689195c..0613bf3 100644
--- a/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala
+++ b/integration-test/src/test/scala/org/apache/livy/test/InteractiveIT.scala
@@ -206,7 +206,7 @@ class InteractiveIT extends BaseIntegrationTestSuite {
       waitForIdle: Boolean = true,
       heartbeatTimeoutInSecond: Int = 0)
     (f: (LivyRestClient#InteractiveSession) => R): R = {
-    withSession(livyClient.startSession(kind, sparkConf, 
heartbeatTimeoutInSecond)) { s =>
+    withSession(livyClient.startSession(None, kind, sparkConf, 
heartbeatTimeoutInSecond)) { s =>
       if (waitForIdle) {
         s.verifySessionIdle()
       }
diff --git a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala 
b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
index d62a96e..0c52a1e 100644
--- a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
+++ b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
@@ -184,8 +184,15 @@ abstract class SessionServlet[S <: Session, R <: 
RecoveryMetadata](
   private def doWithSession(fn: (S => Any),
       allowAll: Boolean,
       checkFn: Option[(String, String) => Boolean]): Any = {
-    val sessionId = params("id").toInt
-    sessionManager.get(sessionId) match {
+    val idOrNameParam: String = params("id")
+    val session = if (idOrNameParam.forall(_.isDigit)) {
+      val sessionId = idOrNameParam.toInt
+      sessionManager.get(sessionId)
+    } else {
+      val sessionName = idOrNameParam
+      sessionManager.get(sessionName)
+    }
+    session match {
       case Some(session) =>
         if (allowAll || checkFn.map(_(session.owner, 
remoteUser(request))).getOrElse(false)) {
           fn(session)
@@ -193,7 +200,7 @@ abstract class SessionServlet[S <: Session, R <: 
RecoveryMetadata](
           Forbidden()
         }
       case None =>
-        NotFound(ResponseMessage(s"Session '$sessionId' not found."))
+        NotFound(ResponseMessage(s"Session '$idOrNameParam' not found."))
     }
   }
 
diff --git 
a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala 
b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
index c15057b..3bbd742 100644
--- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
@@ -35,6 +35,7 @@ import org.apache.livy.utils.{AppInfo, SparkApp, 
SparkAppListener, SparkProcessB
 @JsonIgnoreProperties(ignoreUnknown = true)
 case class BatchRecoveryMetadata(
     id: Int,
+    name: Option[String],
     appId: Option[String],
     appTag: String,
     owner: String,
@@ -53,6 +54,7 @@ object BatchSession extends Logging {
 
   def create(
       id: Int,
+      name: Option[String],
       request: CreateBatchRequest,
       livyConf: LivyConf,
       accessManager: AccessManager,
@@ -110,6 +112,7 @@ object BatchSession extends Logging {
 
     new BatchSession(
       id,
+      name,
       appTag,
       SessionState.Starting,
       livyConf,
@@ -126,6 +129,7 @@ object BatchSession extends Logging {
       mockApp: Option[SparkApp] = None): BatchSession = {
     new BatchSession(
       m.id,
+      m.name,
       m.appTag,
       SessionState.Recovering,
       livyConf,
@@ -140,6 +144,7 @@ object BatchSession extends Logging {
 
 class BatchSession(
     id: Int,
+    name: Option[String],
     appTag: String,
     initialState: SessionState,
     livyConf: LivyConf,
@@ -147,20 +152,25 @@ class BatchSession(
     override val proxyUser: Option[String],
     sessionStore: SessionStore,
     sparkApp: BatchSession => SparkApp)
-  extends Session(id, owner, livyConf) with SparkAppListener {
+  extends Session(id, name, owner, livyConf) with SparkAppListener {
   import BatchSession._
 
   protected implicit def executor: ExecutionContextExecutor = 
ExecutionContext.global
 
   private[this] var _state: SessionState = initialState
-  private val app = sparkApp(this)
+
+  private var app: Option[SparkApp] = None
 
   override def state: SessionState = _state
 
-  override def logLines(): IndexedSeq[String] = app.log()
+  override def logLines(): IndexedSeq[String] = 
app.map(_.log()).getOrElse(IndexedSeq.empty[String])
+
+  override def start(): Unit = {
+    app = Option(sparkApp(this))
+  }
 
   override def stopSession(): Unit = {
-    app.kill()
+    app.foreach(_.kill())
   }
 
   override def appIdKnown(appId: String): Unit = {
@@ -187,5 +197,5 @@ class BatchSession(
   override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo }
 
   override def recoveryMetadata: RecoveryMetadata =
-    BatchRecoveryMetadata(id, appId, appTag, owner, proxyUser)
+    BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser)
 }
diff --git 
a/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala 
b/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala
index a069a50..e48ad8f 100644
--- 
a/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala
+++ 
b/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala
@@ -27,6 +27,7 @@ import org.apache.livy.utils.AppInfo
 
 case class BatchSessionView(
   id: Long,
+  name: Option[String],
   state: String,
   appId: Option[String],
   appInfo: AppInfo,
@@ -42,8 +43,11 @@ class BatchSessionServlet(
 
   override protected def createSession(req: HttpServletRequest): BatchSession 
= {
     val createRequest = bodyAs[CreateBatchRequest](req)
+    val sessionId = sessionManager.nextId()
+    val sessionName = createRequest.name
     BatchSession.create(
-      sessionManager.nextId(),
+      sessionId,
+      sessionName,
       createRequest,
       livyConf,
       accessManager,
@@ -66,7 +70,8 @@ class BatchSessionServlet(
       } else {
         Nil
       }
-    BatchSessionView(session.id, session.state.toString, session.appId, 
session.appInfo, logs)
+    BatchSessionView(session.id, session.name, session.state.toString, 
session.appId,
+      session.appInfo, logs)
   }
 
 }
diff --git 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index 3b3095f..0c3a3a8 100644
--- 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++ 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.concurrent.Future
+import scala.concurrent.{Future, Promise}
 import scala.concurrent.duration.{Duration, FiniteDuration}
 import scala.util.{Random, Try}
 
@@ -49,6 +49,7 @@ import org.apache.livy.utils._
 @JsonIgnoreProperties(ignoreUnknown = true)
 case class InteractiveRecoveryMetadata(
     id: Int,
+    name: Option[String],
     appId: Option[String],
     appTag: String,
     kind: Kind,
@@ -66,6 +67,7 @@ object InteractiveSession extends Logging {
 
   def create(
       id: Int,
+      name: Option[String],
       owner: String,
       livyConf: LivyConf,
       accessManager: AccessManager,
@@ -111,6 +113,7 @@ object InteractiveSession extends Logging {
 
     new InteractiveSession(
       id,
+      name,
       None,
       appTag,
       client,
@@ -137,6 +140,7 @@ object InteractiveSession extends Logging {
 
     new InteractiveSession(
       metadata.id,
+      metadata.name,
       metadata.appId,
       metadata.appTag,
       client,
@@ -347,6 +351,7 @@ object InteractiveSession extends Logging {
 
 class InteractiveSession(
     id: Int,
+    name: Option[String],
     appIdHint: Option[String],
     appTag: String,
     val client: Option[RSCClient],
@@ -358,7 +363,7 @@ class InteractiveSession(
     override val proxyUser: Option[String],
     sessionStore: SessionStore,
     mockApp: Option[SparkApp]) // For unit test.
-  extends Session(id, owner, livyConf)
+  extends Session(id, name, owner, livyConf)
   with SessionHeartbeat
   with SparkAppListener {
 
@@ -377,69 +382,74 @@ class InteractiveSession(
   private val sessionSaveLock = new Object()
 
   _appId = appIdHint
-  sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
-  heartbeat()
 
-  private val app = mockApp.orElse {
-    val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
+  private var app: Option[SparkApp] = None
+
+  override def start(): Unit = {
+    sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
+    heartbeat()
+    app = mockApp.orElse {
+      val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
         .map(new LineBufferedProcess(_, 
livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))
-    driverProcess.map { _ => SparkApp.create(appTag, appId, driverProcess, 
livyConf, Some(this)) }
-  }
+      driverProcess.map { _ => SparkApp.create(appTag, appId, driverProcess, 
livyConf, Some(this)) }
+    }
 
-  if (client.isEmpty) {
-    transition(Dead())
-    val msg = s"Cannot recover interactive session $id because its RSCDriver 
URI is unknown."
-    info(msg)
-    sessionLog = IndexedSeq(msg)
-  } else {
-    val uriFuture = Future { client.get.getServerUri.get() }
-
-    uriFuture onSuccess { case url =>
-      rscDriverUri = Option(url)
-      sessionSaveLock.synchronized {
-        sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
+    if (client.isEmpty) {
+      transition(Dead())
+      val msg = s"Cannot recover interactive session $id because its RSCDriver 
URI is unknown."
+      info(msg)
+      sessionLog = IndexedSeq(msg)
+    } else {
+      val uriFuture = Future { client.get.getServerUri.get() }
+
+      uriFuture.onSuccess { case url =>
+        rscDriverUri = Option(url)
+        sessionSaveLock.synchronized {
+          sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
+        }
       }
-    }
-    uriFuture onFailure { case e => warn("Fail to get rsc uri", e) }
+      uriFuture.onFailure { case e => warn("Fail to get rsc uri", e) }
 
-    // Send a dummy job that will return once the client is ready to be used, 
and set the
-    // state to "idle" at that point.
-    client.get.submit(new PingJob()).addListener(new 
JobHandle.Listener[Void]() {
+      // Send a dummy job that will return once the client is ready to be 
used, and set the
+      // state to "idle" at that point.
+      client.get.submit(new PingJob()).addListener(new 
JobHandle.Listener[Void]() {
       override def onJobQueued(job: JobHandle[Void]): Unit = { }
       override def onJobStarted(job: JobHandle[Void]): Unit = { }
 
-      override def onJobCancelled(job: JobHandle[Void]): Unit = errorOut()
+        override def onJobCancelled(job: JobHandle[Void]): Unit = errorOut()
 
-      override def onJobFailed(job: JobHandle[Void], cause: Throwable): Unit = 
errorOut()
+        override def onJobFailed(job: JobHandle[Void], cause: Throwable): Unit 
= errorOut()
 
-      override def onJobSucceeded(job: JobHandle[Void], result: Void): Unit = {
-        transition(SessionState.Running)
-        info(s"Interactive session $id created [appid: ${appId.orNull}, owner: 
$owner, proxyUser:" +
-          s" $proxyUser, state: ${state.toString}, kind: ${kind.toString}, " +
-          s"info: ${appInfo.asJavaMap}]")
-      }
+        override def onJobSucceeded(job: JobHandle[Void], result: Void): Unit 
= {
+          transition(SessionState.Running)
+          info(s"Interactive session $id created [appid: ${appId.orNull}, " +
+            s"owner: $owner, proxyUser:" +
+            s" $proxyUser, state: ${state.toString}, kind: ${kind.toString}, " 
+
+            s"info: ${appInfo.asJavaMap}]")
+        }
 
-      private def errorOut(): Unit = {
-        // Other code might call stop() to close the RPC channel. When RPC 
channel is closing,
-        // this callback might be triggered. Check and don't call stop() to 
avoid nested called
-        // if the session is already shutting down.
-        if (serverSideState != SessionState.ShuttingDown) {
-          transition(SessionState.Error())
-          stop()
-          app.foreach { a =>
-            info(s"Failed to ping RSC driver for session $id. Killing 
application.")
-            a.kill()
+        private def errorOut(): Unit = {
+          // Other code might call stop() to close the RPC channel. When RPC 
channel is closing,
+          // this callback might be triggered. Check and don't call stop() to 
avoid nested called
+          // if the session is already shutting down.
+          if (serverSideState != SessionState.ShuttingDown) {
+            transition(SessionState.Error())
+            stop()
+            app.foreach { a =>
+              info(s"Failed to ping RSC driver for session $id. Killing 
application.")
+              a.kill()
+            }
           }
         }
-      }
-    })
+      })
+    }
   }
 
   override def logLines(): IndexedSeq[String] = 
app.map(_.log()).getOrElse(sessionLog)
 
   override def recoveryMetadata: RecoveryMetadata =
-    InteractiveRecoveryMetadata(
-      id, appId, appTag, kind, heartbeatTimeout.toSeconds.toInt, owner, 
proxyUser, rscDriverUri)
+    InteractiveRecoveryMetadata(id, name, appId, appTag, kind,
+      heartbeatTimeout.toSeconds.toInt, owner, proxyUser, rscDriverUri)
 
   override def state: SessionState = {
     if (serverSideState == SessionState.Running) {
diff --git 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
index 7450cd7..dec88a8 100644
--- 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
+++ 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
@@ -54,6 +54,7 @@ class InteractiveSessionServlet(
     val createRequest = bodyAs[CreateInteractiveRequest](req)
     InteractiveSession.create(
       sessionManager.nextId(),
+      createRequest.name,
       remoteUser(req),
       livyConf,
       accessManager,
@@ -79,8 +80,9 @@ class InteractiveSessionServlet(
         Nil
       }
 
-    new SessionInfo(session.id, session.appId.orNull, session.owner, 
session.proxyUser.orNull,
-      session.state.toString, session.kind.toString, 
session.appInfo.asJavaMap, logs.asJava)
+    new SessionInfo(session.id, session.name.orNull, session.appId.orNull, 
session.owner,
+      session.proxyUser.orNull, session.state.toString, session.kind.toString,
+      session.appInfo.asJavaMap, logs.asJava)
   }
 
   post("/:id/stop") {
diff --git a/server/src/main/scala/org/apache/livy/sessions/Session.scala 
b/server/src/main/scala/org/apache/livy/sessions/Session.scala
index 0b54779..ee14283 100644
--- a/server/src/main/scala/org/apache/livy/sessions/Session.scala
+++ b/server/src/main/scala/org/apache/livy/sessions/Session.scala
@@ -21,7 +21,6 @@ import java.io.InputStream
 import java.net.{URI, URISyntaxException}
 import java.security.PrivilegedExceptionAction
 import java.util.UUID
-import java.util.concurrent.TimeUnit
 
 import scala.concurrent.{ExecutionContext, Future}
 
@@ -135,13 +134,24 @@ object Session {
   }
 }
 
-abstract class Session(val id: Int, val owner: String, val livyConf: LivyConf)
+abstract class Session(
+    val id: Int,
+    val name: Option[String],
+    val owner: String,
+    val livyConf: LivyConf)
   extends Logging {
 
   import Session._
 
   protected implicit val executionContext = ExecutionContext.global
 
+  // validate session name. The name should not be a number
+  name.foreach { sessionName =>
+    if (sessionName.forall(_.isDigit)) {
+      throw new IllegalArgumentException(s"Invalid session name: $sessionName")
+    }
+  }
+
   protected var _appId: Option[String] = None
 
   private var _lastActivity = System.nanoTime()
@@ -171,6 +181,8 @@ abstract class Session(val id: Int, val owner: String, val 
livyConf: LivyConf)
 
   def state: SessionState
 
+  def start(): Unit
+
   def stop(): Future[Unit] = Future {
     try {
       info(s"Stopping $this...")
diff --git 
a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala 
b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
index 5926071..a63cab3 100644
--- a/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
+++ b/server/src/main/scala/org/apache/livy/sessions/SessionManager.scala
@@ -73,6 +73,8 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : 
ClassTag](
 
   protected[this] final val idCounter = new AtomicInteger(0)
   protected[this] final val sessions = mutable.LinkedHashMap[Int, S]()
+  private[this] final val sessionsByName = mutable.HashMap[String, S]()
+
 
   private[this] final val sessionTimeoutCheck = 
livyConf.getBoolean(LivyConf.SESSION_TIMEOUT_CHECK)
   private[this] final val sessionTimeout =
@@ -92,13 +94,23 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : 
ClassTag](
   def register(session: S): S = {
     info(s"Registering new session ${session.id}")
     synchronized {
+      session.name.foreach { sessionName =>
+        if (sessionsByName.contains(sessionName)) {
+          throw new IllegalArgumentException(s"Duplicate session name: 
${session.name}")
+        } else {
+          sessionsByName.put(sessionName, session)
+        }
+      }
       sessions.put(session.id, session)
+      session.start()
     }
     session
   }
 
   def get(id: Int): Option[S] = sessions.get(id)
 
+  def get(sessionName: String): Option[S] = sessionsByName.get(sessionName)
+
   def size(): Int = sessions.size
 
   def all(): Iterable[S] = sessions.values
@@ -113,6 +125,7 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : 
ClassTag](
         sessionStore.remove(sessionType, session.id)
         synchronized {
           sessions.remove(session.id)
+          session.name.foreach(sessionsByName.remove)
         }
       } catch {
         case NonFatal(e) =>
diff --git 
a/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala 
b/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala
index e0ebd9a..38b79ce 100644
--- a/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala
@@ -32,7 +32,7 @@ object SessionServletSpec {
   val PROXY_USER = "proxyUser"
 
   class MockSession(id: Int, owner: String, livyConf: LivyConf)
-    extends Session(id, owner, livyConf) {
+    extends Session(id, None, owner, livyConf) {
 
     case class MockRecoveryMetadata(id: Int) extends RecoveryMetadata()
 
@@ -42,6 +42,8 @@ object SessionServletSpec {
 
     override def state: SessionState = SessionState.Idle
 
+    override def start(): Unit = ()
+
     override protected def stopSession(): Unit = ()
 
     override def logLines(): IndexedSeq[String] = IndexedSeq("log")
diff --git 
a/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala 
b/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala
index 2c37c19..ed29800 100644
--- a/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala
@@ -62,6 +62,34 @@ class BatchServletSpec extends 
BaseSessionServletSpec[BatchSession, BatchRecover
       accessManager)
   }
 
+  def testShowSessionProperties(name: Option[String]): Unit = {
+    val id = 0
+    val state = SessionState.Running
+    val appId = "appid"
+    val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL"))
+    val log = IndexedSeq[String]("log1", "log2")
+
+    val session = mock[BatchSession]
+    when(session.id).thenReturn(id)
+    when(session.name).thenReturn(name)
+    when(session.state).thenReturn(state)
+    when(session.appId).thenReturn(Some(appId))
+    when(session.appInfo).thenReturn(appInfo)
+    when(session.logLines()).thenReturn(log)
+
+    val req = mock[HttpServletRequest]
+
+    val view = 
servlet.asInstanceOf[BatchSessionServlet].clientSessionView(session, req)
+      .asInstanceOf[BatchSessionView]
+
+    view.id shouldEqual id
+    view.name shouldEqual name
+    view.state shouldEqual state.toString
+    view.appId shouldEqual Some(appId)
+    view.appInfo shouldEqual appInfo
+    view.log shouldEqual log
+  }
+
   describe("Batch Servlet") {
     it("should create and tear down a batch") {
       jget[Map[String, Any]]("/") { data =>
@@ -116,36 +144,18 @@ class BatchServletSpec extends 
BaseSessionServletSpec[BatchSession, BatchRecover
 
     it("should respect config black list") {
       val createRequest = new CreateBatchRequest()
+      createRequest.name = Some("TEST-BatchServletSpec-Session-0")
       createRequest.file = script.toString
       createRequest.conf = BLACKLISTED_CONFIG
       jpost[Map[String, Any]]("/", createRequest, expectedStatus = 
SC_BAD_REQUEST) { _ => }
     }
 
-    it("should show session properties") {
-      val id = 0
-      val state = SessionState.Running
-      val appId = "appid"
-      val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL"))
-      val log = IndexedSeq[String]("log1", "log2")
-
-      val session = mock[BatchSession]
-      when(session.id).thenReturn(id)
-      when(session.state).thenReturn(state)
-      when(session.appId).thenReturn(Some(appId))
-      when(session.appInfo).thenReturn(appInfo)
-      when(session.logLines()).thenReturn(log)
-
-      val req = mock[HttpServletRequest]
-
-      val view = 
servlet.asInstanceOf[BatchSessionServlet].clientSessionView(session, req)
-        .asInstanceOf[BatchSessionView]
-
-      view.id shouldEqual id
-      view.state shouldEqual state.toString
-      view.appId shouldEqual Some(appId)
-      view.appInfo shouldEqual appInfo
-      view.log shouldEqual log
-    }
+    Seq(None, Some("TEST-batch-session"))
+      .foreach { name =>
+        it(s"should show session properties (name = $name)") {
+          testShowSessionProperties(name)
+        }
+      }
 
     it("should fail session creation when max session creation is hit") {
       val createRequest = new CreateBatchRequest()
diff --git 
a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala 
b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
index 8381c95..d7cac15 100644
--- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
@@ -70,7 +70,8 @@ class BatchSessionSpec
 
       val conf = new LivyConf().set(LivyConf.LOCAL_FS_WHITELIST, 
sys.props("java.io.tmpdir"))
       val accessManager = new AccessManager(conf)
-      val batch = BatchSession.create(0, req, conf, accessManager, null, 
sessionStore)
+      val batch = BatchSession.create(0, None, req, conf, accessManager, null, 
sessionStore)
+      batch.start()
 
       Utils.waitUntil({ () => !batch.state.isActive }, Duration(10, 
TimeUnit.SECONDS))
       (batch.state match {
@@ -87,7 +88,8 @@ class BatchSessionSpec
       val mockApp = mock[SparkApp]
       val accessManager = new AccessManager(conf)
       val batch = BatchSession.create(
-        0, req, conf, accessManager, null, sessionStore, Some(mockApp))
+        0, None, req, conf, accessManager, null, sessionStore, Some(mockApp))
+      batch.start()
 
       val expectedAppId = "APPID"
       batch.appIdKnown(expectedAppId)
@@ -100,18 +102,27 @@ class BatchSessionSpec
       batch.appInfo shouldEqual expectedAppInfo
     }
 
-    it("should recover session") {
+    def testRecoverSession(name: Option[String]): Unit = {
       val conf = new LivyConf()
       val req = new CreateBatchRequest()
+      val name = Some("Test Batch Session")
       val mockApp = mock[SparkApp]
-      val m = BatchRecoveryMetadata(99, None, "appTag", null, None)
+      val m = BatchRecoveryMetadata(99, name, None, "appTag", null, None)
       val batch = BatchSession.recover(m, conf, sessionStore, Some(mockApp))
 
       batch.state shouldBe (SessionState.Recovering)
+      batch.name shouldBe (name)
 
       batch.appIdKnown("appId")
       verify(sessionStore, atLeastOnce()).save(
         Matchers.eq(BatchSession.RECOVERY_SESSION_TYPE), anyObject())
     }
+
+    Seq[Option[String]](None, Some("Test Batch Session"), null)
+      .foreach { case name =>
+        it(s"should recover session (name = $name)") {
+          testRecoverSession(name)
+        }
+      }
   }
 }
diff --git 
a/server/src/test/scala/org/apache/livy/server/interactive/BaseInteractiveServletSpec.scala
 
b/server/src/test/scala/org/apache/livy/server/interactive/BaseInteractiveServletSpec.scala
index b16e74f..7f8bbfe 100644
--- 
a/server/src/test/scala/org/apache/livy/server/interactive/BaseInteractiveServletSpec.scala
+++ 
b/server/src/test/scala/org/apache/livy/server/interactive/BaseInteractiveServletSpec.scala
@@ -61,6 +61,7 @@ abstract class BaseInteractiveServletSpec
     val classpath = sys.props("java.class.path")
     val request = new CreateInteractiveRequest()
     request.kind = kind
+    request.name = None
     request.conf = extraConf ++ Map(
       RSCConf.Entry.LIVY_JARS.key() -> "",
       RSCConf.Entry.CLIENT_IN_PROCESS.key() -> inProcess.toString,
diff --git 
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
 
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
index e946fc0..0b061fa 100644
--- 
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
+++ 
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala
@@ -60,6 +60,7 @@ class InteractiveSessionServletSpec extends 
BaseInteractiveServletSpec {
 
       val session = mock[InteractiveSession]
       when(session.kind).thenReturn(Spark)
+      when(session.name).thenReturn(None)
       when(session.appId).thenReturn(None)
       when(session.appInfo).thenReturn(AppInfo())
       when(session.logLines()).thenReturn(IndexedSeq())
@@ -151,7 +152,14 @@ class InteractiveSessionServletSpec extends 
BaseInteractiveServletSpec {
     }
   }
 
-  it("should show session properties") {
+  Seq(Some("TEST-interactive-session"), None)
+    .foreach { case name =>
+      it(s"should show session properties (name =$name") {
+        testShowSessionProperties(name: Option[String])
+      }
+    }
+
+  def testShowSessionProperties(name: Option[String]): Unit = {
     val id = 0
     val appId = "appid"
     val owner = "owner"
@@ -163,6 +171,7 @@ class InteractiveSessionServletSpec extends 
BaseInteractiveServletSpec {
 
     val session = mock[InteractiveSession]
     when(session.id).thenReturn(id)
+    when(session.name).thenReturn(name)
     when(session.appId).thenReturn(Some(appId))
     when(session.owner).thenReturn(owner)
     when(session.proxyUser).thenReturn(Some(proxyUser))
@@ -177,6 +186,7 @@ class InteractiveSessionServletSpec extends 
BaseInteractiveServletSpec {
       .asInstanceOf[SessionInfo]
 
     view.id shouldEqual id
+    Option(view.name) shouldEqual name
     view.appId shouldEqual appId
     view.owner shouldEqual owner
     view.proxyUser shouldEqual proxyUser
diff --git 
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
 
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
index f07e61f..95bc08a 100644
--- 
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
+++ 
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
@@ -70,7 +70,7 @@ class InteractiveSessionSpec extends FunSpec
       SparkLauncher.DRIVER_EXTRA_CLASSPATH -> sys.props("java.class.path"),
       RSCConf.Entry.LIVY_JARS.key() -> ""
     )
-    InteractiveSession.create(0, null, livyConf, accessManager, req, 
sessionStore, mockApp)
+    InteractiveSession.create(0, None, null, livyConf, accessManager, req, 
sessionStore, mockApp)
   }
 
   private def executeStatement(code: String, codeType: Option[String] = None): 
JValue = {
@@ -160,6 +160,7 @@ class InteractiveSessionSpec extends FunSpec
       val mockApp = mock[SparkApp]
       val sessionStore = mock[SessionStore]
       session = createSession(sessionStore, Some(mockApp))
+      session.start()
 
       val expectedAppId = "APPID"
       session.appIdKnown(expectedAppId)
@@ -242,15 +243,32 @@ class InteractiveSessionSpec extends FunSpec
   }
 
   describe("recovery") {
-    it("should recover session") {
+    it("should recover named sessions") {
       val conf = new LivyConf()
       val sessionStore = mock[SessionStore]
       val mockClient = mock[RSCClient]
       
when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]])
-      val m =
-        InteractiveRecoveryMetadata(
-          78, None, "appTag", Spark, 0, null, None, Some(URI.create("")))
+      val m = InteractiveRecoveryMetadata(
+          78, Some("Test session"), None, "appTag", Spark, 0, null, None, 
Some(URI.create("")))
+      val s = InteractiveSession.recover(m, conf, sessionStore, None, 
Some(mockClient))
+      s.start()
+
+      s.state shouldBe (SessionState.Recovering)
+
+      s.appIdKnown("appId")
+      verify(sessionStore, atLeastOnce()).save(
+        MockitoMatchers.eq(InteractiveSession.RECOVERY_SESSION_TYPE), 
anyObject())
+    }
+
+    it("should recover sessions with no name") {
+      val conf = new LivyConf()
+      val sessionStore = mock[SessionStore]
+      val mockClient = mock[RSCClient]
+      
when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]])
+      val m = InteractiveRecoveryMetadata(
+          78, None, None, "appTag", Spark, 0, null, None, Some(URI.create("")))
       val s = InteractiveSession.recover(m, conf, sessionStore, None, 
Some(mockClient))
+      s.start()
 
       s.state shouldBe (SessionState.Recovering)
 
@@ -263,9 +281,9 @@ class InteractiveSessionSpec extends FunSpec
       val conf = new LivyConf()
       val sessionStore = mock[SessionStore]
       val m = InteractiveRecoveryMetadata(
-        78, Some("appId"), "appTag", Spark, 0, null, None, None)
+        78, None, Some("appId"), "appTag", Spark, 0, null, None, None)
       val s = InteractiveSession.recover(m, conf, sessionStore, None)
-
+      s.start()
       s.state shouldBe a[SessionState.Dead]
       s.logLines().mkString should include("RSCDriver URI is unknown")
     }
diff --git 
a/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala
 
b/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala
index 12c8bbb..6d2cff8 100644
--- 
a/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala
+++ 
b/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala
@@ -55,7 +55,8 @@ class SessionHeartbeatSpec extends FunSpec with Matchers {
   }
 
   describe("SessionHeartbeatWatchdog") {
-    abstract class TestSession extends Session(0, null, null) with 
SessionHeartbeat {}
+    abstract class TestSession
+      extends Session(0, None, null, null) with SessionHeartbeat {}
     class TestWatchdog(conf: LivyConf)
       extends SessionManager[TestSession, RecoveryMetadata](
         conf,
@@ -68,10 +69,12 @@ class SessionHeartbeatSpec extends FunSpec with Matchers {
     it("should delete only expired sessions") {
       val expiredSession: TestSession = mock[TestSession]
       when(expiredSession.id).thenReturn(0)
+      when(expiredSession.name).thenReturn(None)
       when(expiredSession.heartbeatExpired).thenReturn(true)
 
       val nonExpiredSession: TestSession = mock[TestSession]
       when(nonExpiredSession.id).thenReturn(1)
+      when(nonExpiredSession.name).thenReturn(None)
       when(nonExpiredSession.heartbeatExpired).thenReturn(false)
 
       val n = new TestWatchdog(new LivyConf())
diff --git 
a/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala
 
b/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala
index e40bb1c..8ee448f 100644
--- 
a/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala
+++ 
b/server/src/test/scala/org/apache/livy/server/recovery/BlackholeStateStoreSpec.scala
@@ -21,6 +21,7 @@ import org.scalatest.FunSpec
 import org.scalatest.Matchers._
 
 import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+import org.apache.livy.server.batch.BatchRecoveryMetadata
 
 class BlackholeStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
   describe("BlackholeStateStore") {
@@ -43,5 +44,24 @@ class BlackholeStateStoreSpec extends FunSpec with 
LivyBaseUnitTestSuite {
     it("remove should not throw") {
       stateStore.remove("")
     }
+
+    it("should deserialize sessions without name") {
+      val jsonbytes =
+        """
+          |{
+          |  "id": 408107,
+          |  "appId": "application_1541532370353_1465148",
+          |  "state": "running",
+          |  "appTag": "livy-batch-408107-2jAOFzDy",
+          |  "owner": "batch_admin",
+          |  "proxyUser": "batch_opts",
+          |  "version": 1
+          |}
+        """.stripMargin.getBytes("UTF-8")
+      val batchRecoveryMetadata = 
stateStore.deserialize[BatchRecoveryMetadata](jsonbytes)
+      batchRecoveryMetadata.id shouldBe 408107
+      batchRecoveryMetadata.appId shouldBe 
Some("application_1541532370353_1465148")
+      batchRecoveryMetadata.name shouldBe None
+    }
   }
 }
diff --git a/server/src/test/scala/org/apache/livy/sessions/MockSession.scala 
b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
index 1604f4d..3d0cc26 100644
--- a/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
+++ b/server/src/test/scala/org/apache/livy/sessions/MockSession.scala
@@ -19,11 +19,14 @@ package org.apache.livy.sessions
 
 import org.apache.livy.LivyConf
 
-class MockSession(id: Int, owner: String, conf: LivyConf) extends Session(id, 
owner, conf) {
+class MockSession(id: Int, owner: String, conf: LivyConf, name: Option[String] 
= None)
+  extends Session(id, name, owner, conf) {
   case class RecoveryMetadata(id: Int) extends Session.RecoveryMetadata()
 
   override val proxyUser = None
 
+  override def start(): Unit = ()
+
   override protected def stopSession(): Unit = ()
 
   override def logLines(): IndexedSeq[String] = IndexedSeq()
diff --git 
a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala 
b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
index 547af8b..523e1d7 100644
--- a/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
+++ b/server/src/test/scala/org/apache/livy/sessions/SessionManagerSpec.scala
@@ -36,16 +36,21 @@ import org.apache.livy.sessions.Session.RecoveryMetadata
 class SessionManagerSpec extends FunSpec with Matchers with 
LivyBaseUnitTestSuite {
   implicit def executor: ExecutionContext = ExecutionContext.global
 
+  private def createSessionManager(): (LivyConf, SessionManager[MockSession, 
RecoveryMetadata]) = {
+    val livyConf = new LivyConf()
+    livyConf.set(LivyConf.SESSION_TIMEOUT, "100ms")
+    val manager = new SessionManager[MockSession, RecoveryMetadata](
+      livyConf,
+      { _ => assert(false).asInstanceOf[MockSession] },
+      mock[SessionStore],
+      "test",
+      Some(Seq.empty))
+    (livyConf, manager)
+  }
+
   describe("SessionManager") {
     it("should garbage collect old sessions") {
-      val livyConf = new LivyConf()
-      livyConf.set(LivyConf.SESSION_TIMEOUT, "100ms")
-      val manager = new SessionManager[MockSession, RecoveryMetadata](
-        livyConf,
-        { _ => assert(false).asInstanceOf[MockSession] },
-        mock[SessionStore],
-        "test",
-        Some(Seq.empty))
+      val (livyConf, manager) = createSessionManager()
       val session = manager.register(new MockSession(manager.nextId(), null, 
livyConf))
       manager.get(session.id).isDefined should be(true)
       eventually(timeout(5 seconds), interval(100 millis)) {
@@ -54,10 +59,31 @@ class SessionManagerSpec extends FunSpec with Matchers with 
LivyBaseUnitTestSuit
       }
     }
 
+    it("should create sessions with names") {
+      val (livyConf, manager) = createSessionManager()
+      val name = "Mock-session"
+      val session = manager.register(new MockSession(manager.nextId(), null, 
livyConf, Some(name)))
+      manager.get(session.id).isDefined should be(true)
+      manager.get(name).isDefined should be(true)
+    }
+
+    it("should not create sessions with duplicate names") {
+      val (livyConf, manager) = createSessionManager()
+      val name = "Mock-session"
+      val session1 = new MockSession(manager.nextId(), null, livyConf, 
Some(name))
+      val session2 = new MockSession(manager.nextId(), null, livyConf, 
Some(name))
+      manager.register(session1)
+      an[IllegalArgumentException] should be thrownBy 
manager.register(session2)
+      manager.get(session1.id).isDefined should be(true)
+      manager.get(session2.id).isDefined should be(false)
+      manager.shutdown()
+    }
+
     it("batch session should not be gc-ed until application is finished") {
       val sessionId = 24
       val session = mock[BatchSession]
       when(session.id).thenReturn(sessionId)
+      when(session.name).thenReturn(None)
       when(session.stop()).thenReturn(Future {})
       when(session.lastActivity).thenReturn(System.nanoTime())
 
@@ -70,6 +96,7 @@ class SessionManagerSpec extends FunSpec with Matchers with 
LivyBaseUnitTestSuit
       val sessionId = 24
       val session = mock[InteractiveSession]
       when(session.id).thenReturn(sessionId)
+      when(session.name).thenReturn(None)
       when(session.stop()).thenReturn(Future {})
       when(session.lastActivity).thenReturn(System.nanoTime())
 
@@ -112,12 +139,13 @@ class SessionManagerSpec extends FunSpec with Matchers 
with LivyBaseUnitTestSuit
     implicit def executor: ExecutionContext = ExecutionContext.global
 
     def makeMetadata(id: Int, appTag: String): BatchRecoveryMetadata = {
-      BatchRecoveryMetadata(id, None, appTag, null, None)
+      BatchRecoveryMetadata(id, Some(s"test-session-$id"), None, appTag, null, 
None)
     }
 
     def mockSession(id: Int): BatchSession = {
       val session = mock[BatchSession]
       when(session.id).thenReturn(id)
+      when(session.name).thenReturn(None)
       when(session.stop()).thenReturn(Future {})
       when(session.lastActivity).thenReturn(System.nanoTime())
 
diff --git 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
index 5be5536..31ea2f0 100644
--- 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
+++ 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
@@ -228,6 +228,7 @@ class LivyThriftSessionManager(val server: 
LivyThriftServer, val livyConf: LivyC
       createInteractiveRequest.kind = Spark
       val newSession = InteractiveSession.create(
         server.livySessionManager.nextId(),
+        None,
         username,
         server.livyConf,
         server.accessManager,

Reply via email to