Browse Source

[LIVY-563] Propagate RSC configuration when creating sessions.

Even though not all RSC configs apply to the remote driver, a few do,
so propagate all of them when starting a new session.

Includes new unit test.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #168 from vanzin/LIVY-563.
Marcelo Vanzin 6 years ago
parent
commit
5abc043708

+ 1 - 1
rsc/src/main/java/org/apache/livy/rsc/RSCConf.java

@@ -35,7 +35,7 @@ public class RSCConf extends ClientConf<RSCConf> {
   public static final String SPARK_CONF_PREFIX = "spark.";
   public static final String LIVY_SPARK_PREFIX = SPARK_CONF_PREFIX + "__livy__.";
 
-  private static final String RSC_CONF_PREFIX = "livy.rsc.";
+  public static final String RSC_CONF_PREFIX = "livy.rsc.";
 
   public static enum Entry implements ConfEntry {
     CLIENT_ID("client.auth.id", null),

+ 9 - 0
server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala

@@ -346,6 +346,15 @@ object InteractiveSession extends Logging {
       mergeHiveSiteAndHiveDeps(sparkMajorVersion)
     }
 
+    // Pick all the RSC-specific configs that have not been explicitly set otherwise, and
+    // put them in the resulting properties, so that the remote driver can use them.
+    livyConf.iterator().asScala.foreach { e =>
+      val (key, value) = (e.getKey(), e.getValue())
+      if (key.startsWith(RSCConf.RSC_CONF_PREFIX) && !builderProperties.contains(key)) {
+        builderProperties(key) = value
+      }
+    }
+
     builderProperties
   }
 }

+ 13 - 1
server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala

@@ -130,7 +130,6 @@ class InteractiveSessionSpec extends FunSpec
         "dummy.jar"))
     }
 
-
     it("should set rsc jars through livy conf") {
       val rscJars = Set(
         "dummy.jar",
@@ -177,6 +176,19 @@ class InteractiveSessionSpec extends FunSpec
       session.state should (be(SessionState.Starting) or be(SessionState.Idle))
     }
 
+    it("should propagate RSC configuration properties") {
+      val livyConf = new LivyConf(false)
+        .set(LivyConf.REPL_JARS, "dummy.jar")
+        .set(RSCConf.Entry.SASL_QOP.key(), "foo")
+        .set(RSCConf.Entry.RPC_CHANNEL_LOG_LEVEL.key(), "TRACE")
+        .set(LivyConf.LIVY_SPARK_VERSION, sys.env("LIVY_SPARK_VERSION"))
+        .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10")
+
+      val properties = InteractiveSession.prepareBuilderProp(Map.empty, Spark, livyConf)
+      assert(properties(RSCConf.Entry.SASL_QOP.key()) === "foo")
+      assert(properties(RSCConf.Entry.RPC_CHANNEL_LOG_LEVEL.key()) === "TRACE")
+    }
+
     withSession("should execute `1 + 2` == 3") { session =>
       val pyResult = executeStatement("1 + 2", Some("pyspark"))
       pyResult should equal (Extraction.decompose(Map(