Repository: drill Updated Branches: refs/heads/master 9c0738d94 -> 3442215fd
DRILL-2236: Optimize hash inner join by swapping inputs based on row count comparison. Add a planner option to enable/disable this feature. Revise code based on review comments. Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3442215f Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3442215f Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3442215f Branch: refs/heads/master Commit: 3442215fd91e700f659bc055cd7c05b623bc59b3 Parents: 9c0738d Author: Jinfeng Ni <j...@maprtech.com> Authored: Thu Jan 29 13:24:28 2015 -0800 Committer: Jinfeng Ni <j...@maprtech.com> Committed: Mon Mar 2 10:03:31 2015 -0800 ---------------------------------------------------------------------- .../exec/planner/physical/HashJoinPrel.java | 54 +++++++++---- .../drill/exec/planner/physical/JoinPrel.java | 4 +- .../exec/planner/physical/MergeJoinPrel.java | 2 +- .../exec/planner/physical/PlannerSettings.java | 11 +++ .../physical/explain/NumberingRelWriter.java | 7 ++ .../physical/visitor/SwapHashJoinVisitor.java | 79 ++++++++++++++++++++ .../planner/sql/handlers/DefaultSqlHandler.java | 13 +++- .../server/options/SystemOptionManager.java | 2 + 8 files changed, 154 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java index a3c42de..f63057f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.physical; import java.io.IOException; import java.util.List; +import net.hydromatic.optiq.runtime.FlatLists; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.logical.data.JoinCondition; import org.apache.drill.exec.ExecConstants; @@ -46,18 +47,24 @@ import com.google.common.collect.Lists; public class HashJoinPrel extends JoinPrel { + private boolean swapped = false; + public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition, - JoinRelType joinType) throws InvalidRelException { - super(cluster, traits, left, right, condition, joinType); + JoinRelType joinType) throws InvalidRelException { + this(cluster, traits, left, right, condition, joinType, false); + } + public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition, + JoinRelType joinType, boolean swapped) throws InvalidRelException { + super(cluster, traits, left, right, condition, joinType); + this.swapped = swapped; RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys); } - @Override public JoinRelBase copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) { try { - return new HashJoinPrel(this.getCluster(), traitSet, left, right, conditionExpr, joinType); + return new HashJoinPrel(this.getCluster(), traitSet, left, right, conditionExpr, joinType, this.swapped); }catch (InvalidRelException e) { throw new AssertionError(e); } @@ -100,11 +107,32 @@ public class HashJoinPrel extends JoinPrel { @Override public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { + // Depending on whether the left/right is swapped for hash inner join, pass in different + // combinations of parameters. + if (! swapped) { + return getHashJoinPop(creator, left, right, leftKeys, rightKeys); + } else { + return getHashJoinPop(creator, right, left, rightKeys, leftKeys); + } + } + + @Override + public SelectionVectorMode[] getSupportedEncodings() { + return SelectionVectorMode.DEFAULT; + } + + @Override + public SelectionVectorMode getEncoding() { + return SelectionVectorMode.NONE; + } + + private PhysicalOperator getHashJoinPop(PhysicalPlanCreator creator, RelNode left, RelNode right, + List<Integer> leftKeys, List<Integer> rightKeys) throws IOException{ final List<String> fields = getRowType().getFieldNames(); assert isUnique(fields); - final int leftCount = left.getRowType().getFieldCount(); - final List<String> leftFields = fields.subList(0, leftCount); - final List<String> rightFields = fields.subList(leftCount, fields.size()); + + final List<String> leftFields = left.getRowType().getFieldNames(); + final List<String> rightFields = right.getRowType().getFieldNames(); PhysicalOperator leftPop = ((Prel)left).getPhysicalOperator(creator); PhysicalOperator rightPop = ((Prel)right).getPhysicalOperator(creator); @@ -113,20 +141,18 @@ public class HashJoinPrel extends JoinPrel { List<JoinCondition> conditions = Lists.newArrayList(); - buildJoinConditions(conditions, leftFields, rightFields); + buildJoinConditions(conditions, leftFields, rightFields, leftKeys, rightKeys); HashJoinPOP hjoin = new HashJoinPOP(leftPop, rightPop, conditions, jtype); return creator.addMetadata(this, hjoin); } - @Override - public SelectionVectorMode[] getSupportedEncodings() { - return SelectionVectorMode.DEFAULT; + public void setSwapped(boolean swapped) { + this.swapped = swapped; } - @Override - public SelectionVectorMode getEncoding() { - return SelectionVectorMode.NONE; + public boolean isSwapped() { + return this.swapped; } } http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java index 3541db7..bfecd06 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java @@ -118,7 +118,9 @@ public abstract class JoinPrel extends DrillJoinRelBase implements Prel{ */ protected void buildJoinConditions(List<JoinCondition> conditions, List<String> leftFields, - List<String> rightFields) { + List<String> rightFields, + List<Integer> leftKeys, + List<Integer> rightKeys) { List<RexNode> conjuncts = RelOptUtil.conjunctions(this.getCondition()); short i=0; http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java index 394a82c..b7e86e3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java @@ -99,7 +99,7 @@ public class MergeJoinPrel extends JoinPrel { List<JoinCondition> conditions = Lists.newArrayList(); - buildJoinConditions(conditions, leftFields, rightFields); + buildJoinConditions(conditions, leftFields, rightFields, leftKeys, rightKeys); MergeJoinPOP mjoin = new MergeJoinPOP(leftPop, rightPop, conditions, jtype); return creator.addMetadata(this, mjoin); http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java index 96be07d..bbfbbcb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java @@ -49,6 +49,9 @@ public class PlannerSettings implements Context{ public static final OptionValidator PRODUCER_CONSUMER = new BooleanValidator("planner.add_producer_consumer", false); public static final OptionValidator PRODUCER_CONSUMER_QUEUE_SIZE = new LongValidator("planner.producer_consumer_queue_size", 10); public static final OptionValidator HASH_SINGLE_KEY = new BooleanValidator("planner.enable_hash_single_key", true); + public static final OptionValidator HASH_JOIN_SWAP = new BooleanValidator("planner.enable_hashjoin_swap", true); + public static final OptionValidator HASH_JOIN_SWAP_MARGIN_FACTOR = new RangeDoubleValidator("planner.join.hash_join_swap_margin_factor", 0, 100, 10d); + public static final OptionValidator IDENTIFIER_MAX_LENGTH = new RangeLongValidator("planner.identifier_max_length", 128 /* A minimum length is needed because option names are identifiers themselves */, Integer.MAX_VALUE, DEFAULT_IDENTIFIER_MAX_LENGTH); @@ -117,6 +120,14 @@ public class PlannerSettings implements Context{ return options.getOption(HASH_SINGLE_KEY.getOptionName()).bool_val; } + public boolean isHashJoinSwapEnabled() { + return options.getOption(HASH_JOIN_SWAP.getOptionName()).bool_val; + } + + public double getHashJoinSwapMarginFactor() { + return options.getOption(HASH_JOIN_SWAP_MARGIN_FACTOR.getOptionName()).float_val / 100d; + } + public long getBroadcastThreshold() { return options.getOption(BROADCAST_THRESHOLD.getOptionName()).num_val; } http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/NumberingRelWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/NumberingRelWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/NumberingRelWriter.java index 6522ad9..387a442 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/NumberingRelWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/explain/NumberingRelWriter.java @@ -23,8 +23,10 @@ import java.util.List; import java.util.Map; import net.hydromatic.linq4j.Ord; +import net.hydromatic.optiq.runtime.FlatLists; import net.hydromatic.optiq.runtime.Spacer; +import org.apache.drill.exec.planner.physical.HashJoinPrel; import org.apache.drill.exec.planner.physical.Prel; import org.apache.drill.exec.planner.physical.explain.PrelSequencer.OpId; import org.eigenbase.rel.RelNode; @@ -62,6 +64,10 @@ class NumberingRelWriter implements RelWriter { RelNode rel, List<Pair<String, Object>> values) { List<RelNode> inputs = rel.getInputs(); + if (rel instanceof HashJoinPrel && ((HashJoinPrel) rel).isSwapped()) { + HashJoinPrel joinPrel = (HashJoinPrel) rel; + inputs = FlatLists.of(joinPrel.getRight(), joinPrel.getLeft()); + } if (!RelMetadataQuery.isVisibleInExplain( rel, @@ -106,6 +112,7 @@ class NumberingRelWriter implements RelWriter { } } if (detailLevel == SqlExplainLevel.ALL_ATTRIBUTES) { + s.append(" : rowType = " + rel.getRowType().toString()); s.append(": rowcount = ") .append(RelMetadataQuery.getRowCount(rel)) .append(", cumulative cost = ") http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java new file mode 100644 index 0000000..18d5e60 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.planner.physical.visitor; + +import com.google.common.collect.Lists; +import org.apache.drill.exec.planner.physical.HashJoinPrel; +import org.apache.drill.exec.planner.physical.JoinPrel; +import org.apache.drill.exec.planner.physical.Prel; +import org.apache.drill.exec.planner.physical.PrelUtil; +import org.eigenbase.rel.JoinRelType; +import org.eigenbase.rel.RelNode; +import org.eigenbase.rex.RexNode; + +import java.util.List; + +/** + * Visit Prel tree. Find all the HashJoinPrel nodes and set the flag to swap the Left/Right for HashJoinPrel + * when 1) It's inner join, 2) left rowcount is < (1 + percentage) * right_row_count. + * The purpose of this visitor is to prevent planner from putting bigger dataset in the RIGHT side, + * which is not good performance-wise. + * + * @see org.apache.drill.exec.planner.physical.HashJoinPrel + */ + +public class SwapHashJoinVisitor extends BasePrelVisitor<Prel, Double, RuntimeException>{ + + private static SwapHashJoinVisitor INSTANCE = new SwapHashJoinVisitor(); + + public static Prel swapHashJoin(Prel prel, Double marginFactor){ + return prel.accept(INSTANCE, marginFactor); + } + + private SwapHashJoinVisitor() { + + } + + @Override + public Prel visitPrel(Prel prel, Double value) throws RuntimeException { + List<RelNode> children = Lists.newArrayList(); + for(Prel child : prel){ + child = child.accept(this, value); + children.add(child); + } + + return (Prel) prel.copy(prel.getTraitSet(), children); + } + + @Override + public Prel visitJoin(JoinPrel prel, Double value) throws RuntimeException { + JoinPrel newJoin = (JoinPrel) visitPrel(prel, value); + + if (prel instanceof HashJoinPrel) { + // Mark left/right is swapped, when INNER hash join's left row count < ( 1+ margin factor) right row count. + if (newJoin.getLeft().getRows() < (1 + value.doubleValue() ) * newJoin.getRight().getRows() && + newJoin.getJoinType() == JoinRelType.INNER) { + ( (HashJoinPrel) newJoin).setSwapped(true); + } + } + + return newJoin; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index 35e7f5c..232778a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -56,6 +56,7 @@ import org.apache.drill.exec.planner.physical.visitor.RewriteProjectToFlatten; import org.apache.drill.exec.planner.physical.visitor.SelectionVectorPrelVisitor; import org.apache.drill.exec.planner.physical.visitor.SplitUpComplexExpressions; import org.apache.drill.exec.planner.physical.visitor.StarColumnConverter; +import org.apache.drill.exec.planner.physical.visitor.SwapHashJoinVisitor; import org.apache.drill.exec.planner.sql.DrillSqlWorker; import org.apache.drill.exec.planner.sql.parser.UnsupportedOperatorsVisitor; import org.apache.drill.exec.server.options.OptionManager; @@ -246,12 +247,20 @@ public class DefaultSqlHandler extends AbstractSqlHandler { phyRelNode = JoinPrelRenameVisitor.insertRenameProject(phyRelNode); /* - * 1.1) Break up all expressions with complex outputs into their own project operations + * 1.1) Swap left / right for INNER hash join, if left's row count is < (1 + margin) right's row count. + * We want to have smaller dataset on the right side, since hash table builds on right side. + */ + if (context.getPlannerSettings().isHashJoinSwapEnabled()) { + phyRelNode = SwapHashJoinVisitor.swapHashJoin(phyRelNode, new Double(context.getPlannerSettings().getHashJoinSwapMarginFactor())); + } + + /* + * 1.2) Break up all expressions with complex outputs into their own project operations */ phyRelNode = ((Prel) phyRelNode).accept(new SplitUpComplexExpressions(planner.getTypeFactory(), context.getDrillOperatorTable(), context.getPlannerSettings().functionImplementationRegistry), null); /* - * 1.2) Projections that contain reference to flatten are rewritten as Flatten operators followed by Project + * 1.3) Projections that contain reference to flatten are rewritten as Flatten operators followed by Project */ phyRelNode = ((Prel) phyRelNode).accept(new RewriteProjectToFlatten(planner.getTypeFactory(), context.getDrillOperatorTable()), null); http://git-wip-us.apache.org/repos/asf/drill/blob/3442215f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index aa0a5ad..3d3e96f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -54,6 +54,8 @@ public class SystemOptionManager implements OptionManager { PlannerSettings.PRODUCER_CONSUMER_QUEUE_SIZE, PlannerSettings.HASH_SINGLE_KEY, PlannerSettings.IDENTIFIER_MAX_LENGTH, + PlannerSettings.HASH_JOIN_SWAP, + PlannerSettings.HASH_JOIN_SWAP_MARGIN_FACTOR, ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION, ExecConstants.OUTPUT_FORMAT_VALIDATOR, ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,