HIVE-10434 - Cancel connection when remote Spark driver process has failed [Spark Branch] (Chao, reviewed by Marcelo)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/88dfd274 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/88dfd274 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/88dfd274 Branch: refs/heads/hbase-metastore Commit: 88dfd274f23bb3941a7c64e6ed932336e6c84890 Parents: 769ccc1 Author: Chao Sun <sunc...@apache.org> Authored: Fri Apr 24 15:16:36 2015 -0700 Committer: Xuefu Zhang <xzh...@cloudera.com> Committed: Mon Jun 1 14:02:51 2015 -0700 ---------------------------------------------------------------------- .../apache/hive/spark/client/SparkClientImpl.java | 3 ++- .../apache/hive/spark/client/rpc/RpcServer.java | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/88dfd274/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 71e432d..1bcd221 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -180,7 +180,7 @@ class SparkClientImpl implements SparkClient { protocol.cancel(jobId); } - private Thread startDriver(RpcServer rpcServer, final String clientId, final String secret) + private Thread startDriver(final RpcServer rpcServer, final String clientId, final String secret) throws IOException { Runnable runnable; final String serverAddress = rpcServer.getAddress(); @@ -424,6 +424,7 @@ class SparkClientImpl implements SparkClient { try { int exitCode = child.waitFor(); if (exitCode != 0) { + rpcServer.cancelClient(clientId, "Child process exited before connecting back"); LOG.warn("Child process exited with code {}.", exitCode); } } catch (InterruptedException ie) { http://git-wip-us.apache.org/repos/asf/hive/blob/88dfd274/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java index 32d4c46..68ee627 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java @@ -164,6 +164,24 @@ public class RpcServer implements Closeable { } /** + * Tells the RPC server to cancel the connection from an existing pending client + * @param clientId The identifier for the client + * @param msg The error message about why the connection should be canceled + */ + public void cancelClient(final String clientId, final String msg) { + final ClientInfo cinfo = pendingClients.remove(clientId); + if (cinfo == null) { + // Nothing to be done here. + return; + } + cinfo.timeoutFuture.cancel(true); + if (!cinfo.promise.isDone()) { + cinfo.promise.setFailure(new RuntimeException( + String.format("Cancel client '%s'. Error: " + msg, clientId))); + } + } + + /** * Creates a secret for identifying a client connection. */ public String createSecret() {