Browse Source

LIVY-135. Add Scala-based tests to integration suite.

These tests are mostly targeted at making sure that Scala classes
(standard library types, case classes, etc) are properly serialized
by the Livy RPC code. They also show a little bit how to use the
Livy API in Scala, although it's not really optimal right now.

I also moved around some code in the test suite so that certain
errors don't cause subsequent tests to throw nasty exceptions,
causing them to be skipped instead.

Closes #125
Marcelo Vanzin 9 years ago
parent
commit
3ae23e6c67

+ 1 - 0
.travis.yml

@@ -1,3 +1,4 @@
+sudo: required
 language: scala
 
 jdk:

+ 1 - 1
integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala

@@ -50,7 +50,7 @@ abstract class BaseIntegrationTestSuite extends FunSuite with Matchers {
     .get
 
   protected def waitTillSessionIdle(sessionId: Int): Unit = {
-    eventually(timeout(30 seconds), interval(100 millis)) {
+    eventually(timeout(1 minute), interval(100 millis)) {
       val curState = livyClient.getSessionStatus(sessionId)
       assert(curState === SessionState.Idle().toString)
     }

+ 8 - 2
integration-test/src/main/scala/com/cloudera/livy/test/framework/MiniCluster.scala

@@ -137,9 +137,15 @@ object MiniYarnMain extends MiniClusterBase {
     var yarnCluster = new MiniYARNCluster(getClass().getName(), config.nmCount,
       config.localDirCount, config.logDirCount)
     yarnCluster.init(baseConfig)
+
+    // Install a shutdown hook for stop the service and kill all running applications.
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      override def run(): Unit = yarnCluster.stop()
+    })
+
     yarnCluster.start()
 
-    // Woraround for YARN-2642.
+    // Workaround for YARN-2642.
     val yarnConfig = yarnCluster.getConfig()
     eventually(timeout(30 seconds), interval(100 millis)) {
       assert(yarnConfig.get(YarnConfiguration.RM_ADDRESS).split(":")(1) != "0",
@@ -301,7 +307,7 @@ class MiniCluster(config: Map[String, String]) extends Cluster with MiniClusterU
       "-Dtest.appender=console",
       "-Djava.io.tmpdir=" + procTmp.getAbsolutePath(),
       "-cp", childClasspath + File.pathSeparator + configDir.getAbsolutePath(),
-      "-Xmx1g",
+      "-XX:MaxPermSize=256m",
       klass.getName().stripSuffix("$"),
       configDir.getAbsolutePath())
 

+ 33 - 13
integration-test/src/test/scala/com/cloudera/livy/test/JobApiIT.scala

@@ -29,7 +29,7 @@ import scala.util.Try
 
 import org.scalatest.BeforeAndAfterAll
 
-import com.cloudera.livy.{LivyClient, LivyClientBuilder}
+import com.cloudera.livy.{LivyClient, LivyClientBuilder, Logging}
 import com.cloudera.livy.client.common.HttpMessages._
 import com.cloudera.livy.sessions.SessionState
 import com.cloudera.livy.test.framework.BaseIntegrationTestSuite
@@ -43,7 +43,7 @@ private class SessionList {
   val sessions: List[SessionInfo] = Nil
 }
 
-class JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll {
+class JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll with Logging {
 
   private var client: LivyClient = _
   private var sessionId: Int = _
@@ -60,30 +60,32 @@ class JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll {
     livyClient.stopSession(sessionId)
   }
 
-  test("create a new session") {
+  test("create a new session and upload test jar") {
     val tempClient = createClient(livyEndpoint)
 
-    // Figure out the session ID by poking at the REST endpoint. We should probably expose this
-    // in the Java API.
     try {
+      // Figure out the session ID by poking at the REST endpoint. We should probably expose this
+      // in the Java API.
       val list = sessionList()
       assert(list.total === 1)
-      sessionId = list.sessions(0).id
+      val tempSessionId = list.sessions(0).id
+
+      waitTillSessionIdle(tempSessionId)
+      waitFor(tempClient.uploadJar(new File(testLib)))
 
-      waitTillSessionIdle(sessionId)
       client = tempClient
+      sessionId = tempSessionId
     } finally {
       if (client == null) {
-        tempClient.stop(true)
+        try {
+          tempClient.stop(true)
+        } catch {
+          case e: Exception => warn("Error stopping client.", e)
+        }
       }
     }
   }
 
-  test("upload jar") {
-    assume(client != null, "Client not active.")
-    waitFor(client.uploadJar(new File(testLib)))
-  }
-
   test("upload file") {
     assume(client != null, "Client not active.")
 
@@ -136,6 +138,24 @@ class JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll {
     assert(result === "hello")
   }
 
+  test("run scala jobs") {
+    assume(client2 != null, "Client not active.")
+
+    val jobs = Seq(
+      new ScalaEcho("abcde"),
+      new ScalaEcho(Seq(1, 2, 3, 4)),
+      new ScalaEcho(Map(1 -> 2, 3 -> 4)),
+      new ScalaEcho(ValueHolder("abcde")),
+      new ScalaEcho(ValueHolder(Seq(1, 2, 3, 4))),
+      new ScalaEcho(Some("abcde"))
+    )
+
+    jobs.foreach { job =>
+      val result = waitFor(client2.submit(job))
+      assert(result === job.value)
+    }
+  }
+
   test("destroy the session") {
     assume(client2 != null, "Client not active.")
     client2.stop(true)

+ 2 - 2
pom.xml

@@ -45,8 +45,8 @@
     <hadoop.version>2.6.0-cdh5.5.0</hadoop.version>
     <spark.version>1.5.0-cdh5.5.0</spark.version>
     <commons-codec.version>1.9</commons-codec.version>
-    <httpclient.version>4.5</httpclient.version>
-    <httpcore.version>4.4.1</httpcore.version>
+    <httpclient.version>4.5.2</httpclient.version>
+    <httpcore.version>4.4.4</httpcore.version>
     <jackson.version>2.4.4</jackson.version>
     <javax.servlet-api.version>3.1.0</javax.servlet-api.version>
     <jetty.version>9.2.10.v20150310</jetty.version>

+ 33 - 0
test-lib/src/main/scala/com/cloudera/livy/test/jobs/ScalaEcho.scala

@@ -0,0 +1,33 @@
+/*
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.livy.test.jobs
+
+import scala.reflect.ClassTag
+
+import com.cloudera.livy.{Job, JobContext}
+
+case class ValueHolder[T](value: T)
+
+class ScalaEcho[T: ClassTag](val value: T)(implicit val tag: ClassTag[T]) extends Job[T] {
+
+  override def call(jc: JobContext): T = {
+    jc.sc().sc.parallelize(Seq(value), 1)(tag).collect()(0)
+  }
+
+}