|
@@ -19,6 +19,7 @@
|
|
|
package com.cloudera.livy.test.framework
|
|
|
|
|
|
import java.io.{File, IOException}
|
|
|
+import java.nio.file.Files
|
|
|
import java.security.PrivilegedExceptionAction
|
|
|
import javax.servlet.http.HttpServletResponse._
|
|
|
|
|
@@ -51,6 +52,8 @@ private class RealClusterConfig(config: Map[String, String]) {
|
|
|
val sparkHome = config("env.spark_home")
|
|
|
val sparkConf = config.getOrElse("env.spark_conf", "/etc/spark/conf")
|
|
|
val hadoopConf = config.getOrElse("env.hadoop_conf", "/etc/hadoop/conf")
|
|
|
+
|
|
|
+ val javaHome = config.getOrElse("env.java_home", "/usr/java/default")
|
|
|
}
|
|
|
|
|
|
class RealCluster(_config: Map[String, String])
|
|
@@ -99,7 +102,10 @@ class RealCluster(_config: Map[String, String])
|
|
|
info(s"Running command: $cmd")
|
|
|
val result = sshClient(_.exec(cmd))
|
|
|
result.exitCode match {
|
|
|
- case Some(ec) if ec > 0 => throw new IOException(s"Command failed: $ec")
|
|
|
+ case Some(ec) if ec > 0 =>
|
|
|
+ throw new IOException(s"Command '$cmd' failed: $ec\n" +
|
|
|
+ s"stdout: ${result.stdOutAsString()}\n" +
|
|
|
+ s"stderr: ${result.stdErrAsString()}\n")
|
|
|
case _ =>
|
|
|
}
|
|
|
result
|
|
@@ -216,20 +222,22 @@ class RealCluster(_config: Map[String, String])
|
|
|
upload(livyConfFile.getAbsolutePath(), s"$livyHomePath/conf/livy.conf")
|
|
|
|
|
|
val env = Map(
|
|
|
+ "JAVA_HOME" -> config.javaHome,
|
|
|
"HADOOP_CONF_DIR" -> config.hadoopConf,
|
|
|
"SPARK_CONF_DIR" -> sparkConfDir,
|
|
|
"SPARK_HOME" -> config.sparkHome,
|
|
|
- "CLASSPATH" -> config.livyClasspath
|
|
|
+ "CLASSPATH" -> config.livyClasspath,
|
|
|
+ "LIVY_PID_DIR" -> s"$tempDirPath/pid",
|
|
|
+ "LIVY_LOG_DIR" -> s"$tempDirPath/logs",
|
|
|
+ "LIVY_MAX_LOG_FILES" -> "16",
|
|
|
+ "LIVY_IDENT_STRING" -> "it"
|
|
|
)
|
|
|
- .map { case (k, v) => s"$k=$v" }
|
|
|
- .mkString(" ")
|
|
|
-
|
|
|
- val livyServerPath = s"$livyHomePath/bin/livy-server"
|
|
|
+ val livyEnvFile = File.createTempFile("livy-env.", ".sh")
|
|
|
+ saveProperties(env, livyEnvFile)
|
|
|
+ upload(livyEnvFile.getAbsolutePath(), s"$livyHomePath/conf/livy-env.sh")
|
|
|
|
|
|
info(s"Starting Livy @ port ${config.livyPort}...")
|
|
|
- livyEpoch += 1
|
|
|
- val logPath = s"$tempDirPath/livy-$livyEpoch.log"
|
|
|
- exec(s"env $env nohup $livyServerPath > $logPath 2>&1 &")
|
|
|
+ exec(s"env -i $livyHomePath/bin/livy-server start")
|
|
|
livyIsRunning = true
|
|
|
|
|
|
val httpClient = new AsyncHttpClient()
|
|
@@ -242,7 +250,7 @@ class RealCluster(_config: Map[String, String])
|
|
|
override def stopLivy(): Unit = synchronized {
|
|
|
info("Stopping Livy Server")
|
|
|
try {
|
|
|
- exec(s"pkill -f com.cloudera.livy.server.LivyServer")
|
|
|
+ exec(s"$livyHomePath/bin/livy-server stop")
|
|
|
} catch {
|
|
|
case e: Exception =>
|
|
|
if (livyIsRunning) {
|
|
@@ -254,9 +262,11 @@ class RealCluster(_config: Map[String, String])
|
|
|
// Wait a tiny bit so that the process finishes closing its output files.
|
|
|
Thread.sleep(2)
|
|
|
|
|
|
- val logName = s"livy-$livyEpoch.log"
|
|
|
- val logPath = s"$tempDirPath/$logName"
|
|
|
- val localLog = sys.props("java.io.tmpdir") + File.separator + logName
|
|
|
+ livyEpoch += 1
|
|
|
+ val logName = "livy-it-server.out"
|
|
|
+ val localName = s"livy-it-server-$livyEpoch.out"
|
|
|
+ val logPath = s"$tempDirPath/logs/$logName"
|
|
|
+ val localLog = sys.props("java.io.tmpdir") + File.separator + localName
|
|
|
download(logPath, localLog)
|
|
|
info(s"Log for epoch $livyEpoch available at $localLog")
|
|
|
}
|