瀏覽代碼

LIVY-133. Remove default master from interactive session config.

This makes it so that admins have to provide a proper Spark config
to start things up in yarn cluster mode, which is really the only
recommended way to run things.

I also removed setting the master from tests that allow that (those
that run through spark-submit); tests that create a SparkContext
in-process still need to define the master manually.

While testing I found out that the code that caused RSCClient initialization
to fail fast was not working anymore, so I fixed that too. Also, found a
race in the integration tests, where an assert might fail because a session
from a previous test was still in the process of shutting down.

Closes #124
Marcelo Vanzin 9 年之前
父節點
當前提交
d306a73c87

+ 7 - 1
integration-test/src/main/scala/com/cloudera/livy/test/framework/BaseIntegrationTestSuite.scala

@@ -105,7 +105,13 @@ abstract class BaseIntegrationTestSuite extends FunSuite with Matchers {
     /** Stops a session. If an id < 0 is provided, do nothing. */
     def stopSession(sessionId: Int): Unit = {
       if (sessionId >= 0) {
-        httpClient.prepareDelete(s"$livyEndpoint/sessions/$sessionId").execute()
+        val sessionUri = s"$livyEndpoint/sessions/$sessionId"
+        httpClient.prepareDelete(sessionUri).execute().get()
+
+        eventually(timeout(30 seconds), interval(1 second)) {
+          var res = httpClient.prepareGet(sessionUri).execute().get()
+          assert(res.getStatusCode() === HttpServletResponse.SC_NOT_FOUND)
+        }
       }
     }
 

+ 13 - 6
integration-test/src/test/scala/com/cloudera/livy/test/JobApiIT.scala

@@ -61,15 +61,22 @@ class JobApiIT extends BaseIntegrationTestSuite with BeforeAndAfterAll {
   }
 
   test("create a new session") {
-    client = createClient(livyEndpoint)
+    val tempClient = createClient(livyEndpoint)
 
     // Figure out the session ID by poking at the REST endpoint. We should probably expose this
     // in the Java API.
-    val list = sessionList()
-    assert(list.total === 1)
-    sessionId = list.sessions(0).id
-
-    waitTillSessionIdle(sessionId)
+    try {
+      val list = sessionList()
+      assert(list.total === 1)
+      sessionId = list.sessions(0).id
+
+      waitTillSessionIdle(sessionId)
+      client = tempClient
+    } finally {
+      if (client == null) {
+        tempClient.stop(true)
+      }
+    }
   }
 
   test("upload jar") {

+ 1 - 13
repl/pom.xml

@@ -166,24 +166,14 @@
                 </executions>
             </plugin>
 
-            <plugin>
+             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-jar-plugin</artifactId>
                 <configuration>
-                    <archive>
-                        <manifest>
-                            <mainClass>com.cloudera.livy.repl.Main</mainClass>
-                        </manifest>
-                    </archive>
                     <outputDirectory>${project.build.directory}/jars</outputDirectory>
                 </configuration>
             </plugin>
 
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-surefire-plugin</artifactId>
-            </plugin>
-
             <plugin>
                 <groupId>org.scalatest</groupId>
                 <artifactId>scalatest-maven-plugin</artifactId>
@@ -191,8 +181,6 @@
                     <systemProperties>
                         <spark.app.name>Livy</spark.app.name>
                         <spark.master>local</spark.master>
-                        <spark.ui.enabled>false</spark.ui.enabled>
-                        <settings.usejavacp.value>true</settings.usejavacp.value>
                     </systemProperties>
                 </configuration>
             </plugin>

+ 0 - 1
repl/src/test/scala/com/cloudera/livy/repl/ReplDriverSuite.scala

@@ -40,7 +40,6 @@ class ReplDriverSuite extends FunSuite {
 
   test("start a repl session using the rsc") {
     val client = new LivyClientBuilder()
-      .setConf(SparkLauncher.SPARK_MASTER, "local")
       .setConf(SparkLauncher.DRIVER_MEMORY, "512m")
       .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, sys.props("java.class.path"))
       .setConf(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH, sys.props("java.class.path"))

+ 23 - 33
rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java

@@ -93,7 +93,12 @@ class ContextLauncher {
       factory.getServer().registerClient(clientId, secret, handler);
       String replMode = conf.get("repl");
       boolean repl = replMode != null && replMode.equals("true");
-      this.child = startDriver(factory.getServer(), conf, clientId, secret);
+
+      conf.set(LAUNCHER_ADDRESS, factory.getServer().getAddress());
+      conf.set(LAUNCHER_PORT, factory.getServer().getPort());
+      conf.set(CLIENT_ID, clientId);
+      conf.set(CLIENT_SECRET, secret);
+      this.child = startDriver(conf, promise);
 
       // Set up a timeout to fail the promise if we don't hear back from the context
       // after a configurable timeout.
@@ -131,17 +136,8 @@ class ContextLauncher {
     }
   }
 
-  private static ChildProcess startDriver(
-      final RpcServer rpcServer,
-      final RSCConf conf,
-      final String clientId,
-      final String secret) throws IOException {
-    // Write out the config file used by the remote context.
-    conf.set(LAUNCHER_ADDRESS, rpcServer.getAddress());
-    conf.set(LAUNCHER_PORT, rpcServer.getPort());
-    conf.set(CLIENT_ID, clientId);
-    conf.set(CLIENT_SECRET, secret);
-
+  private static ChildProcess startDriver(final RSCConf conf, Promise<?> promise)
+      throws IOException {
     String livyJars = conf.get(LIVY_JARS);
     if (livyJars == null) {
       String livyHome = System.getenv("LIVY_HOME");
@@ -197,18 +193,11 @@ class ContextLauncher {
           }
         }
       };
-      return new ChildProcess(conf, child, confFile);
+      return new ChildProcess(conf, promise, child, confFile);
     } else {
       final SparkLauncher launcher = new SparkLauncher();
       launcher.setSparkHome(System.getenv(SPARK_HOME_ENV));
       launcher.setAppResource("spark-internal");
-
-      // Define how to pass options to the child process. If launching in client (or local)
-      // mode, the driver options need to be passed directly on the command line. Otherwise,
-      // SparkSubmit will take care of that for us.
-      String master = conf.get("spark.master");
-      Utils.checkArgument(master != null, "spark.master is not defined.");
-      launcher.setMaster(master);
       launcher.setPropertiesFile(confFile.getAbsolutePath());
       launcher.setMainClass(RSCDriverBootstrapper.class.getName());
 
@@ -216,7 +205,7 @@ class ContextLauncher {
         launcher.addSparkArg("--proxy-user", conf.get(PROXY_USER));
       }
 
-      return new ChildProcess(conf, launcher.launch(), confFile);
+      return new ChildProcess(conf, promise, launcher.launch(), confFile);
     }
   }
 
@@ -335,11 +324,12 @@ class ContextLauncher {
       ContextInfo info = new ContextInfo(msg.host, msg.port, clientId, secret);
       if (promise.trySuccess(info)) {
         timeout.cancel(true);
+        LOG.debug("Received driver info for client {}: {}/{}.", client.getChannel(),
+          msg.host, msg.port);
+      } else {
+        LOG.warn("Connection established but promise is already finalized.");
       }
 
-      LOG.debug("Received driver info for client {}: {}/{}.", client.getChannel(),
-        msg.host, msg.port);
-
       ctx.executor().submit(new Runnable() {
         @Override
         public void run() {
@@ -354,31 +344,31 @@ class ContextLauncher {
   private static class ChildProcess {
 
     private final RSCConf conf;
+    private final Promise<?> promise;
     private final Process child;
     private final Thread monitor;
     private final Thread stdout;
     private final Thread stderr;
     private final File confFile;
-    private volatile boolean childFailed;
 
-    public ChildProcess(RSCConf conf, Runnable child, File confFile) {
+    public ChildProcess(RSCConf conf, Promise<?> promise, Runnable child, File confFile) {
       this.conf = conf;
+      this.promise = promise;
       this.monitor = monitor(child, CHILD_IDS.incrementAndGet());
       this.child = null;
       this.stdout = null;
       this.stderr = null;
       this.confFile = confFile;
-      this.childFailed = false;
     }
 
-    public ChildProcess(RSCConf conf, final Process childProc, File confFile) {
+    public ChildProcess(RSCConf conf, Promise<?> promise, final Process childProc, File confFile) {
       int childId = CHILD_IDS.incrementAndGet();
       this.conf = conf;
+      this.promise = promise;
       this.child = childProc;
       this.stdout = redirect("stdout-redir-" + childId, child.getInputStream());
       this.stderr = redirect("stderr-redir-" + childId, child.getErrorStream());
       this.confFile = confFile;
-      this.childFailed = false;
 
       Runnable monitorTask = new Runnable() {
         @Override
@@ -387,7 +377,7 @@ class ContextLauncher {
             int exitCode = child.waitFor();
             if (exitCode != 0) {
               LOG.warn("Child process exited with code {}.", exitCode);
-              childFailed = true;
+              fail(new IOException(String.format("Child process exited with code %d.", exitCode)));
             }
           } catch (InterruptedException ie) {
             LOG.warn("Waiting thread interrupted, killing child process.");
@@ -401,8 +391,8 @@ class ContextLauncher {
       this.monitor = monitor(monitorTask, childId);
     }
 
-    public boolean isFailed() {
-      return childFailed;
+    private void fail(Throwable error) {
+      promise.tryFailure(error);
     }
 
     public void kill() {
@@ -474,7 +464,7 @@ class ContextLauncher {
         @Override
         public void uncaughtException(Thread t, Throwable e) {
           LOG.warn("Child task threw exception.", e);
-          childFailed = true;
+          fail(e);
         }
       });
       thread.start();

+ 4 - 3
rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java

@@ -107,7 +107,7 @@ public class RSCClient implements LivyClient {
             public void onSuccess(Void unused) {
               if (isAlive) {
                 LOG.warn("Client RPC channel closed unexpectedly.");
-                isAlive = false;
+                stop(false);
               }
             }
           });
@@ -212,8 +212,9 @@ public class RSCClient implements LivyClient {
         }
 
         // Report failure for all pending jobs, so that clients can react.
-        for (JobHandleImpl<?> job : jobs.values()) {
-          job.setFailure(new IOException("RSCClient instance stopped."));
+        for (Map.Entry<String, JobHandleImpl<?>> e : jobs.entrySet()) {
+          LOG.info("Failing pending job {} due to shutdown.", e.getKey());
+          e.getValue().setFailure(new IOException("RSCClient instance stopped."));
         }
 
         eventLoopGroup.shutdownGracefully();

+ 0 - 1
rsc/src/test/java/com/cloudera/livy/rsc/TestSparkClient.java

@@ -77,7 +77,6 @@ public class TestSparkClient {
       conf.put("spark.app.name", "SparkClientSuite Local App");
     } else {
       String classpath = System.getProperty("java.class.path");
-      conf.put(SparkLauncher.SPARK_MASTER, "local");
       conf.put("spark.app.name", "SparkClientSuite Remote App");
       conf.put(SparkLauncher.DRIVER_MEMORY, "512m");
       conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, classpath);

+ 0 - 1
server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala

@@ -71,7 +71,6 @@ class InteractiveSession(
     info(s"Creating LivyClient for sessionId: $id")
     val builder = new LivyClientBuilder()
       .setConf("spark.app.name", s"livy-session-$id")
-      .setConf("spark.master", "yarn-cluster")
       .setAll(Option(request.conf).map(_.asJava).getOrElse(new JHashMap()))
       .setConf("livy.client.sessionId", id.toString)
       .setConf(RSCConf.Entry.DRIVER_CLASS.key(), "com.cloudera.livy.repl.ReplDriver")

+ 1 - 2
server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala

@@ -49,8 +49,7 @@ class InteractiveSessionSpec extends FunSpec with Matchers with BeforeAndAfterAl
     val req = new CreateInteractiveRequest()
     req.kind = PySpark()
     req.conf = Map(
-      RSCConf.Entry.LIVY_JARS.key() -> "",
-      SparkLauncher.SPARK_MASTER -> "local"
+      RSCConf.Entry.LIVY_JARS.key() -> ""
     )
     new InteractiveSession(0, null, None, livyConf, req)
   }