HIVE-12650: Improve error messages for Hive on Spark in case the cluster has no resources available (Rui reviewed by Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/03b81bc9 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/03b81bc9 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/03b81bc9 Branch: refs/heads/llap Commit: 03b81bc9c40b6de4f238f6b7660488e711b869c4 Parents: e085b7e Author: Rui Li <rui...@intel.com> Authored: Fri Apr 1 14:36:18 2016 +0800 Committer: Rui Li <rui...@intel.com> Committed: Fri Apr 1 14:38:14 2016 +0800 ---------------------------------------------------------------------- .../ql/exec/spark/RemoteHiveSparkClient.java | 20 +++++++++++++++----- .../hadoop/hive/ql/exec/spark/SparkTask.java | 3 +++ .../exec/spark/status/LocalSparkJobMonitor.java | 2 +- .../spark/status/RemoteSparkJobMonitor.java | 5 ++++- .../hive/spark/client/SparkClientImpl.java | 9 ++++++++- 5 files changed, 31 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/03b81bc9/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 30e53d2..3a1577f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -110,7 +111,12 @@ public class RemoteHiveSparkClient implements HiveSparkClient { int curExecutors = 0; long ts = System.currentTimeMillis(); do { - curExecutors = getExecutorCount(); + try { + curExecutors = getExecutorCount(MAX_PREWARM_TIME, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + // let's don't fail on future timeout since we have a timeout for pre-warm + LOG.warn("Timed out getting executor count.", e); + } if (curExecutors >= minExecutors) { LOG.info("Finished prewarming Spark executors. The current number of executors is " + curExecutors); return; @@ -118,8 +124,8 @@ public class RemoteHiveSparkClient implements HiveSparkClient { Thread.sleep(500); // sleep half a second } while (System.currentTimeMillis() - ts < MAX_PREWARM_TIME); - LOG.info("Timeout (" + MAX_PREWARM_TIME + - "s) occurred while prewarming executors. The current number of executors is " + curExecutors); + LOG.info("Timeout (" + MAX_PREWARM_TIME / 1000 + "s) occurred while prewarming executors. " + + "The current number of executors is " + curExecutors); } } @@ -143,6 +149,11 @@ public class RemoteHiveSparkClient implements HiveSparkClient { return minExecutors; } + private int getExecutorCount(long timeout, TimeUnit unit) throws Exception { + Future<Integer> handler = remoteClient.getExecutorCount(); + return handler.get(timeout, unit); + } + @Override public SparkConf getSparkConf() { return sparkConf; @@ -150,8 +161,7 @@ public class RemoteHiveSparkClient implements HiveSparkClient { @Override public int getExecutorCount() throws Exception { - Future<Integer> handler = remoteClient.getExecutorCount(); - return handler.get(sparkClientTimtout, TimeUnit.SECONDS).intValue(); + return getExecutorCount(sparkClientTimtout, TimeUnit.SECONDS); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/03b81bc9/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 26cce1b..7f87adf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -105,6 +105,9 @@ public class SparkTask extends Task<SparkWork> { } LOG.info("Execution completed successfully"); } else if (rc == 2) { // Cancel job if the monitor found job submission timeout. + // TODO: If the timeout is because of lack of resources in the cluster, we should + // ideally also cancel the app request here. But w/o facilities from Spark or YARN, + // it's difficult to do it on hive side alone. See HIVE-12650. jobRef.cancelJob(); } sparkJobStatus.cleanup(); http://git-wip-us.apache.org/repos/asf/hive/blob/03b81bc9/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java index d109c6f..5f0352a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java @@ -59,7 +59,7 @@ public class LocalSparkJobMonitor extends SparkJobMonitor { if (state == null) { long timeCount = (System.currentTimeMillis() - startTime)/1000; if (timeCount > monitorTimeoutInteval) { - LOG.info("Job hasn't been submitted after " + timeCount + "s. Aborting it."); + console.printError("Job hasn't been submitted after " + timeCount + "s. Aborting it."); console.printError("Status: " + state); running = false; done = true; http://git-wip-us.apache.org/repos/asf/hive/blob/03b81bc9/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 6990e80..11f263b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -66,7 +66,10 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor { case QUEUED: long timeCount = (System.currentTimeMillis() - startTime) / 1000; if ((timeCount > monitorTimeoutInteval)) { - LOG.info("Job hasn't been submitted after " + timeCount + "s. Aborting it."); + console.printError("Job hasn't been submitted after " + timeCount + "s." + + " Aborting it.\nPossible reasons include network issues, " + + "errors in remote driver or the cluster has no available resources, etc.\n" + + "Please check YARN or Spark driver's logs for further information."); console.printError("Status: " + state); running = false; done = true; http://git-wip-us.apache.org/repos/asf/hive/blob/03b81bc9/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 3d682a0..ae78bc3 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -47,6 +47,7 @@ import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hive.conf.HiveConf; @@ -100,7 +101,13 @@ class SparkClientImpl implements SparkClient { // The RPC server will take care of timeouts here. this.driverRpc = rpcServer.registerClient(clientId, secret, protocol).get(); } catch (Throwable e) { - LOG.warn("Error while waiting for client to connect.", e); + if (e.getCause() instanceof TimeoutException) { + LOG.error("Timed out waiting for client to connect.\nPossible reasons include network " + + "issues, errors in remote driver or the cluster has no available resources, etc." + + "\nPlease check YARN or Spark driver's logs for further information.", e); + } else { + LOG.error("Error while waiting for client to connect.", e); + } driverThread.interrupt(); try { driverThread.join();