Browse Source

LIVY-71. Write session files to HDFS as the correct user.

When uploading files using `LivyClient.uploadJar` (or `uploadFile`),
the user files should not be readable by other users. Since we know
who the session is running as, impersonate that user before uploading
the file to HDFS. The file is stored in a session-specific staging
directory, which is deleted when the session is stopped.

The staging directory, by default, is under the user's home directory
(under the ".livy-sessions" subdirectory); the location can be modified
in the config file. When setting a custom location, the directory needs
to be world-writable, so that all users can create directories, since
Livy cannot change ownership of directories it creates as its own user.

Two other minor changes are included (found during testing):

- bin/livy-server now adds config directories (Spark and Hadoop) do Livy's
  classpath, so that Livy can read the correct Hadoop configuration (and
  Spark's, if needed later).
- client-http/pom.xml now shades livy-client-common since we need to relocate
  kryo classes in that artifact too.

Closes #98
Marcelo Vanzin 9 years ago
parent
commit
410adf40d7

+ 13 - 2
bin/livy-server

@@ -32,9 +32,20 @@ if [ ! -d "$LIBDIR" ]; then
   exit 1
 fi
 
-CLASSPATH="$LIBDIR/*:$LIVY_HOME/conf:$CLASSPATH"
+
+LIVY_CLASSPATH="$LIBDIR/*:$LIVY_HOME/conf"
+
+if [ -n "$SPARK_CONF_DIR" ]; then
+  LIVY_CLASSPATH="$LIVY_CLASSPATH:$SPARK_CONF_DIR"
+fi
+if [ -n "$HADOOP_CONF_DIR" ]; then
+  LIVY_CLASSPATH="$LIVY_CLASSPATH:$HADOOP_CONF_DIR"
+fi
+if [ -n "$YARN_CONF_DIR" ]; then
+  LIVY_CLASSPATH="$LIVY_CLASSPATH:$YARN_CONF_DIR"
+fi
 
 exec java \
 	$LIVY_SERVER_JAVA_OPTS \
-	-cp "$CLASSPATH" \
+	-cp "$LIVY_CLASSPATH:$CLASSPATH" \
 	com.cloudera.livy.server.Main

+ 0 - 1
client-http/pom.xml

@@ -117,7 +117,6 @@
                 </includes>
                 <excludes>
                   <exclude>com.cloudera.livy:livy-api</exclude>
-                  <exclude>com.cloudera.livy:livy-client-common</exclude>
                 </excludes>
               </artifactSet>
               <filters>

+ 1 - 13
core/src/main/scala/com/cloudera/livy/LivyConf.scala

@@ -41,11 +41,9 @@ object LivyConf {
   val SPARK_HOME = Entry("livy.server.spark-home", null)
   val SPARK_SUBMIT_KEY = Entry("livy.server.spark-submit", null)
   val IMPERSONATION_ENABLED = Entry("livy.impersonation.enabled", false)
-  val LIVY_HOME = Entry("livy.home", null)
   val FILE_UPLOAD_MAX_SIZE = Entry("livy.file.upload.max.size", 100L * 1024 * 1024)
   val SUPERUSERS = Entry("livy.superusers", null)
-
-  lazy val TEST_LIVY_HOME = Files.createTempDirectory("livyTemp").toUri.toString
+  val SESSION_STAGING_DIR = Entry("livy.session.staging-dir", null)
 }
 
 /**
@@ -78,16 +76,6 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
   /** Return the location of the spark home directory */
   def sparkHome(): Option[String] = Option(get(SPARK_HOME)).orElse(sys.env.get("SPARK_HOME"))
 
-  def livyHome(): String = {
-    Option(get(LIVY_HOME)).orElse(sys.env.get("LIVY_HOME")).getOrElse {
-      if (LivyConf.TEST_MODE) {
-        LivyConf.TEST_LIVY_HOME
-      } else {
-        throw new IllegalStateException("livy.home must be specified!")
-      }
-    }
-  }
-
   /** Return the path to the spark-submit executable. */
   def sparkSubmit(): String = {
     Option(get(SPARK_SUBMIT_KEY))

+ 81 - 21
server/src/main/scala/com/cloudera/livy/server/client/ClientSession.scala

@@ -22,7 +22,8 @@ import java.io.InputStream
 import java.net.URI
 import java.nio.ByteBuffer
 import java.nio.file.Files
-import java.util.{HashMap => JHashMap}
+import java.security.PrivilegedExceptionAction
+import java.util.{HashMap => JHashMap, UUID}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicLong
 
@@ -31,8 +32,10 @@ import scala.concurrent.{ExecutionContext, Future}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.security.UserGroupInformation
 
-import com.cloudera.livy.{LivyClientBuilder, Logging}
+import com.cloudera.livy.{LivyClientBuilder, LivyConf, Logging}
 import com.cloudera.livy.client.common.HttpMessages._
 import com.cloudera.livy.client.local.{LocalClient, LocalConf}
 import com.cloudera.livy.sessions.{Session, SessionState}
@@ -42,7 +45,7 @@ class ClientSession(
       owner: String,
       val proxyUser: Option[String],
       createRequest: CreateClientRequest,
-      livyHome: String)
+      livyConf: LivyConf)
     extends Session(id, owner) with Logging {
   implicit val executionContext = ExecutionContext.global
 
@@ -64,12 +67,9 @@ class ClientSession(
     builder.build()
   }.asInstanceOf[LocalClient]
 
-  private val fs = FileSystem.get(new Configuration())
-
-  // TODO: It is important that each session's home be readable only by the user that created
-  // that session and not by anyone else. Else, one session might be able to read files uploaded
-  // by another. Fix this when we add security support.
-  private val sessionHome = new Path(livyHome + "/" + id.toString)
+  // Directory where the session's staging files are created. The directory is only accessible
+  // to the session's effective user.
+  private var stagingDir: Path = null
 
   info("Livy client created.")
 
@@ -117,19 +117,57 @@ class ClientSession(
     operations.remove(id).foreach { client.cancel }
   }
 
-  private def copyResourceToHDFS(dataStream: InputStream, name: String): URI = {
-    val filePath = new Path(sessionHome, name)
-    val outFile = fs.create(filePath, true)
-    val buffer = new Array[Byte](512 * 1024)
-    var read = -1
+  private def doAsOwner[T](fn: => T): T = {
+    val user = proxyUser.getOrElse(owner)
+    if (user != null) {
+      val ugi = if (UserGroupInformation.isSecurityEnabled) {
+        UserGroupInformation.createProxyUser(user, UserGroupInformation.getCurrentUser())
+      } else {
+        UserGroupInformation.createRemoteUser(user)
+      }
+      ugi.doAs(new PrivilegedExceptionAction[T] {
+        override def run(): T = fn
+      })
+    } else {
+      fn
+    }
+  }
+
+  private def copyResourceToHDFS(dataStream: InputStream, name: String): URI = doAsOwner {
+    val fs = FileSystem.newInstance(new Configuration())
+
     try {
-      while ({read = dataStream.read(buffer); read != -1}) {
-        outFile.write(buffer, 0, read)
+      val filePath = new Path(getStagingDir(fs), name)
+      debug(s"Uploading user file to $filePath")
+
+      val outFile = fs.create(filePath, true)
+      val buffer = new Array[Byte](512 * 1024)
+      var read = -1
+      try {
+        while ({read = dataStream.read(buffer); read != -1}) {
+          outFile.write(buffer, 0, read)
+        }
+      } finally {
+        outFile.close()
       }
+      filePath.toUri
     } finally {
-      outFile.close()
+      fs.close()
+    }
+  }
+
+  private def getStagingDir(fs: FileSystem): Path = synchronized {
+    if (stagingDir == null) {
+      val stagingRoot = Option(livyConf.get(LivyConf.SESSION_STAGING_DIR)).getOrElse {
+        new Path(fs.getHomeDirectory(), ".livy-sessions").toString()
+      }
+
+      val sessionDir = new Path(stagingRoot, UUID.randomUUID().toString())
+      fs.mkdirs(sessionDir, new FsPermission("700"))
+      stagingDir = sessionDir
+      debug(s"Session $id staging directory is $stagingDir")
     }
-    filePath.toUri
+    stagingDir
   }
 
   private def performOperation(job: Array[Byte], sync: Boolean): Long = {
@@ -142,9 +180,31 @@ class ClientSession(
 
   override def stop(): Future[Unit] = {
     Future {
-      sessionState = SessionState.ShuttingDown()
-      client.stop(true)
-      sessionState = SessionState.Dead()
+      try {
+        sessionState = SessionState.ShuttingDown()
+        client.stop(true)
+        sessionState = SessionState.Dead()
+      } catch {
+        case e: Exception =>
+          warn(s"Error stopping session $id.", e)
+      }
+
+      try {
+        if (stagingDir != null) {
+          debug(s"Deleting session $id staging directory $stagingDir")
+          doAsOwner {
+            val fs = FileSystem.newInstance(new Configuration())
+            try {
+              fs.delete(stagingDir, true)
+            } finally {
+              fs.close()
+            }
+          }
+        }
+      } catch {
+        case e: Exception =>
+          warn(s"Error cleaning up session $id staging dir.", e)
+      }
     }
   }
 

+ 1 - 1
server/src/main/scala/com/cloudera/livy/server/client/ClientSessionServlet.scala

@@ -48,7 +48,7 @@ class ClientSessionServlet(livyConf: LivyConf)
         None
       }
     val proxyUser = checkImpersonation(requestedProxy, req)
-    new ClientSession(id, user, proxyUser, createRequest, livyConf.livyHome)
+    new ClientSession(id, user, proxyUser, createRequest, livyConf)
   }
 
   // This endpoint is used by the client-http module to "connect" to an existing session and

+ 4 - 2
server/src/test/resources/log4j.properties

@@ -33,6 +33,8 @@ log4j.appender.console.target=System.err
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%t: %m%n
 
-# Ignore messages below warning level from Jetty, because it's a bit verbose
+# Silence some noisy libraries.
+log4j.logger.org.apache.http=WARN
+log4j.logger.org.apache.spark=INFO
+log4j.logger.org.eclipse.jetty=WARN
 log4j.logger.org.spark-project.jetty=WARN
-org.spark-project.jetty.LEVEL=WARN

+ 5 - 1
server/src/test/scala/com/cloudera/livy/server/BaseJsonServletSpec.scala

@@ -98,7 +98,11 @@ abstract class BaseJsonServletSpec extends ScalatraSuite with FunSpecLike {
 
   private def doTest[R: ClassTag](expectedStatus: Int, fn: R => Unit)
       (implicit klass: ClassTag[R]): Unit = {
-    status should be (expectedStatus)
+    if (status != expectedStatus) {
+      // Yeah this is weird, but we don't want to evaluate "response.body" if there's no error.
+      assert(status === expectedStatus,
+        s"Unexpected response status: $status != $expectedStatus (${response.body})")
+    }
     // Only try to parse the body if response is in the "OK" range (20x).
     if ((status / 100) * 100 == SC_OK) {
       val result =

+ 33 - 2
server/src/test/scala/com/cloudera/livy/server/client/ClientServletSpec.scala

@@ -31,6 +31,7 @@ import scala.concurrent.duration._
 import scala.io.Source
 import scala.language.postfixOps
 
+import org.apache.commons.io.FileUtils
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.spark.api.java.function.VoidFunction
 import org.scalatest.concurrent.Eventually._
@@ -46,6 +47,23 @@ class ClientServletSpec
 
   private val PROXY = "__proxy__"
 
+  private var tempDir: File = _
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    if (tempDir != null) {
+      scala.util.Try(FileUtils.deleteDirectory(tempDir))
+      tempDir = null
+    }
+  }
+
+  override protected def createConf(): LivyConf = synchronized {
+    if (tempDir == null) {
+      tempDir = Files.createTempDirectory("client-test").toFile()
+    }
+    super.createConf().set(LivyConf.SESSION_STAGING_DIR, tempDir.toURI().toString())
+  }
+
   override def createServlet(): ClientSessionServlet = {
     new ClientSessionServlet(createConf()) with RemoteUserOverride
   }
@@ -134,6 +152,9 @@ class ClientServletSpec
           case _ => fail("Response is not an array.")
         }
       }
+
+      // Make sure the session's staging directory was cleaned up.
+      assert(tempDir.listFiles().length === 0)
     }
 
     it("should support user impersonation") {
@@ -159,8 +180,13 @@ class ClientServletSpec
           data.proxyUser should be (PROXY)
           val user = runJob(data.id, new GetUserJob(), headers = adminHeaders)
           user should be (PROXY)
+
+          // Test that files are uploaded to a new session directory.
+          assert(tempDir.listFiles().length === 0)
+          testResourceUpload("file", data.id)
         } finally {
           deleteSession(data.id)
+          assert(tempDir.listFiles().length === 0)
         }
       }
     }
@@ -185,12 +211,17 @@ class ClientServletSpec
 
   private def testResourceUpload(cmd: String, sessionId: Int): Unit = {
     val f = File.createTempFile("uploadTestFile", cmd)
-    val conf = new LivyConf()
+    val conf = createConf()
 
     Files.write(Paths.get(f.getAbsolutePath), "Test data".getBytes())
 
     jupload[Unit](s"/$sessionId/upload-$cmd", Map(cmd -> f), expectedStatus = SC_OK) { _ =>
-      val resultFile = new File(new URI(s"${conf.livyHome()}/$sessionId/${f.getName}"))
+      // There should be a single directory under the staging dir.
+      val subdirs = tempDir.listFiles()
+      assert(subdirs.length === 1)
+      val stagingDir = subdirs(0).toURI().toString()
+
+      val resultFile = new File(new URI(s"$stagingDir/${f.getName}"))
       resultFile.deleteOnExit()
       resultFile.exists() should be(true)
       Source.fromFile(resultFile).mkString should be("Test data")