|
@@ -37,6 +37,7 @@ import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
import io.netty.channel.ChannelHandler.Sharable;
|
|
@@ -92,6 +93,7 @@ public class RSCDriver extends BaseProtocol {
|
|
|
protected final RSCConf livyConf;
|
|
|
|
|
|
private final AtomicReference<ScheduledFuture<?>> idleTimeout;
|
|
|
+ private final AtomicBoolean inShutdown;
|
|
|
|
|
|
public RSCDriver(SparkConf conf, RSCConf livyConf) throws Exception {
|
|
|
Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwx------");
|
|
@@ -110,6 +112,7 @@ public class RSCDriver extends BaseProtocol {
|
|
|
this.activeJobs = new ConcurrentHashMap<>();
|
|
|
this.bypassJobs = new ConcurrentLinkedDeque<>();
|
|
|
this.idleTimeout = new AtomicReference<>();
|
|
|
+ this.inShutdown = new AtomicBoolean(false);
|
|
|
}
|
|
|
|
|
|
private synchronized void shutdown() {
|
|
@@ -217,7 +220,9 @@ public class RSCDriver extends BaseProtocol {
|
|
|
@Override
|
|
|
public void onSuccess(Void unused) {
|
|
|
clients.remove(client);
|
|
|
- setupIdleTimeout();
|
|
|
+ if (!inShutdown.get()) {
|
|
|
+ setupIdleTimeout();
|
|
|
+ }
|
|
|
}
|
|
|
});
|
|
|
LOG.debug("Registered new connection from {}.", client.getChannel());
|
|
@@ -304,6 +309,7 @@ public class RSCDriver extends BaseProtocol {
|
|
|
}
|
|
|
|
|
|
private void shutdownServer() {
|
|
|
+ inShutdown.compareAndSet(false, true);
|
|
|
if (server != null) {
|
|
|
server.close();
|
|
|
}
|