Repository: hive Updated Branches: refs/heads/master 68bdf9eb4 -> 14bb84088
HIVE-19986: Add logging of runtime statistics indicating when Hdfs Erasure Coding is used by MR (Andrew Sherman, reviewed by Sahil Takiar) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/14bb8408 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/14bb8408 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/14bb8408 Branch: refs/heads/master Commit: 14bb84088f65467d2ea0cc828a48cd33e3e6666c Parents: 68bdf9e Author: Andrew Sherman <asher...@cloudera.com> Authored: Wed Jul 25 16:23:51 2018 -0500 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Wed Jul 25 16:44:58 2018 -0500 ---------------------------------------------------------------------- .../jdbc/TestJdbcWithMiniHS2ErasureCoding.java | 55 ++++++++++++++++++++ .../org/apache/hadoop/hive/ql/MapRedStats.java | 43 ++++++++++++++- .../hive/ql/exec/mr/HadoopJobExecHelper.java | 2 +- .../hive/ql/processors/ErasureProcessor.java | 10 +++- .../apache/hadoop/hive/shims/Hadoop23Shims.java | 17 ++++++ .../apache/hadoop/hive/shims/HadoopShims.java | 10 ++++ 6 files changed, 134 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/14bb8408/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java index b0a0145..efb3759 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2ErasureCoding.java @@ -19,6 +19,8 @@ package org.apache.hive.jdbc; import java.io.IOException; +import java.io.StringWriter; +import java.io.Writer; import java.nio.file.Paths; import java.sql.Connection; import java.sql.DriverManager; @@ -31,11 +33,17 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.processors.ErasureProcessor; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsErasureCodingShim; import org.apache.hadoop.hive.shims.HadoopShims.MiniDFSShim; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.appender.WriterAppender; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.layout.PatternLayout; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -174,6 +182,53 @@ public class TestJdbcWithMiniHS2ErasureCoding { } /** + * Test MR stats. + */ + @Test + public void testMapRedStats() throws Exception { + // Do log4j magic to save log output + StringWriter writer = new StringWriter(); + Appender appender = addAppender(writer, "testMapRedStats"); + try (Statement stmt = hs2Conn.createStatement()) { + String table = "mapredstats"; + stmt.execute("set hive.execution.engine=mr"); + stmt.execute(" CREATE TABLE " + table + " (a int) STORED AS PARQUET"); + stmt.execute("INSERT INTO TABLE " + table + " VALUES (3)"); + try (ResultSet rs = stmt.executeQuery("select a from " + table + " order by a")) { + while (rs.next()) { + int val = rs.getInt(1); + assertEquals(3, val); + } + } + } + String output = writer.toString(); + // check for standard stats + assertTrue(output.contains("HDFS Read:")); + assertTrue(output.contains("HDFS Write:")); + + // check for erasure coding stat + HadoopShims.HdfsErasureCodingShim erasureShim = ErasureProcessor.getErasureShim(conf); + if (erasureShim.isMapReduceStatAvailable()) { + assertTrue(output.contains("HDFS EC Read:")); + } + } + + /** + * Add an appender to log4j. + * http://logging.apache.org/log4j/2.x/manual/customconfig.html#AddingToCurrent + */ + private Appender addAppender(final Writer writer, final String writerName) { + final LoggerContext context = LoggerContext.getContext(false); + final Configuration config = context.getConfiguration(); + final PatternLayout layout = PatternLayout.createDefaultLayout(config); + final Appender appender = + WriterAppender.createAppender(layout, null, writer, writerName, false, true); + appender.start(); + config.getRootLogger().addAppender(appender, null, null); + return appender; + } + + /** * Add a Erasure Coding Policy to a Path. */ private static void addErasurePolicy(MiniDFSShim dfs, String pathString, String policyName) throws IOException { http://git-wip-us.apache.org/repos/asf/hive/blob/14bb8408/ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java b/ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java index 483c3d9..ac45ec4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java @@ -18,8 +18,15 @@ package org.apache.hadoop.hive.ql; +import java.io.IOException; + +import org.apache.hadoop.hive.ql.processors.ErasureProcessor; +import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.JobConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * MapRedStats. @@ -30,6 +37,9 @@ import org.apache.hadoop.mapred.Counters.Counter; * */ public class MapRedStats { + private static final String CLASS_NAME = MapRedStats.class.getName(); + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + private JobConf jobConf; int numMap; int numReduce; long cpuMSec; @@ -40,7 +50,8 @@ public class MapRedStats { private long numModifiedRows; - public MapRedStats(int numMap, int numReduce, long cpuMSec, boolean ifSuccess, String jobId) { + public MapRedStats(JobConf jobConf, int numMap, int numReduce, long cpuMSec, boolean ifSuccess, String jobId) { + this.jobConf = jobConf; this.numMap = numMap; this.numReduce = numReduce; this.cpuMSec = cpuMSec; @@ -144,10 +155,40 @@ public class MapRedStats { if (hdfsWrittenCntr != null && (hdfsWritten = hdfsWrittenCntr.getValue()) >= 0) { sb.append(" HDFS Write: " + hdfsWritten); } + + HadoopShims.HdfsErasureCodingShim erasureShim = getHdfsErasureCodingShim(); + + if (erasureShim != null && erasureShim.isMapReduceStatAvailable()) { + // Erasure Coding stats - added in HADOOP-15507, expected in Hadoop 3.2.0 + Counter hdfsReadEcCntr = counters.findCounter("FileSystemCounters", + "HDFS_BYTES_READ_EC"); // FileSystemCounter.BYTES_READ_EC + if (hdfsReadEcCntr != null) { + long hdfsReadEc = hdfsReadEcCntr.getValue(); + if (hdfsReadEc >= 0) { + sb.append(" HDFS EC Read: " + hdfsReadEc); + } + } + } } sb.append(" " + (success ? "SUCCESS" : "FAIL")); return sb.toString(); } + + /** + * Get the Erasure Coding Shim. + * @return a HdfsErasureCodingShim + */ + private HadoopShims.HdfsErasureCodingShim getHdfsErasureCodingShim() { + HadoopShims.HdfsErasureCodingShim erasureShim = null; + try { + erasureShim = ErasureProcessor.getErasureShim(jobConf); + } catch (IOException e) { + // this should not happen + LOG.warn("Could not get Erasure Coding shim for reason: " + e.getMessage()); + // fall through + } + return erasureShim; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/14bb8408/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index c31e22f..eb6cbf7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -419,7 +419,7 @@ public class HadoopJobExecHelper { } } - MapRedStats mapRedStats = new MapRedStats(numMap, numReduce, cpuMsec, success, rj.getID().toString()); + MapRedStats mapRedStats = new MapRedStats(job, numMap, numReduce, cpuMsec, success, rj.getID().toString()); mapRedStats.setCounters(ctrs); // update based on the final value of the counters http://git-wip-us.apache.org/repos/asf/hive/blob/14bb8408/ql/src/java/org/apache/hadoop/hive/ql/processors/ErasureProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/ErasureProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/processors/ErasureProcessor.java index 46114f5..04cc8b0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/processors/ErasureProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/ErasureProcessor.java @@ -31,6 +31,7 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -69,9 +70,16 @@ public class ErasureProcessor implements CommandProcessor { private HadoopShims.HdfsErasureCodingShim erasureCodingShim; ErasureProcessor(HiveConf config) throws IOException { + this.erasureCodingShim = getErasureShim(config); + } + + /** + * Get an instance of HdfsErasureCodingShim from a config. + */ + public static HadoopShims.HdfsErasureCodingShim getErasureShim(Configuration config) throws IOException { HadoopShims hadoopShims = ShimLoader.getHadoopShims(); FileSystem fileSystem = FileSystem.get(config); - this.erasureCodingShim = hadoopShims.createHdfsErasureCodingShim(fileSystem, config); + return hadoopShims.createHdfsErasureCodingShim(fileSystem, config); } private CommandLine parseCommandArgs(final Options opts, String[] args) throws ParseException { http://git-wip-us.apache.org/repos/asf/hive/blob/14bb8408/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java ---------------------------------------------------------------------- diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 02490f1..98c3eef 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.shims; import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.InetSocketAddress; @@ -78,6 +79,7 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.WebHCatJTShim23; import org.apache.hadoop.mapred.lib.TotalOrderPartitioner; +import org.apache.hadoop.mapreduce.FileSystemCounter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; @@ -1607,5 +1609,20 @@ public class Hadoop23Shims extends HadoopShimsSecure { public void disableErasureCodingPolicy(String ecPolicyName) throws IOException { hdfsAdmin.disableErasureCodingPolicy(ecPolicyName); } + + /** + * @return true if if the runtime MR stat for Erasure Coding is available. + */ + @Override + public boolean isMapReduceStatAvailable() { + // Look for FileSystemCounter.BYTES_READ_EC, this is present in hadoop 3.2 + Field field = null; + try { + field = FileSystemCounter.class.getField("BYTES_READ_EC"); + } catch (NoSuchFieldException e) { + // This version of Hadoop does not support EC stats for MR + } + return (field != null); + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/14bb8408/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java ---------------------------------------------------------------------- diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 2e84ca9..84e6430 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -691,6 +691,11 @@ public interface HadoopShims { * @param ecPolicyName the name of the erasure coding policy */ void disableErasureCodingPolicy(String ecPolicyName) throws IOException; + + /** + * @return true if if the runtime MR stat for Erasure Coding is available. + */ + boolean isMapReduceStatAvailable(); } /** @@ -728,6 +733,11 @@ public interface HadoopShims { public void disableErasureCodingPolicy(String ecPolicyName) throws IOException { } + @Override + public boolean isMapReduceStatAvailable() { + return false; + } + } /**