|
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
|
|
|
import scala.util.Try
|
|
|
|
|
|
import org.apache.hive.service.cli.SessionHandle
|
|
|
+import org.apache.spark.sql.{Row, SparkSession}
|
|
|
|
|
|
import org.apache.livy._
|
|
|
import org.apache.livy.server.interactive.InteractiveSession
|
|
@@ -34,11 +35,6 @@ import org.apache.livy.utils.LivySparkUtils
|
|
|
class RpcClient(livySession: InteractiveSession) extends Logging {
|
|
|
import RpcClient._
|
|
|
|
|
|
- private val isSpark1 = {
|
|
|
- val (sparkMajorVersion, _) =
|
|
|
- LivySparkUtils.formatSparkVersion(livySession.livyConf.get(LivyConf.LIVY_SPARK_VERSION))
|
|
|
- sparkMajorVersion == 1
|
|
|
- }
|
|
|
private val defaultIncrementalCollect =
|
|
|
livySession.livyConf.getBoolean(LivyConf.THRIFT_INCR_COLLECT_ENABLED).toString
|
|
|
|
|
@@ -63,7 +59,6 @@ class RpcClient(livySession: InteractiveSession) extends Logging {
|
|
|
rscClient.submit(executeSqlJob(sessionId(sessionHandle),
|
|
|
statementId,
|
|
|
statement,
|
|
|
- isSpark1,
|
|
|
defaultIncrementalCollect,
|
|
|
s"spark.${LivyConf.THRIFT_INCR_COLLECT_ENABLED}"))
|
|
|
}
|
|
@@ -104,7 +99,7 @@ class RpcClient(livySession: InteractiveSession) extends Logging {
|
|
|
def executeRegisterSession(sessionHandle: SessionHandle): JobHandle[_] = {
|
|
|
info(s"RSC client is executing register session $sessionHandle")
|
|
|
livySession.recordActivity()
|
|
|
- rscClient.submit(registerSessionJob(sessionId(sessionHandle), isSpark1))
|
|
|
+ rscClient.submit(registerSessionJob(sessionId(sessionHandle)))
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -123,23 +118,18 @@ class RpcClient(livySession: InteractiveSession) extends Logging {
|
|
|
* order to enforce that we are not accessing any class attribute
|
|
|
*/
|
|
|
object RpcClient {
|
|
|
- // Maps a session ID to its SparkSession (or HiveContext/SQLContext according to the Spark
|
|
|
- // version used)
|
|
|
+ // Maps a session ID to its SparkSession.
|
|
|
val SESSION_SPARK_ENTRY_MAP = "livy.thriftserver.rpc_sessionIdToSparkSQLSession"
|
|
|
val STATEMENT_RESULT_ITER_MAP = "livy.thriftserver.rpc_statementIdToResultIter"
|
|
|
val STATEMENT_SCHEMA_MAP = "livy.thriftserver.rpc_statementIdToSchema"
|
|
|
|
|
|
- private def registerSessionJob(sessionId: String, isSpark1: Boolean): Job[_] = new Job[Boolean] {
|
|
|
+ private def registerSessionJob(sessionId: String): Job[_] = new Job[Boolean] {
|
|
|
override def call(jc: JobContext): Boolean = {
|
|
|
- val spark: Any = if (isSpark1) {
|
|
|
- Option(jc.hivectx()).getOrElse(jc.sqlctx())
|
|
|
- } else {
|
|
|
- jc.sparkSession()
|
|
|
- }
|
|
|
- val sessionSpecificSpark = spark.getClass.getMethod("newSession").invoke(spark)
|
|
|
+ val spark = jc.sparkSession[SparkSession]()
|
|
|
+ val sessionSpecificSpark = spark.newSession()
|
|
|
jc.sc().synchronized {
|
|
|
val existingMap =
|
|
|
- Try(jc.getSharedObject[HashMap[String, AnyRef]](SESSION_SPARK_ENTRY_MAP))
|
|
|
+ Try(jc.getSharedObject[HashMap[String, SparkSession]](SESSION_SPARK_ENTRY_MAP))
|
|
|
.getOrElse(new HashMap[String, AnyRef]())
|
|
|
jc.setSharedObject(SESSION_SPARK_ENTRY_MAP,
|
|
|
existingMap + ((sessionId, sessionSpecificSpark)))
|
|
@@ -147,9 +137,9 @@ object RpcClient {
|
|
|
.failed.foreach { _ =>
|
|
|
jc.setSharedObject(STATEMENT_SCHEMA_MAP, new HashMap[String, String]())
|
|
|
}
|
|
|
- Try(jc.getSharedObject[HashMap[String, Iterator[_]]](STATEMENT_RESULT_ITER_MAP))
|
|
|
+ Try(jc.getSharedObject[HashMap[String, Iterator[Row]]](STATEMENT_RESULT_ITER_MAP))
|
|
|
.failed.foreach { _ =>
|
|
|
- jc.setSharedObject(STATEMENT_RESULT_ITER_MAP, new HashMap[String, Iterator[_]]())
|
|
|
+ jc.setSharedObject(STATEMENT_RESULT_ITER_MAP, new HashMap[String, Iterator[Row]]())
|
|
|
}
|
|
|
}
|
|
|
true
|
|
@@ -160,7 +150,7 @@ object RpcClient {
|
|
|
override def call(jobContext: JobContext): Boolean = {
|
|
|
jobContext.sc().synchronized {
|
|
|
val existingMap =
|
|
|
- jobContext.getSharedObject[HashMap[String, AnyRef]](SESSION_SPARK_ENTRY_MAP)
|
|
|
+ jobContext.getSharedObject[HashMap[String, SparkSession]](SESSION_SPARK_ENTRY_MAP)
|
|
|
jobContext.setSharedObject(SESSION_SPARK_ENTRY_MAP, existingMap - sessionId)
|
|
|
}
|
|
|
true
|
|
@@ -176,7 +166,7 @@ object RpcClient {
|
|
|
if (sparkContext.getLocalProperty("spark.jobGroup.id") == statementId) {
|
|
|
sparkContext.clearJobGroup()
|
|
|
}
|
|
|
- val iterMap = jc.getSharedObject[HashMap[String, Iterator[_]]](STATEMENT_RESULT_ITER_MAP)
|
|
|
+ val iterMap = jc.getSharedObject[HashMap[String, Iterator[Row]]](STATEMENT_RESULT_ITER_MAP)
|
|
|
jc.setSharedObject(STATEMENT_RESULT_ITER_MAP, iterMap - statementId)
|
|
|
val schemaMap = jc.getSharedObject[HashMap[String, String]](STATEMENT_SCHEMA_MAP)
|
|
|
jc.setSharedObject(STATEMENT_SCHEMA_MAP, schemaMap - statementId)
|
|
@@ -196,7 +186,7 @@ object RpcClient {
|
|
|
maxRows: Int): Job[ColumnOrientedResultSet] = new Job[ColumnOrientedResultSet] {
|
|
|
override def call(jobContext: JobContext): ColumnOrientedResultSet = {
|
|
|
val statementIterMap =
|
|
|
- jobContext.getSharedObject[HashMap[String, Iterator[_]]](STATEMENT_RESULT_ITER_MAP)
|
|
|
+ jobContext.getSharedObject[HashMap[String, Iterator[Row]]](STATEMENT_RESULT_ITER_MAP)
|
|
|
val iter = statementIterMap(statementId)
|
|
|
|
|
|
if (null == iter) {
|
|
@@ -212,13 +202,13 @@ object RpcClient {
|
|
|
var curRow = 0
|
|
|
while (curRow < maxRows && iter.hasNext) {
|
|
|
val sparkRow = iter.next()
|
|
|
- val row = ArrayBuffer[Any]()
|
|
|
+ val row = ArrayBuffer[Object]()
|
|
|
var curCol: Integer = 0
|
|
|
while (curCol < numOfColumns) {
|
|
|
- row += sparkRow.getClass.getMethod("get", classOf[Int]).invoke(sparkRow, curCol)
|
|
|
+ row += sparkRow.get(curCol).asInstanceOf[Object]
|
|
|
curCol += 1
|
|
|
}
|
|
|
- resultSet.addRow(row.toArray.asInstanceOf[Array[Object]])
|
|
|
+ resultSet.addRow(row.toArray)
|
|
|
curRow += 1
|
|
|
}
|
|
|
resultSet
|
|
@@ -229,7 +219,6 @@ object RpcClient {
|
|
|
private def executeSqlJob(sessionId: String,
|
|
|
statementId: String,
|
|
|
statement: String,
|
|
|
- isSpark1: Boolean,
|
|
|
defaultIncrementalCollect: String,
|
|
|
incrementalCollectEnabledProp: String): Job[_] = new Job[Boolean] {
|
|
|
override def call(jc: JobContext): Boolean = {
|
|
@@ -237,46 +226,31 @@ object RpcClient {
|
|
|
sparkContext.synchronized {
|
|
|
sparkContext.setJobGroup(statementId, statement)
|
|
|
}
|
|
|
- val spark = jc.getSharedObject[HashMap[String, AnyRef]](SESSION_SPARK_ENTRY_MAP)(sessionId)
|
|
|
+ val spark =
|
|
|
+ jc.getSharedObject[HashMap[String, SparkSession]](SESSION_SPARK_ENTRY_MAP)(sessionId)
|
|
|
try {
|
|
|
- val result = spark.getClass.getMethod("sql", classOf[String]).invoke(spark, statement)
|
|
|
- val schema = result.getClass.getMethod("schema").invoke(result)
|
|
|
- val jsonString = schema.getClass.getMethod("json").invoke(schema).asInstanceOf[String]
|
|
|
+ val result = spark.sql(statement)
|
|
|
+ val jsonSchema = result.schema.json
|
|
|
|
|
|
// Set the schema in the shared map
|
|
|
sparkContext.synchronized {
|
|
|
val existingMap = jc.getSharedObject[HashMap[String, String]](STATEMENT_SCHEMA_MAP)
|
|
|
- jc.setSharedObject(STATEMENT_SCHEMA_MAP, existingMap + ((statementId, jsonString)))
|
|
|
+ jc.setSharedObject(STATEMENT_SCHEMA_MAP, existingMap + ((statementId, jsonSchema)))
|
|
|
}
|
|
|
|
|
|
- val incrementalCollect = {
|
|
|
- if (isSpark1) {
|
|
|
- spark.getClass.getMethod("getConf", classOf[String], classOf[String])
|
|
|
- .invoke(spark,
|
|
|
- incrementalCollectEnabledProp,
|
|
|
- defaultIncrementalCollect)
|
|
|
- .asInstanceOf[String].toBoolean
|
|
|
- } else {
|
|
|
- val conf = spark.getClass.getMethod("conf").invoke(spark)
|
|
|
- conf.getClass.getMethod("get", classOf[String], classOf[String])
|
|
|
- .invoke(conf,
|
|
|
- incrementalCollectEnabledProp,
|
|
|
- defaultIncrementalCollect)
|
|
|
- .asInstanceOf[String].toBoolean
|
|
|
- }
|
|
|
- }
|
|
|
+ val incrementalCollect = spark.conf.get(incrementalCollectEnabledProp,
|
|
|
+ defaultIncrementalCollect).toBoolean
|
|
|
|
|
|
val iter = if (incrementalCollect) {
|
|
|
- val rdd = result.getClass.getMethod("rdd").invoke(result)
|
|
|
- rdd.getClass.getMethod("toLocalIterator").invoke(rdd).asInstanceOf[Iterator[_]]
|
|
|
+ result.rdd.toLocalIterator
|
|
|
} else {
|
|
|
- result.getClass.getMethod("collect").invoke(result).asInstanceOf[Array[_]].iterator
|
|
|
+ result.collect().iterator
|
|
|
}
|
|
|
|
|
|
// Set the iterator in the shared map
|
|
|
sparkContext.synchronized {
|
|
|
val existingMap =
|
|
|
- jc.getSharedObject[HashMap[String, Iterator[_]]](STATEMENT_RESULT_ITER_MAP)
|
|
|
+ jc.getSharedObject[HashMap[String, Iterator[Row]]](STATEMENT_RESULT_ITER_MAP)
|
|
|
jc.setSharedObject(STATEMENT_RESULT_ITER_MAP, existingMap + ((statementId, iter)))
|
|
|
}
|
|
|
} catch {
|