|
@@ -25,6 +25,7 @@ import scala.concurrent._
|
|
|
import scala.concurrent.duration._
|
|
|
import scala.language.postfixOps
|
|
|
import scala.util.Try
|
|
|
+import scala.util.control.NonFatal
|
|
|
|
|
|
import org.apache.hadoop.yarn.api.records.{ApplicationId, ApplicationReport, FinalApplicationStatus, YarnApplicationState}
|
|
|
import org.apache.hadoop.yarn.client.api.YarnClient
|
|
@@ -70,7 +71,7 @@ object SparkYarnApp extends Logging {
|
|
|
override def run(): Unit = {
|
|
|
while (true) {
|
|
|
if (!leakedAppTags.isEmpty) {
|
|
|
- // kill the app if found it and remove it if exceeding a threashold
|
|
|
+ // kill the app if found it and remove it if exceeding a threshold
|
|
|
val iter = leakedAppTags.entrySet().iterator()
|
|
|
var isRemoved = false
|
|
|
val now = System.currentTimeMillis()
|
|
@@ -179,9 +180,11 @@ class SparkYarnApp private[utils] (
|
|
|
if (deadline.isOverdue) {
|
|
|
process.foreach(_.destroy())
|
|
|
leakedAppTags.put(appTag, System.currentTimeMillis())
|
|
|
- throw new Exception(s"No YARN application is found with tag $appTagLowerCase in " +
|
|
|
- livyConf.getTimeAsMs(LivyConf.YARN_APP_LOOKUP_TIMEOUT)/1000 + " seconds. " +
|
|
|
- "Please check your cluster status, it is may be very busy.")
|
|
|
+ throw new IllegalStateException(s"No YARN application is found with tag" +
|
|
|
+ s" $appTagLowerCase in ${livyConf.getTimeAsMs(LivyConf.YARN_APP_LOOKUP_TIMEOUT)/1000}" +
|
|
|
+ " seconds. This may be because 1) spark-submit fail to submit application to YARN; " +
|
|
|
+ "or 2) YARN cluster doesn't have enough resources to start the application in time. " +
|
|
|
+ "Please check Livy log and YARN log to know the details.")
|
|
|
} else {
|
|
|
Clock.sleep(pollInterval.toMillis)
|
|
|
getAppIdFromTag(appTagLowerCase, pollInterval, deadline)
|
|
@@ -290,12 +293,12 @@ class SparkYarnApp private[utils] (
|
|
|
|
|
|
debug(s"$appId $state ${yarnDiagnostics.mkString(" ")}")
|
|
|
} catch {
|
|
|
- case e: InterruptedException =>
|
|
|
+ case _: InterruptedException =>
|
|
|
yarnDiagnostics = ArrayBuffer("Session stopped by user.")
|
|
|
changeState(SparkApp.State.KILLED)
|
|
|
- case e: Throwable =>
|
|
|
- error(s"Error whiling refreshing YARN state: $e")
|
|
|
- yarnDiagnostics = ArrayBuffer(e.toString, e.getStackTrace().mkString(" "))
|
|
|
+ case NonFatal(e) =>
|
|
|
+ error(s"Error whiling refreshing YARN state", e)
|
|
|
+ yarnDiagnostics = ArrayBuffer(e.getMessage)
|
|
|
changeState(SparkApp.State.FAILED)
|
|
|
}
|
|
|
}
|