HIVE-10434 - Cancel connection when remote Spark driver process has failed 
[Spark Branch] (Chao, reviewed by Marcelo)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/88dfd274
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/88dfd274
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/88dfd274

Branch: refs/heads/hbase-metastore
Commit: 88dfd274f23bb3941a7c64e6ed932336e6c84890
Parents: 769ccc1
Author: Chao Sun <sunc...@apache.org>
Authored: Fri Apr 24 15:16:36 2015 -0700
Committer: Xuefu Zhang <xzh...@cloudera.com>
Committed: Mon Jun 1 14:02:51 2015 -0700

----------------------------------------------------------------------
 .../apache/hive/spark/client/SparkClientImpl.java |  3 ++-
 .../apache/hive/spark/client/rpc/RpcServer.java   | 18 ++++++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/88dfd274/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java 
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index 71e432d..1bcd221 100644
--- 
a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ 
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -180,7 +180,7 @@ class SparkClientImpl implements SparkClient {
     protocol.cancel(jobId);
   }
 
-  private Thread startDriver(RpcServer rpcServer, final String clientId, final 
String secret)
+  private Thread startDriver(final RpcServer rpcServer, final String clientId, 
final String secret)
       throws IOException {
     Runnable runnable;
     final String serverAddress = rpcServer.getAddress();
@@ -424,6 +424,7 @@ class SparkClientImpl implements SparkClient {
           try {
             int exitCode = child.waitFor();
             if (exitCode != 0) {
+              rpcServer.cancelClient(clientId, "Child process exited before 
connecting back");
               LOG.warn("Child process exited with code {}.", exitCode);
             }
           } catch (InterruptedException ie) {

http://git-wip-us.apache.org/repos/asf/hive/blob/88dfd274/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java 
b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
index 32d4c46..68ee627 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
@@ -164,6 +164,24 @@ public class RpcServer implements Closeable {
   }
 
   /**
+   * Tells the RPC server to cancel the connection from an existing pending 
client
+   * @param clientId The identifier for the client
+   * @param msg The error message about why the connection should be canceled
+   */
+  public void cancelClient(final String clientId, final String msg) {
+    final ClientInfo cinfo = pendingClients.remove(clientId);
+    if (cinfo == null) {
+      // Nothing to be done here.
+      return;
+    }
+    cinfo.timeoutFuture.cancel(true);
+    if (!cinfo.promise.isDone()) {
+      cinfo.promise.setFailure(new RuntimeException(
+          String.format("Cancel client '%s'. Error: " + msg, clientId)));
+    }
+  }
+
+  /**
    * Creates a secret for identifying a client connection.
    */
   public String createSecret() {

Reply via email to