Sfoglia il codice sorgente

LIVY-108. Remove Guava dependency from RSC.

This avoids two sets of issues: the first is the need to localize the
Guava jar every time an app is started, in case the jars haven't been
cached in HDFS. The second is that we can't really know which version
of Guava will be used; most of the time it will be Hadoop's, but user
configuration and dependencies might change that, and since Guava is
a very common dependency, let's not make user's lives more difficult.

Guava is not explicitly excluded from dependencies, because it's needed
to run the tests (and in some cases is even provided by different
transitive dependencies, such as hive-exec). The patch was tested with
the exclusions in place, to make sure the module compiles without Guava.

Closes #110
Marcelo Vanzin 9 anni fa
parent
commit
dfdcbc2cc0

+ 0 - 7
pom.xml

@@ -45,7 +45,6 @@
     <hadoop.version>2.6.0-cdh5.5.0</hadoop.version>
     <spark.version>1.5.0-cdh5.5.0</spark.version>
     <commons-codec.version>1.9</commons-codec.version>
-    <guava.version>14.0.1</guava.version>
     <httpclient.version>4.5</httpclient.version>
     <httpcore.version>4.4.1</httpcore.version>
     <jackson.version>2.4.4</jackson.version>
@@ -242,12 +241,6 @@
         <version>${jackson.version}</version>
       </dependency>
 
-      <dependency>
-        <groupId>com.google.guava</groupId>
-        <artifactId>guava</artifactId>
-        <version>${guava.version}</version>
-      </dependency>
-
       <dependency>
         <groupId>commons-codec</groupId>
         <artifactId>commons-codec</artifactId>

+ 0 - 4
rsc/pom.xml

@@ -50,10 +50,6 @@
       <groupId>com.esotericsoftware.kryo</groupId>
       <artifactId>kryo</artifactId>
     </dependency>
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-all</artifactId>

+ 2 - 4
rsc/src/main/java/com/cloudera/livy/rsc/BaseProtocol.java

@@ -17,8 +17,6 @@
 
 package com.cloudera.livy.rsc;
 
-import com.google.common.base.Throwables;
-
 import com.cloudera.livy.Job;
 import com.cloudera.livy.rsc.rpc.RpcDispatcher;
 
@@ -50,7 +48,7 @@ public abstract class BaseProtocol extends RpcDispatcher {
       if (cause == null) {
         this.cause = "";
       } else {
-        this.cause = Throwables.getStackTraceAsString(cause);
+        this.cause = Utils.stackTraceAsString(cause);
       }
     }
 
@@ -117,7 +115,7 @@ public abstract class BaseProtocol extends RpcDispatcher {
     public JobResult(String id, T result, Throwable error) {
       this.id = id;
       this.result = result;
-      this.error = error != null ? Throwables.getStackTraceAsString(error) : null;
+      this.error = error != null ? Utils.stackTraceAsString(error) : null;
     }
 
     public JobResult() {

+ 10 - 12
rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java

@@ -27,6 +27,7 @@ import java.io.OutputStreamWriter;
 import java.io.Writer;
 import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
@@ -37,9 +38,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.nio.file.attribute.PosixFilePermission.*;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import io.netty.channel.ChannelHandlerContext;
 import org.apache.spark.launcher.SparkLauncher;
 import org.slf4j.Logger;
@@ -96,17 +94,17 @@ class ContextLauncher implements ContextInfo {
       synchronized (handler) {
         while (handler.driverAddress == null && now < deadline) {
           handler.wait(Math.min(step, deadline - now));
-          Preconditions.checkState(handler.driverAddress != null || !child.isFailed(),
+          Utils.checkState(handler.driverAddress != null || !child.isFailed(),
             "Child has exited before address information was received.");
           now = System.nanoTime();
         }
       }
-      Preconditions.checkState(handler.driverAddress != null,
+      Utils.checkState(handler.driverAddress != null,
         "Timed out waiting for driver connection information.");
       driverAddress = handler.driverAddress;
     } catch (Exception e) {
       factory.getServer().unregisterClient(clientId);
-      throw Throwables.propagate(e);
+      throw Utils.propagate(e);
     } finally {
       handler.dispose();
     }
@@ -159,19 +157,19 @@ class ContextLauncher implements ContextInfo {
     String livyJars = conf.get(LIVY_JARS);
     if (livyJars == null) {
       String livyHome = System.getenv("LIVY_HOME");
-      Preconditions.checkState(livyHome != null,
+      Utils.checkState(livyHome != null,
         "Need one of LIVY_HOME or %s set.", LIVY_JARS.key());
       File rscJars = new File(livyHome, "rsc-jars");
       if (!rscJars.isDirectory()) {
         rscJars = new File(livyHome, "rsc/target/jars");
       }
-      Preconditions.checkState(rscJars.isDirectory(),
+      Utils.checkState(rscJars.isDirectory(),
         "Cannot find 'client-jars' directory under LIVY_HOME.");
       List<String> jars = new ArrayList<>();
       for (File f : rscJars.listFiles()) {
          jars.add(f.getAbsolutePath());
       }
-      livyJars = Joiner.on(",").join(jars);
+      livyJars = Utils.join(jars, ",");
     }
     merge(conf, SPARK_JARS_KEY, livyJars, ",");
 
@@ -207,7 +205,7 @@ class ContextLauncher implements ContextInfo {
           try {
             RSCDriverBootstrapper.main(new String[] { confFile.getAbsolutePath() });
           } catch (Exception e) {
-            throw Throwables.propagate(e);
+            throw Utils.propagate(e);
           }
         }
       };
@@ -230,7 +228,7 @@ class ContextLauncher implements ContextInfo {
       // mode, the driver options need to be passed directly on the command line. Otherwise,
       // SparkSubmit will take care of that for us.
       String master = conf.get("spark.master");
-      Preconditions.checkArgument(master != null, "spark.master is not defined.");
+      Utils.checkArgument(master != null, "spark.master is not defined.");
       launcher.setMaster(master);
       launcher.setPropertiesFile(confFile.getAbsolutePath());
       launcher.setMainClass(RSCDriverBootstrapper.class.getName());
@@ -243,7 +241,7 @@ class ContextLauncher implements ContextInfo {
   }
 
   private static void merge(RSCConf conf, String key, String livyConf, String sep) {
-    String confValue = Joiner.on(sep).skipNulls().join(livyConf, conf.get(key));
+    String confValue = Utils.join(Arrays.asList(livyConf, conf.get(key)), sep);
     conf.set(key, confValue);
   }
 

+ 0 - 1
rsc/src/main/java/com/cloudera/livy/rsc/JobHandleImpl.java

@@ -23,7 +23,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import com.google.common.collect.Lists;
 import io.netty.util.concurrent.Promise;
 
 import com.cloudera.livy.JobHandle;

+ 5 - 11
rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java

@@ -23,12 +23,9 @@ import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -42,6 +39,7 @@ import com.cloudera.livy.JobContext;
 import com.cloudera.livy.JobHandle;
 import com.cloudera.livy.LivyClient;
 import com.cloudera.livy.client.common.BufferUtils;
+import com.cloudera.livy.rsc.Utils;
 import com.cloudera.livy.rsc.driver.AddJarJob;
 import com.cloudera.livy.rsc.rpc.Rpc;
 import static com.cloudera.livy.rsc.RSCConf.Entry.*;
@@ -62,14 +60,11 @@ public class RSCClient implements LivyClient {
     this.ctx = ctx;
     this.factory = factory;
     this.conf = conf;
-    this.jobs = Maps.newConcurrentMap();
+    this.jobs = new ConcurrentHashMap<>();
     this.protocol = new ClientProtocol();
     this.eventLoopGroup = new NioEventLoopGroup(
         conf.getInt(RPC_MAX_THREADS),
-        new ThreadFactoryBuilder()
-            .setNameFormat("Client-RPC-Handler-" + ctx.getClientId() + "-%d")
-            .setDaemon(true)
-            .build());
+        Utils.newDaemonThreadFactory("Client-RPC-Handler-" + ctx.getClientId() + "-%d"));
 
     try {
       this.driverRpc = Rpc.createClient(conf,
@@ -81,7 +76,7 @@ public class RSCClient implements LivyClient {
         protocol).get();
     } catch (Throwable e) {
       ctx.dispose(true);
-      throw Throwables.propagate(e);
+      throw Utils.propagate(e);
     }
 
     driverRpc.addListener(new Rpc.Listener() {
@@ -161,7 +156,6 @@ public class RSCClient implements LivyClient {
     protocol.cancel(jobId);
   }
 
-  @VisibleForTesting
   ContextInfo getContextInfo() {
     return ctx;
   }

+ 3 - 7
rsc/src/main/java/com/cloudera/livy/rsc/RSCClientFactory.java

@@ -22,10 +22,6 @@ import java.net.URI;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import org.apache.spark.SparkException;
-
 import com.cloudera.livy.LivyClient;
 import com.cloudera.livy.LivyClientFactory;
 import com.cloudera.livy.rsc.rpc.RpcServer;
@@ -70,7 +66,7 @@ public final class RSCClientFactory implements LivyClientFactory {
       if (needsServer) {
         unref();
       }
-      throw Throwables.propagate(e);
+      throw Utils.propagate(e);
     }
   }
 
@@ -84,12 +80,12 @@ public final class RSCClientFactory implements LivyClientFactory {
       return;
     }
 
-    Preconditions.checkState(server == null);
+    Utils.checkState(server == null, "Server already running but ref count is 0.");
     if (server == null) {
       try {
         server = new RpcServer(config);
       } catch (InterruptedException ie) {
-        throw Throwables.propagate(ie);
+        throw Utils.propagate(ie);
       }
     }
 

+ 4 - 10
rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java

@@ -27,7 +27,6 @@ import java.util.Map;
 import java.util.Properties;
 import javax.security.sasl.Sasl;
 
-import com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -90,16 +89,11 @@ public class RSCConf extends ClientConf<RSCConf> {
     Map<String, String> opts = new HashMap<>();
 
     // TODO: add more options?
-    Map<String, Entry> saslOpts = ImmutableMap.<String, Entry>builder()
-      .put(Sasl.QOP, Entry.SASL_QOP)
-      .build();
-
-    for (Map.Entry<String, Entry> e : saslOpts.entrySet()) {
-      String value = get(e.getValue());
-      if (value != null) {
-        opts.put(e.getKey(), value);
-      }
+    String qop = get(Entry.SASL_QOP);
+    if (qop != null) {
+      opts.put(Sasl.QOP, qop);
     }
+
     return opts;
   }
 

+ 105 - 0
rsc/src/main/java/com/cloudera/livy/rsc/Utils.java

@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.livy.rsc;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A few simple utility functions used by the code, mostly to avoid a direct dependency
+ * on Guava.
+ */
+public class Utils {
+
+  public static void checkArgument(boolean condition) {
+    if (!condition) {
+      throw new IllegalArgumentException();
+    }
+  }
+
+  public static void checkArgument(boolean condition, String msg, Object... args) {
+    if (!condition) {
+      throw new IllegalArgumentException(String.format(msg, args));
+    }
+  }
+
+  public static void checkState(boolean condition, String msg, Object... args) {
+    if (!condition) {
+      throw new IllegalStateException(String.format(msg, args));
+    }
+  }
+
+  public static void checkNotNull(Object o) {
+    if (o == null) {
+      throw new NullPointerException();
+    }
+  }
+
+  public static RuntimeException propagate(Throwable t) {
+    if (t instanceof RuntimeException) {
+      throw (RuntimeException) t;
+    } else {
+      throw new RuntimeException(t);
+    }
+  }
+
+  public static ThreadFactory newDaemonThreadFactory(final String nameFormat) {
+    return new ThreadFactory() {
+
+      private final AtomicInteger threadId = new AtomicInteger();
+
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = new Thread(r);
+        t.setName(String.format(nameFormat, threadId.incrementAndGet()));
+        t.setDaemon(true);
+        return t;
+      }
+
+    };
+  }
+
+  public static String join(Iterable<String> strs, String sep) {
+    StringBuilder sb = new StringBuilder();
+    for (String s : strs) {
+      if (s != null) {
+        sb.append(s).append(sep);
+      }
+    }
+    if (sb.length() > 0) {
+      sb.setLength(sb.length() - sep.length());
+    }
+    return sb.toString();
+  }
+
+  public static String stackTraceAsString(Throwable t) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(t.getClass().getName()).append(": ").append(t.getMessage());
+    for (StackTraceElement e : t.getStackTrace()) {
+      sb.append(e.toString());
+      sb.append("\n");
+    }
+    if (sb.length() > 0) {
+      sb.setLength(sb.length() - 1);
+    }
+    return sb.toString();
+  }
+
+  private Utils() { }
+
+}

+ 2 - 4
rsc/src/main/java/com/cloudera/livy/rsc/driver/BypassJobWrapper.java

@@ -20,11 +20,9 @@ package com.cloudera.livy.rsc.driver;
 import java.util.ArrayList;
 import java.util.List;
 
-import com.google.common.base.Throwables;
-import org.apache.spark.api.java.JavaFutureAction;
-
 import com.cloudera.livy.JobHandle;
 import com.cloudera.livy.rsc.BypassJobStatus;
+import com.cloudera.livy.rsc.Utils;
 
 class BypassJobWrapper extends JobWrapper<byte[]> {
 
@@ -70,7 +68,7 @@ class BypassJobWrapper extends JobWrapper<byte[]> {
   }
 
   synchronized BypassJobStatus getStatus() {
-    String stackTrace = error != null ? Throwables.getStackTraceAsString(error) : null;
+    String stackTrace = error != null ? Utils.stackTraceAsString(error) : null;
     return new BypassJobStatus(state, result, stackTrace);
   }
 

+ 0 - 81
rsc/src/main/java/com/cloudera/livy/rsc/driver/DriverSparkListener.java

@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.cloudera.livy.rsc.driver;
-
-import java.util.Iterator;
-import java.util.Map;
-
-import com.google.common.collect.Maps;
-import org.apache.spark.JavaSparkListener;
-import org.apache.spark.api.java.JavaFutureAction;
-import org.apache.spark.scheduler.SparkListenerJobEnd;
-import org.apache.spark.scheduler.SparkListenerJobStart;
-import org.apache.spark.scheduler.SparkListenerTaskEnd;
-
-class DriverSparkListener extends JavaSparkListener {
-
-  private final Map<Integer, Integer> stageToJobId = Maps.newHashMap();
-  private final RSCDriver driver;
-
-  DriverSparkListener(RSCDriver driver) {
-    this.driver = driver;
-  }
-
-  @Override
-  public void onJobStart(SparkListenerJobStart jobStart) {
-    synchronized (stageToJobId) {
-      for (int i = 0; i < jobStart.stageIds().length(); i++) {
-        stageToJobId.put((Integer) jobStart.stageIds().apply(i), jobStart.jobId());
-      }
-    }
-  }
-
-  @Override
-  public void onJobEnd(SparkListenerJobEnd jobEnd) {
-    synchronized (stageToJobId) {
-      for (Iterator<Map.Entry<Integer, Integer>> it = stageToJobId.entrySet().iterator();
-          it.hasNext();) {
-        Map.Entry<Integer, Integer> e = it.next();
-        if (e.getValue() == jobEnd.jobId()) {
-          it.remove();
-        }
-      }
-    }
-
-    JobWrapper<?> job = getWrapper(jobEnd.jobId());
-    if (job != null) {
-      job.jobDone();
-    }
-  }
-
-  /**
-   * Returns the client job ID for the given Spark job ID.
-   *
-   * This will only work for jobs monitored via JobContext#monitor(). Other jobs won't be
-   * matched, and this method will return `None`.
-   */
-  private JobWrapper<?> getWrapper(Integer sparkJobId) {
-    for (JobWrapper<?> job : driver.activeJobs.values()) {
-      if (job.hasSparkJobId(sparkJobId)) {
-        return job;
-      }
-    }
-    return null;
-  }
-
-}

+ 4 - 4
rsc/src/main/java/com/cloudera/livy/rsc/driver/JobContextImpl.java

@@ -27,9 +27,9 @@ import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import static com.google.common.base.Preconditions.checkState;
 
 import com.cloudera.livy.JobContext;
+import com.cloudera.livy.rsc.Utils;
 
 class JobContextImpl implements JobContext {
 
@@ -77,19 +77,19 @@ class JobContextImpl implements JobContext {
 
   @Override
   public synchronized JavaStreamingContext streamingctx(){
-    checkState(streamingctx != null, "method createStreamingContext must be called first.");
+    Utils.checkState(streamingctx != null, "method createStreamingContext must be called first.");
     return streamingctx;
   }
 
   @Override
   public synchronized void createStreamingContext(long batchDuration) {
-    checkState(streamingctx == null, "Streaming context is not null.");
+    Utils.checkState(streamingctx == null, "Streaming context is not null.");
     streamingctx = new JavaStreamingContext(sc, new Duration(batchDuration));
   }
 
   @Override
   public synchronized void stopStreamingCtx() {
-    checkState(streamingctx != null, "Streaming Context is null");
+    Utils.checkState(streamingctx != null, "Streaming Context is null");
     streamingctx.stop();
     streamingctx = null;
   }

+ 2 - 30
rsc/src/main/java/com/cloudera/livy/rsc/driver/JobWrapper.java

@@ -17,6 +17,7 @@
 
 package com.cloudera.livy.rsc.driver;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -24,7 +25,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.collect.Lists;
 import org.apache.spark.api.java.JavaFutureAction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +38,6 @@ public class JobWrapper<T> implements Callable<Void> {
   public final String jobId;
 
   private final RSCDriver driver;
-  private final List<JavaFutureAction<?>> sparkJobs;
   private final Job<T> job;
   private final AtomicInteger completed;
 
@@ -48,7 +47,6 @@ public class JobWrapper<T> implements Callable<Void> {
     this.driver = driver;
     this.jobId = jobId;
     this.job = job;
-    this.sparkJobs = Lists.newArrayList();
     this.completed = new AtomicInteger();
   }
 
@@ -57,19 +55,6 @@ public class JobWrapper<T> implements Callable<Void> {
     try {
       jobStarted();
       T result = job.call(driver.jobContext());
-      synchronized (completed) {
-        while (completed.get() < sparkJobs.size()) {
-          LOG.debug("Client job {} finished, {} of {} Spark jobs finished.",
-              jobId, completed.get(), sparkJobs.size());
-          completed.wait();
-        }
-      }
-
-      // make sure job has really succeeded
-      // at this point, future.get shall not block us
-      for (JavaFutureAction<?> future : sparkJobs) {
-        future.get();
-      }
       finished(result, null);
     } catch (Throwable t) {
       // Catch throwables in a best-effort to report job status back to the client. It's
@@ -96,20 +81,7 @@ public class JobWrapper<T> implements Callable<Void> {
   }
 
   boolean cancel() {
-    boolean cancelled = false;
-    for (JavaFutureAction<?> action : sparkJobs) {
-      cancelled |= action.cancel(true);
-    }
-    return cancelled | (future != null && future.cancel(true));
-  }
-
-  boolean hasSparkJobId(Integer sparkId) {
-    for (JavaFutureAction<?> future : sparkJobs) {
-      if (future.jobIds().contains(sparkId)) {
-        return true;
-      }
-    }
-    return false;
+    return future.cancel(true);
   }
 
   protected void finished(T result, Throwable error) {

+ 5 - 6
rsc/src/main/java/com/cloudera/livy/rsc/driver/RSCDriver.java

@@ -35,7 +35,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Preconditions;
 import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
 import org.apache.commons.io.FileUtils;
@@ -49,6 +48,7 @@ import com.cloudera.livy.client.common.Serializer;
 import com.cloudera.livy.rsc.BaseProtocol;
 import com.cloudera.livy.rsc.BypassJobStatus;
 import com.cloudera.livy.rsc.RSCConf;
+import com.cloudera.livy.rsc.Utils;
 import com.cloudera.livy.rsc.rpc.Rpc;
 import com.cloudera.livy.rsc.rpc.RpcDispatcher;
 import com.cloudera.livy.rsc.rpc.RpcServer;
@@ -116,14 +116,14 @@ public class RSCDriver extends BaseProtocol {
 
   private void initializeServer() throws Exception {
     String clientId = livyConf.get(CLIENT_ID);
-    Preconditions.checkArgument(clientId != null, "No client ID provided.");
+    Utils.checkArgument(clientId != null, "No client ID provided.");
     String secret = livyConf.get(CLIENT_SECRET);
-    Preconditions.checkArgument(secret != null, "No secret provided.");
+    Utils.checkArgument(secret != null, "No secret provided.");
 
     String launcherAddress = livyConf.get(LAUNCHER_ADDRESS);
-    Preconditions.checkArgument(launcherAddress != null, "Missing launcher address.");
+    Utils.checkArgument(launcherAddress != null, "Missing launcher address.");
     int launcherPort = livyConf.getInt(LAUNCHER_PORT);
-    Preconditions.checkArgument(launcherPort > 0, "Missing launcher port.");
+    Utils.checkArgument(launcherPort > 0, "Missing launcher port.");
 
     LOG.info("Connecting to: {}:{}", launcherAddress, launcherPort);
 
@@ -175,7 +175,6 @@ public class RSCDriver extends BaseProtocol {
     JavaSparkContext sc = new JavaSparkContext(conf);
     LOG.info("Spark context finished initialization in {}ms",
       TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t1));
-    sc.sc().addSparkListener(new DriverSparkListener(this));
     return sc;
   }
 

+ 3 - 3
rsc/src/main/java/com/cloudera/livy/rsc/rpc/KryoMessageCodec.java

@@ -28,7 +28,6 @@ import com.esotericsoftware.kryo.io.ByteBufferInputStream;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy;
-import com.google.common.base.Preconditions;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ByteToMessageCodec;
@@ -36,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.cloudera.livy.client.common.Serializer;
+import com.cloudera.livy.rsc.Utils;
 
 /**
  * Codec that serializes / deserializes objects using Kryo. Objects are encoded with a 4-byte
@@ -105,8 +105,8 @@ class KryoMessageCodec extends ByteToMessageCodec<Object> {
   }
 
   private void checkSize(int msgSize) {
-    Preconditions.checkArgument(msgSize > 0, "Message size (%s bytes) must be positive.", msgSize);
-    Preconditions.checkArgument(maxMessageSize <= 0 || msgSize <= maxMessageSize,
+    Utils.checkArgument(msgSize > 0, "Message size (%s bytes) must be positive.", msgSize);
+    Utils.checkArgument(maxMessageSize <= 0 || msgSize <= maxMessageSize,
         "Message (%s bytes) exceeds maximum allowed size (%s bytes).", msgSize, maxMessageSize);
   }
 

+ 10 - 13
rsc/src/main/java/com/cloudera/livy/rsc/rpc/Rpc.java

@@ -21,6 +21,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -36,10 +37,6 @@ import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -63,6 +60,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.cloudera.livy.rsc.RSCConf;
+import com.cloudera.livy.rsc.Utils;
 import static com.cloudera.livy.rsc.RSCConf.Entry.*;
 
 /**
@@ -203,7 +201,6 @@ public class Rpc implements Closeable {
     return new Rpc(config, client, egroup);
   }
 
-  @VisibleForTesting
   static Rpc createEmbedded(RpcDispatcher dispatcher) {
     EmbeddedChannel c = new EmbeddedChannel(
         new LoggingHandler(Rpc.class),
@@ -224,14 +221,14 @@ public class Rpc implements Closeable {
   private volatile RpcDispatcher dispatcher;
 
   private Rpc(RSCConf config, Channel channel, EventExecutorGroup egroup) {
-    Preconditions.checkArgument(channel != null);
-    Preconditions.checkArgument(egroup != null);
+    Utils.checkArgument(channel != null);
+    Utils.checkArgument(egroup != null);
     this.config = config;
     this.channel = channel;
     this.channelLock = new Object();
     this.dispatcher = null;
     this.egroup = egroup;
-    this.listeners = Lists.newLinkedList();
+    this.listeners = new LinkedList<>();
     this.rpcClosed = new AtomicBoolean();
     this.rpcId = new AtomicLong();
 
@@ -267,8 +264,8 @@ public class Rpc implements Closeable {
    * @return A future used to monitor the operation.
    */
   public <T> Future<T> call(Object msg, Class<T> retType) {
-    Preconditions.checkArgument(msg != null);
-    Preconditions.checkState(channel.isOpen(), "RPC channel is closed.");
+    Utils.checkArgument(msg != null);
+    Utils.checkState(channel.isOpen(), "RPC channel is closed.");
     try {
       final long id = rpcId.getAndIncrement();
       final Promise<T> promise = createPromise();
@@ -291,7 +288,7 @@ public class Rpc implements Closeable {
       }
       return promise;
     } catch (Exception e) {
-      throw Throwables.propagate(e);
+      throw Utils.propagate(e);
     }
   }
 
@@ -307,8 +304,8 @@ public class Rpc implements Closeable {
   }
 
   void setDispatcher(RpcDispatcher dispatcher) {
-    Preconditions.checkNotNull(dispatcher);
-    Preconditions.checkState(this.dispatcher == null);
+    Utils.checkNotNull(dispatcher);
+    Utils.checkState(this.dispatcher == null, "Dispatcher already set.");
     this.dispatcher = dispatcher;
     channel.pipeline().addLast("dispatcher", dispatcher);
   }

+ 7 - 6
rsc/src/main/java/com/cloudera/livy/rsc/rpc/RpcDispatcher.java

@@ -22,16 +22,17 @@ import java.lang.reflect.Method;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import com.google.common.base.Throwables;
-import com.google.common.collect.Maps;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.util.concurrent.Promise;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.cloudera.livy.rsc.Utils;
+
 /**
  * An implementation of ChannelInboundHandler that dispatches incoming messages to an instance
  * method based on the method signature.
@@ -48,7 +49,7 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object>
 
   private static final Logger LOG = LoggerFactory.getLogger(RpcDispatcher.class);
 
-  private final Map<Class<?>, Method> handlers = Maps.newConcurrentMap();
+  private final Map<Class<?>, Method> handlers = new ConcurrentHashMap<>();
   private final Collection<OutstandingRpc> rpcs = new ConcurrentLinkedQueue<OutstandingRpc>();
 
   private volatile Rpc.MessageHeader lastHeader;
@@ -117,7 +118,7 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object>
         } catch (NoSuchMethodException e2) {
           LOG.warn(String.format("[%s] Failed to find handler for msg '%s'.", name(),
             msg.getClass().getName()));
-          writeMessage(ctx, Rpc.MessageType.ERROR, Throwables.getStackTraceAsString(e.getCause()));
+          writeMessage(ctx, Rpc.MessageType.ERROR, Utils.stackTraceAsString(e.getCause()));
           return;
         }
       }
@@ -133,7 +134,7 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object>
       writeMessage(ctx, Rpc.MessageType.REPLY, payload);
     } catch (InvocationTargetException ite) {
       LOG.debug(String.format("[%s] Error in RPC handler.", name()), ite.getCause());
-      writeMessage(ctx, Rpc.MessageType.ERROR, Throwables.getStackTraceAsString(ite.getCause()));
+      writeMessage(ctx, Rpc.MessageType.ERROR, Utils.stackTraceAsString(ite.getCause()));
     }
   }
 
@@ -174,7 +175,7 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object>
       // There's an RPC waiting for a reply. Exception was most probably caught while processing
       // the RPC, so send an error.
       ctx.channel().write(new Rpc.MessageHeader(lastHeader.id, Rpc.MessageType.ERROR));
-      ctx.channel().writeAndFlush(Throwables.getStackTraceAsString(cause));
+      ctx.channel().writeAndFlush(Utils.stackTraceAsString(cause));
       lastHeader = null;
     }
 

+ 7 - 12
rsc/src/main/java/com/cloudera/livy/rsc/rpc/RpcServer.java

@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
 import java.security.SecureRandom;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -36,10 +37,6 @@ import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
@@ -56,6 +53,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.cloudera.livy.rsc.RSCConf;
+import com.cloudera.livy.rsc.Utils;
 import static com.cloudera.livy.rsc.RSCConf.Entry.*;
 
 /**
@@ -79,10 +77,7 @@ public class RpcServer implements Closeable {
     this.config = lconf;
     this.group = new NioEventLoopGroup(
         this.config.getInt(RPC_MAX_THREADS),
-        new ThreadFactoryBuilder()
-            .setNameFormat("RPC-Handler-%d")
-            .setDaemon(true)
-            .build());
+        Utils.newDaemonThreadFactory("RPC-Handler-%d"));
     this.channel = new ServerBootstrap()
       .group(group)
       .channel(NioServerSocketChannel.class)
@@ -112,7 +107,7 @@ public class RpcServer implements Closeable {
       .sync()
       .channel();
     this.port = ((InetSocketAddress) channel.localAddress()).getPort();
-    this.pendingClients = Maps.newConcurrentMap();
+    this.pendingClients = new ConcurrentHashMap<>();
 
     String address = config.get(RPC_SERVER_ADDRESS);
     if (address == null) {
@@ -228,11 +223,11 @@ public class RpcServer implements Closeable {
     @Override
     protected Rpc.SaslMessage update(Rpc.SaslMessage challenge) throws IOException {
       if (clientId == null) {
-        Preconditions.checkArgument(challenge.clientId != null,
+        Utils.checkArgument(challenge.clientId != null,
           "Missing client ID in SASL handshake.");
         clientId = challenge.clientId;
         client = pendingClients.get(clientId);
-        Preconditions.checkArgument(client != null,
+        Utils.checkArgument(client != null,
           "Unexpected client ID '%s' in SASL handshake.", clientId);
       }
 
@@ -280,7 +275,7 @@ public class RpcServer implements Closeable {
 
     @Override
     public void handle(Callback[] callbacks) {
-      Preconditions.checkState(client != null, "Handshake not initialized yet.");
+      Utils.checkState(client != null, "Handshake not initialized yet.");
       for (Callback cb : callbacks) {
         if (cb instanceof NameCallback) {
           ((NameCallback)cb).setName(clientId);

+ 14 - 5
rsc/src/test/java/com/cloudera/livy/rsc/TestSparkClient.java

@@ -17,9 +17,11 @@
 
 package com.cloudera.livy.rsc;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
 import java.net.URI;
@@ -32,9 +34,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
 
-import com.google.common.base.Objects;
-import com.google.common.base.Strings;
-import com.google.common.io.ByteStreams;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.spark.SparkFiles;
 import org.apache.spark.api.java.JavaFutureAction;
@@ -404,6 +403,16 @@ public class TestSparkClient {
     }
   }
 
+  private static byte[] toByteArray(InputStream in) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    byte[] buf = new byte[1024];
+    int read;
+    while ((read = in.read(buf)) >= 0) {
+      out.write(buf, 0, read);
+    }
+    return out.toByteArray();
+  }
+
   private static class HiveJob implements Job<ArrayList<String>> {
 
     @Override
@@ -540,7 +549,7 @@ public class TestSparkClient {
     public String call(Integer i) throws Exception {
       ClassLoader ccl = Thread.currentThread().getContextClassLoader();
       InputStream in = ccl.getResourceAsStream("test.resource");
-      byte[] bytes = ByteStreams.toByteArray(in);
+      byte[] bytes = toByteArray(in);
       in.close();
       return new String(bytes, 0, bytes.length, "UTF-8");
     }
@@ -563,7 +572,7 @@ public class TestSparkClient {
     @Override
     public String call(Integer i) throws Exception {
       InputStream in = new FileInputStream(SparkFiles.get(fileName));
-      byte[] bytes = ByteStreams.toByteArray(in);
+      byte[] bytes = toByteArray(in);
       in.close();
       return new String(bytes, 0, bytes.length, "UTF-8");
     }

+ 6 - 6
rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestKryoMessageCodec.java

@@ -18,10 +18,10 @@
 package com.cloudera.livy.rsc.rpc;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.embedded.EmbeddedChannel;
@@ -53,7 +53,7 @@ public class TestKryoMessageCodec {
       indices[i] = buf.writerIndex();
     }
 
-    List<Object> objects = Lists.newArrayList();
+    List<Object> objects = new ArrayList<>();
 
     // Don't read enough data for the first message to be decoded.
     codec.decode(null, buf.slice(0, indices[0] - 1), objects);
@@ -84,7 +84,7 @@ public class TestKryoMessageCodec {
     ByteBuf buf = newBuffer();
     codec.encode(null, new TestMessage(), buf);
 
-    List<Object> out = Lists.newArrayList();
+    List<Object> out = new ArrayList<>();
     codec.decode(null, buf, out);
 
     assertEquals(1, out.size());
@@ -109,7 +109,7 @@ public class TestKryoMessageCodec {
     unlimited.encode(null, new TestMessage(new byte[1025]), buf);
 
     try {
-      List<Object> out = Lists.newArrayList();
+      List<Object> out = new ArrayList<>();
       codec.decode(null, buf, out);
       fail("Should have failed to decode large message.");
     } catch (IllegalArgumentException e) {
@@ -124,7 +124,7 @@ public class TestKryoMessageCodec {
     buf.writeInt(-1);
 
     try {
-      List<Object> out = Lists.newArrayList();
+      List<Object> out = new ArrayList<>();
       codec.decode(null, buf, out);
       fail("Should have failed to decode message with negative size.");
     } catch (IllegalArgumentException e) {
@@ -174,7 +174,7 @@ public class TestKryoMessageCodec {
     codec.setEncryptionHandler(eh);
     codec.encode(null, message, buf);
 
-    List<Object> objects = Lists.newArrayList();
+    List<Object> objects = new ArrayList<>();
     codec.decode(null, buf, objects);
     return objects;
   }

+ 2 - 5
rsc/src/test/java/com/cloudera/livy/rsc/rpc/TestRpc.java

@@ -18,16 +18,13 @@
 package com.cloudera.livy.rsc.rpc;
 
 import java.io.Closeable;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.security.sasl.SaslException;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.util.concurrent.Future;
@@ -52,7 +49,7 @@ public class TestRpc {
 
   @Before
   public void setUp() {
-    closeables = Lists.newArrayList();
+    closeables = new ArrayList<>();
     emptyConfig = new RSCConf(null);
   }