HIVE-12650: Improve error messages for Hive on Spark in case the cluster has no 
resources available (Rui reviewed by Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/03b81bc9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/03b81bc9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/03b81bc9

Branch: refs/heads/llap
Commit: 03b81bc9c40b6de4f238f6b7660488e711b869c4
Parents: e085b7e
Author: Rui Li <rui...@intel.com>
Authored: Fri Apr 1 14:36:18 2016 +0800
Committer: Rui Li <rui...@intel.com>
Committed: Fri Apr 1 14:38:14 2016 +0800

----------------------------------------------------------------------
 .../ql/exec/spark/RemoteHiveSparkClient.java    | 20 +++++++++++++++-----
 .../hadoop/hive/ql/exec/spark/SparkTask.java    |  3 +++
 .../exec/spark/status/LocalSparkJobMonitor.java |  2 +-
 .../spark/status/RemoteSparkJobMonitor.java     |  5 ++++-
 .../hive/spark/client/SparkClientImpl.java      |  9 ++++++++-
 5 files changed, 31 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/03b81bc9/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 30e53d2..3a1577f 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -110,7 +111,12 @@ public class RemoteHiveSparkClient implements 
HiveSparkClient {
       int curExecutors = 0;
       long ts = System.currentTimeMillis();
       do {
-        curExecutors = getExecutorCount();
+        try {
+          curExecutors = getExecutorCount(MAX_PREWARM_TIME, 
TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          // let's don't fail on future timeout since we have a timeout for 
pre-warm
+          LOG.warn("Timed out getting executor count.", e);
+        }
         if (curExecutors >= minExecutors) {
           LOG.info("Finished prewarming Spark executors. The current number of 
executors is " + curExecutors);
           return;
@@ -118,8 +124,8 @@ public class RemoteHiveSparkClient implements 
HiveSparkClient {
         Thread.sleep(500); // sleep half a second
       } while (System.currentTimeMillis() - ts < MAX_PREWARM_TIME);
 
-      LOG.info("Timeout (" + MAX_PREWARM_TIME + 
-          "s) occurred while prewarming executors. The current number of 
executors is " + curExecutors);
+      LOG.info("Timeout (" + MAX_PREWARM_TIME / 1000 + "s) occurred while 
prewarming executors. " +
+          "The current number of executors is " + curExecutors);
     }
   }
 
@@ -143,6 +149,11 @@ public class RemoteHiveSparkClient implements 
HiveSparkClient {
     return minExecutors;
   }
 
+  private int getExecutorCount(long timeout, TimeUnit unit) throws Exception {
+    Future<Integer> handler = remoteClient.getExecutorCount();
+    return handler.get(timeout, unit);
+  }
+
   @Override
   public SparkConf getSparkConf() {
     return sparkConf;
@@ -150,8 +161,7 @@ public class RemoteHiveSparkClient implements 
HiveSparkClient {
 
   @Override
   public int getExecutorCount() throws Exception {
-    Future<Integer> handler = remoteClient.getExecutorCount();
-    return handler.get(sparkClientTimtout, TimeUnit.SECONDS).intValue();
+    return getExecutorCount(sparkClientTimtout, TimeUnit.SECONDS);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/03b81bc9/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index 26cce1b..7f87adf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -105,6 +105,9 @@ public class SparkTask extends Task<SparkWork> {
         }
         LOG.info("Execution completed successfully");
       } else if (rc == 2) { // Cancel job if the monitor found job submission 
timeout.
+        // TODO: If the timeout is because of lack of resources in the 
cluster, we should
+        // ideally also cancel the app request here. But w/o facilities from 
Spark or YARN,
+        // it's difficult to do it on hive side alone. See HIVE-12650.
         jobRef.cancelJob();
       }
       sparkJobStatus.cleanup();

http://git-wip-us.apache.org/repos/asf/hive/blob/03b81bc9/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
index d109c6f..5f0352a 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
@@ -59,7 +59,7 @@ public class LocalSparkJobMonitor extends SparkJobMonitor {
         if (state == null) {
           long timeCount = (System.currentTimeMillis() - startTime)/1000;
           if (timeCount > monitorTimeoutInteval) {
-            LOG.info("Job hasn't been submitted after " + timeCount + "s. 
Aborting it.");
+            console.printError("Job hasn't been submitted after " + timeCount 
+ "s. Aborting it.");
             console.printError("Status: " + state);
             running = false;
             done = true;

http://git-wip-us.apache.org/repos/asf/hive/blob/03b81bc9/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
index 6990e80..11f263b 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
@@ -66,7 +66,10 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
         case QUEUED:
           long timeCount = (System.currentTimeMillis() - startTime) / 1000;
           if ((timeCount > monitorTimeoutInteval)) {
-            LOG.info("Job hasn't been submitted after " + timeCount + "s. 
Aborting it.");
+            console.printError("Job hasn't been submitted after " + timeCount 
+ "s." +
+                " Aborting it.\nPossible reasons include network issues, " +
+                "errors in remote driver or the cluster has no available 
resources, etc.\n" +
+                "Please check YARN or Spark driver's logs for further 
information.");
             console.printError("Status: " + state);
             running = false;
             done = true;

http://git-wip-us.apache.org/repos/asf/hive/blob/03b81bc9/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java 
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index 3d682a0..ae78bc3 100644
--- 
a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ 
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -47,6 +47,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -100,7 +101,13 @@ class SparkClientImpl implements SparkClient {
       // The RPC server will take care of timeouts here.
       this.driverRpc = rpcServer.registerClient(clientId, secret, 
protocol).get();
     } catch (Throwable e) {
-      LOG.warn("Error while waiting for client to connect.", e);
+      if (e.getCause() instanceof TimeoutException) {
+        LOG.error("Timed out waiting for client to connect.\nPossible reasons 
include network " +
+            "issues, errors in remote driver or the cluster has no available 
resources, etc." +
+            "\nPlease check YARN or Spark driver's logs for further 
information.", e);
+      } else {
+        LOG.error("Error while waiting for client to connect.", e);
+      }
       driverThread.interrupt();
       try {
         driverThread.join();

Reply via email to