This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push: new 80daade [LIVY-551] Add "doAs" impersonation support 80daade is described below commit 80daadef02ae57b2a5487c6f92e0f7df558d4864 Author: Kevin Risden <kris...@apache.org> AuthorDate: Thu Feb 7 09:54:08 2019 -0800 [LIVY-551] Add "doAs" impersonation support ## What changes were proposed in this pull request? Currently `proxyuser` is used in batches and sessions for impersonation. This should be extended to support impersonation for all endpoints. Adding the query parameter `doAs` matches others in the Hadoop ecosystem and will work across endpoints. https://issues.apache.org/jira/browse/LIVY-551 ## How was this patch tested? Added unit and integration tests for this feature. Manually tested to ensure that `proxyuser` still works. Also checked that if both `proxyuser` and the new `doAs` parameter are specified that `doAs` takes precedence. Author: Kevin Risden <kris...@apache.org> Closes #141 from risdenk/LIVY-551. --- docs/rest-api.md | 7 ++ .../org/apache/livy/server/AccessManager.scala | 5 +- .../org/apache/livy/server/SessionServlet.scala | 26 +++- .../apache/livy/server/batch/BatchSession.scala | 3 +- .../livy/server/batch/BatchSessionServlet.scala | 3 +- .../server/interactive/InteractiveSession.scala | 3 +- .../interactive/InteractiveSessionServlet.scala | 3 +- .../scala/org/apache/livy/sessions/Session.scala | 3 +- .../apache/livy/server/SessionServletSpec.scala | 138 +++++++++++++++++++-- .../livy/server/batch/BatchSessionSpec.scala | 4 +- .../interactive/InteractiveSessionSpec.scala | 3 +- .../livy/server/interactive/JobApiSpec.scala | 73 +++++++++-- .../thriftserver/LivyThriftSessionManager.scala | 1 + 13 files changed, 240 insertions(+), 32 deletions(-) diff --git a/docs/rest-api.md b/docs/rest-api.md index 5949b93..d4be858 100644 --- a/docs/rest-api.md +++ b/docs/rest-api.md @@ -810,3 +810,10 @@ A statement represents the result of an execution statement. <td>string</td> </tr> </table> + +### Proxy User - `doAs` support +If superuser support is configured, Livy supports the `doAs` query parameter +to specify the user to impersonate. The `doAs` query parameter can be used +on any supported REST endpoint described above to perform the action as the +specified user. If both `doAs` and `proxyUser` are specified during session +or batch creation, the `doAs` parameter takes precedence. diff --git a/server/src/main/scala/org/apache/livy/server/AccessManager.scala b/server/src/main/scala/org/apache/livy/server/AccessManager.scala index c86801e..3ccbfa1 100644 --- a/server/src/main/scala/org/apache/livy/server/AccessManager.scala +++ b/server/src/main/scala/org/apache/livy/server/AccessManager.scala @@ -107,9 +107,8 @@ private[livy] class AccessManager(conf: LivyConf) extends Logging { */ def checkImpersonation( target: Option[String], - requestUser: String, - livyConf: LivyConf): Option[String] = { - if (livyConf.getBoolean(LivyConf.IMPERSONATION_ENABLED)) { + requestUser: String): Option[String] = { + if (conf.getBoolean(LivyConf.IMPERSONATION_ENABLED)) { if (!target.forall(hasSuperAccess(_, requestUser))) { throw new AccessControlException( s"User '$requestUser' not allowed to impersonate '$target'.") diff --git a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala index 0c52a1e..412af92 100644 --- a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala +++ b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala @@ -161,6 +161,30 @@ abstract class SessionServlet[S <: Session, R <: RecoveryMetadata]( protected def remoteUser(req: HttpServletRequest): String = req.getRemoteUser() /** + * Returns the impersonated user as given by "doAs" as a request parameter. + */ + protected def impersonatedUser(request: HttpServletRequest): Option[String] = { + Option(request.getParameter("doAs")) + } + + /** + * Returns the proxyUser for the given request. + */ + protected def proxyUser( + request: HttpServletRequest, + createRequestProxyUser: Option[String]): Option[String] = { + impersonatedUser(request).orElse(createRequestProxyUser) + } + + /** + * Gets the request user or impersonated user to determine the effective user. + */ + protected def effectiveUser(request: HttpServletRequest): String = { + val requestUser = remoteUser(request) + accessManager.checkImpersonation(impersonatedUser(request), requestUser).getOrElse(requestUser) + } + + /** * Performs an operation on the session, without checking for ownership. Operations executed * via this method must not modify the session in any way, or return potentially sensitive * information. @@ -194,7 +218,7 @@ abstract class SessionServlet[S <: Session, R <: RecoveryMetadata]( } session match { case Some(session) => - if (allowAll || checkFn.map(_(session.owner, remoteUser(request))).getOrElse(false)) { + if (allowAll || checkFn.map(_(session.owner, effectiveUser(request))).getOrElse(false)) { fn(session) } else { Forbidden() diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala index 3bbd742..4b27058 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala @@ -59,10 +59,11 @@ object BatchSession extends Logging { livyConf: LivyConf, accessManager: AccessManager, owner: String, + proxyUser: Option[String], sessionStore: SessionStore, mockApp: Option[SparkApp] = None): BatchSession = { val appTag = s"livy-batch-$id-${Random.alphanumeric.take(8).mkString}" - val impersonatedUser = accessManager.checkImpersonation(request.proxyUser, owner, livyConf) + val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner) def createSparkApp(s: BatchSession): SparkApp = { val conf = SparkApp.prepareSparkConf( diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala index e48ad8f..1bd52f4 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala @@ -52,6 +52,7 @@ class BatchSessionServlet( livyConf, accessManager, remoteUser(req), + proxyUser(req, createRequest.proxyUser), sessionStore) } @@ -59,7 +60,7 @@ class BatchSessionServlet( session: BatchSession, req: HttpServletRequest): Any = { val logs = - if (accessManager.hasViewAccess(session.owner, remoteUser(req))) { + if (accessManager.hasViewAccess(session.owner, effectiveUser(req))) { val lines = session.logLines() val size = 10 diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index 0c3a3a8..6ec2d75 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -69,6 +69,7 @@ object InteractiveSession extends Logging { id: Int, name: Option[String], owner: String, + proxyUser: Option[String], livyConf: LivyConf, accessManager: AccessManager, request: CreateInteractiveRequest, @@ -76,7 +77,7 @@ object InteractiveSession extends Logging { mockApp: Option[SparkApp] = None, mockClient: Option[RSCClient] = None): InteractiveSession = { val appTag = s"livy-session-$id-${Random.alphanumeric.take(8).mkString}" - val impersonatedUser = accessManager.checkImpersonation(request.proxyUser, owner, livyConf) + val impersonatedUser = accessManager.checkImpersonation(proxyUser, owner) val client = mockClient.orElse { val conf = SparkApp.prepareSparkConf(appTag, livyConf, prepareConf( diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala index dec88a8..9ad1a24 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala @@ -56,6 +56,7 @@ class InteractiveSessionServlet( sessionManager.nextId(), createRequest.name, remoteUser(req), + proxyUser(req, createRequest.proxyUser), livyConf, accessManager, createRequest, @@ -66,7 +67,7 @@ class InteractiveSessionServlet( session: InteractiveSession, req: HttpServletRequest): Any = { val logs = - if (accessManager.hasViewAccess(session.owner, remoteUser(req))) { + if (accessManager.hasViewAccess(session.owner, effectiveUser(req))) { Option(session.logLines()) .map { lines => val size = 10 diff --git a/server/src/main/scala/org/apache/livy/sessions/Session.scala b/server/src/main/scala/org/apache/livy/sessions/Session.scala index ee14283..67f78d4 100644 --- a/server/src/main/scala/org/apache/livy/sessions/Session.scala +++ b/server/src/main/scala/org/apache/livy/sessions/Session.scala @@ -216,7 +216,8 @@ abstract class Session( protected def stopSession(): Unit - protected val proxyUser: Option[String] + // Visible for testing. + val proxyUser: Option[String] protected def doAsOwner[T](fn: => T): T = { val user = proxyUser.getOrElse(owner) diff --git a/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala b/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala index 38b79ce..02fffa4 100644 --- a/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala @@ -31,13 +31,11 @@ object SessionServletSpec { val PROXY_USER = "proxyUser" - class MockSession(id: Int, owner: String, livyConf: LivyConf) + class MockSession(id: Int, owner: String, val proxyUser: Option[String], livyConf: LivyConf) extends Session(id, None, owner, livyConf) { case class MockRecoveryMetadata(id: Int) extends RecoveryMetadata() - override val proxyUser = None - override def recoveryMetadata: RecoveryMetadata = MockRecoveryMetadata(0) override def state: SessionState = SessionState.Idle @@ -47,10 +45,9 @@ object SessionServletSpec { override protected def stopSession(): Unit = () override def logLines(): IndexedSeq[String] = IndexedSeq("log") - } - case class MockSessionView(id: Int, owner: String, logs: Seq[String]) + case class MockSessionView(id: Int, owner: String, proxyUser: Option[String], logs: Seq[String]) def createServlet(conf: LivyConf): SessionServlet[Session, RecoveryMetadata] = { val sessionManager = new SessionManager[Session, RecoveryMetadata]( @@ -64,27 +61,27 @@ object SessionServletSpec { new SessionServlet(sessionManager, conf, accessManager) with RemoteUserOverride { override protected def createSession(req: HttpServletRequest): Session = { val params = bodyAs[Map[String, String]](req) - accessManager.checkImpersonation(params.get(PROXY_USER), remoteUser(req), livyConf) - new MockSession(sessionManager.nextId(), remoteUser(req), conf) + val owner = remoteUser(req) + val impersonatedUser = accessManager.checkImpersonation( + proxyUser(req, params.get(PROXY_USER)), owner) + new MockSession(sessionManager.nextId(), owner, impersonatedUser, conf) } override protected def clientSessionView( session: Session, req: HttpServletRequest): Any = { - val logs = if (accessManager.hasViewAccess(session.owner, remoteUser(req))) { + val logs = if (accessManager.hasViewAccess(session.owner, effectiveUser(req))) { session.logLines() } else { Nil } - MockSessionView(session.id, session.owner, logs) + MockSessionView(session.id, session.owner, session.proxyUser, logs) } } } - } class SessionServletSpec extends BaseSessionServletSpec[Session, RecoveryMetadata] { - import SessionServletSpec._ override def createServlet(): SessionServlet[Session, RecoveryMetadata] = { @@ -114,17 +111,66 @@ class SessionServletSpec extends BaseSessionServletSpec[Session, RecoveryMetadat } it("should attach owner information to sessions") { + jpost[MockSessionView]("/", Map()) { res => + assert(res.owner === null) + assert(res.proxyUser === None) + assert(res.logs === IndexedSeq("log")) + delete(res.id, adminHeaders, SC_OK) + } + jpost[MockSessionView]("/", Map(), headers = aliceHeaders) { res => assert(res.owner === "alice") + assert(res.proxyUser === Some("alice")) + assert(res.logs === IndexedSeq("log")) + delete(res.id, aliceHeaders, SC_OK) + } + + jpost[MockSessionView]("/?doAs=alice", Map(), headers = adminHeaders) { res => + assert(res.owner === ADMIN) + assert(res.proxyUser === Some("alice")) assert(res.logs === IndexedSeq("log")) delete(res.id, aliceHeaders, SC_OK) } } - it("should allow other users to see non-sensitive information") { + it("should allow other users to see all information due to ACLs not enabled") { + jpost[MockSessionView]("/", Map()) { res => + jget[MockSessionView](s"/${res.id}", headers = bobHeaders) { res => + assert(res.owner === null) + assert(res.proxyUser === None) + assert(res.logs === IndexedSeq("log")) + } + delete(res.id, adminHeaders, SC_OK) + } + + jpost[MockSessionView]("/", Map(), headers = aliceHeaders) { res => + jget[MockSessionView](s"/${res.id}") { res => + assert(res.owner === "alice") + assert(res.proxyUser === Some("alice")) + assert(res.logs === IndexedSeq("log")) + } + delete(res.id, aliceHeaders, SC_OK) + } + jpost[MockSessionView]("/", Map(), headers = aliceHeaders) { res => jget[MockSessionView](s"/${res.id}", headers = bobHeaders) { res => assert(res.owner === "alice") + assert(res.proxyUser === Some("alice")) + assert(res.logs === IndexedSeq("log")) + } + + jget[MockSessionView](s"/${res.id}?doAs=bob", headers = adminHeaders) { res => + assert(res.owner === "alice") + assert(res.proxyUser === Some("alice")) + assert(res.logs === IndexedSeq("log")) + } + delete(res.id, aliceHeaders, SC_OK) + } + + jpost[MockSessionView]("/?doAs=alice", Map(), headers = adminHeaders) { res => + jget[MockSessionView](s"/${res.id}", headers = bobHeaders) { res => + assert(res.owner === ADMIN) + assert(res.proxyUser === Some("alice")) assert(res.logs === IndexedSeq("log")) } delete(res.id, aliceHeaders, SC_OK) @@ -135,17 +181,28 @@ class SessionServletSpec extends BaseSessionServletSpec[Session, RecoveryMetadat jpost[MockSessionView]("/", Map(), headers = aliceHeaders) { res => delete(res.id, bobHeaders, SC_OK) } + + jpost[MockSessionView]("/?doAs=alice", Map(), headers = adminHeaders) { res => + delete(res.id, bobHeaders, SC_OK) + } } it("should not allow regular users to impersonate others") { jpost[MockSessionView]("/", Map(PROXY_USER -> "bob"), headers = aliceHeaders, expectedStatus = SC_FORBIDDEN) { _ => } + + jpost[MockSessionView]("/?doAs=bob", Map(), headers = aliceHeaders, + expectedStatus = SC_FORBIDDEN) { _ => } } it("should allow admins to impersonate anyone") { jpost[MockSessionView]("/", Map(PROXY_USER -> "bob"), headers = adminHeaders) { res => delete(res.id, adminHeaders, SC_OK) } + + jpost[MockSessionView]("/?doAs=bob", Map(), headers = adminHeaders) { res => + delete(res.id, adminHeaders, SC_OK) + } } } } @@ -170,8 +227,16 @@ class AclsEnabledSessionServletSpec extends BaseSessionServletSpec[Session, Reco describe("SessionServlet") { it("should attach owner information to sessions") { + jpost[MockSessionView]("/", Map()) { res => + assert(res.owner === null) + assert(res.proxyUser === None) + assert(res.logs === IndexedSeq("log")) + delete(res.id, adminHeaders, SC_OK) + } + jpost[MockSessionView]("/", Map(), headers = aliceHeaders) { res => assert(res.owner === "alice") + assert(res.proxyUser === Some("alice")) assert(res.logs === IndexedSeq("log")) delete(res.id, aliceHeaders, SC_OK) } @@ -181,11 +246,22 @@ class AclsEnabledSessionServletSpec extends BaseSessionServletSpec[Session, Reco jpost[MockSessionView]("/", Map(), headers = aliceHeaders) { res => jget[MockSessionView](s"/${res.id}", headers = bobHeaders) { res => assert(res.owner === "alice") + assert(res.proxyUser === Some("alice")) + // Other user cannot see the logs + assert(res.logs === Nil) + } + + jget[MockSessionView](s"/${res.id}?doAs=bob", headers = adminHeaders) { res => + assert(res.owner === "alice") + assert(res.proxyUser === Some("alice")) // Other user cannot see the logs assert(res.logs === Nil) } // Users with access permission could see the logs + jget[MockSessionView](s"/${res.id}", headers = aliceHeaders) { res => + assert(res.logs === IndexedSeq("log")) + } jget[MockSessionView](s"/${res.id}", headers = viewUserHeaders) { res => assert(res.logs === IndexedSeq("log")) } @@ -198,6 +274,28 @@ class AclsEnabledSessionServletSpec extends BaseSessionServletSpec[Session, Reco delete(res.id, aliceHeaders, SC_OK) } + + jpost[MockSessionView]("/?doAs=alice", Map(), headers = adminHeaders) { res => + jget[MockSessionView](s"/${res.id}", headers = bobHeaders) { res => + assert(res.owner === ADMIN) + assert(res.proxyUser === Some("alice")) + // Other user cannot see the logs + assert(res.logs === Nil) + } + + // Users with access permission could see the logs + jget[MockSessionView](s"/${res.id}", headers = viewUserHeaders) { res => + assert(res.logs === IndexedSeq("log")) + } + jget[MockSessionView](s"/${res.id}", headers = modifyUserHeaders) { res => + assert(res.logs === IndexedSeq("log")) + } + jget[MockSessionView](s"/${res.id}", headers = adminHeaders) { res => + assert(res.logs === IndexedSeq("log")) + } + + delete(res.id, adminHeaders, SC_OK) + } } it("should only allow modify accessible users from modifying sessions") { @@ -206,16 +304,30 @@ class AclsEnabledSessionServletSpec extends BaseSessionServletSpec[Session, Reco delete(res.id, viewUserHeaders, SC_FORBIDDEN) delete(res.id, modifyUserHeaders, SC_OK) } + + jpost[MockSessionView]("/?doAs=alice", Map(), headers = adminHeaders) { res => + delete(res.id, bobHeaders, SC_FORBIDDEN) + delete(res.id, viewUserHeaders, SC_FORBIDDEN) + delete(res.id, modifyUserHeaders, SC_OK) + } } it("should not allow regular users to impersonate others") { jpost[MockSessionView]("/", Map(PROXY_USER -> "bob"), headers = aliceHeaders, expectedStatus = SC_FORBIDDEN) { _ => } + + jpost[MockSessionView]("/?doAs=bob", Map(), headers = aliceHeaders, + expectedStatus = SC_FORBIDDEN) { _ => } } it("should allow admins to impersonate anyone") { jpost[MockSessionView]("/", Map(PROXY_USER -> "bob"), headers = adminHeaders) { res => - delete(res.id, bobHeaders, SC_FORBIDDEN) + delete(res.id, aliceHeaders, SC_FORBIDDEN) + delete(res.id, adminHeaders, SC_OK) + } + + jpost[MockSessionView]("/?doAs=bob", Map(), headers = adminHeaders) { res => + delete(res.id, aliceHeaders, SC_FORBIDDEN) delete(res.id, adminHeaders, SC_OK) } } diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala index d7cac15..417b627 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala @@ -70,7 +70,7 @@ class BatchSessionSpec val conf = new LivyConf().set(LivyConf.LOCAL_FS_WHITELIST, sys.props("java.io.tmpdir")) val accessManager = new AccessManager(conf) - val batch = BatchSession.create(0, None, req, conf, accessManager, null, sessionStore) + val batch = BatchSession.create(0, None, req, conf, accessManager, null, None, sessionStore) batch.start() Utils.waitUntil({ () => !batch.state.isActive }, Duration(10, TimeUnit.SECONDS)) @@ -88,7 +88,7 @@ class BatchSessionSpec val mockApp = mock[SparkApp] val accessManager = new AccessManager(conf) val batch = BatchSession.create( - 0, None, req, conf, accessManager, null, sessionStore, Some(mockApp)) + 0, None, req, conf, accessManager, null, None, sessionStore, Some(mockApp)) batch.start() val expectedAppId = "APPID" diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala index 95bc08a..2a99abb 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala @@ -70,7 +70,8 @@ class InteractiveSessionSpec extends FunSpec SparkLauncher.DRIVER_EXTRA_CLASSPATH -> sys.props("java.class.path"), RSCConf.Entry.LIVY_JARS.key() -> "" ) - InteractiveSession.create(0, None, null, livyConf, accessManager, req, sessionStore, mockApp) + InteractiveSession.create(0, None, null, None, livyConf, accessManager, req, + sessionStore, mockApp) } private def executeStatement(code: String, codeType: Option[String] = None): JValue = { diff --git a/server/src/test/scala/org/apache/livy/server/interactive/JobApiSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/JobApiSpec.scala index 36a46ca..1646492 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/JobApiSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/JobApiSpec.scala @@ -27,10 +27,11 @@ import scala.concurrent.duration._ import scala.io.Source import scala.language.postfixOps +import org.apache.hadoop.security.UserGroupInformation import org.scalatest.concurrent.Eventually._ import org.scalatest.mock.MockitoSugar.mock -import org.apache.livy.{Job, JobHandle} +import org.apache.livy.{Job, JobHandle, LivyConf} import org.apache.livy.client.common.{BufferUtils, Serializer} import org.apache.livy.client.common.HttpMessages._ import org.apache.livy.server.{AccessManager, RemoteUserOverride} @@ -40,7 +41,7 @@ import org.apache.livy.test.jobs.{Echo, GetCurrentUser} class JobApiSpec extends BaseInteractiveServletSpec { - private val PROXY = "__proxy__" + protected val PROXY = "__proxy__" private var sessionId: Int = -1 @@ -124,6 +125,7 @@ class JobApiSpec extends BaseInteractiveServletSpec { } it("should support user impersonation") { + assume(createConf().getBoolean(LivyConf.IMPERSONATION_ENABLED)) val headers = makeUserHeaders(PROXY) jpost[SessionInfo]("/", createRequest(inProcess = false), headers = headers) { data => try { @@ -139,6 +141,7 @@ class JobApiSpec extends BaseInteractiveServletSpec { } it("should honor impersonation requests") { + assume(createConf().getBoolean(LivyConf.IMPERSONATION_ENABLED)) val request = createRequest(inProcess = false) request.proxyUser = Some(PROXY) jpost[SessionInfo]("/", request, headers = adminHeaders) { data => @@ -166,7 +169,7 @@ class JobApiSpec extends BaseInteractiveServletSpec { } - private def waitForIdle(id: Int): Unit = { + protected def waitForIdle(id: Int): Unit = { eventually(timeout(1 minute), interval(100 millis)) { jget[SessionInfo](s"/$id") { status => status.state should be (SessionState.Idle.toString()) @@ -174,11 +177,11 @@ class JobApiSpec extends BaseInteractiveServletSpec { } } - private def deleteSession(id: Int): Unit = { + protected def deleteSession(id: Int): Unit = { jdelete[Map[String, Any]](s"/$id", headers = adminHeaders) { _ => } } - private def testResourceUpload(cmd: String, sessionId: Int): Unit = { + protected def testResourceUpload(cmd: String, sessionId: Int): Unit = { val f = File.createTempFile("uploadTestFile", cmd) val conf = createConf() @@ -197,12 +200,12 @@ class JobApiSpec extends BaseInteractiveServletSpec { } } - private def testJobSubmission(sid: Int, sync: Boolean): Unit = { + protected def testJobSubmission(sid: Int, sync: Boolean): Unit = { val result = runJob(sid, new Echo(42), sync = sync) result should be (42) } - private def runJob[T]( + protected def runJob[T]( sid: Int, job: Job[T], sync: Boolean = false, @@ -227,3 +230,59 @@ class JobApiSpec extends BaseInteractiveServletSpec { } } + +class JobApiSpecNoImpersonation extends JobApiSpec { + override protected def createConf(): LivyConf = synchronized { + super.createConf().set(LivyConf.IMPERSONATION_ENABLED, false) + } + + it("should not support user impersonation") { + assume(!createConf().getBoolean(LivyConf.IMPERSONATION_ENABLED)) + jpost[SessionInfo]("/", createRequest(inProcess = false)) { data => + try { + waitForIdle(data.id) + data.owner should be (null) + data.proxyUser should be (null) + val user = runJob(data.id, new GetCurrentUser()) + user should be (UserGroupInformation.getCurrentUser.getUserName) + } finally { + deleteSession(data.id) + } + } + + val headers = makeUserHeaders(PROXY) + jpost[SessionInfo]("/", createRequest(inProcess = false), headers = headers) { data => + try { + waitForIdle(data.id) + data.owner should be (PROXY) + data.proxyUser should be (null) + val user = runJob(data.id, new GetCurrentUser(), headers = headers) + user should be (UserGroupInformation.getCurrentUser.getUserName) + } finally { + deleteSession(data.id) + } + } + } + + it("should not honor impersonation requests") { + assume(!createConf().getBoolean(LivyConf.IMPERSONATION_ENABLED)) + val request = createRequest(inProcess = false) + request.proxyUser = Some(PROXY) + jpost[SessionInfo]("/", request, headers = adminHeaders) { data => + try { + waitForIdle(data.id) + data.owner should be (ADMIN) + data.proxyUser should be (null) + val user = runJob(data.id, new GetCurrentUser(), headers = adminHeaders) + user should be (UserGroupInformation.getCurrentUser.getUserName) + + // Test that files are uploaded to a new session directory. + assert(tempDir.listFiles().length === 0) + testResourceUpload("file", data.id) + } finally { + deleteSession(data.id) + assert(tempDir.listFiles().length === 0) + } + } + } +} diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala index 31ea2f0..bc62084 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala @@ -230,6 +230,7 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC server.livySessionManager.nextId(), None, username, + None, server.livyConf, server.accessManager, createInteractiveRequest,