Repository: hive Updated Branches: refs/heads/master 9fc51b0c4 -> 1c123126a
HIVE-14299: Log serialized plan size (Prasanth Jayachandran reviewed by Ashutosh Chauhan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1c123126 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1c123126 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1c123126 Branch: refs/heads/master Commit: 1c123126a80fc80912488b14946e1338a36b3c68 Parents: 9fc51b0 Author: Prasanth Jayachandran <prasan...@apache.org> Authored: Sun Aug 14 16:09:46 2016 -0700 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Sun Aug 14 16:09:46 2016 -0700 ---------------------------------------------------------------------- .../hive/ql/exec/SerializationUtilities.java | 1 + .../apache/hadoop/hive/ql/exec/Utilities.java | 35 +++++++++++++++++--- .../hadoop/hive/ql/exec/tez/TezJobMonitor.java | 23 ++++--------- 3 files changed, 38 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1c123126/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java index 277683e..42c1003 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java @@ -38,6 +38,7 @@ import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.tez.TezJobMonitor; import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; http://git-wip-us.apache.org/repos/asf/hive/blob/1c123126/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index e78c3d5..c97c335 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -407,6 +407,8 @@ public final class Utilities { if (gWork == null) { Path localPath = path; LOG.debug("local path = " + localPath); + final long serializedSize; + final String planMode; if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) { LOG.debug("Loading plan from string: "+path.toUri().getPath()); String planString = conf.getRaw(path.toUri().getPath()); @@ -414,12 +416,17 @@ public final class Utilities { LOG.info("Could not find plan string in conf"); return null; } + serializedSize = planString.length(); + planMode = "RPC"; byte[] planBytes = Base64.decodeBase64(planString); in = new ByteArrayInputStream(planBytes); in = new InflaterInputStream(in); } else { LOG.debug("Open file to read in plan: " + localPath); - in = localPath.getFileSystem(conf).open(localPath); + FileSystem fs = localPath.getFileSystem(conf); + in = fs.open(localPath); + serializedSize = fs.getFileStatus(localPath).getLen(); + planMode = "FILE"; } if(MAP_PLAN_NAME.equals(name)){ @@ -451,6 +458,8 @@ public final class Utilities { throw new RuntimeException("Unknown work type: " + name); } } + LOG.info("Deserialized plan (via {}) - name: {} size: {}", planMode, + gWork.getName(), humanReadableByteCount(serializedSize)); gWorkMap.get(conf).put(path, gWork); } else if (LOG.isDebugEnabled()) { LOG.debug("Found plan in cache for name: " + name); @@ -539,6 +548,8 @@ public final class Utilities { OutputStream out = null; + final long serializedSize; + final String planMode; if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) { // add it to the conf ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); @@ -550,9 +561,10 @@ public final class Utilities { } finally { IOUtils.closeStream(out); } - LOG.info("Setting plan: "+planPath.toUri().getPath()); - conf.set(planPath.toUri().getPath(), - Base64.encodeBase64String(byteOut.toByteArray())); + final String serializedPlan = Base64.encodeBase64String(byteOut.toByteArray()); + serializedSize = serializedPlan.length(); + planMode = "RPC"; + conf.set(planPath.toUri().getPath(), serializedPlan); } else { // use the default file system of the conf FileSystem fs = planPath.getFileSystem(conf); @@ -561,6 +573,9 @@ public final class Utilities { SerializationUtilities.serializePlan(kryo, w, out); out.close(); out = null; + long fileLen = fs.getFileStatus(planPath).getLen(); + serializedSize = fileLen; + planMode = "FILE"; } finally { IOUtils.closeStream(out); } @@ -583,6 +598,8 @@ public final class Utilities { } } + LOG.info("Serialized plan (via {}) - name: {} size: {}", planMode, w.getName(), + humanReadableByteCount(serializedSize)); // Cache the plan in this process gWorkMap.get(conf).put(planPath, w); return planPath; @@ -3697,4 +3714,14 @@ public final class Utilities { } return result; } + + public static String humanReadableByteCount(long bytes) { + int unit = 1000; // use binary units instead? + if (bytes < unit) { + return bytes + "B"; + } + int exp = (int) (Math.log(bytes) / Math.log(unit)); + String suffix = "KMGTPE".charAt(exp-1) + ""; + return String.format("%.2f%sB", bytes / Math.pow(unit, exp), suffix); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/1c123126/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index 67cd38d..d617879 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -635,17 +635,6 @@ public class TezJobMonitor { } } - - private String humanReadableByteCount(long bytes) { - int unit = 1000; // use binary units instead? - if (bytes < unit) { - return bytes + "B"; - } - int exp = (int) (Math.log(bytes) / Math.log(unit)); - String suffix = "KMGTPE".charAt(exp-1) + ""; - return String.format("%.2f%sB", bytes / Math.pow(unit, exp), suffix); - } - private void printLlapIOSummary(Map<String, Progress> progressMap, LogHelper console, DAGClient dagClient) { SortedSet<String> keys = new TreeSet<>(progressMap.keySet()); @@ -697,10 +686,10 @@ public class TezJobMonitor { selectedRowgroups, metadataCacheHit, metadataCacheMiss, - humanReadableByteCount(cacheHitBytes), - humanReadableByteCount(cacheMissBytes), - humanReadableByteCount(allocatedBytes), - humanReadableByteCount(allocatedUsedBytes), + Utilities.humanReadableByteCount(cacheHitBytes), + Utilities.humanReadableByteCount(cacheMissBytes), + Utilities.humanReadableByteCount(allocatedBytes), + Utilities.humanReadableByteCount(allocatedUsedBytes), secondsFormat.format(totalIoTime / 1000_000_000.0) + "s"); console.printInfo(queryFragmentStats); } @@ -750,10 +739,10 @@ public class TezJobMonitor { String fsCountersSummary = String.format(FS_COUNTERS_HEADER_FORMAT, vertexName, - humanReadableByteCount(bytesRead), + Utilities.humanReadableByteCount(bytesRead), readOps, largeReadOps, - humanReadableByteCount(bytesWritten), + Utilities.humanReadableByteCount(bytesWritten), writeOps); console.printInfo(fsCountersSummary); }