Repository: hive
Updated Branches:
  refs/heads/master 4137c212c -> a036e52df


HIVE-17684: HoS memory issues with MapJoinMemoryExhaustionHandler (Misha 
Dmitriev, reviewed by Sahil Takiar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a036e52d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a036e52d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a036e52d

Branch: refs/heads/master
Commit: a036e52dff85215850dee365e4093b436d301f42
Parents: 4137c21
Author: Misha Dmitriev <mi...@cloudera.com>
Authored: Tue Sep 25 18:05:03 2018 -0700
Committer: Sahil Takiar <stak...@cloudera.com>
Committed: Tue Sep 25 18:05:16 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/common/GcTimeMonitor.java       | 261 +++++++++++++++++++
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  12 +-
 data/conf/hive-site.xml                         |  10 +
 data/conf/spark/standalone/hive-site.xml        |   7 +-
 pom.xml                                         |   1 +
 .../ql/exec/DefaultMemoryExhaustionChecker.java |  45 ++++
 .../hive/ql/exec/HashTableSinkOperator.java     |   9 +-
 .../hive/ql/exec/MemoryExhaustionChecker.java   |  28 ++
 .../ql/exec/MemoryExhaustionCheckerFactory.java |  39 +++
 .../ql/exec/SparkMemoryExhaustionChecker.java   |  91 +++++++
 10 files changed, 494 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a036e52d/common/src/java/org/apache/hadoop/hive/common/GcTimeMonitor.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/GcTimeMonitor.java 
b/common/src/java/org/apache/hadoop/hive/common/GcTimeMonitor.java
new file mode 100644
index 0000000..edba6f9
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/GcTimeMonitor.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Based on org.apache.hadoop.util.GcTimeMonitor. However, this class detects
+ * GC pauses using the same method as JvmPauseMonitor (by comparing the actual
+ * and expected thread sleep time) rather than by reading information from
+ * GarbageCollectionMXBean. The latter may sometimes report time spent in
+ * concurrent GC operations rather than GC pauses. This may result in 
inaccurate
+ * results when trying to estimate the time that the JVM is "frozen" due to GC.
+ *
+ * This class monitors the percentage of time the JVM is paused in GC within
+ * the specified observation window, say 1 minute. The user can provide a
+ * hook which will be called whenever this percentage exceeds the specified
+ * threshold.
+ */
+public class GcTimeMonitor extends Thread {
+
+  private final long maxGcTimePercentage;
+  private final long observationWindowNanos, sleepIntervalMs;
+  private final GcTimeAlertHandler alertHandler;
+
+  // Ring buffers containing GC timings and timestamps when timings were taken
+  private final TsAndData[] gcDataBuf;
+  private int bufSize, startIdx, endIdx;
+
+  private long startTimeNanos;
+  private final GcData curData = new GcData();
+  private volatile boolean shouldRun = true;
+
+  /**
+   * Create an instance of GCTimeMonitor. Once it's started, it will stay alive
+   * and monitor GC time percentage until shutdown() is called. If you don't
+   * put a limit on the number of GCTimeMonitor instances that you create, and
+   * alertHandler != null, you should necessarily call shutdown() once the 
given
+   * instance is not needed. Otherwise, you may create a memory leak, because
+   * each running GCTimeMonitor will keep its alertHandler object in memory,
+   * which in turn may reference and keep in memory many more other objects.
+   *
+   * @param observationWindowMs the interval over which the percentage
+   *   of GC time should be calculated. A practical value would be somewhere
+   *   between 30 sec and several minutes.
+   * @param sleepIntervalMs how frequently this thread should wake up to check
+   *   GC timings. This is also a frequency with which alertHandler will be
+   *   invoked if GC time percentage exceeds the specified limit. A practical
+   *   value would likely be 500..1000 ms.
+   * @param maxGcTimePercentage A GC time percentage limit (0..100) within
+   *   observationWindowMs. Once this is exceeded, alertHandler will be
+   *   invoked every sleepIntervalMs milliseconds until GC time percentage
+   *   falls below this limit.
+   * @param alertHandler a single method in this interface is invoked when GC
+   *   time percentage exceeds the specified limit.
+   */
+  public GcTimeMonitor(long observationWindowMs, long sleepIntervalMs,
+      int maxGcTimePercentage, GcTimeAlertHandler alertHandler) {
+    Preconditions.checkArgument(observationWindowMs > 0);
+    Preconditions.checkArgument(
+        sleepIntervalMs > 0 && sleepIntervalMs < observationWindowMs);
+    Preconditions.checkArgument(
+        maxGcTimePercentage >= 0 && maxGcTimePercentage <= 100);
+
+    this.observationWindowNanos = observationWindowMs * 1000000;
+    this.sleepIntervalMs = sleepIntervalMs;
+    this.maxGcTimePercentage = maxGcTimePercentage;
+    this.alertHandler = alertHandler;
+
+    bufSize = (int) (observationWindowMs / sleepIntervalMs + 2);
+    // Prevent the user from accidentally creating an abnormally big buffer,
+    // which will result in slow calculations and likely inaccuracy.
+    Preconditions.checkArgument(bufSize <= 128 * 1024);
+    gcDataBuf = new TsAndData[bufSize];
+    for (int i = 0; i < bufSize; i++) {
+      gcDataBuf[i] = new TsAndData();
+    }
+
+    this.setDaemon(true);
+    this.setName("GcTimeMonitor obsWindow = " + observationWindowMs +
+        ", sleepInterval = " + sleepIntervalMs +
+        ", maxGcTimePerc = " + maxGcTimePercentage);
+  }
+
+  @Override
+  public void run() {
+    startTimeNanos = System.nanoTime();
+    gcDataBuf[startIdx].setValues(startTimeNanos, 0);
+
+    while (shouldRun) {
+      long intervalStartTsNanos = System.nanoTime();
+      try {
+        Thread.sleep(sleepIntervalMs);
+      } catch (InterruptedException ie) {
+        return;
+      }
+      long intervalEndTsNanos = System.nanoTime();
+
+      calculateGCTimePercentageWithinObservedInterval(intervalStartTsNanos, 
intervalEndTsNanos);
+      if (alertHandler != null &&
+          curData.gcTimePercentage > maxGcTimePercentage) {
+        alertHandler.alert(curData.clone());
+      }
+    }
+  }
+
+  public void shutdown() {
+    shouldRun = false;
+  }
+
+  /** Returns a copy of the most recent data measured by this monitor. */
+  public GcData getLatestGcData() {
+    return curData.clone();
+  }
+
+  private void calculateGCTimePercentageWithinObservedInterval(
+      long intervalStartTsNanos, long intervalEndTsNanos) {
+    long gcTimeWithinSleepIntervalNanos =
+        intervalEndTsNanos - intervalStartTsNanos - sleepIntervalMs * 1000000;
+    long totalGcTimeNanos = curData.totalGcTimeNanos + 
gcTimeWithinSleepIntervalNanos;
+
+    long gcMonitorRunTimeNanos = intervalEndTsNanos - startTimeNanos;
+
+    endIdx = (endIdx + 1) % bufSize;
+    gcDataBuf[endIdx].setValues(intervalEndTsNanos, 
gcTimeWithinSleepIntervalNanos);
+
+    // Update the observation window so that it covers the last 
observationWindowNanos
+    // period. For that, move startIdx forward until we reach the first buffer 
entry with
+    // timestamp within the observation window.
+    long startObsWindowTsNanos = intervalEndTsNanos - observationWindowNanos;
+    while (gcDataBuf[startIdx].tsNanos < startObsWindowTsNanos && startIdx != 
endIdx) {
+      startIdx = (startIdx + 1) % bufSize;
+    }
+
+    // Calculate total GC time within observationWindowMs.
+    // We should be careful about GC time that passed before the first 
timestamp
+    // in our observation window.
+    long gcTimeWithinObservationWindowNanos = Math.min(
+        gcDataBuf[startIdx].gcPauseNanos, gcDataBuf[startIdx].tsNanos - 
startObsWindowTsNanos);
+    if (startIdx != endIdx) {
+      for (int i = (startIdx + 1) % bufSize; i != endIdx;
+           i = (i + 1) % bufSize) {
+        gcTimeWithinObservationWindowNanos += gcDataBuf[i].gcPauseNanos;
+      }
+    }
+
+    curData.update(gcMonitorRunTimeNanos, totalGcTimeNanos,
+        (int) (gcTimeWithinObservationWindowNanos * 100 /
+            Math.min(observationWindowNanos, gcMonitorRunTimeNanos)));
+  }
+
+  /**
+   * The user can provide an instance of a class implementing this interface
+   * when initializing a GcTimeMonitor to receive alerts when GC time
+   * percentage exceeds the specified threshold.
+   */
+  public interface GcTimeAlertHandler {
+    void alert(GcData gcData);
+  }
+
+  /** Encapsulates data about GC pauses measured at the specific timestamp. */
+  public static class GcData implements Cloneable {
+    private long gcMonitorRunTimeNanos, totalGcTimeNanos;
+    private int gcTimePercentage;
+
+    /** Returns the time since the start of the associated GcTimeMonitor. */
+    public long getGcMonitorRunTimeMs() {
+      return gcMonitorRunTimeNanos / 1000000;
+    }
+
+    /** Returns accumulated GC time since this JVM started. */
+    public long getAccumulatedGcTimeMs() {
+      return totalGcTimeNanos / 1000000;
+    }
+
+    /**
+     * Returns the percentage (0..100) of time that the JVM spent in GC pauses
+     * within the observation window of the associated GcTimeMonitor.
+     */
+    public int getGcTimePercentage() {
+      return gcTimePercentage;
+    }
+
+    private synchronized void update(long gcMonitorRunTimeNanos,
+        long totalGcTimeNanos, int inGcTimePercentage) {
+      this.gcMonitorRunTimeNanos = gcMonitorRunTimeNanos;
+      this.totalGcTimeNanos = totalGcTimeNanos;
+      this.gcTimePercentage = inGcTimePercentage;
+    }
+
+    @Override
+    public synchronized GcData clone() {
+      try {
+        return (GcData) super.clone();
+      } catch (CloneNotSupportedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private static class TsAndData {
+    private long tsNanos;      // Timestamp when this measurement was taken
+    private long gcPauseNanos; // Total GC pause time within the interval 
between ts
+                               // and the timestamp of the previous 
measurement.
+
+    void setValues(long tsNanos, long gcPauseNanos) {
+      this.tsNanos = tsNanos;
+      this.gcPauseNanos = gcPauseNanos;
+    }
+  }
+
+  /**
+   * Simple 'main' to facilitate manual testing of the pause monitor.
+   *
+   * This main function just leaks memory. Running this class will quickly
+   * result in a "GC hell" and subsequent alerts from the GcTimeMonitor.
+   */
+  public static void main(String []args) throws Exception {
+    new GcTimeMonitor(20 * 1000, 500, 20,
+          new GcTimeMonitor.GcTimeAlertHandler() {
+            @Override
+            public void alert(GcData gcData) {
+              System.err.println(
+                  "GcTimeMonitor alert. Current GC time percentage = " +
+                  gcData.getGcTimePercentage() +
+                  ", total run time = " + (gcData.getGcMonitorRunTimeMs() / 
1000) + " sec" +
+                  ", total GC time = " + (gcData.getAccumulatedGcTimeMs() / 
1000) + " sec");
+            }
+          }).start();
+
+    List<String> list = Lists.newArrayList();
+    for (int i = 0; ; i++) {
+      list.add("This is a long string to fill memory quickly " + i);
+      if (i % 100000 == 0) {
+        System.out.println("Added " + i + " strings");
+        Thread.sleep(100);
+      }
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/a036e52d/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 348e07b..3b8e820 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2068,13 +2068,21 @@ public class HiveConf extends Configuration {
     
HIVEHASHTABLEFOLLOWBYGBYMAXMEMORYUSAGE("hive.mapjoin.followby.gby.localtask.max.memory.usage",
 (float) 0.55,
         "This number means how much memory the local task can take to hold the 
key/value into an in-memory hash table \n" +
         "when this map join is followed by a group by. If the local task's 
memory usage is more than this number, \n" +
-        "the local task will abort by itself. It means the data of the small 
table is too large to be held in memory."),
+        "the local task will abort by itself. It means the data of the small 
table is too large " +
+        "to be held in memory. Does not apply to Hive-on-Spark (replaced by " +
+        "hive.mapjoin.max.gc.time.percentage)"),
     HIVEHASHTABLEMAXMEMORYUSAGE("hive.mapjoin.localtask.max.memory.usage", 
(float) 0.90,
         "This number means how much memory the local task can take to hold the 
key/value into an in-memory hash table. \n" +
         "If the local task's memory usage is more than this number, the local 
task will abort by itself. \n" +
-        "It means the data of the small table is too large to be held in 
memory."),
+        "It means the data of the small table is too large to be held in 
memory. Does not apply to " +
+        "Hive-on-Spark (replaced by hive.mapjoin.max.gc.time.percentage)"),
     HIVEHASHTABLESCALE("hive.mapjoin.check.memory.rows", (long)100000,
         "The number means after how many rows processed it needs to check the 
memory usage"),
+    HIVEHASHTABLEMAXGCTIMEPERCENTAGE("hive.mapjoin.max.gc.time.percentage", 
(float) 0.60,
+        new RangeValidator(0.0f, 1.0f), "This number means how much time (what 
percentage, " +
+        "0..1, of wallclock time) the JVM is allowed to spend in garbage 
collection when running " +
+        "the local task. If GC time percentage exceeds this number, the local 
task will abort by " +
+        "itself. Applies to Hive-on-Spark only"),
 
     HIVEDEBUGLOCALTASK("hive.debug.localtask",false, ""),
 

http://git-wip-us.apache.org/repos/asf/hive/blob/a036e52d/data/conf/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/hive-site.xml b/data/conf/hive-site.xml
index 0c3adb4..0daf9ad 100644
--- a/data/conf/hive-site.xml
+++ b/data/conf/hive-site.xml
@@ -204,6 +204,16 @@
 </property>
 
 <property>
+  <name>hive.mapjoin.max.gc.time.percentage</name>
+  <value>0.99</value>
+  <description>
+  Maximum percentage of wallclock time that the JVM can spend in GC.
+  If this limit is exceeded, the local task will abort by itself.
+  Tests may run in very stressed environment, so this number is set very high 
to avoid false negatives.
+  </description>
+</property>
+
+<property>
   <name>hive.input.format</name>
   <value>org.apache.hadoop.hive.ql.io.CombineHiveInputFormat</value>
   <description>The default input format, if it is not specified, the system 
assigns it. It is set to HiveInputFormat for hadoop versions 17, 18 and 19, 
whereas it is set to CombineHiveInputFormat for hadoop 20. The user can always 
overwrite it - if there is a bug in CombineHiveInputFormat, it can always be 
manually set to HiveInputFormat. </description>

http://git-wip-us.apache.org/repos/asf/hive/blob/a036e52d/data/conf/spark/standalone/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/spark/standalone/hive-site.xml 
b/data/conf/spark/standalone/hive-site.xml
index 79e388e..317631c 100644
--- a/data/conf/spark/standalone/hive-site.xml
+++ b/data/conf/spark/standalone/hive-site.xml
@@ -236,7 +236,12 @@
 
 <property>
   <name>spark.driver.extraClassPath</name>
-  
<value>${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar:${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar:${maven.local.repository}/org/antlr/antlr-runtime/${antlr.version}/antlr-runtime-${antlr.version}.jar</value>
+  
<value>${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar:${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar:${maven.local.repository}/org/antlr/antlr-runtime/${antlr.version}/antlr-runtime-${antlr.version}.jar:${maven.local.repository}/org/apache/hadoop/hadoop-common/${hadoop.version}/hadoop-common-${hadoop.version}.jar</value>
+</property>
+
+<property>
+  <name>spark.executor.extraClassPath</name>
+  
<value>${maven.local.repository}/org/apache/hadoop/hadoop-common/${hadoop.version}/hadoop-common-${hadoop.version}.jar</value>
 </property>
 
 <property>

http://git-wip-us.apache.org/repos/asf/hive/blob/a036e52d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9c15328..5e100fd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1329,6 +1329,7 @@
             
<java.security.krb5.conf>${test.conf.dir}/krb5.conf</java.security.krb5.conf>
             <!-- Required by spark to work around SPARK-14958 -->
             <antlr.version>${antlr.version}</antlr.version>
+            <hadoop.version>${hadoop.version}</hadoop.version>
             <qfile>${qfile}</qfile>
             <initScript>${initScript}</initScript>
             <clustermode>${clustermode}</clustermode>

http://git-wip-us.apache.org/repos/asf/hive/blob/a036e52d/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultMemoryExhaustionChecker.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultMemoryExhaustionChecker.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultMemoryExhaustionChecker.java
new file mode 100644
index 0000000..b50a66e
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultMemoryExhaustionChecker.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
+import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+/**
+ * A {@link MemoryExhaustionChecker} that uses a {@link 
MapJoinMemoryExhaustionHandler}
+ * to check memory overhead.
+ */
+class DefaultMemoryExhaustionChecker implements MemoryExhaustionChecker {
+
+   private MapJoinMemoryExhaustionHandler memoryExhaustionHandler;
+
+   DefaultMemoryExhaustionChecker(SessionState.LogHelper console,
+                                  HashTableSinkDesc hashTableSinkDesc) {
+    super();
+    this.memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console,
+            hashTableSinkDesc.getHashtableMemoryUsage());
+  }
+
+  @Override
+  public void checkMemoryOverhead(long rowNumber, long hashTableScale, int 
tableContainerSize) {
+    if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) {
+      this.memoryExhaustionHandler.checkMemoryStatus(tableContainerSize, 
rowNumber);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/a036e52d/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
index f01b67e..36c9335 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
-import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
 import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinEagerRowContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject;
@@ -103,7 +102,7 @@ public class HashTableSinkOperator extends 
TerminalOperator<HashTableSinkDesc> i
   private long rowNumber = 0;
   protected transient LogHelper console;
   private long hashTableScale;
-  private MapJoinMemoryExhaustionHandler memoryExhaustionHandler;
+  private MemoryExhaustionChecker memoryExhaustionChecker;
 
   /** Kryo ctor. */
   protected HashTableSinkOperator() {
@@ -126,7 +125,7 @@ public class HashTableSinkOperator extends 
TerminalOperator<HashTableSinkDesc> i
     super.initializeOp(hconf);
     boolean isSilent = HiveConf.getBoolVar(hconf, 
HiveConf.ConfVars.HIVESESSIONSILENT);
     console = new LogHelper(LOG, isSilent);
-    memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console, 
conf.getHashtableMemoryUsage());
+    memoryExhaustionChecker = 
MemoryExhaustionCheckerFactory.getChecker(console, hconf, conf);
     emptyRowContainer.addRow(emptyObjectArray);
 
     // for small tables only; so get the big table position first
@@ -255,9 +254,7 @@ public class HashTableSinkOperator extends 
TerminalOperator<HashTableSinkDesc> i
         rowContainer = emptyRowContainer;
       }
       rowNumber++;
-      if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) {
-        memoryExhaustionHandler.checkMemoryStatus(tableContainer.size(), 
rowNumber);
-      }
+      memoryExhaustionChecker.checkMemoryOverhead(rowNumber, hashTableScale, 
tableContainer.size());
       tableContainer.put(key, rowContainer);
     } else if (rowContainer == emptyRowContainer) {
       rowContainer = rowContainer.copy();

http://git-wip-us.apache.org/repos/asf/hive/blob/a036e52d/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryExhaustionChecker.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryExhaustionChecker.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryExhaustionChecker.java
new file mode 100644
index 0000000..7500430
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryExhaustionChecker.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+/**
+ * Checks the memory overhead when running {@link HashTableSinkOperator}. 
Throws a
+ * {@link org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError}
+ * if too much memory is being used.
+ */
+interface MemoryExhaustionChecker {
+
+  void checkMemoryOverhead(long rowNumber, long hashTableScale, int 
tableContainerSize);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/a036e52d/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryExhaustionCheckerFactory.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryExhaustionCheckerFactory.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryExhaustionCheckerFactory.java
new file mode 100644
index 0000000..086df68
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryExhaustionCheckerFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+class MemoryExhaustionCheckerFactory {
+
+  private MemoryExhaustionCheckerFactory() {
+    // No default constructor allowed
+  }
+
+  static MemoryExhaustionChecker getChecker(SessionState.LogHelper console, 
Configuration conf,
+                                            HashTableSinkDesc 
hashTableSinkDesc) {
+    if ("spark".equals(HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) {
+      return SparkMemoryExhaustionChecker.get(conf);
+    } else {
+      return new DefaultMemoryExhaustionChecker(console, hashTableSinkDesc);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/a036e52d/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkMemoryExhaustionChecker.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkMemoryExhaustionChecker.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkMemoryExhaustionChecker.java
new file mode 100644
index 0000000..b1b3e09
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkMemoryExhaustionChecker.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.GcTimeMonitor;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link MemoryExhaustionChecker} specific to Hive-on-Spark. Unlike the
+ * {@link DefaultMemoryExhaustionChecker} it uses a {@link GcTimeMonitor}
+ * to monitor how much time (what percentage of run time within the last
+ * minute or so) is spent in GC. If this value exceeds the configured value
+ * in {@link HiveConf.ConfVars#HIVEHASHTABLEMAXGCTIMEPERCENTAGE}, a
+ * {@link MapJoinMemoryExhaustionError} is thrown.
+ */
+class SparkMemoryExhaustionChecker implements MemoryExhaustionChecker {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkMemoryExhaustionChecker.class);
+
+  private static SparkMemoryExhaustionChecker INSTANCE;
+
+  // The GC time alert functionality below is used by the checkGcOverhead() 
method.
+  // This method may be called very frequently, and if it called
+  // GcTimeMonitor.getLatestGcData() every time, it could result in unnecessary
+  // overhead due to synchronization and new object creation. So instead,
+  // GcTimeMonitor itself sets the "red flag" in lastAlertGcTimePercentage,
+  // and checkGcOverhead() may check it as frequently as needed.
+  private volatile int lastAlertGcTimePercentage;
+  private final int criticalGcTimePercentage;
+
+  private SparkMemoryExhaustionChecker(Configuration conf) {
+    super();
+    criticalGcTimePercentage = (int) (HiveConf.getFloatVar(
+        conf, HiveConf.ConfVars.HIVEHASHTABLEMAXGCTIMEPERCENTAGE) * 100);
+    GcTimeMonitor hiveGcTimeMonitor = new 
HiveGcTimeMonitor(criticalGcTimePercentage);
+    hiveGcTimeMonitor.start();
+  }
+
+  static synchronized SparkMemoryExhaustionChecker get(Configuration conf) {
+    if (INSTANCE == null) {
+      INSTANCE = new SparkMemoryExhaustionChecker(conf);
+    }
+    return INSTANCE;
+  }
+
+  @Override
+  public void checkMemoryOverhead(long rowNumber, long hashTableScale, int 
tableContainerSize) {
+    if (lastAlertGcTimePercentage >= criticalGcTimePercentage) {
+      String msg = "GC time percentage = " + lastAlertGcTimePercentage + "% 
exceeded threshold "
+              + criticalGcTimePercentage + "%";
+      throw new MapJoinMemoryExhaustionError(msg);
+    }
+  }
+
+  // GC time monitoring
+  private class HiveGcTimeMonitor extends GcTimeMonitor {
+
+    HiveGcTimeMonitor(int criticalGcTimePercentage) {
+      super(45 * 1000, 200, criticalGcTimePercentage, new 
HiveGcTimeAlertHandler());
+    }
+  }
+
+  private class HiveGcTimeAlertHandler implements 
GcTimeMonitor.GcTimeAlertHandler {
+
+    @Override
+    public void alert(GcTimeMonitor.GcData gcData) {
+      lastAlertGcTimePercentage = gcData.getGcTimePercentage();
+      LOG.warn("GcTimeMonitor alert called. Current GC time = " + 
lastAlertGcTimePercentage + "%");
+    }
+  }
+}

Reply via email to