incubator-livy git commit: [LIVY-466][RSC] Fix RSCDriver exception during RPC shutdown

2018-05-02 Thread jshao
Repository: incubator-livy
Updated Branches:
  refs/heads/master 9d381bdf0 -> e3f45a057


[LIVY-466][RSC] Fix RSCDriver exception during RPC shutdown

## What changes were proposed in this pull request?

During RSCDriver's shutdown, it will first shutdown RPC server, and then all 
the RPC clients. When RPC client is closed, it will register a timeout to avoid 
orphaned RSCDriver, but this is not necessary during RSCDriver's shutdown, so 
here fixing this issue. The details can be seen in 
[JIRA](https://issues.apache.org/jira/browse/LIVY-466).

## How was this patch tested?

Local verification.

Author: jerryshao 

Closes #90 from jerryshao/LIVY-466.


Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/e3f45a05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/e3f45a05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/e3f45a05

Branch: refs/heads/master
Commit: e3f45a057cc45bca5bceb04af8ea9218b35fa621
Parents: 9d381bd
Author: jerryshao 
Authored: Thu May 3 14:37:07 2018 +0800
Committer: jerryshao 
Committed: Thu May 3 14:37:07 2018 +0800

--
 rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java | 8 +++-
 1 file changed, 7 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/e3f45a05/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
--
diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java 
b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
index f727570..eeba300 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import io.netty.channel.ChannelHandler.Sharable;
@@ -92,6 +93,7 @@ public class RSCDriver extends BaseProtocol {
   protected final RSCConf livyConf;
 
   private final AtomicReference> idleTimeout;
+  private final AtomicBoolean inShutdown;
 
   public RSCDriver(SparkConf conf, RSCConf livyConf) throws Exception {
 Set perms = 
PosixFilePermissions.fromString("rwx--");
@@ -110,6 +112,7 @@ public class RSCDriver extends BaseProtocol {
 this.activeJobs = new ConcurrentHashMap<>();
 this.bypassJobs = new ConcurrentLinkedDeque<>();
 this.idleTimeout = new AtomicReference<>();
+this.inShutdown = new AtomicBoolean(false);
   }
 
   private synchronized void shutdown() {
@@ -217,7 +220,9 @@ public class RSCDriver extends BaseProtocol {
   @Override
   public void onSuccess(Void unused) {
 clients.remove(client);
-setupIdleTimeout();
+if (!inShutdown.get()) {
+  setupIdleTimeout();
+}
   }
 });
 LOG.debug("Registered new connection from {}.", client.getChannel());
@@ -304,6 +309,7 @@ public class RSCDriver extends BaseProtocol {
   }
 
   private void shutdownServer() {
+inShutdown.compareAndSet(false, true);
 if (server != null) {
   server.close();
 }



incubator-livy git commit: [LIVY-466][RSC] Fix RSCDriver exception during RPC shutdown

2018-05-02 Thread jshao
Repository: incubator-livy
Updated Branches:
  refs/heads/branch-0.5 bdb954798 -> 8c9698113


[LIVY-466][RSC] Fix RSCDriver exception during RPC shutdown

## What changes were proposed in this pull request?

During RSCDriver's shutdown, it will first shutdown RPC server, and then all 
the RPC clients. When RPC client is closed, it will register a timeout to avoid 
orphaned RSCDriver, but this is not necessary during RSCDriver's shutdown, so 
here fixing this issue. The details can be seen in 
[JIRA](https://issues.apache.org/jira/browse/LIVY-466).

## How was this patch tested?

Local verification.

Author: jerryshao 

Closes #90 from jerryshao/LIVY-466.

(cherry picked from commit e3f45a057cc45bca5bceb04af8ea9218b35fa621)
Signed-off-by: jerryshao 


Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/8c969811
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/8c969811
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/8c969811

Branch: refs/heads/branch-0.5
Commit: 8c96981134a862353b751a29398ae81d1a3a4450
Parents: bdb9547
Author: jerryshao 
Authored: Thu May 3 14:37:07 2018 +0800
Committer: jerryshao 
Committed: Thu May 3 14:37:20 2018 +0800

--
 rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java | 8 +++-
 1 file changed, 7 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/8c969811/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
--
diff --git a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java 
b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
index f727570..eeba300 100644
--- a/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
+++ b/rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import io.netty.channel.ChannelHandler.Sharable;
@@ -92,6 +93,7 @@ public class RSCDriver extends BaseProtocol {
   protected final RSCConf livyConf;
 
   private final AtomicReference> idleTimeout;
+  private final AtomicBoolean inShutdown;
 
   public RSCDriver(SparkConf conf, RSCConf livyConf) throws Exception {
 Set perms = 
PosixFilePermissions.fromString("rwx--");
@@ -110,6 +112,7 @@ public class RSCDriver extends BaseProtocol {
 this.activeJobs = new ConcurrentHashMap<>();
 this.bypassJobs = new ConcurrentLinkedDeque<>();
 this.idleTimeout = new AtomicReference<>();
+this.inShutdown = new AtomicBoolean(false);
   }
 
   private synchronized void shutdown() {
@@ -217,7 +220,9 @@ public class RSCDriver extends BaseProtocol {
   @Override
   public void onSuccess(Void unused) {
 clients.remove(client);
-setupIdleTimeout();
+if (!inShutdown.get()) {
+  setupIdleTimeout();
+}
   }
 });
 LOG.debug("Registered new connection from {}.", client.getChannel());
@@ -304,6 +309,7 @@ public class RSCDriver extends BaseProtocol {
   }
 
   private void shutdownServer() {
+inShutdown.compareAndSet(false, true);
 if (server != null) {
   server.close();
 }