Repository: hive Updated Branches: refs/heads/master f778e8c50 -> f067df6f5
HIVE-16758: Better Select Number of Replications (BELUGA BEHR, reviewed by Chao Sun) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f067df6f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f067df6f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f067df6f Branch: refs/heads/master Commit: f067df6f5817429d52a96fb78cdbcfbe83c1f497 Parents: f778e8c Author: BELUGA BEHR <dam6...@gmail.com> Authored: Tue Aug 8 10:27:18 2017 -0700 Committer: Chao Sun <sunc...@apache.org> Committed: Tue Aug 8 10:27:18 2017 -0700 ---------------------------------------------------------------------- .../hive/ql/exec/SparkHashTableSinkOperator.java | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f067df6f/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java index c3b1d0a..7c1b714 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java @@ -49,11 +49,13 @@ public class SparkHashTableSinkOperator private final String CLASS_NAME = this.getClass().getName(); private final transient PerfLogger perfLogger = SessionState.getPerfLogger(); protected static final Logger LOG = LoggerFactory.getLogger(SparkHashTableSinkOperator.class.getName()); - public static final String DFS_REPLICATION_MAX = "dfs.replication.max"; - private int minReplication = 10; + private static final String MAPRED_FILE_REPLICATION = "mapreduce.client.submit.file.replication"; + private static final int DEFAULT_REPLICATION = 10; private final HashTableSinkOperator htsOperator; + private short numReplication; + /** Kryo ctor. */ protected SparkHashTableSinkOperator() { super(); @@ -72,9 +74,7 @@ public class SparkHashTableSinkOperator byte tag = conf.getTag(); inputOIs[tag] = inputObjInspectors[0]; conf.setTagOrder(new Byte[]{ tag }); - int dfsMaxReplication = hconf.getInt(DFS_REPLICATION_MAX, minReplication); - // minReplication value should not cross the value of dfs.replication.max - minReplication = Math.min(minReplication, dfsMaxReplication); + numReplication = (short) hconf.getInt(MAPRED_FILE_REPLICATION, DEFAULT_REPLICATION); htsOperator.setConf(conf); htsOperator.initialize(hconf, inputOIs); } @@ -136,7 +136,6 @@ public class SparkHashTableSinkOperator String dumpFilePrefix = conf.getDumpFilePrefix(); Path path = Utilities.generatePath(tmpURI, dumpFilePrefix, tag, fileName); FileSystem fs = path.getFileSystem(htsOperator.getConfiguration()); - short replication = fs.getDefaultReplication(path); fs.mkdirs(path); // Create the folder and its parents if not there while (true) { @@ -151,9 +150,7 @@ public class SparkHashTableSinkOperator // No problem, use a new name } } - // TODO find out numOfPartitions for the big table - int numOfPartitions = replication; - replication = (short) Math.max(minReplication, numOfPartitions); + htsOperator.console.printInfo(Utilities.now() + "\tDump the side-table for tag: " + tag + " with group count: " + tableContainer.size() + " into file: " + path); try { @@ -162,7 +159,7 @@ public class SparkHashTableSinkOperator ObjectOutputStream out = null; MapJoinTableContainerSerDe mapJoinTableSerde = htsOperator.mapJoinTableSerdes[tag]; try { - os = fs.create(path, replication); + os = fs.create(path, numReplication); out = new ObjectOutputStream(new BufferedOutputStream(os, 4096)); mapJoinTableSerde.persist(out, tableContainer); } finally {