Repository: hive
Updated Branches:
  refs/heads/master ccbc5c383 -> d813b487a


HIVE-20512: Improve record and memory usage logging in SparkRecordHandler 
(Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d813b487
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d813b487
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d813b487

Branch: refs/heads/master
Commit: d813b487ae4c04661a239a3a6b84444a9bae12cf
Parents: ccbc5c3
Author: Bharathkrishna Guruvayoor Murali <bhar...@cloudera.com>
Authored: Tue Nov 13 20:02:53 2018 -0600
Committer: Sahil Takiar <stak...@cloudera.com>
Committed: Tue Nov 13 20:02:53 2018 -0600

----------------------------------------------------------------------
 .../ql/exec/spark/SparkMapRecordHandler.java    |  11 +-
 .../exec/spark/SparkMergeFileRecordHandler.java |   2 +
 .../hive/ql/exec/spark/SparkRecordHandler.java  | 109 +++++++++++++------
 .../ql/exec/spark/SparkReduceRecordHandler.java |  14 +--
 4 files changed, 83 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d813b487/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
index 88dd12c..530131f 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
@@ -140,9 +140,8 @@ public class SparkMapRecordHandler extends 
SparkRecordHandler {
       // Since there is no concept of a group, we don't invoke
       // startGroup/endGroup for a mapper
       mo.process((Writable) value);
-      if (LOG.isInfoEnabled()) {
-        logMemoryInfo();
-      }
+      incrementRowNumber();
+
     } catch (Throwable e) {
       abort = true;
       Utilities.setMapWork(jc, null);
@@ -164,11 +163,11 @@ public class SparkMapRecordHandler extends 
SparkRecordHandler {
 
   @Override
   public void close() {
+    super.close();
     // No row was processed
     if (!anyRow) {
       LOG.trace("Close called. no row processed by map.");
     }
-
     // check if there are IOExceptions
     if (!abort) {
       abort = execContext.getIoCxt().getIOExceptions();
@@ -188,10 +187,6 @@ public class SparkMapRecordHandler extends 
SparkRecordHandler {
         }
       }
 
-      if (LOG.isInfoEnabled()) {
-        logCloseInfo();
-      }
-
       ReportStats rps = new ReportStats(rp, jc);
       mo.preorderMap(rps);
       return;

http://git-wip-us.apache.org/repos/asf/hive/blob/d813b487/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java
index 8880bb6..409e3cc 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java
@@ -92,6 +92,7 @@ public class SparkMergeFileRecordHandler extends 
SparkRecordHandler {
   public void processRow(Object key, Object value) throws IOException {
     row[0] = key;
     row[1] = value;
+    incrementRowNumber();
     try {
       mergeOp.process(row, 0);
     } catch (HiveException e) {
@@ -108,6 +109,7 @@ public class SparkMergeFileRecordHandler extends 
SparkRecordHandler {
 
   @Override
   public void close() {
+    super.close();
     LOG.info("Closing Merge Operator " + mergeOp.getName());
     try {
       mergeOp.closeOp(abort);

http://git-wip-us.apache.org/repos/asf/hive/blob/d813b487/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
index cb5bd7a..f7ea212 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
@@ -33,6 +34,10 @@ import java.lang.management.MemoryMXBean;
 import java.net.URLClassLoader;
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 public abstract class SparkRecordHandler {
   protected static final String CLASS_NAME = 
SparkRecordHandler.class.getName();
@@ -40,16 +45,38 @@ public abstract class SparkRecordHandler {
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkRecordHandler.class);
 
   // used to log memory usage periodically
-  protected final MemoryMXBean memoryMXBean = 
ManagementFactory.getMemoryMXBean();
+  private final MemoryMXBean memoryMXBean = 
ManagementFactory.getMemoryMXBean();
 
   protected JobConf jc;
   protected OutputCollector<?, ?> oc;
   protected Reporter rp;
   protected boolean abort = false;
-  private long rowNumber = 0;
-  private long nextLogThreshold = 1;
-
-  protected boolean anyRow = false;
+  /**
+   * Using volatile for rowNumber and logThresholdInterval instead of
+   *  Atomic even though they are used in non-atomic context. This is because
+   *  we know that they will be updated only by a single thread at a time and
+   *  there is no contention on these variables.
+   */
+  private volatile long rowNumber = 0;
+  private volatile long logThresholdInterval = 15000;
+  boolean anyRow = false;
+  private final long maxLogThresholdInterval = 900000;
+  // We use this ScheduledFuture while closing to cancel any logger thread 
that is scheduled.
+  private ScheduledFuture memoryAndRowLogFuture;
+
+  private final ScheduledThreadPoolExecutor memoryAndRowLogExecutor = 
getMemoryAndRowLogExecutor();
+
+  private ScheduledThreadPoolExecutor getMemoryAndRowLogExecutor() {
+    ScheduledThreadPoolExecutor executor =  new ScheduledThreadPoolExecutor(1,
+        new ThreadFactoryBuilder()
+            .setNameFormat("MemoryAndRowInfoLogger")
+            .setDaemon(true)
+            .setUncaughtExceptionHandler((Thread t, Throwable e) -> 
LOG.error(t + " throws exception: " + e))
+            .build(),
+        new ThreadPoolExecutor.DiscardPolicy());
+    executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+    return executor;
+  }
 
   public <K, V> void init(JobConf job, OutputCollector<K, V> output, Reporter 
reporter) throws Exception {
     jc = job;
@@ -60,13 +87,12 @@ public abstract class SparkRecordHandler {
     rp = reporter;
 
     LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
-
+    MemoryInfoLogger memoryInfoLogger = new MemoryInfoLogger();
+    memoryInfoLogger.run();
     try {
-      LOG.info("conf classpath = "
-        + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs()));
-      LOG.info("thread classpath = "
-        + Arrays.asList(((URLClassLoader) Thread.currentThread()
-        .getContextClassLoader()).getURLs()));
+      LOG.info("conf classpath = " + Arrays.asList(((URLClassLoader) 
job.getClassLoader()).getURLs()));
+      LOG.info("thread classpath = " + Arrays
+          .asList(((URLClassLoader) 
Thread.currentThread().getContextClassLoader()).getURLs()));
     } catch (Exception e) {
       LOG.info("cannot get classpath: " + e.getMessage());
     }
@@ -83,39 +109,54 @@ public abstract class SparkRecordHandler {
   public abstract <E> void processRow(Object key, Iterator<E> values) throws 
IOException;
 
   /**
-   * Logger processed row number and used memory info.
+   * Increments rowNumber to indicate # of rows processed.
    */
-  protected void logMemoryInfo() {
-    rowNumber++;
-    if (rowNumber == nextLogThreshold) {
-      long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
-      LOG.info("processing " + rowNumber
-        + " rows: used memory = " + usedMemory);
-      nextLogThreshold = getNextLogThreshold(rowNumber);
+  void incrementRowNumber() {
+    ++rowNumber;
+  }
+
+  /**
+   * Logs every 'logThresholdInterval' milliseconds and doubles the
+   * logThresholdInterval value after each time it logs until it
+   * reaches maxLogThresholdInterval.
+   * */
+  class MemoryInfoLogger implements Runnable {
+    @Override
+    public void run() {
+      if (anyRow) {
+        logThresholdInterval = Math.min(maxLogThresholdInterval, 2 * 
logThresholdInterval);
+        logMemoryInfo();
+      }
+      memoryAndRowLogFuture =
+          memoryAndRowLogExecutor.schedule(new MemoryInfoLogger(), 
logThresholdInterval, TimeUnit.MILLISECONDS);
+    }
+  }
+
+  public void close() {
+    memoryAndRowLogExecutor.shutdown();
+    memoryAndRowLogFuture.cancel(false);
+    try {
+      if (!memoryAndRowLogExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+        memoryAndRowLogExecutor.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      memoryAndRowLogExecutor.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+
+    if (LOG.isInfoEnabled()) {
+      logMemoryInfo();
     }
   }
 
-  public abstract void close();
   public abstract boolean getDone();
 
   /**
    * Logger information to be logged at the end.
    */
-  protected void logCloseInfo() {
+  private void logMemoryInfo() {
     long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
-    LOG.info("processed " + rowNumber + " rows: used memory = "
-      + usedMemory);
-  }
-
-  private long getNextLogThreshold(long currentThreshold) {
-    // A very simple counter to keep track of number of rows processed by the
-    // reducer. It dumps
-    // every 1 million times, and quickly before that
-    if (currentThreshold >= 1000000) {
-      return currentThreshold + 1000000;
-    }
-
-    return 10 * currentThreshold;
+    LOG.info("Processed " + rowNumber + " rows: used memory = " + usedMemory);
   }
 
   public boolean isAbort() {

http://git-wip-us.apache.org/repos/asf/hive/blob/d813b487/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
index 20e7ea0..07cb5cb 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
@@ -401,9 +401,7 @@ public class SparkReduceRecordHandler extends 
SparkRecordHandler {
       row.clear();
       row.add(keyObject);
       row.add(valueObject[tag]);
-      if (LOG.isInfoEnabled()) {
-        logMemoryInfo();
-      }
+      incrementRowNumber();
       try {
         reducer.process(row, tag);
       } catch (Exception e) {
@@ -571,9 +569,7 @@ public class SparkReduceRecordHandler extends 
SparkRecordHandler {
     }
 
     batchBytes = 0;
-    if (LOG.isInfoEnabled()) {
-      logMemoryInfo();
-    }
+    incrementRowNumber();
   }
 
   private Object deserializeValue(BytesWritable valueWritable, byte tag) 
throws HiveException {
@@ -593,12 +589,11 @@ public class SparkReduceRecordHandler extends 
SparkRecordHandler {
 
   @Override
   public void close() {
-
+    super.close();
     // No row was processed
     if (!anyRow) {
       LOG.trace("Close called without any rows processed");
     }
-
     try {
       if (vectorized) {
         if (batch.size > 0) {
@@ -617,9 +612,6 @@ public class SparkReduceRecordHandler extends 
SparkRecordHandler {
           reducer.endGroup();
         }
       }
-      if (LOG.isInfoEnabled()) {
-        logCloseInfo();
-      }
 
       reducer.close(abort);
 

Reply via email to