This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new 80daade  [LIVY-551] Add "doAs" impersonation support
80daade is described below

commit 80daadef02ae57b2a5487c6f92e0f7df558d4864
Author: Kevin Risden <kris...@apache.org>
AuthorDate: Thu Feb 7 09:54:08 2019 -0800

    [LIVY-551] Add "doAs" impersonation support
    
    ## What changes were proposed in this pull request?
    
    Currently `proxyuser` is used in batches and sessions for impersonation. 
This should be extended to support impersonation for all endpoints. Adding the 
query parameter `doAs` matches others in the Hadoop ecosystem and will work 
across endpoints.
    
    https://issues.apache.org/jira/browse/LIVY-551
    
    ## How was this patch tested?
    
    Added unit and integration tests for this feature. Manually tested to 
ensure that `proxyuser` still works. Also checked that if both `proxyuser` and 
the new `doAs` parameter are specified that `doAs` takes precedence.
    
    Author: Kevin Risden <kris...@apache.org>
    
    Closes #141 from risdenk/LIVY-551.
---
 docs/rest-api.md                                   |   7 ++
 .../org/apache/livy/server/AccessManager.scala     |   5 +-
 .../org/apache/livy/server/SessionServlet.scala    |  26 +++-
 .../apache/livy/server/batch/BatchSession.scala    |   3 +-
 .../livy/server/batch/BatchSessionServlet.scala    |   3 +-
 .../server/interactive/InteractiveSession.scala    |   3 +-
 .../interactive/InteractiveSessionServlet.scala    |   3 +-
 .../scala/org/apache/livy/sessions/Session.scala   |   3 +-
 .../apache/livy/server/SessionServletSpec.scala    | 138 +++++++++++++++++++--
 .../livy/server/batch/BatchSessionSpec.scala       |   4 +-
 .../interactive/InteractiveSessionSpec.scala       |   3 +-
 .../livy/server/interactive/JobApiSpec.scala       |  73 +++++++++--
 .../thriftserver/LivyThriftSessionManager.scala    |   1 +
 13 files changed, 240 insertions(+), 32 deletions(-)

diff --git a/docs/rest-api.md b/docs/rest-api.md
index 5949b93..d4be858 100644
--- a/docs/rest-api.md
+++ b/docs/rest-api.md
@@ -810,3 +810,10 @@ A statement represents the result of an execution 
statement.
     <td>string</td>
   </tr>
 </table>
+
+### Proxy User - `doAs` support
+If superuser support is configured, Livy supports the `doAs` query parameter
+to specify the user to impersonate. The `doAs` query parameter can be used
+on any supported REST endpoint described above to perform the action as the
+specified user. If both `doAs` and `proxyUser` are specified during session
+or batch creation, the `doAs` parameter takes precedence.
diff --git a/server/src/main/scala/org/apache/livy/server/AccessManager.scala 
b/server/src/main/scala/org/apache/livy/server/AccessManager.scala
index c86801e..3ccbfa1 100644
--- a/server/src/main/scala/org/apache/livy/server/AccessManager.scala
+++ b/server/src/main/scala/org/apache/livy/server/AccessManager.scala
@@ -107,9 +107,8 @@ private[livy] class AccessManager(conf: LivyConf) extends 
Logging {
    */
   def checkImpersonation(
       target: Option[String],
-      requestUser: String,
-      livyConf: LivyConf): Option[String] = {
-    if (livyConf.getBoolean(LivyConf.IMPERSONATION_ENABLED)) {
+      requestUser: String): Option[String] = {
+    if (conf.getBoolean(LivyConf.IMPERSONATION_ENABLED)) {
       if (!target.forall(hasSuperAccess(_, requestUser))) {
         throw new AccessControlException(
           s"User '$requestUser' not allowed to impersonate '$target'.")
diff --git a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala 
b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
index 0c52a1e..412af92 100644
--- a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
+++ b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala
@@ -161,6 +161,30 @@ abstract class SessionServlet[S <: Session, R <: 
RecoveryMetadata](
   protected def remoteUser(req: HttpServletRequest): String = 
req.getRemoteUser()
 
   /**
+   * Returns the impersonated user as given by "doAs" as a request parameter.
+   */
+  protected def impersonatedUser(request: HttpServletRequest): Option[String] 
= {
+    Option(request.getParameter("doAs"))
+  }
+
+  /**
+   * Returns the proxyUser for the given request.
+   */
+  protected def proxyUser(
+      request: HttpServletRequest,
+      createRequestProxyUser: Option[String]): Option[String] = {
+    impersonatedUser(request).orElse(createRequestProxyUser)
+  }
+
+  /**
+   * Gets the request user or impersonated user to determine the effective 
user.
+   */
+  protected def effectiveUser(request: HttpServletRequest): String = {
+    val requestUser = remoteUser(request)
+    accessManager.checkImpersonation(impersonatedUser(request), 
requestUser).getOrElse(requestUser)
+  }
+
+  /**
    * Performs an operation on the session, without checking for ownership. 
Operations executed
    * via this method must not modify the session in any way, or return 
potentially sensitive
    * information.
@@ -194,7 +218,7 @@ abstract class SessionServlet[S <: Session, R <: 
RecoveryMetadata](
     }
     session match {
       case Some(session) =>
-        if (allowAll || checkFn.map(_(session.owner, 
remoteUser(request))).getOrElse(false)) {
+        if (allowAll || checkFn.map(_(session.owner, 
effectiveUser(request))).getOrElse(false)) {
           fn(session)
         } else {
           Forbidden()
diff --git 
a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala 
b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
index 3bbd742..4b27058 100644
--- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
+++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala
@@ -59,10 +59,11 @@ object BatchSession extends Logging {
       livyConf: LivyConf,
       accessManager: AccessManager,
       owner: String,
+      proxyUser: Option[String],
       sessionStore: SessionStore,
       mockApp: Option[SparkApp] = None): BatchSession = {
     val appTag = s"livy-batch-$id-${Random.alphanumeric.take(8).mkString}"
-    val impersonatedUser = accessManager.checkImpersonation(request.proxyUser, 
owner, livyConf)
+    val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner)
 
     def createSparkApp(s: BatchSession): SparkApp = {
       val conf = SparkApp.prepareSparkConf(
diff --git 
a/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala 
b/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala
index e48ad8f..1bd52f4 100644
--- 
a/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala
+++ 
b/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala
@@ -52,6 +52,7 @@ class BatchSessionServlet(
       livyConf,
       accessManager,
       remoteUser(req),
+      proxyUser(req, createRequest.proxyUser),
       sessionStore)
   }
 
@@ -59,7 +60,7 @@ class BatchSessionServlet(
       session: BatchSession,
       req: HttpServletRequest): Any = {
     val logs =
-      if (accessManager.hasViewAccess(session.owner, remoteUser(req))) {
+      if (accessManager.hasViewAccess(session.owner, effectiveUser(req))) {
         val lines = session.logLines()
 
         val size = 10
diff --git 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
index 0c3a3a8..6ec2d75 100644
--- 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
+++ 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala
@@ -69,6 +69,7 @@ object InteractiveSession extends Logging {
       id: Int,
       name: Option[String],
       owner: String,
+      proxyUser: Option[String],
       livyConf: LivyConf,
       accessManager: AccessManager,
       request: CreateInteractiveRequest,
@@ -76,7 +77,7 @@ object InteractiveSession extends Logging {
       mockApp: Option[SparkApp] = None,
       mockClient: Option[RSCClient] = None): InteractiveSession = {
     val appTag = s"livy-session-$id-${Random.alphanumeric.take(8).mkString}"
-    val impersonatedUser = accessManager.checkImpersonation(request.proxyUser, 
owner, livyConf)
+    val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner)
 
     val client = mockClient.orElse {
       val conf = SparkApp.prepareSparkConf(appTag, livyConf, prepareConf(
diff --git 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
index dec88a8..9ad1a24 100644
--- 
a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
+++ 
b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala
@@ -56,6 +56,7 @@ class InteractiveSessionServlet(
       sessionManager.nextId(),
       createRequest.name,
       remoteUser(req),
+      proxyUser(req, createRequest.proxyUser),
       livyConf,
       accessManager,
       createRequest,
@@ -66,7 +67,7 @@ class InteractiveSessionServlet(
       session: InteractiveSession,
       req: HttpServletRequest): Any = {
     val logs =
-      if (accessManager.hasViewAccess(session.owner, remoteUser(req))) {
+      if (accessManager.hasViewAccess(session.owner, effectiveUser(req))) {
         Option(session.logLines())
           .map { lines =>
             val size = 10
diff --git a/server/src/main/scala/org/apache/livy/sessions/Session.scala 
b/server/src/main/scala/org/apache/livy/sessions/Session.scala
index ee14283..67f78d4 100644
--- a/server/src/main/scala/org/apache/livy/sessions/Session.scala
+++ b/server/src/main/scala/org/apache/livy/sessions/Session.scala
@@ -216,7 +216,8 @@ abstract class Session(
 
   protected def stopSession(): Unit
 
-  protected val proxyUser: Option[String]
+  // Visible for testing.
+  val proxyUser: Option[String]
 
   protected def doAsOwner[T](fn: => T): T = {
     val user = proxyUser.getOrElse(owner)
diff --git 
a/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala 
b/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala
index 38b79ce..02fffa4 100644
--- a/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala
@@ -31,13 +31,11 @@ object SessionServletSpec {
 
   val PROXY_USER = "proxyUser"
 
-  class MockSession(id: Int, owner: String, livyConf: LivyConf)
+  class MockSession(id: Int, owner: String, val proxyUser: Option[String], 
livyConf: LivyConf)
     extends Session(id, None, owner, livyConf) {
 
     case class MockRecoveryMetadata(id: Int) extends RecoveryMetadata()
 
-    override val proxyUser = None
-
     override def recoveryMetadata: RecoveryMetadata = MockRecoveryMetadata(0)
 
     override def state: SessionState = SessionState.Idle
@@ -47,10 +45,9 @@ object SessionServletSpec {
     override protected def stopSession(): Unit = ()
 
     override def logLines(): IndexedSeq[String] = IndexedSeq("log")
-
   }
 
-  case class MockSessionView(id: Int, owner: String, logs: Seq[String])
+  case class MockSessionView(id: Int, owner: String, proxyUser: 
Option[String], logs: Seq[String])
 
   def createServlet(conf: LivyConf): SessionServlet[Session, RecoveryMetadata] 
= {
     val sessionManager = new SessionManager[Session, RecoveryMetadata](
@@ -64,27 +61,27 @@ object SessionServletSpec {
     new SessionServlet(sessionManager, conf, accessManager) with 
RemoteUserOverride {
       override protected def createSession(req: HttpServletRequest): Session = 
{
         val params = bodyAs[Map[String, String]](req)
-        accessManager.checkImpersonation(params.get(PROXY_USER), 
remoteUser(req), livyConf)
-        new MockSession(sessionManager.nextId(), remoteUser(req), conf)
+        val owner = remoteUser(req)
+        val impersonatedUser = accessManager.checkImpersonation(
+          proxyUser(req, params.get(PROXY_USER)), owner)
+        new MockSession(sessionManager.nextId(), owner, impersonatedUser, conf)
       }
 
       override protected def clientSessionView(
           session: Session,
           req: HttpServletRequest): Any = {
-        val logs = if (accessManager.hasViewAccess(session.owner, 
remoteUser(req))) {
+        val logs = if (accessManager.hasViewAccess(session.owner, 
effectiveUser(req))) {
           session.logLines()
         } else {
           Nil
         }
-        MockSessionView(session.id, session.owner, logs)
+        MockSessionView(session.id, session.owner, session.proxyUser, logs)
       }
     }
   }
-
 }
 
 class SessionServletSpec extends BaseSessionServletSpec[Session, 
RecoveryMetadata] {
-
   import SessionServletSpec._
 
   override def createServlet(): SessionServlet[Session, RecoveryMetadata] = {
@@ -114,17 +111,66 @@ class SessionServletSpec extends 
BaseSessionServletSpec[Session, RecoveryMetadat
     }
 
     it("should attach owner information to sessions") {
+      jpost[MockSessionView]("/", Map()) { res =>
+        assert(res.owner === null)
+        assert(res.proxyUser === None)
+        assert(res.logs === IndexedSeq("log"))
+        delete(res.id, adminHeaders, SC_OK)
+      }
+
       jpost[MockSessionView]("/", Map(), headers = aliceHeaders) { res =>
         assert(res.owner === "alice")
+        assert(res.proxyUser === Some("alice"))
+        assert(res.logs === IndexedSeq("log"))
+        delete(res.id, aliceHeaders, SC_OK)
+      }
+
+      jpost[MockSessionView]("/?doAs=alice", Map(), headers = adminHeaders) { 
res =>
+        assert(res.owner === ADMIN)
+        assert(res.proxyUser === Some("alice"))
         assert(res.logs === IndexedSeq("log"))
         delete(res.id, aliceHeaders, SC_OK)
       }
     }
 
-    it("should allow other users to see non-sensitive information") {
+    it("should allow other users to see all information due to ACLs not 
enabled") {
+      jpost[MockSessionView]("/", Map()) { res =>
+        jget[MockSessionView](s"/${res.id}", headers = bobHeaders) { res =>
+          assert(res.owner === null)
+          assert(res.proxyUser === None)
+          assert(res.logs === IndexedSeq("log"))
+        }
+        delete(res.id, adminHeaders, SC_OK)
+      }
+
+      jpost[MockSessionView]("/", Map(), headers = aliceHeaders) { res =>
+        jget[MockSessionView](s"/${res.id}") { res =>
+          assert(res.owner === "alice")
+          assert(res.proxyUser === Some("alice"))
+          assert(res.logs === IndexedSeq("log"))
+        }
+        delete(res.id, aliceHeaders, SC_OK)
+      }
+
       jpost[MockSessionView]("/", Map(), headers = aliceHeaders) { res =>
         jget[MockSessionView](s"/${res.id}", headers = bobHeaders) { res =>
           assert(res.owner === "alice")
+          assert(res.proxyUser === Some("alice"))
+          assert(res.logs === IndexedSeq("log"))
+        }
+
+        jget[MockSessionView](s"/${res.id}?doAs=bob", headers = adminHeaders) 
{ res =>
+          assert(res.owner === "alice")
+          assert(res.proxyUser === Some("alice"))
+          assert(res.logs === IndexedSeq("log"))
+        }
+        delete(res.id, aliceHeaders, SC_OK)
+      }
+
+      jpost[MockSessionView]("/?doAs=alice", Map(), headers = adminHeaders) { 
res =>
+        jget[MockSessionView](s"/${res.id}", headers = bobHeaders) { res =>
+          assert(res.owner === ADMIN)
+          assert(res.proxyUser === Some("alice"))
           assert(res.logs === IndexedSeq("log"))
         }
         delete(res.id, aliceHeaders, SC_OK)
@@ -135,17 +181,28 @@ class SessionServletSpec extends 
BaseSessionServletSpec[Session, RecoveryMetadat
       jpost[MockSessionView]("/", Map(), headers = aliceHeaders) { res =>
         delete(res.id, bobHeaders, SC_OK)
       }
+
+      jpost[MockSessionView]("/?doAs=alice", Map(), headers = adminHeaders) { 
res =>
+        delete(res.id, bobHeaders, SC_OK)
+      }
     }
 
     it("should not allow regular users to impersonate others") {
       jpost[MockSessionView]("/", Map(PROXY_USER -> "bob"), headers = 
aliceHeaders,
         expectedStatus = SC_FORBIDDEN) { _ => }
+
+      jpost[MockSessionView]("/?doAs=bob", Map(), headers = aliceHeaders,
+        expectedStatus = SC_FORBIDDEN) { _ => }
     }
 
     it("should allow admins to impersonate anyone") {
       jpost[MockSessionView]("/", Map(PROXY_USER -> "bob"), headers = 
adminHeaders) { res =>
         delete(res.id, adminHeaders, SC_OK)
       }
+
+      jpost[MockSessionView]("/?doAs=bob", Map(), headers = adminHeaders) { 
res =>
+        delete(res.id, adminHeaders, SC_OK)
+      }
     }
   }
 }
@@ -170,8 +227,16 @@ class AclsEnabledSessionServletSpec extends 
BaseSessionServletSpec[Session, Reco
 
   describe("SessionServlet") {
     it("should attach owner information to sessions") {
+      jpost[MockSessionView]("/", Map()) { res =>
+        assert(res.owner === null)
+        assert(res.proxyUser === None)
+        assert(res.logs === IndexedSeq("log"))
+        delete(res.id, adminHeaders, SC_OK)
+      }
+
       jpost[MockSessionView]("/", Map(), headers = aliceHeaders) { res =>
         assert(res.owner === "alice")
+        assert(res.proxyUser === Some("alice"))
         assert(res.logs === IndexedSeq("log"))
         delete(res.id, aliceHeaders, SC_OK)
       }
@@ -181,11 +246,22 @@ class AclsEnabledSessionServletSpec extends 
BaseSessionServletSpec[Session, Reco
       jpost[MockSessionView]("/", Map(), headers = aliceHeaders) { res =>
         jget[MockSessionView](s"/${res.id}", headers = bobHeaders) { res =>
           assert(res.owner === "alice")
+          assert(res.proxyUser === Some("alice"))
+          // Other user cannot see the logs
+          assert(res.logs === Nil)
+        }
+
+        jget[MockSessionView](s"/${res.id}?doAs=bob", headers = adminHeaders) 
{ res =>
+          assert(res.owner === "alice")
+          assert(res.proxyUser === Some("alice"))
           // Other user cannot see the logs
           assert(res.logs === Nil)
         }
 
         // Users with access permission could see the logs
+        jget[MockSessionView](s"/${res.id}", headers = aliceHeaders) { res =>
+          assert(res.logs === IndexedSeq("log"))
+        }
         jget[MockSessionView](s"/${res.id}", headers = viewUserHeaders) { res 
=>
           assert(res.logs === IndexedSeq("log"))
         }
@@ -198,6 +274,28 @@ class AclsEnabledSessionServletSpec extends 
BaseSessionServletSpec[Session, Reco
 
         delete(res.id, aliceHeaders, SC_OK)
       }
+
+      jpost[MockSessionView]("/?doAs=alice", Map(), headers = adminHeaders) { 
res =>
+        jget[MockSessionView](s"/${res.id}", headers = bobHeaders) { res =>
+          assert(res.owner === ADMIN)
+          assert(res.proxyUser === Some("alice"))
+          // Other user cannot see the logs
+          assert(res.logs === Nil)
+        }
+
+        // Users with access permission could see the logs
+        jget[MockSessionView](s"/${res.id}", headers = viewUserHeaders) { res 
=>
+          assert(res.logs === IndexedSeq("log"))
+        }
+        jget[MockSessionView](s"/${res.id}", headers = modifyUserHeaders) { 
res =>
+          assert(res.logs === IndexedSeq("log"))
+        }
+        jget[MockSessionView](s"/${res.id}", headers = adminHeaders) { res =>
+          assert(res.logs === IndexedSeq("log"))
+        }
+
+        delete(res.id, adminHeaders, SC_OK)
+      }
     }
 
     it("should only allow modify accessible users from modifying sessions") {
@@ -206,16 +304,30 @@ class AclsEnabledSessionServletSpec extends 
BaseSessionServletSpec[Session, Reco
         delete(res.id, viewUserHeaders, SC_FORBIDDEN)
         delete(res.id, modifyUserHeaders, SC_OK)
       }
+
+      jpost[MockSessionView]("/?doAs=alice", Map(), headers = adminHeaders) { 
res =>
+        delete(res.id, bobHeaders, SC_FORBIDDEN)
+        delete(res.id, viewUserHeaders, SC_FORBIDDEN)
+        delete(res.id, modifyUserHeaders, SC_OK)
+      }
     }
 
     it("should not allow regular users to impersonate others") {
       jpost[MockSessionView]("/", Map(PROXY_USER -> "bob"), headers = 
aliceHeaders,
         expectedStatus = SC_FORBIDDEN) { _ => }
+
+      jpost[MockSessionView]("/?doAs=bob", Map(), headers = aliceHeaders,
+        expectedStatus = SC_FORBIDDEN) { _ => }
     }
 
     it("should allow admins to impersonate anyone") {
       jpost[MockSessionView]("/", Map(PROXY_USER -> "bob"), headers = 
adminHeaders) { res =>
-        delete(res.id, bobHeaders, SC_FORBIDDEN)
+        delete(res.id, aliceHeaders, SC_FORBIDDEN)
+        delete(res.id, adminHeaders, SC_OK)
+      }
+
+      jpost[MockSessionView]("/?doAs=bob", Map(), headers = adminHeaders) { 
res =>
+        delete(res.id, aliceHeaders, SC_FORBIDDEN)
         delete(res.id, adminHeaders, SC_OK)
       }
     }
diff --git 
a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala 
b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
index d7cac15..417b627 100644
--- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala
@@ -70,7 +70,7 @@ class BatchSessionSpec
 
       val conf = new LivyConf().set(LivyConf.LOCAL_FS_WHITELIST, 
sys.props("java.io.tmpdir"))
       val accessManager = new AccessManager(conf)
-      val batch = BatchSession.create(0, None, req, conf, accessManager, null, 
sessionStore)
+      val batch = BatchSession.create(0, None, req, conf, accessManager, null, 
None, sessionStore)
       batch.start()
 
       Utils.waitUntil({ () => !batch.state.isActive }, Duration(10, 
TimeUnit.SECONDS))
@@ -88,7 +88,7 @@ class BatchSessionSpec
       val mockApp = mock[SparkApp]
       val accessManager = new AccessManager(conf)
       val batch = BatchSession.create(
-        0, None, req, conf, accessManager, null, sessionStore, Some(mockApp))
+        0, None, req, conf, accessManager, null, None, sessionStore, 
Some(mockApp))
       batch.start()
 
       val expectedAppId = "APPID"
diff --git 
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
 
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
index 95bc08a..2a99abb 100644
--- 
a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
+++ 
b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala
@@ -70,7 +70,8 @@ class InteractiveSessionSpec extends FunSpec
       SparkLauncher.DRIVER_EXTRA_CLASSPATH -> sys.props("java.class.path"),
       RSCConf.Entry.LIVY_JARS.key() -> ""
     )
-    InteractiveSession.create(0, None, null, livyConf, accessManager, req, 
sessionStore, mockApp)
+    InteractiveSession.create(0, None, null, None, livyConf, accessManager, 
req,
+      sessionStore, mockApp)
   }
 
   private def executeStatement(code: String, codeType: Option[String] = None): 
JValue = {
diff --git 
a/server/src/test/scala/org/apache/livy/server/interactive/JobApiSpec.scala 
b/server/src/test/scala/org/apache/livy/server/interactive/JobApiSpec.scala
index 36a46ca..1646492 100644
--- a/server/src/test/scala/org/apache/livy/server/interactive/JobApiSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/interactive/JobApiSpec.scala
@@ -27,10 +27,11 @@ import scala.concurrent.duration._
 import scala.io.Source
 import scala.language.postfixOps
 
+import org.apache.hadoop.security.UserGroupInformation
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.mock.MockitoSugar.mock
 
-import org.apache.livy.{Job, JobHandle}
+import org.apache.livy.{Job, JobHandle, LivyConf}
 import org.apache.livy.client.common.{BufferUtils, Serializer}
 import org.apache.livy.client.common.HttpMessages._
 import org.apache.livy.server.{AccessManager, RemoteUserOverride}
@@ -40,7 +41,7 @@ import org.apache.livy.test.jobs.{Echo, GetCurrentUser}
 
 class JobApiSpec extends BaseInteractiveServletSpec {
 
-  private val PROXY = "__proxy__"
+  protected val PROXY = "__proxy__"
 
   private var sessionId: Int = -1
 
@@ -124,6 +125,7 @@ class JobApiSpec extends BaseInteractiveServletSpec {
     }
 
     it("should support user impersonation") {
+      assume(createConf().getBoolean(LivyConf.IMPERSONATION_ENABLED))
       val headers = makeUserHeaders(PROXY)
       jpost[SessionInfo]("/", createRequest(inProcess = false), headers = 
headers) { data =>
         try {
@@ -139,6 +141,7 @@ class JobApiSpec extends BaseInteractiveServletSpec {
     }
 
     it("should honor impersonation requests") {
+      assume(createConf().getBoolean(LivyConf.IMPERSONATION_ENABLED))
       val request = createRequest(inProcess = false)
       request.proxyUser = Some(PROXY)
       jpost[SessionInfo]("/", request, headers = adminHeaders) { data =>
@@ -166,7 +169,7 @@ class JobApiSpec extends BaseInteractiveServletSpec {
 
   }
 
-  private def waitForIdle(id: Int): Unit = {
+  protected def waitForIdle(id: Int): Unit = {
     eventually(timeout(1 minute), interval(100 millis)) {
       jget[SessionInfo](s"/$id") { status =>
         status.state should be (SessionState.Idle.toString())
@@ -174,11 +177,11 @@ class JobApiSpec extends BaseInteractiveServletSpec {
     }
   }
 
-  private def deleteSession(id: Int): Unit = {
+  protected def deleteSession(id: Int): Unit = {
     jdelete[Map[String, Any]](s"/$id", headers = adminHeaders) { _ => }
   }
 
-  private def testResourceUpload(cmd: String, sessionId: Int): Unit = {
+  protected def testResourceUpload(cmd: String, sessionId: Int): Unit = {
     val f = File.createTempFile("uploadTestFile", cmd)
     val conf = createConf()
 
@@ -197,12 +200,12 @@ class JobApiSpec extends BaseInteractiveServletSpec {
     }
   }
 
-  private def testJobSubmission(sid: Int, sync: Boolean): Unit = {
+  protected def testJobSubmission(sid: Int, sync: Boolean): Unit = {
     val result = runJob(sid, new Echo(42), sync = sync)
     result should be (42)
   }
 
-  private def runJob[T](
+  protected def runJob[T](
       sid: Int,
       job: Job[T],
       sync: Boolean = false,
@@ -227,3 +230,59 @@ class JobApiSpec extends BaseInteractiveServletSpec {
   }
 
 }
+
+class JobApiSpecNoImpersonation extends JobApiSpec {
+  override protected def createConf(): LivyConf = synchronized {
+    super.createConf().set(LivyConf.IMPERSONATION_ENABLED, false)
+  }
+
+  it("should not support user impersonation") {
+    assume(!createConf().getBoolean(LivyConf.IMPERSONATION_ENABLED))
+    jpost[SessionInfo]("/", createRequest(inProcess = false)) { data =>
+      try {
+        waitForIdle(data.id)
+        data.owner should be (null)
+        data.proxyUser should be (null)
+        val user = runJob(data.id, new GetCurrentUser())
+        user should be (UserGroupInformation.getCurrentUser.getUserName)
+      } finally {
+        deleteSession(data.id)
+      }
+    }
+
+    val headers = makeUserHeaders(PROXY)
+    jpost[SessionInfo]("/", createRequest(inProcess = false), headers = 
headers) { data =>
+      try {
+        waitForIdle(data.id)
+        data.owner should be (PROXY)
+        data.proxyUser should be (null)
+        val user = runJob(data.id, new GetCurrentUser(), headers = headers)
+        user should be (UserGroupInformation.getCurrentUser.getUserName)
+      } finally {
+        deleteSession(data.id)
+      }
+    }
+  }
+
+  it("should not honor impersonation requests") {
+    assume(!createConf().getBoolean(LivyConf.IMPERSONATION_ENABLED))
+    val request = createRequest(inProcess = false)
+    request.proxyUser = Some(PROXY)
+    jpost[SessionInfo]("/", request, headers = adminHeaders) { data =>
+      try {
+        waitForIdle(data.id)
+        data.owner should be (ADMIN)
+        data.proxyUser should be (null)
+        val user = runJob(data.id, new GetCurrentUser(), headers = 
adminHeaders)
+        user should be (UserGroupInformation.getCurrentUser.getUserName)
+
+        // Test that files are uploaded to a new session directory.
+        assert(tempDir.listFiles().length === 0)
+        testResourceUpload("file", data.id)
+      } finally {
+        deleteSession(data.id)
+        assert(tempDir.listFiles().length === 0)
+      }
+    }
+  }
+}
diff --git 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
index 31ea2f0..bc62084 100644
--- 
a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
+++ 
b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
@@ -230,6 +230,7 @@ class LivyThriftSessionManager(val server: 
LivyThriftServer, val livyConf: LivyC
         server.livySessionManager.nextId(),
         None,
         username,
+        None,
         server.livyConf,
         server.accessManager,
         createInteractiveRequest,

Reply via email to