Explorar o código

LIVY-130. Run pyspark and sparkr tests if a real Spark is available.

A new setting was added to allow a real Spark installation to be used
for integration tests. When set, this allows pyspark and sparkr to run;
this also allows testing different versions of Spark than the one
referenced in the pom easily. I tested this with 1.6.1.

Closes #123
Marcelo Vanzin %!s(int64=9) %!d(string=hai) anos
pai
achega
7fe445429a

+ 15 - 0
integration-test/pom.xml

@@ -204,6 +204,7 @@
         <configuration>
           <environmentVariables>
             <LIVY_HOME>${execution.root}</LIVY_HOME>
+            <LIVY_TEST>false</LIVY_TEST>
           </environmentVariables>
           <systemProperties>
             <cluster.spec>${cluster.spec}</cluster.spec>
@@ -214,4 +215,18 @@
     </plugins>
   </build>
 
+  <profiles>
+    <profile>
+      <id>real-spark-home</id>
+      <activation>
+        <property>
+          <name>real.spark.home</name>
+        </property>
+      </activation>
+      <properties>
+        <spark.home>${real.spark.home}</spark.home>
+      </properties>
+    </profile>
+  </profiles>
+
 </project>

+ 27 - 15
integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala

@@ -31,7 +31,7 @@ import org.scalatest._
 import org.scalatest.concurrent.Eventually._
 
 import com.cloudera.livy.server.interactive.CreateInteractiveRequest
-import com.cloudera.livy.sessions.{SessionKindModule, SessionState, Spark}
+import com.cloudera.livy.sessions._
 
 abstract class BaseIntegrationTestSuite extends FunSuite with Matchers {
   var cluster: Cluster = _
@@ -49,12 +49,6 @@ abstract class BaseIntegrationTestSuite extends FunSuite with Matchers {
     .find(new File(_).getName().startsWith("livy-test-lib-"))
     .get
 
-  test("initialize test cluster") {
-    cluster = ClusterPool.get.lease()
-    httpClient = new AsyncHttpClient()
-    livyClient = new LivyRestClient(httpClient, livyEndpoint)
-  }
-
   protected def waitTillSessionIdle(sessionId: Int): Unit = {
     eventually(timeout(30 seconds), interval(100 millis)) {
       val curState = livyClient.getSessionStatus(sessionId)
@@ -62,20 +56,40 @@ abstract class BaseIntegrationTestSuite extends FunSuite with Matchers {
     }
   }
 
+  /** Wrapper around test() to be used by pyspark tests. */
+  protected def pytest(desc: String)(testFn: => Unit): Unit = {
+    test(desc) {
+      assume(cluster.isRealSpark(), "PySpark tests require a real Spark installation.")
+      testFn
+    }
+  }
+
+  /** Wrapper around test() to be used by SparkR tests. */
+  protected def rtest(desc: String)(testFn: => Unit): Unit = {
+    test(desc) {
+      assume(cluster.isRealSpark(), "SparkR tests require a real Spark installation.")
+      testFn
+    }
+  }
+
+  test("initialize test cluster") {
+    cluster = ClusterPool.get.lease()
+    httpClient = new AsyncHttpClient()
+    livyClient = new LivyRestClient(httpClient, livyEndpoint)
+  }
+
   class LivyRestClient(httpClient: AsyncHttpClient, livyEndpoint: String) {
 
-    def startSession(): Int = {
+    def startSession(kind: Kind): Int = {
       withClue(cluster.getLivyLog()) {
         val requestBody = new CreateInteractiveRequest()
-        requestBody.kind = Spark()
+        requestBody.kind = kind
 
         val rep = httpClient.preparePost(s"$livyEndpoint/sessions")
           .setBody(mapper.writeValueAsString(requestBody))
           .execute()
           .get()
 
-        info("Interactive session submitted")
-
         val sessionId: Int = withClue(rep.getResponseBody) {
           rep.getStatusCode should equal(HttpServletResponse.SC_CREATED)
           val newSession = mapper.readValue(rep.getResponseBodyAsStream, classOf[Map[String, Any]])
@@ -83,7 +97,6 @@ abstract class BaseIntegrationTestSuite extends FunSuite with Matchers {
 
           newSession("id").asInstanceOf[Int]
         }
-        info(s"Session id $sessionId")
 
         sessionId
       }
@@ -112,10 +125,9 @@ abstract class BaseIntegrationTestSuite extends FunSuite with Matchers {
 
     def runStatement(sessionId: Int, stmt: String): Int = {
       withClue(cluster.getLivyLog()) {
-        val requestBody = "{ \"code\": \"" + stmt + "\" }"
-        info(requestBody)
+        val requestBody = Map("code" -> stmt)
         val rep = httpClient.preparePost(s"$livyEndpoint/sessions/$sessionId/statements")
-          .setBody(requestBody)
+          .setBody(mapper.writeValueAsString(requestBody))
           .execute()
           .get()
 

+ 1 - 0
integration-test/src/main/scala/com/cloudera/livy/test/framework/Cluster.scala

@@ -29,6 +29,7 @@ trait Cluster {
   def getYarnRmEndpoint: String
   def upload(srcPath: String, destPath: String): Unit
   def configDir(): File
+  def isRealSpark(): Boolean
 
   def runLivy(): Unit
   def stopLivy(): Unit

+ 20 - 4
integration-test/src/main/scala/com/cloudera/livy/test/framework/MiniCluster.scala

@@ -52,9 +52,14 @@ private class MiniClusterConfig(val config: Map[String, String]) {
 sealed trait MiniClusterUtils {
 
   protected def saveConfig(conf: Configuration, dest: File): Unit = {
+    val redacted = new Configuration(conf)
+    // This setting references a test class that is not available when using a real Spark
+    // installation, so remove it from client configs.
+    redacted.unset("net.topology.node.switch.mapping.impl")
+
     val out = new FileOutputStream(dest)
     try {
-      conf.writeXml(out)
+      redacted.writeXml(out)
     } finally {
       out.close()
     }
@@ -189,6 +194,10 @@ class MiniCluster(config: Map[String, String]) extends Cluster with MiniClusterU
 
   override def configDir(): File = _configDir
 
+  override def isRealSpark(): Boolean = {
+    new File(sys.env("SPARK_HOME") + File.separator + "RELEASE").isFile()
+  }
+
   // Explicitly remove the "test-lib" dependency from the classpath of child processes. We
   // want tests to explicitly upload this jar when necessary, to test those code paths.
   private val childClasspath = {
@@ -201,15 +210,22 @@ class MiniCluster(config: Map[String, String]) extends Cluster with MiniClusterU
   override def deploy(): Unit = {
     sparkConfDir = mkdir("spark-conf")
 
-    val sparkConf = Map(
+    // When running a real Spark cluster, don't set the classpath.
+    val extraCp = if (!isRealSpark()) {
+      Map(
+        SparkLauncher.DRIVER_EXTRA_CLASSPATH -> childClasspath,
+        SparkLauncher.EXECUTOR_EXTRA_CLASSPATH -> childClasspath)
+    } else {
+      Map()
+    }
+
+    val sparkConf = extraCp ++ Map(
       SparkLauncher.SPARK_MASTER -> "yarn-cluster",
       "spark.executor.instances" -> "1",
       "spark.scheduler.minRegisteredResourcesRatio" -> "0.0",
       "spark.ui.enabled" -> "false",
       SparkLauncher.DRIVER_MEMORY -> "512m",
       SparkLauncher.EXECUTOR_MEMORY -> "512m",
-      SparkLauncher.DRIVER_EXTRA_CLASSPATH -> childClasspath,
-      SparkLauncher.EXECUTOR_EXTRA_CLASSPATH -> childClasspath,
       SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS -> "-Dtest.appender=console",
       SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS -> "-Dtest.appender=console"
     )

+ 2 - 0
integration-test/src/main/scala/com/cloudera/livy/test/framework/RealCluster.scala

@@ -52,6 +52,8 @@ class RealCluster(
   private var livyHomePath: Option[String] = Some("/usr/bin/livy")
   private var pathsToCleanUp = ListBuffer.empty[String]
 
+  override def isRealSpark(): Boolean = true
+
   override def configDir(): File = throw new UnsupportedOperationException()
 
   def sshClient[T](body: SshClient => SSH.Result[T]): Validated[T] = {

+ 2 - 3
integration-test/src/test/resources/log4j.properties

@@ -18,7 +18,7 @@
 
 # Set everything to be logged to the file target/unit-tests.log
 test.appender=file
-log4j.rootCategory=INFO, ${test.appender}
+log4j.rootCategory=DEBUG, ${test.appender}
 log4j.appender.file=org.apache.log4j.FileAppender
 log4j.appender.file.append=true
 log4j.appender.file.file=target/unit-tests.log
@@ -34,5 +34,4 @@ log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t: %m%n
 
 # Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark-project.jetty=WARN
-org.spark-project.jetty.LEVEL=WARN
+log4j.logger.org.eclipse.jetty=WARN

+ 28 - 0
integration-test/src/test/resources/pytest.py

@@ -0,0 +1,28 @@
+#!/usr/bin/env /python
+#
+# Licensed to Cloudera, Inc. under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  Cloudera, Inc. licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+from pyspark import SparkContext
+
+output = sys.argv[1]
+sc = SparkContext(appName="PySpark Test")
+try:
+  sc.parallelize(range(100), 10).map(lambda x: (x, x * 2)).saveAsTextFile(output)
+finally:
+  sc.stop()

+ 34 - 0
integration-test/src/test/resources/rtest.R

@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(SparkR)
+
+# Initialize SparkContext and SQLContext
+sc <- sparkR.init(appName="SparkR-DataFrame-example")
+sqlContext <- sparkRSQL.init(sc)
+
+# Create a simple local data.frame
+localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18))
+
+# Convert local data frame to a SparkDataFrame
+df <- createDataFrame(sqlContext, localDF)
+
+# Print its schema
+printSchema(df)
+
+# Stop the SparkContext now
+sparkR.stop()

+ 45 - 3
integration-test/src/test/scala/com/cloudera/livy/test/BatchIT.scala

@@ -25,6 +25,7 @@ import javax.servlet.http.HttpServletResponse._
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
+import org.apache.commons.io.IOUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.fs.Path
@@ -61,7 +62,7 @@ class BatchIT extends BaseIntegrationTestSuite {
   test("submit spark app") {
     assume(testLibPath != null, "Test lib not uploaded.")
     val output = "/" + UUID.randomUUID().toString()
-    val result = runBatch(classOf[SimpleSparkApp], args = List(output))
+    val result = runSpark(classOf[SimpleSparkApp], args = List(output))
     assert(result.state === SessionState.Success().toString)
     assert(fs.isDirectory(new Path(output)))
   }
@@ -69,7 +70,7 @@ class BatchIT extends BaseIntegrationTestSuite {
   test("submit an app that fails") {
     assume(testLibPath != null, "Test lib not uploaded.")
     val output = "/" + UUID.randomUUID().toString()
-    val result = runBatch(classOf[FailingApp], args = List(output))
+    val result = runSpark(classOf[FailingApp], args = List(output))
     assert(result.state === SessionState.Error().toString)
 
     // The file is written to make sure the app actually ran, instead of just failing for
@@ -77,11 +78,52 @@ class BatchIT extends BaseIntegrationTestSuite {
     assert(fs.isFile(new Path(output)))
   }
 
-  private def runBatch(klass: Class[_], args: List[String] = Nil): SessionInfo = {
+  pytest("submit a pyspark application") {
+    val hdfsPath = uploadResource("pytest.py")
+    val output = "/" + UUID.randomUUID().toString()
+    val result = runScript(hdfsPath.toString, args = List(output))
+    assert(result.state === SessionState.Success().toString)
+    assert(fs.isDirectory(new Path(output)))
+  }
+
+  // This is disabled since R scripts don't seem to work in yarn-cluster mode. There's a
+  // TODO comment in Spark's ApplicationMaster.scala.
+  ignore("submit a SparkR application") {
+    val hdfsPath = uploadResource("rtest.R")
+    val result = runScript(hdfsPath.toString)
+    assert(result.state === SessionState.Success().toString)
+  }
+
+  private def uploadResource(name: String): Path = {
+    val hdfsPath = new Path(UUID.randomUUID().toString() + name)
+    val fs = FileSystem.get(hdfsPath.toUri(), conf)
+    val in = getClass.getResourceAsStream("/" + name)
+    val out = fs.create(hdfsPath)
+    try {
+      IOUtils.copy(in, out)
+    } finally {
+      in.close()
+      out.close()
+    }
+    fs.makeQualified(hdfsPath)
+  }
+
+  private def runScript(script: String, args: List[String] = Nil): SessionInfo = {
+    val request = new CreateBatchRequest()
+    request.file = script
+    request.args = args
+    runBatch(request)
+  }
+
+  private def runSpark(klass: Class[_], args: List[String] = Nil): SessionInfo = {
     val request = new CreateBatchRequest()
     request.file = testLibPath.toString()
     request.className = Some(klass.getName())
     request.args = args
+    runBatch(request)
+  }
+
+  private def runBatch(request: CreateBatchRequest): SessionInfo = {
     request.conf = Map("spark.yarn.maxAppAttempts" -> "1")
 
     val response = httpClient.preparePost(s"$livyEndpoint/batches")

+ 46 - 11
integration-test/src/test/scala/com/cloudera/livy/test/InteractiveIT.scala

@@ -18,10 +18,13 @@
 
 package com.cloudera.livy.test
 
+import java.util.concurrent.atomic.AtomicInteger
+
 import scala.concurrent.duration._
 
 import org.scalatest.BeforeAndAfter
 
+import com.cloudera.livy.sessions._
 import com.cloudera.livy.test.framework.BaseIntegrationTestSuite
 
 private case class TestStatement(
@@ -35,23 +38,56 @@ class InteractiveIT extends BaseIntegrationTestSuite with BeforeAndAfter {
 
   after {
     livyClient.stopSession(sessionId)
+    sessionId = -1
   }
 
   test("basic interactive session") {
-    sessionId = livyClient.startSession()
+    sessionId = livyClient.startSession(Spark())
 
     val testStmts = List(
       new TestStatement("1+1", Some("res0: Int = 2")),
-      new TestStatement("val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)",
-        Some("hiveContext: org.apache.spark.sql.hive.HiveContext = " +
-          "org.apache.spark.sql.hive.HiveContext")))
+      new TestStatement("val sqlContext = new org.apache.spark.sql.SQLContext(sc)",
+        Some("sqlContext: org.apache.spark.sql.SQLContext = " +
+          "org.apache.spark.sql.SQLContext")))
+    runAndValidateStatements(testStmts)
+  }
 
-    waitTillSessionIdle(sessionId)
+  pytest("pyspark interactive session") {
+    sessionId = livyClient.startSession(PySpark())
 
-    // Run the statements
-    testStmts.foreach {
-      runAndValidateStatement(_)
-    }
+    val testStmts = List(
+      new TestStatement("1+1", Some("2")),
+      new TestStatement(
+        "sc.parallelize(range(100)).map(lambda x: x * 2).reduce(lambda x, y: x + y)",
+        Some("9900")))
+    runAndValidateStatements(testStmts)
+  }
+
+  rtest("R interactive session") {
+    sessionId = livyClient.startSession(SparkR())
+
+    // R's output sometimes includes the count of statements, which makes it annoying to test
+    // things. This helps a bit.
+    val curr = new AtomicInteger()
+    def count: Int = curr.incrementAndGet()
+
+    val testStmts = List(
+      new TestStatement("1+1", Some(s"[$count] 2")),
+      new TestStatement("sqlContext <- sparkRSQL.init(sc)", None),
+      new TestStatement(
+        """localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18))""", None),
+      new TestStatement("df <- createDataFrame(sqlContext, localDF)", None),
+      new TestStatement("printSchema(df)", Some(
+      """|root
+         | |-- name: string (nullable = true)
+         | |-- age: double (nullable = true)""".stripMargin))
+    )
+    runAndValidateStatements(testStmts)
+  }
+
+  private def runAndValidateStatements(statements: Seq[TestStatement]) = {
+    waitTillSessionIdle(sessionId)
+    statements.foreach(runAndValidateStatement)
   }
 
   private def runAndValidateStatement(testStmt: TestStatement) = {
@@ -61,8 +97,7 @@ class InteractiveIT extends BaseIntegrationTestSuite with BeforeAndAfter {
 
     testStmt.expectedResult.map { s =>
       val result = livyClient.getStatementResult(sessionId, testStmt.stmtId)
-      assert(result.indexOf(s) >= 0,
-        s"Statement result doesn't match. Expected: $s. Actual: $result")
+      assert(result.contains(s))
     }
 
   }

+ 4 - 2
pom.xml

@@ -67,6 +67,8 @@
     <maxJavaVersion>1.8</maxJavaVersion>
     <test.redirectToFile>true</test.redirectToFile>
     <execution.root>${user.dir}</execution.root>
+    <spark.home>${execution.root}/dev/spark</spark.home>
+
     <!--
       Properties for the copyright header style checks. Modules that use the ASF header
       should override the "copyright.header" property.
@@ -638,7 +640,7 @@
           <configuration>
             <environmentVariables>
               <LIVY_TEST>true</LIVY_TEST>
-              <SPARK_HOME>${execution.root}/dev/spark</SPARK_HOME>
+              <SPARK_HOME>${spark.home}</SPARK_HOME>
             </environmentVariables>
             <systemProperties>
               <java.awt.headless>true</java.awt.headless>
@@ -661,7 +663,7 @@
           <configuration>
             <environmentVariables>
               <LIVY_TEST>true</LIVY_TEST>
-              <SPARK_HOME>${execution.root}/dev/spark</SPARK_HOME>
+              <SPARK_HOME>${spark.home}</SPARK_HOME>
             </environmentVariables>
             <systemProperties>
               <java.awt.headless>true</java.awt.headless>

+ 0 - 2
server/src/main/scala/com/cloudera/livy/utils/SparkProcessBuilder.scala

@@ -255,8 +255,6 @@ class SparkProcessBuilder(livyConf: LivyConf) extends Logging {
       env.put(key, value)
     }
 
-    env.asScala.foreach { case (k, v) => info(s"  env: $k = $v") }
-
     _redirectOutput.foreach(pb.redirectOutput)
     _redirectError.foreach(pb.redirectError)
     _redirectErrorStream.foreach(pb.redirectErrorStream)