Repository: hive Updated Branches: refs/heads/master 6d4b19b55 -> a3bac4d7e
HIVE-20847: Review of NullScan Code (Beluga Behr, reviewed by Vineet Garg) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a3bac4d7 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a3bac4d7 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a3bac4d7 Branch: refs/heads/master Commit: a3bac4d7e508268ff9855e16e965e364a3ab46c2 Parents: 6d4b19b Author: Beluga Behr <dam6...@gmail.com> Authored: Fri Jan 25 17:56:52 2019 -0800 Committer: Vineet Garg <vg...@apache.org> Committed: Fri Jan 25 17:58:59 2019 -0800 ---------------------------------------------------------------------- .../optimizer/physical/NullScanOptimizer.java | 83 ++++++++------- .../physical/NullScanTaskDispatcher.java | 101 ++++++++++--------- 2 files changed, 95 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/a3bac4d7/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanOptimizer.java index bc8afbf..282805d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanOptimizer.java @@ -18,20 +18,19 @@ package org.apache.hadoop.hive.ql.optimizer.physical; +import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; +import java.util.Set; import java.util.Stack; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.hive.ql.exec.FilterOperator; import org.apache.hadoop.hive.ql.exec.LimitOperator; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -45,6 +44,8 @@ import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer.Walker import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This optimizer attempts following two optimizations: @@ -54,27 +55,32 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; */ public class NullScanOptimizer implements PhysicalPlanResolver { - private static final Logger LOG = LoggerFactory.getLogger(NullScanOptimizer.class.getName()); - @Override - public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + private static final Logger LOG = + LoggerFactory.getLogger(NullScanOptimizer.class); - Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); - opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%.*" + - FilterOperator.getOperatorName() + "%"), new WhereFalseProcessor()); + @Override + public PhysicalContext resolve(PhysicalContext pctx) + throws SemanticException { + Map<Rule, NodeProcessor> opRules = new LinkedHashMap<>(); + opRules.put( + new RuleRegExp("R1", + TableScanOperator.getOperatorName() + "%.*" + + FilterOperator.getOperatorName() + "%"), + new WhereFalseProcessor()); Dispatcher disp = new NullScanTaskDispatcher(pctx, opRules); GraphWalker ogw = new DefaultGraphWalker(disp); - ArrayList<Node> topNodes = new ArrayList<Node>(); - topNodes.addAll(pctx.getRootTasks()); + List<Node> topNodes = new ArrayList<>(pctx.getRootTasks()); ogw.startWalking(topNodes, null); opRules.clear(); - opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName()+ "%"),new TSMarker()); - opRules.put(new RuleRegExp("R2", LimitOperator.getOperatorName()+ "%"), new Limit0Processor()); + opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%"), + new TSMarker()); + opRules.put(new RuleRegExp("R2", LimitOperator.getOperatorName() + "%"), + new Limit0Processor()); disp = new NullScanTaskDispatcher(pctx, opRules); ogw = new DefaultGraphWalker(disp); - topNodes = new ArrayList<Node>(); - topNodes.addAll(pctx.getRootTasks()); + topNodes = new ArrayList<>(pctx.getRootTasks()); ogw.startWalking(topNodes, null); return pctx; } @@ -82,30 +88,28 @@ public class NullScanOptimizer implements PhysicalPlanResolver { //We need to make sure that Null Operator (LIM or FIL) is present in all branches of multi-insert query before //applying the optimization. This method does full tree traversal starting from TS and will return true only if //it finds target Null operator on each branch. - static private boolean isNullOpPresentInAllBranches(TableScanOperator ts, Node causeOfNullNode) { - Node curNode = null; - List<? extends Node> curChd = null; - LinkedList<Node> middleNodes = new LinkedList<Node>(); - middleNodes.addLast(ts); + private static boolean isNullOpPresentInAllBranches(TableScanOperator ts, Node causeOfNullNode) { + Queue<Node> middleNodes = new ArrayDeque<>(); + middleNodes.add(ts); while (!middleNodes.isEmpty()) { - curNode = middleNodes.remove(); - curChd = curNode.getChildren(); + Node curNode = middleNodes.remove(); + List<? extends Node> curChd = curNode.getChildren(); for (Node chd: curChd) { - if (chd.getChildren() == null || chd.getChildren().isEmpty() || chd == causeOfNullNode) { - if (chd != causeOfNullNode) { // If there is an end node that not the limit0/wherefalse.. + List<? extends Node> children = chd.getChildren(); + if (CollectionUtils.isEmpty(children) || chd == causeOfNullNode) { + // If there is an end node that not the limit0/wherefalse.. + if (chd != causeOfNullNode) { return false; } - } - else { - middleNodes.addLast(chd); + } else { + middleNodes.add(chd); } } - } return true; } - static private class WhereFalseProcessor implements NodeProcessor { + private static class WhereFalseProcessor implements NodeProcessor { @Override public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, @@ -126,7 +130,7 @@ public class NullScanOptimizer implements PhysicalPlanResolver { if (op instanceof TableScanOperator) { if (isNullOpPresentInAllBranches((TableScanOperator)op, filter)) { ctx.setMayBeMetadataOnly((TableScanOperator)op); - LOG.info("Found where false TableScan. " + op); + LOG.debug("Found where false TableScan. {}", op); } } } @@ -135,32 +139,33 @@ public class NullScanOptimizer implements PhysicalPlanResolver { } } - static private class Limit0Processor implements NodeProcessor { + private static class Limit0Processor implements NodeProcessor { @Override public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { LimitOperator limitOp = (LimitOperator)nd; - if(!(limitOp.getConf().getLimit() == 0)) { + if (!(limitOp.getConf().getLimit() == 0)) { return null; } - HashSet<TableScanOperator> tsOps = ((WalkerCtx)procCtx).getMayBeMetadataOnlyTableScans(); + Set<TableScanOperator> tsOps = + ((WalkerCtx) procCtx).getMayBeMetadataOnlyTableScans(); if (tsOps != null) { for (Iterator<TableScanOperator> tsOp = tsOps.iterator(); tsOp.hasNext();) { - if (!isNullOpPresentInAllBranches(tsOp.next(),limitOp)) + if (!isNullOpPresentInAllBranches(tsOp.next(), limitOp)) { tsOp.remove(); + } } } - LOG.info("Found Limit 0 TableScan. " + nd); + LOG.debug("Found Limit 0 TableScan. {}", nd); ((WalkerCtx)procCtx).convertMetadataOnly(); return null; } - } - static private class TSMarker implements NodeProcessor { + private static class TSMarker implements NodeProcessor { @Override public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, http://git-wip-us.apache.org/repos/asf/hive/blob/a3bac4d7/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java index 6c0e71d..ec9813d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java @@ -19,33 +19,31 @@ package org.apache.hadoop.hive.ql.optimizer.physical; import java.io.IOException; - -import org.apache.hadoop.hive.common.StringInternUtils; -import org.apache.hadoop.hive.ql.exec.Utilities; - -import org.apache.hadoop.hive.ql.io.ZeroRowsInputFormat; - import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.Stack; +import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.NullScanFileSystem; import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat; +import org.apache.hadoop.hive.ql.io.ZeroRowsInputFormat; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.GraphWalker; @@ -61,39 +59,44 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.NullStructSerDe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Iterate over all tasks one by one and removes all input paths from task if conditions as - * defined in rules match. + * Iterate over all tasks one by one and removes all input paths from task if + * conditions as defined in rules match. */ public class NullScanTaskDispatcher implements Dispatcher { - static final Logger LOG = LoggerFactory.getLogger(NullScanTaskDispatcher.class.getName()); + static final Logger LOG = + LoggerFactory.getLogger(NullScanTaskDispatcher.class); private final PhysicalContext physicalContext; private final Map<Rule, NodeProcessor> rules; - public NullScanTaskDispatcher(PhysicalContext context, Map<Rule, NodeProcessor> rules) { + public NullScanTaskDispatcher(PhysicalContext context, + Map<Rule, NodeProcessor> rules) { super(); - physicalContext = context; + this.physicalContext = context; this.rules = rules; } private String getAliasForTableScanOperator(MapWork work, TableScanOperator tso) { - - for (Map.Entry<String, Operator<? extends OperatorDesc>> entry : - work.getAliasToWork().entrySet()) { + for (Map.Entry<String, Operator<? extends OperatorDesc>> entry : work + .getAliasToWork().entrySet()) { if (entry.getValue() == tso) { return entry.getKey(); } } - return null; } - private PartitionDesc changePartitionToMetadataOnly(PartitionDesc desc, Path path) { - if (desc == null) return null; + private PartitionDesc changePartitionToMetadataOnly(PartitionDesc desc, + Path path) { + if (desc == null) { + return null; + } boolean isEmpty = false; try { isEmpty = Utilities.isEmptyPath(physicalContext.getConf(), path); @@ -104,25 +107,23 @@ public class NullScanTaskDispatcher implements Dispatcher { isEmpty ? ZeroRowsInputFormat.class : OneNullRowInputFormat.class); desc.setOutputFileFormatClass(HiveIgnoreKeyTextOutputFormat.class); desc.getProperties().setProperty(serdeConstants.SERIALIZATION_LIB, - NullStructSerDe.class.getName()); + NullStructSerDe.class.getName()); return desc; } - private void processAlias(MapWork work, Path path, ArrayList<String> aliasesAffected, - ArrayList<String> aliases) { + private void processAlias(MapWork work, Path path, + Collection<String> aliasesAffected, Set<String> aliases) { // the aliases that are allowed to map to a null scan. - ArrayList<String> allowed = new ArrayList<String>(); - for (String alias : aliasesAffected) { - if (aliases.contains(alias)) { - allowed.add(alias); - } - } - if (allowed.size() > 0) { + Collection<String> allowed = aliasesAffected.stream() + .filter(a -> aliases.contains(a)).collect(Collectors.toList()); + if (!allowed.isEmpty()) { PartitionDesc partDesc = work.getPathToPartitionInfo().get(path).clone(); - PartitionDesc newPartition = changePartitionToMetadataOnly(partDesc, path); + PartitionDesc newPartition = + changePartitionToMetadataOnly(partDesc, path); // Prefix partition with something to avoid it being a hidden file. - Path fakePath = new Path(NullScanFileSystem.getBase() + newPartition.getTableName() - + "/part" + encode(newPartition.getPartSpec())); + Path fakePath = + new Path(NullScanFileSystem.getBase() + newPartition.getTableName() + + "/part" + encode(newPartition.getPartSpec())); StringInternUtils.internUriStringsInPath(fakePath); work.addPathToPartitionInfo(fakePath, newPartition); work.addPathToAlias(fakePath, new ArrayList<>(allowed)); @@ -134,12 +135,11 @@ public class NullScanTaskDispatcher implements Dispatcher { } } - private void processAlias(MapWork work, HashSet<TableScanOperator> tableScans) { - ArrayList<String> aliases = new ArrayList<String>(); + private void processAlias(MapWork work, Set<TableScanOperator> tableScans) { + Set<String> aliases = new HashSet<>(); for (TableScanOperator tso : tableScans) { // use LinkedHashMap<String, Operator<? extends OperatorDesc>> - // getAliasToWork() - // should not apply this for non-native table + // getAliasToWork() should not apply this for non-native table if (tso.getConf().getTableMetadata().getStorageHandler() != null) { continue; } @@ -148,10 +148,10 @@ public class NullScanTaskDispatcher implements Dispatcher { tso.getConf().setIsMetadataOnly(true); } // group path alias according to work - LinkedHashMap<Path, ArrayList<String>> candidates = new LinkedHashMap<>(); + Map<Path, ArrayList<String>> candidates = new HashMap<>(); for (Path path : work.getPaths()) { ArrayList<String> aliasesAffected = work.getPathToAliases().get(path); - if (aliasesAffected != null && aliasesAffected.size() > 0) { + if (CollectionUtils.isNotEmpty(aliasesAffected)) { candidates.put(path, aliasesAffected); } } @@ -183,10 +183,10 @@ public class NullScanTaskDispatcher implements Dispatcher { }); for (MapWork mapWork : mapWorks) { - LOG.debug("Looking at: "+mapWork.getName()); - Collection<Operator<? extends OperatorDesc>> topOperators - = mapWork.getAliasToWork().values(); - if (topOperators.size() == 0) { + LOG.debug("Looking at: {}", mapWork.getName()); + Collection<Operator<? extends OperatorDesc>> topOperators = + mapWork.getAliasToWork().values(); + if (topOperators.isEmpty()) { LOG.debug("No top operators"); return null; } @@ -199,11 +199,11 @@ public class NullScanTaskDispatcher implements Dispatcher { GraphWalker ogw = new PreOrderOnceWalker(disp); // Create a list of topOp nodes - ArrayList<Node> topNodes = new ArrayList<Node>(); + ArrayList<Node> topNodes = new ArrayList<>(); // Get the top Nodes for this task - for (Operator<? extends OperatorDesc> - workOperator : topOperators) { - if (parseContext.getTopOps().values().contains(workOperator)) { + Collection<TableScanOperator> topOps = parseContext.getTopOps().values(); + for (Operator<? extends OperatorDesc> workOperator : topOperators) { + if (topOps.contains(workOperator)) { topNodes.add(workOperator); } } @@ -215,10 +215,11 @@ public class NullScanTaskDispatcher implements Dispatcher { ogw.startWalking(topNodes, null); - LOG.debug(String.format("Found %d null table scans", - walkerCtx.getMetadataOnlyTableScans().size())); - if (walkerCtx.getMetadataOnlyTableScans().size() > 0) + int scanTableSize = walkerCtx.getMetadataOnlyTableScans().size(); + LOG.debug("Found {} null table scans", scanTableSize); + if (scanTableSize > 0) { processAlias(mapWork, walkerCtx.getMetadataOnlyTableScans()); + } } return null; }