Repository: hive Updated Branches: refs/heads/master ccbc5c383 -> d813b487a
HIVE-20512: Improve record and memory usage logging in SparkRecordHandler (Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d813b487 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d813b487 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d813b487 Branch: refs/heads/master Commit: d813b487ae4c04661a239a3a6b84444a9bae12cf Parents: ccbc5c3 Author: Bharathkrishna Guruvayoor Murali <bhar...@cloudera.com> Authored: Tue Nov 13 20:02:53 2018 -0600 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Tue Nov 13 20:02:53 2018 -0600 ---------------------------------------------------------------------- .../ql/exec/spark/SparkMapRecordHandler.java | 11 +- .../exec/spark/SparkMergeFileRecordHandler.java | 2 + .../hive/ql/exec/spark/SparkRecordHandler.java | 109 +++++++++++++------ .../ql/exec/spark/SparkReduceRecordHandler.java | 14 +-- 4 files changed, 83 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d813b487/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java index 88dd12c..530131f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java @@ -140,9 +140,8 @@ public class SparkMapRecordHandler extends SparkRecordHandler { // Since there is no concept of a group, we don't invoke // startGroup/endGroup for a mapper mo.process((Writable) value); - if (LOG.isInfoEnabled()) { - logMemoryInfo(); - } + incrementRowNumber(); + } catch (Throwable e) { abort = true; Utilities.setMapWork(jc, null); @@ -164,11 +163,11 @@ public class SparkMapRecordHandler extends SparkRecordHandler { @Override public void close() { + super.close(); // No row was processed if (!anyRow) { LOG.trace("Close called. no row processed by map."); } - // check if there are IOExceptions if (!abort) { abort = execContext.getIoCxt().getIOExceptions(); @@ -188,10 +187,6 @@ public class SparkMapRecordHandler extends SparkRecordHandler { } } - if (LOG.isInfoEnabled()) { - logCloseInfo(); - } - ReportStats rps = new ReportStats(rp, jc); mo.preorderMap(rps); return; http://git-wip-us.apache.org/repos/asf/hive/blob/d813b487/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java index 8880bb6..409e3cc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java @@ -92,6 +92,7 @@ public class SparkMergeFileRecordHandler extends SparkRecordHandler { public void processRow(Object key, Object value) throws IOException { row[0] = key; row[1] = value; + incrementRowNumber(); try { mergeOp.process(row, 0); } catch (HiveException e) { @@ -108,6 +109,7 @@ public class SparkMergeFileRecordHandler extends SparkRecordHandler { @Override public void close() { + super.close(); LOG.info("Closing Merge Operator " + mergeOp.getName()); try { mergeOp.closeOp(abort); http://git-wip-us.apache.org/repos/asf/hive/blob/d813b487/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java index cb5bd7a..f7ea212 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.MapredContext; @@ -33,6 +34,10 @@ import java.lang.management.MemoryMXBean; import java.net.URLClassLoader; import java.util.Arrays; import java.util.Iterator; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public abstract class SparkRecordHandler { protected static final String CLASS_NAME = SparkRecordHandler.class.getName(); @@ -40,16 +45,38 @@ public abstract class SparkRecordHandler { private static final Logger LOG = LoggerFactory.getLogger(SparkRecordHandler.class); // used to log memory usage periodically - protected final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); protected JobConf jc; protected OutputCollector<?, ?> oc; protected Reporter rp; protected boolean abort = false; - private long rowNumber = 0; - private long nextLogThreshold = 1; - - protected boolean anyRow = false; + /** + * Using volatile for rowNumber and logThresholdInterval instead of + * Atomic even though they are used in non-atomic context. This is because + * we know that they will be updated only by a single thread at a time and + * there is no contention on these variables. + */ + private volatile long rowNumber = 0; + private volatile long logThresholdInterval = 15000; + boolean anyRow = false; + private final long maxLogThresholdInterval = 900000; + // We use this ScheduledFuture while closing to cancel any logger thread that is scheduled. + private ScheduledFuture memoryAndRowLogFuture; + + private final ScheduledThreadPoolExecutor memoryAndRowLogExecutor = getMemoryAndRowLogExecutor(); + + private ScheduledThreadPoolExecutor getMemoryAndRowLogExecutor() { + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, + new ThreadFactoryBuilder() + .setNameFormat("MemoryAndRowInfoLogger") + .setDaemon(true) + .setUncaughtExceptionHandler((Thread t, Throwable e) -> LOG.error(t + " throws exception: " + e)) + .build(), + new ThreadPoolExecutor.DiscardPolicy()); + executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + return executor; + } public <K, V> void init(JobConf job, OutputCollector<K, V> output, Reporter reporter) throws Exception { jc = job; @@ -60,13 +87,12 @@ public abstract class SparkRecordHandler { rp = reporter; LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax()); - + MemoryInfoLogger memoryInfoLogger = new MemoryInfoLogger(); + memoryInfoLogger.run(); try { - LOG.info("conf classpath = " - + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs())); - LOG.info("thread classpath = " - + Arrays.asList(((URLClassLoader) Thread.currentThread() - .getContextClassLoader()).getURLs())); + LOG.info("conf classpath = " + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs())); + LOG.info("thread classpath = " + Arrays + .asList(((URLClassLoader) Thread.currentThread().getContextClassLoader()).getURLs())); } catch (Exception e) { LOG.info("cannot get classpath: " + e.getMessage()); } @@ -83,39 +109,54 @@ public abstract class SparkRecordHandler { public abstract <E> void processRow(Object key, Iterator<E> values) throws IOException; /** - * Logger processed row number and used memory info. + * Increments rowNumber to indicate # of rows processed. */ - protected void logMemoryInfo() { - rowNumber++; - if (rowNumber == nextLogThreshold) { - long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); - LOG.info("processing " + rowNumber - + " rows: used memory = " + usedMemory); - nextLogThreshold = getNextLogThreshold(rowNumber); + void incrementRowNumber() { + ++rowNumber; + } + + /** + * Logs every 'logThresholdInterval' milliseconds and doubles the + * logThresholdInterval value after each time it logs until it + * reaches maxLogThresholdInterval. + * */ + class MemoryInfoLogger implements Runnable { + @Override + public void run() { + if (anyRow) { + logThresholdInterval = Math.min(maxLogThresholdInterval, 2 * logThresholdInterval); + logMemoryInfo(); + } + memoryAndRowLogFuture = + memoryAndRowLogExecutor.schedule(new MemoryInfoLogger(), logThresholdInterval, TimeUnit.MILLISECONDS); + } + } + + public void close() { + memoryAndRowLogExecutor.shutdown(); + memoryAndRowLogFuture.cancel(false); + try { + if (!memoryAndRowLogExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + memoryAndRowLogExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + memoryAndRowLogExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + + if (LOG.isInfoEnabled()) { + logMemoryInfo(); } } - public abstract void close(); public abstract boolean getDone(); /** * Logger information to be logged at the end. */ - protected void logCloseInfo() { + private void logMemoryInfo() { long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); - LOG.info("processed " + rowNumber + " rows: used memory = " - + usedMemory); - } - - private long getNextLogThreshold(long currentThreshold) { - // A very simple counter to keep track of number of rows processed by the - // reducer. It dumps - // every 1 million times, and quickly before that - if (currentThreshold >= 1000000) { - return currentThreshold + 1000000; - } - - return 10 * currentThreshold; + LOG.info("Processed " + rowNumber + " rows: used memory = " + usedMemory); } public boolean isAbort() { http://git-wip-us.apache.org/repos/asf/hive/blob/d813b487/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index 20e7ea0..07cb5cb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -401,9 +401,7 @@ public class SparkReduceRecordHandler extends SparkRecordHandler { row.clear(); row.add(keyObject); row.add(valueObject[tag]); - if (LOG.isInfoEnabled()) { - logMemoryInfo(); - } + incrementRowNumber(); try { reducer.process(row, tag); } catch (Exception e) { @@ -571,9 +569,7 @@ public class SparkReduceRecordHandler extends SparkRecordHandler { } batchBytes = 0; - if (LOG.isInfoEnabled()) { - logMemoryInfo(); - } + incrementRowNumber(); } private Object deserializeValue(BytesWritable valueWritable, byte tag) throws HiveException { @@ -593,12 +589,11 @@ public class SparkReduceRecordHandler extends SparkRecordHandler { @Override public void close() { - + super.close(); // No row was processed if (!anyRow) { LOG.trace("Close called without any rows processed"); } - try { if (vectorized) { if (batch.size > 0) { @@ -617,9 +612,6 @@ public class SparkReduceRecordHandler extends SparkRecordHandler { reducer.endGroup(); } } - if (LOG.isInfoEnabled()) { - logCloseInfo(); - } reducer.close(abort);