|
@@ -22,6 +22,8 @@ import java.io.File
|
|
|
import java.lang.ProcessBuilder.Redirect
|
|
|
import java.nio.file.{Paths, Files}
|
|
|
|
|
|
+import scala.collection.JavaConverters._
|
|
|
+
|
|
|
import com.cloudera.livy.sessions.interactive.InteractiveSession
|
|
|
import com.cloudera.livy.sessions.{PySpark, SessionFactory, SessionKindSerializer}
|
|
|
import com.cloudera.livy.spark.SparkProcessBuilder.{AbsolutePath, RelativePath}
|
|
@@ -30,14 +32,14 @@ import com.cloudera.livy.{LivyConf, Utils}
|
|
|
import org.json4s.{DefaultFormats, Formats, JValue}
|
|
|
|
|
|
object InteractiveSessionFactory {
|
|
|
- private val LivyReplDriverClassPath = "livy.repl.driverClassPath"
|
|
|
- private val LivyReplJar = "livy.repl.jar"
|
|
|
- private val LivyServerUrl = "livy.server.serverUrl"
|
|
|
- private val SparkDriverExtraJavaOptions = "spark.driver.extraJavaOptions"
|
|
|
- private val SparkLivyCallbackUrl = "spark.livy.callbackUrl"
|
|
|
- private val SparkLivyPort = "spark.livy.port"
|
|
|
- private val SparkSubmitPyFiles = "spark.submit.pyFiles"
|
|
|
- private val SparkYarnIsPython = "spark.yarn.isPython"
|
|
|
+ val LivyReplDriverClassPath = "livy.repl.driverClassPath"
|
|
|
+ val LivyReplJars = "livy.repl.jars"
|
|
|
+ val LivyServerUrl = "livy.server.serverUrl"
|
|
|
+ val SparkDriverExtraJavaOptions = "spark.driver.extraJavaOptions"
|
|
|
+ val SparkLivyCallbackUrl = "spark.livy.callbackUrl"
|
|
|
+ val SparkLivyPort = "spark.livy.port"
|
|
|
+ val SparkSubmitPyFiles = "spark.submit.pyFiles"
|
|
|
+ val SparkYarnIsPython = "spark.yarn.isPython"
|
|
|
}
|
|
|
|
|
|
abstract class InteractiveSessionFactory(processFactory: SparkProcessBuilderFactory)
|
|
@@ -53,7 +55,7 @@ abstract class InteractiveSessionFactory(processFactory: SparkProcessBuilderFact
|
|
|
def create(id: Int, request: CreateInteractiveRequest): InteractiveSession = {
|
|
|
val builder = sparkBuilder(id, request)
|
|
|
val kind = request.kind.toString
|
|
|
- val process = builder.start(AbsolutePath(livyJar(processFactory.livyConf)), List(kind))
|
|
|
+ val process = builder.start(None, List(kind))
|
|
|
|
|
|
create(id, process, request)
|
|
|
}
|
|
@@ -72,7 +74,10 @@ abstract class InteractiveSessionFactory(processFactory: SparkProcessBuilderFact
|
|
|
request.executorMemory.foreach(builder.executorMemory)
|
|
|
request.numExecutors.foreach(builder.numExecutors)
|
|
|
request.files.map(RelativePath).foreach(builder.file)
|
|
|
- request.jars.map(RelativePath).foreach(builder.jar)
|
|
|
+
|
|
|
+ val jars = request.jars.map(RelativePath) ++ livyJars(processFactory.livyConf).map(RelativePath)
|
|
|
+ jars.foreach(builder.jar)
|
|
|
+
|
|
|
request.proxyUser.foreach(builder.proxyUser)
|
|
|
request.queue.foreach(builder.queue)
|
|
|
request.name.foreach(builder.name)
|
|
@@ -120,9 +125,15 @@ abstract class InteractiveSessionFactory(processFactory: SparkProcessBuilderFact
|
|
|
builder
|
|
|
}
|
|
|
|
|
|
- private def livyJar(livyConf: LivyConf) = {
|
|
|
- livyConf.getOption(LivyReplJar)
|
|
|
- .getOrElse(Utils.jarOfClass(getClass).head)
|
|
|
+ private def livyJars(livyConf: LivyConf): Seq[String] = {
|
|
|
+ livyConf.getOption(LivyReplJars).map(_.split(",").toSeq).getOrElse {
|
|
|
+ val home = sys.env("LIVY_HOME")
|
|
|
+ val jars = Option(new File(home, "repl-jars"))
|
|
|
+ .filter(_.isDirectory())
|
|
|
+ .getOrElse(new File(home, "repl/target/jars"))
|
|
|
+ require(jars.isDirectory(), "Cannot find Livy REPL jars.")
|
|
|
+ jars.listFiles().map(_.getAbsolutePath()).toSeq
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private def findPySparkArchives(): Seq[String] = {
|