فهرست منبع

Automatically load default configuration from the classpath.

To make it easier to manage client configurations, allow LivyClientBuilder
to load configuration from files present in the classpath, mimicking what
other Hadoop APIs also do. The class will look for two files (livy-client.conf
and spark-defaults.conf) and load their contents if they're present.

On the server side, the ClientSession implementation was changed to
use this instead of trying to load Spark configuration from the system
properties. This should allow server-specific defaults to be configured
while allowing clients to override individual configs using the
session create request.

In the client-local implementation, I broke Spark and Livy configurations
into two separate files, to (arguably) simplify the code a bit. This is
not strictly related to the client configs change; but I also changed the
fact that client-local was loading spark-defaults.conf on its own, and
that's not needed anymore.
Marcelo Vanzin 9 سال پیش
والد
کامیت
35b96d092b

+ 67 - 17
api/src/main/java/com/cloudera/livy/LivyClientBuilder.java

@@ -17,29 +17,71 @@
 
 package com.cloudera.livy;
 
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.io.Reader;
 import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
 import java.util.Map;
 import java.util.Properties;
 import java.util.ServiceLoader;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import static java.nio.charset.StandardCharsets.UTF_8;
 
 /**
  * A builder for Livy clients.
  */
 public final class LivyClientBuilder {
 
+  public static final String LIVY_URI_KEY = "livy.uri";
+
   private static final Logger LOG = Logger.getLogger(LivyClientBuilder.class.getName());
 
-  private final URI uri;
   private final Properties config;
 
-  public LivyClientBuilder(URI uri) {
-    if (uri == null) {
-      throw new IllegalArgumentException("URI must be provided.");
-    }
-    this.uri = uri;
+  /**
+   * Creates a new builder that will automatically load the default Livy and Spark configuration
+   * from the classpath.
+   */
+  public LivyClientBuilder() throws IOException {
+    this(true);
+  }
+
+  /**
+   * Creates a new builder that will optionally load the default Livy and Spark configuration
+   * from the classpath.
+   *
+   * Livy client configuration is stored in a file called "livy-client.conf", and Spark client
+   * configuration is stored in a file called "spark-defaults.conf", both in the root of the
+   * application's classpath. Livy configuration takes precedence over Spark's (in case
+   * configuration entries are duplicated), and configuration set in this builder object will
+   * override the values in those files.
+   */
+  public LivyClientBuilder(boolean loadDefaults) throws IOException {
     this.config = new Properties();
+
+    if (loadDefaults) {
+      String[] confFiles = { "spark-defaults.conf", "livy-client.conf" };
+
+      for (String file : confFiles) {
+        URL url = classLoader().getResource(file);
+        if (url != null) {
+          Reader r = new InputStreamReader(url.openStream(), UTF_8);
+          try {
+            config.load(r);
+          } finally {
+            r.close();
+          }
+        }
+      }
+    }
+  }
+
+  public LivyClientBuilder setURI(URI uri) {
+    config.setProperty(LIVY_URI_KEY, uri.toString());
+    return this;
   }
 
   public LivyClientBuilder setConf(String key, String value) {
@@ -57,21 +99,21 @@ public final class LivyClientBuilder {
     return this;
   }
 
-  public LivyClientBuilder setIfMissing(String key, String value) {
-    if (!config.containsKey(key)) {
-      setConf(key, value);
-    }
-    return this;
-  }
-
   public LivyClient build() {
-    ClassLoader cl = Thread.currentThread().getContextClassLoader();
-    if (cl == null) {
-      cl = getClass().getClassLoader();
+    String uriStr = config.getProperty(LIVY_URI_KEY);
+    if (uriStr == null) {
+      throw new IllegalArgumentException("URI must be provided.");
+    }
+    URI uri;
+    try {
+      uri = new URI(uriStr);
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException("Invalid URI.", e);
     }
 
     LivyClient client = null;
-    ServiceLoader<LivyClientFactory> loader = ServiceLoader.load(LivyClientFactory.class, cl);
+    ServiceLoader<LivyClientFactory> loader = ServiceLoader.load(LivyClientFactory.class,
+      classLoader());
     if (!loader.iterator().hasNext()) {
       throw new IllegalStateException("No LivyClientFactory implementation was found.");
     }
@@ -101,4 +143,12 @@ public final class LivyClientBuilder {
     return client;
   }
 
+  private ClassLoader classLoader() {
+    ClassLoader cl = Thread.currentThread().getContextClassLoader();
+    if (cl == null) {
+      cl = getClass().getClassLoader();
+    }
+    return cl;
+  }
+
 }

+ 13 - 5
api/src/test/java/com/cloudera/livy/TestLivyClientBuilder.java

@@ -32,7 +32,8 @@ public class TestLivyClientBuilder {
     props.setProperty("prop3", "prop3");
 
     TestClientFactory.Client client = (TestClientFactory.Client)
-      new LivyClientBuilder(new URI("match"))
+      new LivyClientBuilder(false)
+        .setURI(new URI("match"))
         .setConf("prop1", "prop1")
         .setConf("prop2", "prop2")
         .setAll(props)
@@ -45,23 +46,30 @@ public class TestLivyClientBuilder {
   }
 
   @Test(expected = IllegalArgumentException.class)
-  public void testMissingUri() {
-    new LivyClientBuilder(null);
+  public void testMissingUri() throws Exception {
+    new LivyClientBuilder(false).build();
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void testMismatch() throws Exception {
-    assertNull(new LivyClientBuilder(new URI("mismatch")).build());
+    assertNull(new LivyClientBuilder(false).setURI(new URI("mismatch")).build());
   }
 
   @Test
   public void testFactoryError() throws Exception {
     try {
-      assertNull(new LivyClientBuilder(new URI("error")).build());
+      assertNull(new LivyClientBuilder(false).setURI(new URI("error")).build());
     } catch (IllegalArgumentException e) {
       assertNotNull(e.getCause());
       assertTrue(e.getCause() instanceof IllegalStateException);
     }
   }
 
+  @Test
+  public void testDefaultConfig() throws Exception {
+    TestClientFactory.Client client = (TestClientFactory.Client)
+      new LivyClientBuilder().build();
+    assertEquals("override", client.config.getProperty("spark.config"));
+  }
+
 }

+ 2 - 0
api/src/test/resources/livy-client.conf

@@ -0,0 +1,2 @@
+livy.uri=match
+spark.config=override

+ 1 - 0
api/src/test/resources/spark-defaults.conf

@@ -0,0 +1 @@
+spark.config=default

+ 1 - 1
client-http/src/test/scala/com/cloudera/livy/client/http/HttpClientSpec.scala

@@ -88,7 +88,7 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll {
       // WebServer does this internally instad of respecting "0.0.0.0", so try to use the same
       // address.
       val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}/"
-      client = new LivyClientBuilder(new URI(uri)).build()
+      client = new LivyClientBuilder(false).setURI(new URI(uri)).build()
     }
 
     withClient("should run and monitor asynchronous jobs") {

+ 44 - 56
client-local/src/main/java/com/cloudera/livy/client/local/LocalClient.java

@@ -29,13 +29,17 @@ import java.io.Writer;
 import java.net.URI;
 import java.net.URL;
 import java.nio.ByteBuffer;
+import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.nio.file.attribute.PosixFilePermission.*;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
@@ -67,7 +71,6 @@ public class LocalClient implements LivyClient {
 
   private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In milliseconds
 
-  private static final String OSX_TEST_OPTS = "SPARK_OSX_TEST_OPTS";
   private static final String SPARK_JARS_KEY = "spark.jars";
   private static final String SPARK_HOME_ENV = "SPARK_HOME";
   private static final String SPARK_HOME_KEY = "spark.home";
@@ -202,12 +205,8 @@ public class LocalClient implements LivyClient {
           args.add(secret);
 
           for (Map.Entry<String, String> e : conf) {
-            String key = e.getKey();
-            if (!key.startsWith("spark.")) {
-              key = LocalConf.SPARK_CONF_PREFIX + key;
-            }
             args.add("--conf");
-            args.add(String.format("%s=%s", key, e.getValue()));
+            args.add(String.format("%s=%s", e.getKey(), e.getValue()));
           }
           try {
             RemoteDriver.main(args.toArray(new String[args.size()]));
@@ -228,56 +227,14 @@ public class LocalClient implements LivyClient {
         sparkHome = System.getProperty(SPARK_HOME_KEY);
       }
 
-      String osxTestOpts = "";
-      if (Strings.nullToEmpty(System.getProperty("os.name")).toLowerCase().contains("mac")) {
-        osxTestOpts = Strings.nullToEmpty(System.getenv(OSX_TEST_OPTS));
-      }
-
-      String driverJavaOpts = Joiner.on(" ").skipNulls().join(
-          osxTestOpts, conf.get(DRIVER_OPTS_KEY));
-      String executorJavaOpts = Joiner.on(" ").skipNulls().join(
-          osxTestOpts, conf.get(EXECUTOR_OPTS_KEY));
-
-      // Create a file with all the job properties to be read by spark-submit. Change the
-      // file's permissions so that only the owner can read it. This avoid having the
-      // connection secret show up in the child process's command line.
-      File properties = File.createTempFile("spark-submit.", ".properties");
-      if (!properties.setReadable(false) || !properties.setReadable(true, true)) {
-        throw new IOException("Cannot change permissions of job properties file.");
-      }
-      properties.deleteOnExit();
+      conf.set(CLIENT_ID, clientId);
+      conf.set(CLIENT_SECRET, secret);
 
-      Properties allProps = new Properties();
-      // first load the defaults from spark-defaults.conf if available
-      try {
-        URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf");
-        if (sparkDefaultsUrl != null) {
-          LOG.info("Loading spark defaults: " + sparkDefaultsUrl);
-          allProps.load(new ByteArrayInputStream(Resources.toByteArray(sparkDefaultsUrl)));
-        }
-      } catch (Exception e) {
-        String msg = "Exception trying to load spark-defaults.conf: " + e;
-        throw new IOException(msg, e);
-      }
-      // then load the SparkClientImpl config
-      for (Map.Entry<String, String> e : conf) {
-        String key = e.getKey();
-        if (!key.startsWith("spark.")) {
-          key = LocalConf.SPARK_CONF_PREFIX + key;
-        }
-        allProps.put(key, e.getValue());
-      }
-      allProps.put(LocalConf.SPARK_CONF_PREFIX + CLIENT_ID.key(), clientId);
-      allProps.put(LocalConf.SPARK_CONF_PREFIX + CLIENT_SECRET.key(), secret);
-      allProps.put(DRIVER_OPTS_KEY, driverJavaOpts);
-      allProps.put(EXECUTOR_OPTS_KEY, executorJavaOpts);
-
-      Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8);
-      try {
-        allProps.store(writer, "Spark Context configuration");
-      } finally {
-        writer.close();
-      }
+      // Create two config files: one with Spark configuration, provided to spark-submit, and
+      // one with Livy configuration, provided to RemoteDriver. Make the files readable only
+      // by their owner, since they may contain private information.
+      File sparkConf = writeConfToFile(true);
+      File livyConf = writeConfToFile(false);
 
       // Define how to pass options to the child process. If launching in client (or local)
       // mode, the driver options need to be passed directly on the command line. Otherwise,
@@ -345,7 +302,7 @@ public class LocalClient implements LivyClient {
       }
 
       argv.add("--properties-file");
-      argv.add(properties.getAbsolutePath());
+      argv.add(sparkConf.getAbsolutePath());
       argv.add("--class");
       argv.add(RemoteDriver.class.getName());
 
@@ -381,6 +338,8 @@ public class LocalClient implements LivyClient {
       argv.add(serverAddress);
       argv.add("--remote-port");
       argv.add(serverPort);
+      argv.add("--config-file");
+      argv.add(livyConf.getAbsolutePath());
 
       LOG.info("Running client driver with argv: {}", Joiner.on(" ").join(argv));
       final Process child = new ProcessBuilder(argv.toArray(new String[argv.size()])).start();
@@ -423,6 +382,35 @@ public class LocalClient implements LivyClient {
     thread.start();
   }
 
+  /**
+   * Write either Livy or Spark configuration to a file readable only by the process's owner.
+   */
+  private File writeConfToFile(boolean spark) throws IOException {
+    Properties confView = new Properties();
+    for (Map.Entry<String, String> e : conf) {
+      String key = e.getKey();
+      boolean isSparkOpt = key.startsWith(LocalConf.SPARK_CONF_PREFIX);
+      if ((spark && isSparkOpt) || (!spark && !isSparkOpt)) {
+        confView.setProperty(key, e.getValue());
+      }
+    }
+
+    String prefix = spark ? "spark." : "livy.";
+    File file = File.createTempFile(prefix, ".properties");
+    Files.setPosixFilePermissions(file.toPath(), EnumSet.of(OWNER_READ, OWNER_WRITE));
+    file.deleteOnExit();
+
+    prefix = spark ? "Spark" : "Livy";
+    Writer writer = new OutputStreamWriter(new FileOutputStream(file), UTF_8);
+    try {
+      confView.store(writer, prefix + " Configuration");
+    } finally {
+      writer.close();
+    }
+
+    return file;
+  }
+
   private class ClientProtocol extends BaseProtocol {
 
     <T> JobHandleImpl<T> submit(Job<T> job) {

+ 2 - 6
client-local/src/main/java/com/cloudera/livy/client/local/LocalConf.java

@@ -36,13 +36,9 @@ import com.cloudera.livy.client.common.ClientConf;
 
 public class LocalConf extends ClientConf<LocalConf> {
 
-  private static final Logger LOG = LoggerFactory.getLogger(LocalConf.class);
+  public static final String SPARK_CONF_PREFIX = "spark.";
 
-  /**
-   * Prefix for Livy configurations embedded in SparkConf properties, since SparkConf
-   * disallows anything that does not start with "spark.".
-   */
-  public static final String SPARK_CONF_PREFIX = "spark.__livy__.";
+  private static final Logger LOG = LoggerFactory.getLogger(LocalConf.class);
 
   public static enum Entry implements ConfEntry {
     CLIENT_ID("client.auth.id", null),

+ 26 - 14
client-local/src/main/java/com/cloudera/livy/client/local/driver/RemoteDriver.java

@@ -18,13 +18,16 @@
 package com.cloudera.livy.client.local.driver;
 
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
 import java.io.IOException;
+import java.io.Reader;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ExecutorService;
-
-import scala.Tuple2;
+import static java.nio.charset.StandardCharsets.UTF_8;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -79,6 +82,7 @@ public class RemoteDriver {
     localTmpDir = Files.createTempDir();
 
     SparkConf conf = new SparkConf();
+    LocalConf livyConf = new LocalConf(null);
     String serverAddress = null;
     int serverPort = -1;
     for (int idx = 0; idx < args.length; idx += 2) {
@@ -88,12 +92,29 @@ public class RemoteDriver {
       } else if (key.equals("--remote-port")) {
         serverPort = Integer.parseInt(getArg(args, idx));
       } else if (key.equals("--client-id")) {
-        conf.set(LocalConf.SPARK_CONF_PREFIX + CLIENT_ID.key(), getArg(args, idx));
+        livyConf.set(CLIENT_ID, getArg(args, idx));
       } else if (key.equals("--secret")) {
-        conf.set(LocalConf.SPARK_CONF_PREFIX + CLIENT_SECRET.key(), getArg(args, idx));
+        livyConf.set(CLIENT_SECRET.key(), getArg(args, idx));
       } else if (key.equals("--conf")) {
         String[] val = getArg(args, idx).split("[=]", 2);
-        conf.set(val[0], val[1]);
+        if (val[0].startsWith(LocalConf.SPARK_CONF_PREFIX)) {
+          conf.set(val[0], val[1]);
+        } else {
+          livyConf.set(val[0], val[1]);
+        }
+      } else if (key.equals("--config-file")) {
+        String path = getArg(args, idx);
+        Properties p = new Properties();
+        Reader r = new InputStreamReader(new FileInputStream(path), UTF_8);
+        try {
+          p.load(r);
+        } finally {
+          r.close();
+        }
+
+        for (String k : p.stringPropertyNames()) {
+          livyConf.set(k, p.getProperty(k));
+        }
       } else {
         throw new IllegalArgumentException("Invalid command line: "
           + Joiner.on(" ").join(args));
@@ -104,15 +125,6 @@ public class RemoteDriver {
 
     LOG.info("Connecting to: {}:{}", serverAddress, serverPort);
 
-    LocalConf livyConf = new LocalConf(null);
-    for (Tuple2<String, String> e : conf.getAll()) {
-      if (e._1().startsWith(LocalConf.SPARK_CONF_PREFIX)) {
-        String key = e._1().substring(LocalConf.SPARK_CONF_PREFIX.length());
-        livyConf.set(key, e._2());
-        LOG.debug("Remote Driver config: {} = {}", key, e._2());
-      }
-    }
-
     String clientId = livyConf.get(CLIENT_ID);
     Preconditions.checkArgument(clientId != null, "No client ID provided.");
     String secret = livyConf.get(CLIENT_SECRET);

+ 1 - 1
client-local/src/test/java/com/cloudera/livy/client/local/TestSparkClient.java

@@ -364,7 +364,7 @@ public class TestSparkClient {
     LivyClient client = null;
     try {
       test.config(conf);
-      client = new LivyClientBuilder(new URI("local:spark"))
+      client = new LivyClientBuilder(false).setURI(new URI("local:spark"))
         .setAll(conf)
         .build();
       test.call(client);

+ 1 - 1
server/src/test/scala/com/cloudera/livy/server/client/ClientServletSpec.scala

@@ -57,7 +57,7 @@ class ClientServletSpec extends BaseSessionServletSpec[ClientSession] {
     it("should create client sessions") {
       val classpath = sys.props("java.class.path")
       val conf = new HashMap[String, String]
-      conf.put("master", "local")
+      conf.put("spark.master", "local")
       conf.put("livy.local.jars", "")
       conf.put("spark.driver.extraClassPath", classpath)
       conf.put("spark.executor.extraClassPath", classpath)

+ 3 - 6
spark/src/main/scala/com/cloudera/livy/spark/client/ClientSession.scala

@@ -27,8 +27,6 @@ import scala.concurrent.{ExecutionContext, Future}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-import org.apache.spark.SparkConf
-
 import com.cloudera.livy.{LivyClientBuilder, Logging}
 import com.cloudera.livy.client.common.HttpMessages._
 import com.cloudera.livy.client.local.LocalClient
@@ -44,12 +42,11 @@ class ClientSession(val sessionId: Int, createRequest: CreateClientRequest)
 
   private val client = {
     info("Creating LivyClient for sessionId: " + sessionId)
-    val builder = new LivyClientBuilder(new URI("local:spark"))
-    new SparkConf(true).getAll.foreach { case (k, v) => builder.setConf(k, v) }
-    builder
+    new LivyClientBuilder()
+      .setConf("spark.master", "yarn-cluster")
       .setAll(createRequest.conf)
+      .setURI(new URI("local:spark"))
       .setConf("livy.client.sessionId", sessionId.toString)
-      .setIfMissing("spark.master", "yarn-cluster")
       .build()
   }.asInstanceOf[LocalClient]