Ver Fonte

[LIVY-348] Improve the ACLs in Livy

This PR propose to improve current Livy's access control mechanism with fine-grained control:

1. If `livy.server.access-control.enabled` is disabled, which means ACLs is disabled, any user could send any request to Livy.
2. If `livy.server.access-control.enabled` is enabled, then this ACL mechanism divides users into 3 groups:
    1. view accessible users: users could get session, statement data, but cannot POST any queries.
    2. modify accessible users: users could submit new statements, kill sessions.
    3. super users: this is the same as previous, super user could impersonate any user.

    In the meanwhile, modify accessible users automatically have the view accessibility, and super user has all the permissions.

Also add new configuration `livy.server.access-control.allowed-users`, this is the same as previous `livy.server.access-control.users`, when ACLs is enabled only users in the allowed list could issue REST queries to Livy server, other users will get 403.

Please review and comment.

Author: jerryshao <sshao@hortonworks.com>

Closes #15 from jerryshao/LIVY-348.
jerryshao há 7 anos atrás
pai
commit
75902ebf16

+ 3 - 2
client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala

@@ -37,7 +37,7 @@ import org.scalatra.servlet.ScalatraListener
 import org.apache.livy._
 import org.apache.livy.client.common.{BufferUtils, Serializer}
 import org.apache.livy.client.common.HttpMessages._
-import org.apache.livy.server.WebServer
+import org.apache.livy.server.{AccessManager, WebServer}
 import org.apache.livy.server.interactive.{InteractiveSession, InteractiveSessionServlet}
 import org.apache.livy.server.recovery.SessionStore
 import org.apache.livy.sessions.{InteractiveSessionManager, SessionState, Spark}
@@ -267,7 +267,8 @@ private class HttpClientTestBootstrap extends LifeCycle {
     val conf = new LivyConf()
     val stateStore = mock(classOf[SessionStore])
     val sessionManager = new InteractiveSessionManager(conf, stateStore, Some(Seq.empty))
-    val servlet = new InteractiveSessionServlet(sessionManager, stateStore, conf) {
+    val accessManager = new AccessManager(conf)
+    val servlet = new InteractiveSessionServlet(sessionManager, stateStore, conf, accessManager) {
       override protected def createSession(req: HttpServletRequest): InteractiveSession = {
         val session = mock(classOf[InteractiveSession])
         val id = sessionManager.nextId()

+ 16 - 0
conf/livy.conf.template

@@ -107,3 +107,19 @@
 
 # If the Livy Web UI should be included in the Livy Server. Enabled by default.
 # livy.ui.enabled = true
+
+# Whether to enable Livy server access control, if it is true then all the income requests will
+# be checked if the requested user has permission.
+# livy.server.access-control.enabled = false
+
+# Allowed users to access Livy, by default any user is allowed to access Livy. If user want to
+# limit who could access Livy, user should list all the permitted users with comma separated.
+# livy.server.access-control.allowed-users = *
+
+# A list of users with comma separated has the permission to change other user's submitted
+# session, like submitting statements, deleting session.
+# livy.server.access-control.modify-users =
+
+# A list of users with comma separated has the permission to view other user's infomation, like
+# submitted session state, statement results.
+# livy.server.access-control.view-users =

+ 8 - 13
server/src/main/scala/org/apache/livy/LivyConf.scala

@@ -73,7 +73,12 @@ object LivyConf {
   val SUPERUSERS = Entry("livy.superusers", null)
 
   val ACCESS_CONTROL_ENABLED = Entry("livy.server.access-control.enabled", false)
-  val ACCESS_CONTROL_USERS = Entry("livy.server.access-control.users", null)
+  // Allowed users to access Livy, by default any user is allowed to access Livy. If user want to
+  // limit who could access Livy, user should list all the permitted users with comma
+  // separated.
+  val ACCESS_CONTROL_ALLOWED_USERS = Entry("livy.server.access-control.allowed-users", "*")
+  val ACCESS_CONTROL_MODIFY_USERS = Entry("livy.server.access-control.modify-users", null)
+  val ACCESS_CONTROL_VIEW_USERS = Entry("livy.server.access-control.view-users", null)
 
   val SSL_KEYSTORE = Entry("livy.keystore", null)
   val SSL_KEYSTORE_PASSWORD = Entry("livy.keystore.password", null)
@@ -186,7 +191,6 @@ object LivyConf {
     ENABLE_HIVE_CONTEXT.key -> DepConf("livy.repl.enableHiveContext", "0.4"),
     CSRF_PROTECTION.key -> DepConf("livy.server.csrf_protection.enabled", "0.4"),
     ACCESS_CONTROL_ENABLED.key -> DepConf("livy.server.access_control.enabled", "0.4"),
-    ACCESS_CONTROL_USERS.key -> DepConf("livy.server.access_control.users", "0.4"),
     AUTH_KERBEROS_NAME_RULES.key -> DepConf("livy.server.auth.kerberos.name_rules", "0.4"),
     LAUNCH_KERBEROS_REFRESH_INTERVAL.key ->
       DepConf("livy.server.launch.kerberos.refresh_interval", "0.4"),
@@ -199,7 +203,7 @@ object LivyConf {
 
   private val deprecatedConfigs: Map[String, DeprecatedConf] = {
     val configs: Seq[DepConf] = Seq(
-      // There are no deprecated configs without alternatives currently.
+      DepConf("livy.server.access_control.users", "0.4")
     )
 
     Map(configs.map { cfg => (cfg.key -> cfg) }: _*)
@@ -215,9 +219,6 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
 
   import LivyConf._
 
-  private lazy val _superusers = configToSeq(SUPERUSERS)
-  private lazy val _allowedUsers = configToSeq(ACCESS_CONTROL_USERS).toSet
-
   lazy val hadoopConf = new Configuration()
   lazy val localFsWhitelist = configToSeq(LOCAL_FS_WHITELIST).map { path =>
     // Make sure the path ends with a single separator.
@@ -260,12 +261,6 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
     sparkHome().map { _ + File.separator + "bin" + File.separator + "spark-submit" }.get
   }
 
-  /** Return the list of superusers. */
-  def superusers(): Seq[String] = _superusers
-
-  /** Return the set of users allowed to use Livy via SPNEGO. */
-  def allowedUsers(): Set[String] = _allowedUsers
-
   private val configDir: Option[File] = {
     sys.env.get("LIVY_CONF_DIR")
       .orElse(sys.env.get("LIVY_HOME").map(path => s"$path${File.separator}conf"))
@@ -285,7 +280,7 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
     }
   }
 
-  private def configToSeq(entry: LivyConf.Entry): Seq[String] = {
+  def configToSeq(entry: LivyConf.Entry): Seq[String] = {
     Option(get(entry)).map(_.split("[, ]+").toSeq).getOrElse(Nil)
   }
 

+ 3 - 5
server/src/main/scala/org/apache/livy/server/AccessFilter.scala

@@ -20,9 +20,7 @@ package org.apache.livy.server
 import javax.servlet._
 import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
 
-import org.apache.livy.LivyConf
-
-class AccessFilter(livyConf: LivyConf) extends Filter {
+private[livy] class AccessFilter(accessManager: AccessManager) extends Filter {
 
   override def init(filterConfig: FilterConfig): Unit = {}
 
@@ -31,11 +29,11 @@ class AccessFilter(livyConf: LivyConf) extends Filter {
                         chain: FilterChain): Unit = {
     val httpRequest = request.asInstanceOf[HttpServletRequest]
     val remoteUser = httpRequest.getRemoteUser
-    if (livyConf.allowedUsers.contains(remoteUser)) {
+    if (accessManager.isUserAllowed(remoteUser)) {
       chain.doFilter(request, response)
     } else {
       val httpServletResponse = response.asInstanceOf[HttpServletResponse]
-      httpServletResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED,
+      httpServletResponse.sendError(HttpServletResponse.SC_FORBIDDEN,
         "User not authorised to use Livy.")
     }
   }

+ 97 - 0
server/src/main/scala/org/apache/livy/server/AccessManager.scala

@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server
+
+import org.apache.livy.{LivyConf, Logging}
+
+private[livy] class AccessManager(conf: LivyConf) extends Logging {
+  private val aclsOn = conf.getBoolean(LivyConf.ACCESS_CONTROL_ENABLED)
+
+  private val WILDCARD_ACL = "*"
+
+  private val superUsers = conf.configToSeq(LivyConf.SUPERUSERS)
+  private val modifyUsers = conf.configToSeq(LivyConf.ACCESS_CONTROL_MODIFY_USERS)
+  private val viewUsers = conf.configToSeq(LivyConf.ACCESS_CONTROL_VIEW_USERS)
+  private val allowedUsers = conf.configToSeq(LivyConf.ACCESS_CONTROL_ALLOWED_USERS).toSet
+
+  private val viewAcls = (superUsers ++ modifyUsers ++ viewUsers).toSet
+  private val modifyAcls = (superUsers ++ modifyUsers).toSet
+  private val superAcls = superUsers.toSet
+  private val allowedAcls = (superUsers ++ modifyUsers ++ viewUsers ++ allowedUsers).toSet
+
+  info(s"AccessControlManager acls ${if (aclsOn) "enabled" else "disabled"};" +
+    s"users with view permission: ${viewUsers.mkString(", ")};" +
+    s"users with modify permission: ${modifyUsers.mkString(", ")};" +
+    s"users with super permission: ${superUsers.mkString(", ")};" +
+    s"other allowed users: ${allowedUsers.mkString(", ")}")
+
+  /**
+   * Check whether the given user has view access to the REST APIs.
+   */
+  def checkViewPermissions(user: String): Boolean = {
+    debug(s"user=$user aclsOn=$aclsOn viewAcls=${viewAcls.mkString(", ")}")
+    if (!aclsOn || user == null || viewAcls.contains(WILDCARD_ACL) || viewAcls.contains(user)) {
+      true
+    } else {
+      false
+    }
+  }
+
+  /**
+   * Check whether the give user has modification access to the REST APIs.
+   */
+  def checkModifyPermissions(user: String): Boolean = {
+    debug(s"user=$user aclsOn=$aclsOn modifyAcls=${modifyAcls.mkString(", ")}")
+    if (!aclsOn || user == null || modifyAcls.contains(WILDCARD_ACL) || modifyAcls.contains(user)) {
+      true
+    } else {
+      false
+    }
+  }
+
+  /**
+   * Check whether the give user has super access to the REST APIs. This will always be checked
+   * no matter acls is on or off.
+   */
+  def checkSuperUser(user: String): Boolean = {
+    debug(s"user=$user aclsOn=$aclsOn superAcls=${superAcls.mkString(", ")}")
+    if (user == null || superUsers.contains(WILDCARD_ACL) || superUsers.contains(user)) {
+      true
+    } else {
+      false
+    }
+  }
+
+  /**
+   * Check whether the given user has the permission to access REST APIs.
+   */
+  def isUserAllowed(user: String): Boolean = {
+    debug(s"user=$user aclsOn=$aclsOn, allowedAcls=${allowedAcls.mkString(", ")}")
+    if (!aclsOn || user == null || allowedAcls.contains(WILDCARD_ACL) ||
+      allowedAcls.contains(user)) {
+      true
+    } else {
+      false
+    }
+  }
+
+  /**
+   * Check whether access control is enabled or not.
+   */
+  def isAccessControlOn: Boolean = aclsOn
+}

+ 10 - 12
server/src/main/scala/org/apache/livy/server/LivyServer.scala

@@ -54,9 +54,11 @@ class LivyServer extends Logging {
 
   private var kinitFailCount: Int = 0
   private var executor: ScheduledExecutorService = _
+  private var accessManager: AccessManager = _
 
   def start(): Unit = {
     livyConf = new LivyConf().loadFromFile("livy.conf")
+    accessManager = new AccessManager(livyConf)
 
     val host = livyConf.get(SERVER_HOST)
     val port = livyConf.getInt(SERVER_PORT)
@@ -187,11 +189,12 @@ class LivyServer extends Logging {
             val context = sce.getServletContext()
             context.initParameters(org.scalatra.EnvironmentKey) = livyConf.get(ENVIRONMENT)
 
-            val interactiveServlet =
-              new InteractiveSessionServlet(interactiveSessionManager, sessionStore, livyConf)
+            val interactiveServlet = new InteractiveSessionServlet(
+              interactiveSessionManager, sessionStore, livyConf, accessManager)
             mount(context, interactiveServlet, "/sessions/*")
 
-            val batchServlet = new BatchSessionServlet(batchSessionManager, sessionStore, livyConf)
+            val batchServlet =
+              new BatchSessionServlet(batchSessionManager, sessionStore, livyConf, accessManager)
             mount(context, batchServlet, "/batches/*")
 
             if (livyConf.getBoolean(UI_ENABLED)) {
@@ -247,15 +250,10 @@ class LivyServer extends Logging {
       server.context.addFilter(csrfHolder, "/*", EnumSet.allOf(classOf[DispatcherType]))
     }
 
-    if (livyConf.getBoolean(ACCESS_CONTROL_ENABLED)) {
-      if (livyConf.get(AUTH_TYPE) != null) {
-        info("Access control is enabled.")
-        val accessHolder = new FilterHolder(new AccessFilter(livyConf))
-        server.context.addFilter(accessHolder, "/*", EnumSet.allOf(classOf[DispatcherType]))
-      } else {
-        throw new IllegalArgumentException("Access control was requested but could " +
-          "not be enabled, since authentication is disabled.")
-      }
+    if (accessManager.isAccessControlOn) {
+      info("Access control is enabled")
+      val accessHolder = new FilterHolder(new AccessFilter(accessManager))
+      server.context.addFilter(accessHolder, "/*", EnumSet.allOf(classOf[DispatcherType]))
     }
 
     server.start()

+ 39 - 12
server/src/main/scala/org/apache/livy/server/SessionServlet.scala

@@ -38,7 +38,8 @@ object SessionServlet extends Logging
  */
 abstract class SessionServlet[S <: Session, R <: RecoveryMetadata](
     private[livy] val sessionManager: SessionManager[S, R],
-    livyConf: LivyConf)
+    livyConf: LivyConf,
+    accessManager: AccessManager)
   extends JsonServlet
   with ApiVersioningSupport
   with MethodOverride
@@ -90,7 +91,7 @@ abstract class SessionServlet[S <: Session, R <: RecoveryMetadata](
   }
 
   get("/:id/log") {
-    withSession { session =>
+    withViewAccessSession { session =>
       val from = params.get("from").map(_.toInt)
       val size = params.get("size").map(_.toInt)
       val (from_, total, logLines) = serializeLogs(session, from, size)
@@ -104,7 +105,7 @@ abstract class SessionServlet[S <: Session, R <: RecoveryMetadata](
   }
 
   delete("/:id") {
-    withSession { session =>
+    withModifyAccessSession { session =>
       sessionManager.delete(session.id) match {
         case Some(future) =>
           Await.ready(future, Duration.Inf)
@@ -156,7 +157,7 @@ abstract class SessionServlet[S <: Session, R <: RecoveryMetadata](
       target: Option[String],
       req: HttpServletRequest): Option[String] = {
     if (livyConf.getBoolean(LivyConf.IMPERSONATION_ENABLED)) {
-      if (!target.map(hasAccess(_, req)).getOrElse(true)) {
+      if (!target.map(hasSuperAccess(_, req)).getOrElse(true)) {
         halt(Forbidden(s"User '${remoteUser(req)}' not allowed to impersonate '$target'."))
       }
       target.orElse(Option(remoteUser(req)))
@@ -166,11 +167,27 @@ abstract class SessionServlet[S <: Session, R <: RecoveryMetadata](
   }
 
   /**
-   * Check that the request's user has access to resources owned by the given target user.
+   * Check that the request's user has view access to resources owned by the given target user.
    */
-  protected def hasAccess(target: String, req: HttpServletRequest): Boolean = {
+  protected def hasViewAccess(target: String, req: HttpServletRequest): Boolean = {
     val user = remoteUser(req)
-    user == null || user == target || livyConf.superusers().contains(user)
+    user == target || accessManager.checkViewPermissions(user)
+  }
+
+  /**
+   * Check that the request's user has modify access to resources owned by the given target user.
+   */
+  protected def hasModifyAccess(target: String, req: HttpServletRequest): Boolean = {
+    val user = remoteUser(req)
+    user == target || accessManager.checkModifyPermissions(user)
+  }
+
+  /**
+   * Check that the request's user has admin access to resources owned by the given target user.
+   */
+  protected def hasSuperAccess(target: String, req: HttpServletRequest): Boolean = {
+    val user = remoteUser(req)
+    user == target || accessManager.checkSuperUser(user)
   }
 
   /**
@@ -178,19 +195,29 @@ abstract class SessionServlet[S <: Session, R <: RecoveryMetadata](
    * via this method must not modify the session in any way, or return potentially sensitive
    * information.
    */
-  protected def withUnprotectedSession(fn: (S => Any)): Any = doWithSession(fn, true)
+  protected def withUnprotectedSession(fn: (S => Any)): Any = doWithSession(fn, true, None)
+
+  /**
+   * Performs an operation on the session, verifying whether the caller has view access of the
+   * session.
+   */
+  protected def withViewAccessSession(fn: (S => Any)): Any =
+    doWithSession(fn, false, Some(hasViewAccess))
 
   /**
-   * Performs an operation on the session, verifying whether the caller is the owner of the
+   * Performs an operation on the session, verifying whether the caller has view access of the
    * session.
    */
-  protected def withSession(fn: (S => Any)): Any = doWithSession(fn, false)
+  protected def withModifyAccessSession(fn: (S => Any)): Any =
+    doWithSession(fn, false, Some(hasModifyAccess))
 
-  private def doWithSession(fn: (S => Any), allowAll: Boolean): Any = {
+  private def doWithSession(fn: (S => Any),
+      allowAll: Boolean,
+      checkFn: Option[(String, HttpServletRequest) => Boolean]): Any = {
     val sessionId = params("id").toInt
     sessionManager.get(sessionId) match {
       case Some(session) =>
-        if (allowAll || hasAccess(session.owner, request)) {
+        if (allowAll || checkFn.map(_(session.owner, request)).getOrElse(false)) {
           fn(session)
         } else {
           Forbidden()

+ 5 - 4
server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala

@@ -20,7 +20,7 @@ package org.apache.livy.server.batch
 import javax.servlet.http.HttpServletRequest
 
 import org.apache.livy.LivyConf
-import org.apache.livy.server.SessionServlet
+import org.apache.livy.server.{AccessManager, SessionServlet}
 import org.apache.livy.server.recovery.SessionStore
 import org.apache.livy.sessions.BatchSessionManager
 import org.apache.livy.utils.AppInfo
@@ -35,8 +35,9 @@ case class BatchSessionView(
 class BatchSessionServlet(
     sessionManager: BatchSessionManager,
     sessionStore: SessionStore,
-    livyConf: LivyConf)
-  extends SessionServlet(sessionManager, livyConf)
+    livyConf: LivyConf,
+    accessManager: AccessManager)
+  extends SessionServlet(sessionManager, livyConf, accessManager)
 {
 
   override protected def createSession(req: HttpServletRequest): BatchSession = {
@@ -50,7 +51,7 @@ class BatchSessionServlet(
       session: BatchSession,
       req: HttpServletRequest): Any = {
     val logs =
-      if (hasAccess(session.owner, req)) {
+      if (hasViewAccess(session.owner, req)) {
         val lines = session.logLines()
 
         val size = 10

+ 22 - 22
server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala

@@ -31,8 +31,7 @@ import org.scalatra.servlet.FileUploadSupport
 import org.apache.livy.{ExecuteRequest, JobHandle, LivyConf, Logging}
 import org.apache.livy.client.common.HttpMessages
 import org.apache.livy.client.common.HttpMessages._
-import org.apache.livy.rsc.driver.Statement
-import org.apache.livy.server.SessionServlet
+import org.apache.livy.server.{AccessManager, SessionServlet}
 import org.apache.livy.server.recovery.SessionStore
 import org.apache.livy.sessions._
 
@@ -41,8 +40,9 @@ object InteractiveSessionServlet extends Logging
 class InteractiveSessionServlet(
     sessionManager: InteractiveSessionManager,
     sessionStore: SessionStore,
-    livyConf: LivyConf)
-  extends SessionServlet(sessionManager, livyConf)
+    livyConf: LivyConf,
+    accessManager: AccessManager)
+  extends SessionServlet(sessionManager, livyConf, accessManager)
   with SessionHeartbeatNotifier[InteractiveSession, InteractiveRecoveryMetadata]
   with FileUploadSupport
 {
@@ -66,7 +66,7 @@ class InteractiveSessionServlet(
       session: InteractiveSession,
       req: HttpServletRequest): Any = {
     val logs =
-      if (hasAccess(session.owner, req)) {
+      if (hasViewAccess(session.owner, req)) {
         Option(session.logLines())
           .map { lines =>
             val size = 10
@@ -85,21 +85,21 @@ class InteractiveSessionServlet(
   }
 
   post("/:id/stop") {
-    withSession { session =>
+    withModifyAccessSession { session =>
       Await.ready(session.stop(), Duration.Inf)
       NoContent()
     }
   }
 
   post("/:id/interrupt") {
-    withSession { session =>
+    withModifyAccessSession { session =>
       Await.ready(session.interrupt(), Duration.Inf)
       Ok(Map("msg" -> "interrupted"))
     }
   }
 
   get("/:id/statements") {
-    withSession { session =>
+    withViewAccessSession { session =>
       val statements = session.statements
       val from = params.get("from").map(_.toInt).getOrElse(0)
       val size = params.get("size").map(_.toInt).getOrElse(statements.length)
@@ -112,7 +112,7 @@ class InteractiveSessionServlet(
   }
 
   val getStatement = get("/:id/statements/:statementId") {
-    withSession { session =>
+    withViewAccessSession { session =>
       val statementId = params("statementId").toInt
 
       session.getStatement(statementId).getOrElse(NotFound("Statement not found"))
@@ -120,7 +120,7 @@ class InteractiveSessionServlet(
   }
 
   jpost[ExecuteRequest]("/:id/statements") { req =>
-    withSession { session =>
+    withModifyAccessSession { session =>
       val statement = session.executeStatement(req)
 
       Created(statement,
@@ -132,7 +132,7 @@ class InteractiveSessionServlet(
   }
 
   post("/:id/statements/:statementId/cancel") {
-    withSession { session =>
+    withModifyAccessSession { session =>
       val statementId = params("statementId")
       session.cancelStatement(statementId.toInt)
       Ok(Map("msg" -> "canceled"))
@@ -143,14 +143,14 @@ class InteractiveSessionServlet(
   // has access to the session, so even though it returns the same data, it behaves differently
   // from get("/:id").
   post("/:id/connect") {
-    withSession { session =>
+    withModifyAccessSession { session =>
       session.recordActivity()
       Ok(clientSessionView(session, request))
     }
   }
 
   jpost[SerializedJob]("/:id/submit-job") { req =>
-    withSession { session =>
+    withModifyAccessSession { session =>
       try {
       require(req.job != null && req.job.length > 0, "no job provided.")
       val jobId = session.submitJob(req.job)
@@ -164,7 +164,7 @@ class InteractiveSessionServlet(
   }
 
   jpost[SerializedJob]("/:id/run-job") { req =>
-    withSession { session =>
+    withModifyAccessSession { session =>
       require(req.job != null && req.job.length > 0, "no job provided.")
       val jobId = session.runJob(req.job)
       Created(new JobStatus(jobId, JobHandle.State.SENT, null, null))
@@ -172,7 +172,7 @@ class InteractiveSessionServlet(
   }
 
   post("/:id/upload-jar") {
-    withSession { lsession =>
+    withModifyAccessSession { lsession =>
       fileParams.get("jar") match {
         case Some(file) =>
           lsession.addJar(file.getInputStream, file.name)
@@ -183,7 +183,7 @@ class InteractiveSessionServlet(
   }
 
   post("/:id/upload-pyfile") {
-    withSession { lsession =>
+    withModifyAccessSession { lsession =>
       fileParams.get("file") match {
         case Some(file) =>
           lsession.addJar(file.getInputStream, file.name)
@@ -194,7 +194,7 @@ class InteractiveSessionServlet(
   }
 
   post("/:id/upload-file") {
-    withSession { lsession =>
+    withModifyAccessSession { lsession =>
       fileParams.get("file") match {
         case Some(file) =>
           lsession.addFile(file.getInputStream, file.name)
@@ -205,13 +205,13 @@ class InteractiveSessionServlet(
   }
 
   jpost[AddResource]("/:id/add-jar") { req =>
-    withSession { lsession =>
+    withModifyAccessSession { lsession =>
       addJarOrPyFile(req, lsession)
     }
   }
 
   jpost[AddResource]("/:id/add-pyfile") { req =>
-    withSession { lsession =>
+    withModifyAccessSession { lsession =>
       lsession.kind match {
         case PySpark() | PySpark3() => addJarOrPyFile(req, lsession)
         case _ => BadRequest("Only supported for pyspark sessions.")
@@ -220,21 +220,21 @@ class InteractiveSessionServlet(
   }
 
   jpost[AddResource]("/:id/add-file") { req =>
-    withSession { lsession =>
+    withModifyAccessSession { lsession =>
       val uri = new URI(req.uri)
       lsession.addFile(uri)
     }
   }
 
   get("/:id/jobs/:jobid") {
-    withSession { lsession =>
+    withViewAccessSession { lsession =>
       val jobId = params("jobid").toLong
       Ok(lsession.jobStatus(jobId))
     }
   }
 
   post("/:id/jobs/:jobid/cancel") {
-    withSession { lsession =>
+    withModifyAccessSession { lsession =>
       val jobId = params("jobid").toLong
       lsession.cancelJob(jobId)
     }

+ 9 - 2
server/src/main/scala/org/apache/livy/server/interactive/SessionHeartbeat.scala

@@ -63,8 +63,15 @@ trait SessionHeartbeatNotifier[S <: Session with SessionHeartbeat, R <: Recovery
     }
   }
 
-  abstract override protected def withSession(fn: (S => Any)): Any = {
-    super.withSession { s =>
+  abstract override protected def withViewAccessSession(fn: (S => Any)): Any = {
+    super.withViewAccessSession { s =>
+      s.heartbeat()
+      fn(s)
+    }
+  }
+
+  abstract override protected def withModifyAccessSession(fn: (S) => Any): Any = {
+    super.withModifyAccessSession { s =>
       s.heartbeat()
       fn(s)
     }

+ 119 - 0
server/src/test/scala/org/apache/livy/server/AccessManagerSuite.scala

@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server
+
+import org.scalatest.{FunSuite, Matchers}
+
+import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
+
+class AccessManagerSuite extends FunSuite with Matchers with LivyBaseUnitTestSuite {
+  import LivyConf._
+
+  private val viewUsers = Seq("user1", "user2", "user3")
+  private val modifyUsers = Seq("user4", "user5")
+  private val superUsers = Seq("user6", "user7")
+  private val allowedUsers = Seq("user8", "user9")
+
+  test("access permission") {
+    val conf = new LivyConf()
+      .set(ACCESS_CONTROL_ENABLED, true)
+      .set(ACCESS_CONTROL_VIEW_USERS, viewUsers.mkString(","))
+      .set(ACCESS_CONTROL_MODIFY_USERS, modifyUsers.mkString(","))
+      .set(SUPERUSERS, superUsers.mkString(","))
+
+    val accessManager = new AccessManager(conf)
+    accessManager.isAccessControlOn should be (true)
+
+    // check view access
+    viewUsers.foreach { u => accessManager.checkViewPermissions(u) should be (true) }
+    modifyUsers.foreach { u => accessManager.checkViewPermissions(u) should be (true) }
+    superUsers.foreach { u => accessManager.checkViewPermissions(u) should be (true) }
+    allowedUsers.foreach { u => accessManager.checkViewPermissions(u) should be (false) }
+
+    accessManager.checkViewPermissions(null) should be (true)
+    accessManager.checkViewPermissions("user8") should be (false)
+
+    // check modify access
+    viewUsers.foreach { u => accessManager.checkModifyPermissions(u) should be (false) }
+    modifyUsers.foreach { u => accessManager.checkModifyPermissions(u) should be (true) }
+    superUsers.foreach { u => accessManager.checkModifyPermissions(u) should be (true) }
+    allowedUsers.foreach { u => accessManager.checkModifyPermissions(u) should be (false) }
+
+    accessManager.checkModifyPermissions(null) should be (true)
+    accessManager.checkModifyPermissions("user8") should be (false)
+
+    // check super access
+    viewUsers.foreach { u => accessManager.checkSuperUser(u) should be (false) }
+    modifyUsers.foreach { u => accessManager.checkSuperUser(u) should be (false) }
+    superUsers.foreach { u => accessManager.checkSuperUser(u) should be (true) }
+    allowedUsers.foreach { u => accessManager.checkSuperUser(u) should be (false) }
+
+    accessManager.checkSuperUser(null) should be (true)
+    accessManager.checkSuperUser("user8") should be (false)
+  }
+
+  test("wildcard access permission") {
+    val conf = new LivyConf()
+      .set(ACCESS_CONTROL_ENABLED, true)
+      .set(ACCESS_CONTROL_VIEW_USERS, "*")
+      .set(ACCESS_CONTROL_MODIFY_USERS, "*")
+      .set(SUPERUSERS, "*")
+
+    val accessManager = new AccessManager(conf)
+    accessManager.isAccessControlOn should be (true)
+
+    accessManager.checkViewPermissions("anyUser") should be (true)
+    accessManager.checkModifyPermissions("anyUser") should be (true)
+    accessManager.checkSuperUser("anyUser") should be (true)
+  }
+
+  test("default allowed users") {
+    val conf = new LivyConf()
+      .set(ACCESS_CONTROL_ENABLED, true)
+      .set(ACCESS_CONTROL_VIEW_USERS, viewUsers.mkString(","))
+      .set(ACCESS_CONTROL_MODIFY_USERS, modifyUsers.mkString(","))
+      .set(SUPERUSERS, superUsers.mkString(","))
+
+    val accessManager = new AccessManager(conf)
+
+    // check if configured users are allowed
+    viewUsers.foreach { u => accessManager.isUserAllowed(u) should be (true) }
+    modifyUsers.foreach { u => accessManager.isUserAllowed(u) should be (true) }
+    superUsers.foreach { u => accessManager.isUserAllowed(u) should be (true) }
+
+    accessManager.isUserAllowed("anyUser") should be (true)
+    accessManager.isUserAllowed(null) should be (true)
+  }
+
+  test("configured users are not in the allowed list") {
+    val conf = new LivyConf()
+      .set(ACCESS_CONTROL_ENABLED, true)
+      .set(ACCESS_CONTROL_VIEW_USERS, viewUsers.mkString(","))
+      .set(ACCESS_CONTROL_MODIFY_USERS, modifyUsers.mkString(","))
+      .set(SUPERUSERS, superUsers.mkString(","))
+      .set(ACCESS_CONTROL_ALLOWED_USERS, allowedUsers.mkString(","))
+
+    val accessManager = new AccessManager(conf)
+    viewUsers.foreach { u => accessManager.isUserAllowed(u) should be (true) }
+    modifyUsers.foreach { u => accessManager.isUserAllowed(u) should be (true) }
+    superUsers.foreach { u => accessManager.isUserAllowed(u) should be (true) }
+    allowedUsers.foreach { u => accessManager.isUserAllowed(u) should be (true) }
+
+    accessManager.isUserAllowed("anyUser") should be (false)
+  }
+}

+ 8 - 0
server/src/test/scala/org/apache/livy/server/BaseSessionServletSpec.scala

@@ -42,18 +42,26 @@ abstract class BaseSessionServletSpec[S <: Session, R <: RecoveryMetadata]
   /** Name of the admin user. */
   protected val ADMIN = "__admin__"
 
+  private val VIEW_USER = "__view__"
+
+  private val MODIFY_USER = "__modify__"
+
   /** Create headers that identify a specific user in tests. */
   protected def makeUserHeaders(user: String): Map[String, String] = {
     defaultHeaders ++ Map(BaseSessionServletSpec.REMOTE_USER_HEADER -> user)
   }
 
   protected val adminHeaders = makeUserHeaders(ADMIN)
+  protected val viewUserHeaders = makeUserHeaders(VIEW_USER)
+  protected val modifyUserHeaders = makeUserHeaders(MODIFY_USER)
 
   /** Create a LivyConf with impersonation enabled and a superuser. */
   protected def createConf(): LivyConf = {
     new LivyConf()
       .set(LivyConf.IMPERSONATION_ENABLED, true)
       .set(LivyConf.SUPERUSERS, ADMIN)
+      .set(LivyConf.ACCESS_CONTROL_VIEW_USERS, VIEW_USER)
+      .set(LivyConf.ACCESS_CONTROL_MODIFY_USERS, MODIFY_USER)
       .set(LivyConf.LOCAL_FS_WHITELIST, sys.props("java.io.tmpdir"))
   }
 

+ 81 - 19
server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala

@@ -50,15 +50,7 @@ object SessionServletSpec {
 
   case class MockSessionView(id: Int, owner: String, logs: Seq[String])
 
-}
-
-class SessionServletSpec
-  extends BaseSessionServletSpec[Session, RecoveryMetadata] {
-
-  import SessionServletSpec._
-
-  override def createServlet(): SessionServlet[Session, RecoveryMetadata] = {
-    val conf = createConf()
+  def createServlet(conf: LivyConf): SessionServlet[Session, RecoveryMetadata] = {
     val sessionManager = new SessionManager[Session, RecoveryMetadata](
       conf,
       { _ => assert(false).asInstanceOf[Session] },
@@ -66,7 +58,8 @@ class SessionServletSpec
       "test",
       Some(Seq.empty))
 
-    new SessionServlet(sessionManager, conf) with RemoteUserOverride {
+    val accessManager = new AccessManager(conf)
+    new SessionServlet(sessionManager, conf, accessManager) with RemoteUserOverride {
       override protected def createSession(req: HttpServletRequest): Session = {
         val params = bodyAs[Map[String, String]](req)
         checkImpersonation(params.get(PROXY_USER), req)
@@ -76,12 +69,22 @@ class SessionServletSpec
       override protected def clientSessionView(
           session: Session,
           req: HttpServletRequest): Any = {
-        val logs = if (hasAccess(session.owner, req)) session.logLines() else Nil
+        val logs = if (hasViewAccess(session.owner, req)) session.logLines() else Nil
         MockSessionView(session.id, session.owner, logs)
       }
     }
   }
 
+}
+
+class SessionServletSpec extends BaseSessionServletSpec[Session, RecoveryMetadata] {
+
+  import SessionServletSpec._
+
+  override def createServlet(): SessionServlet[Session, RecoveryMetadata] = {
+    SessionServletSpec.createServlet(createConf())
+  }
+
   private val aliceHeaders = makeUserHeaders("alice")
   private val bobHeaders = makeUserHeaders("bob")
 
@@ -116,25 +119,86 @@ class SessionServletSpec
       jpost[MockSessionView]("/", Map(), headers = aliceHeaders) { res =>
         jget[MockSessionView](s"/${res.id}", headers = bobHeaders) { res =>
           assert(res.owner === "alice")
-          assert(res.logs === Nil)
+          assert(res.logs === IndexedSeq("log"))
         }
         delete(res.id, aliceHeaders, SC_OK)
       }
     }
 
-    it("should prevent non-owners from modifying sessions") {
+    it("should allow non-owners to modify sessions") {
       jpost[MockSessionView]("/", Map(), headers = aliceHeaders) { res =>
-        delete(res.id, bobHeaders, SC_FORBIDDEN)
+        delete(res.id, bobHeaders, SC_OK)
+      }
+    }
+
+    it("should not allow regular users to impersonate others") {
+      jpost[MockSessionView]("/", Map(PROXY_USER -> "bob"), headers = aliceHeaders,
+        expectedStatus = SC_FORBIDDEN) { _ => }
+    }
+
+    it("should allow admins to impersonate anyone") {
+      jpost[MockSessionView]("/", Map(PROXY_USER -> "bob"), headers = adminHeaders) { res =>
+        delete(res.id, adminHeaders, SC_OK)
+      }
+    }
+  }
+}
+
+class AclsEnabledSessionServletSpec extends BaseSessionServletSpec[Session, RecoveryMetadata] {
+
+  import SessionServletSpec._
+
+  override def createServlet(): SessionServlet[Session, RecoveryMetadata] = {
+    val conf = createConf().set(LivyConf.ACCESS_CONTROL_ENABLED, true)
+    SessionServletSpec.createServlet(conf)
+  }
+
+  private val aliceHeaders = makeUserHeaders("alice")
+  private val bobHeaders = makeUserHeaders("bob")
+
+  private def delete(id: Int, headers: Map[String, String], expectedStatus: Int): Unit = {
+    jdelete[Map[String, Any]](s"/$id", headers = headers, expectedStatus = expectedStatus) { _ =>
+      // Nothing to do.
+    }
+  }
+
+  describe("SessionServlet") {
+    it("should attach owner information to sessions") {
+      jpost[MockSessionView]("/", Map(), headers = aliceHeaders) { res =>
+        assert(res.owner === "alice")
+        assert(res.logs === IndexedSeq("log"))
+        delete(res.id, aliceHeaders, SC_OK)
       }
     }
 
-    it("should allow admins to access all sessions") {
+    it("should only allow view accessible users to see non-sensitive information") {
       jpost[MockSessionView]("/", Map(), headers = aliceHeaders) { res =>
-        jget[MockSessionView](s"/${res.id}", headers = adminHeaders) { res =>
+        jget[MockSessionView](s"/${res.id}", headers = bobHeaders) { res =>
           assert(res.owner === "alice")
+          // Other user cannot see the logs
+          assert(res.logs === Nil)
+        }
+
+        // Users with access permission could see the logs
+        jget[MockSessionView](s"/${res.id}", headers = viewUserHeaders) { res =>
           assert(res.logs === IndexedSeq("log"))
         }
-        delete(res.id, adminHeaders, SC_OK)
+        jget[MockSessionView](s"/${res.id}", headers = modifyUserHeaders) { res =>
+          assert(res.logs === IndexedSeq("log"))
+        }
+        jget[MockSessionView](s"/${res.id}", headers = adminHeaders) { res =>
+          assert(res.logs === IndexedSeq("log"))
+        }
+
+        delete(res.id, aliceHeaders, SC_OK)
+      }
+    }
+
+    it("should only allow modify accessible users from modifying sessions") {
+      jpost[MockSessionView]("/", Map(), headers = aliceHeaders) { res =>
+        delete(res.id, bobHeaders, SC_FORBIDDEN)
+        delete(res.id, viewUserHeaders, SC_FORBIDDEN)
+        delete(res.id, modifyUserHeaders, SC_OK)
       }
     }
 
@@ -149,7 +213,5 @@ class SessionServletSpec
         delete(res.id, adminHeaders, SC_OK)
       }
     }
-
   }
-
 }

+ 4 - 2
server/src/test/scala/org/apache/livy/server/batch/BatchServletSpec.scala

@@ -29,7 +29,7 @@ import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar.mock
 
 import org.apache.livy.Utils
-import org.apache.livy.server.BaseSessionServletSpec
+import org.apache.livy.server.{AccessManager, BaseSessionServletSpec}
 import org.apache.livy.server.recovery.SessionStore
 import org.apache.livy.sessions.{BatchSessionManager, SessionState}
 import org.apache.livy.utils.AppInfo
@@ -54,10 +54,12 @@ class BatchServletSpec extends BaseSessionServletSpec[BatchSession, BatchRecover
   override def createServlet(): BatchSessionServlet = {
     val livyConf = createConf()
     val sessionStore = mock[SessionStore]
+    val accessManager = new AccessManager(livyConf)
     new BatchSessionServlet(
       new BatchSessionManager(livyConf, sessionStore, Some(Seq.empty)),
       sessionStore,
-      livyConf)
+      livyConf,
+      accessManager)
   }
 
   describe("Batch Servlet") {

+ 6 - 3
server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionServletSpec.scala

@@ -34,6 +34,7 @@ import org.scalatest.mock.MockitoSugar.mock
 import org.apache.livy.{ExecuteRequest, LivyConf}
 import org.apache.livy.client.common.HttpMessages.SessionInfo
 import org.apache.livy.rsc.driver.{Statement, StatementState}
+import org.apache.livy.server.AccessManager
 import org.apache.livy.server.recovery.SessionStore
 import org.apache.livy.sessions._
 import org.apache.livy.utils.AppInfo
@@ -44,8 +45,9 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {
 
   class MockInteractiveSessionServlet(
       sessionManager: InteractiveSessionManager,
-      conf: LivyConf)
-    extends InteractiveSessionServlet(sessionManager, mock[SessionStore], conf) {
+      conf: LivyConf,
+      accessManager: AccessManager)
+    extends InteractiveSessionServlet(sessionManager, mock[SessionStore], conf, accessManager) {
 
     private var statements = IndexedSeq[Statement]()
 
@@ -91,7 +93,8 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {
   override def createServlet(): InteractiveSessionServlet = {
     val conf = createConf()
     val sessionManager = new InteractiveSessionManager(conf, mock[SessionStore], Some(Seq.empty))
-    new MockInteractiveSessionServlet(sessionManager, conf)
+    val accessManager = new AccessManager(conf)
+    new MockInteractiveSessionServlet(sessionManager, conf, accessManager)
   }
 
   it("should setup and tear down an interactive session") {

+ 4 - 2
server/src/test/scala/org/apache/livy/server/interactive/JobApiSpec.scala

@@ -33,7 +33,7 @@ import org.scalatest.mock.MockitoSugar.mock
 import org.apache.livy.{Job, JobHandle}
 import org.apache.livy.client.common.{BufferUtils, Serializer}
 import org.apache.livy.client.common.HttpMessages._
-import org.apache.livy.server.RemoteUserOverride
+import org.apache.livy.server.{AccessManager, RemoteUserOverride}
 import org.apache.livy.server.recovery.SessionStore
 import org.apache.livy.sessions.{InteractiveSessionManager, SessionState}
 import org.apache.livy.test.jobs.{Echo, GetCurrentUser}
@@ -48,7 +48,9 @@ class JobApiSpec extends BaseInteractiveServletSpec {
     val conf = createConf()
     val sessionStore = mock[SessionStore]
     val sessionManager = new InteractiveSessionManager(conf, sessionStore, Some(Seq.empty))
-    new InteractiveSessionServlet(sessionManager, sessionStore, conf) with RemoteUserOverride
+    val accessManager = new AccessManager(conf)
+    new InteractiveSessionServlet(sessionManager, sessionStore, conf, accessManager)
+      with RemoteUserOverride
   }
 
   def withSessionId(desc: String)(fn: (Int) => Unit): Unit = {