Repository: phoenix Updated Branches: refs/heads/calcite 9adb3e00f -> 208150098
Bug fixes for multi joins Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/20815009 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/20815009 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/20815009 Branch: refs/heads/calcite Commit: 20815009859b2a0e179a8831289fc3e56a6cec67 Parents: 9adb3e0 Author: maryannxue <wei....@intel.com> Authored: Wed Mar 25 18:14:31 2015 -0400 Committer: maryannxue <wei....@intel.com> Committed: Wed Mar 25 18:14:31 2015 -0400 ---------------------------------------------------------------------- .../org/apache/phoenix/calcite/CalciteTest.java | 54 +++++++++++++------- .../org/apache/phoenix/calcite/PhoenixJoin.java | 41 +++++++++++++-- .../phoenix/calcite/PhoenixTableScan.java | 14 ++++- .../calcite/PhoenixToEnumerableConverter.java | 2 - 4 files changed, 85 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/20815009/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java index a719271..a9ad76b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java @@ -188,8 +188,7 @@ public class CalciteTest extends BaseClientManagedTimeIT { @Test public void testTableScan() throws Exception { start().sql("select * from aTable where a_string = 'a'") .explainIs("PhoenixToEnumerableConverter\n" + - " PhoenixFilter(condition=[=($2, 'a')])\n" + - " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") + " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')])\n") .resultIs(new Object[][] { {"00D300000000XHP", "00A123122312312", "a"}, {"00D300000000XHP", "00A223122312312", "a"}, @@ -201,9 +200,7 @@ public class CalciteTest extends BaseClientManagedTimeIT { @Test public void testProject() throws Exception { start().sql("select entity_id, a_string, organization_id from aTable where a_string = 'a'") .explainIs("PhoenixToEnumerableConverter\n" + - " PhoenixProject(ENTITY_ID=[$1], A_STRING=[$2], ORGANIZATION_ID=[$0])\n" + - " PhoenixFilter(condition=[=($2, 'a')])\n" + - " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") + " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')], project=[[$1, $2, $0]])\n") .resultIs(new Object[][] { {"00A123122312312", "a", "00D300000000XHP"}, {"00A223122312312", "a", "00D300000000XHP"}, @@ -217,11 +214,8 @@ public class CalciteTest extends BaseClientManagedTimeIT { .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixProject(ENTITY_ID=[$4], A_STRING=[$2], ORGANIZATION_ID=[$3])\n" + " PhoenixJoin(condition=[AND(=($4, $1), =($3, $0))], joinType=[inner])\n" + - " PhoenixProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" + - " PhoenixTableScan(table=[[phoenix, ATABLE]])\n" + - " PhoenixProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" + - " PhoenixFilter(condition=[=($2, 'a')])\n" + - " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") + " PhoenixTableScan(table=[[phoenix, ATABLE]], project=[[$0, $1, $2]])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')], project=[[$0, $1, $2]])\n") .resultIs(new Object[][] { {"00A123122312312", "a", "00D300000000XHP"}, {"00A223122312312", "a", "00D300000000XHP"}, @@ -233,10 +227,8 @@ public class CalciteTest extends BaseClientManagedTimeIT { .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixProject(item_id=[$0], NAME=[$1], supplier_id=[$3], NAME0=[$4])\n" + " PhoenixJoin(condition=[=($2, $3)], joinType=[inner])\n" + - " PhoenixProject(item_id=[$0], NAME=[$1], supplier_id=[$5])\n" + - " PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n" + - " PhoenixProject(supplier_id=[$0], NAME=[$1])\n" + - " PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n") + " PhoenixTableScan(table=[[phoenix, ITEMTABLE]], project=[[$0, $1, $5]])\n" + + " PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]], project=[[$0, $1]])\n") .resultIs(new Object[][] { {"0000000001", "T1", "0000000001", "S1"}, {"0000000002", "T2", "0000000001", "S1"}, @@ -250,13 +242,37 @@ public class CalciteTest extends BaseClientManagedTimeIT { @Test public void testMultiJoin() throws Exception { start().sql("select t1.entity_id, t2.a_string, t3.organization_id from aTable t1 join aTable t2 on t1.entity_id = t2.entity_id and t1.organization_id = t2.organization_id join atable t3 on t1.entity_id = t3.entity_id and t1.organization_id = t3.organization_id where t1.a_string = 'a'") .explainIs("PhoenixToEnumerableConverter\n" + - " PhoenixProject(ENTITY_ID=[$1], A_STRING=[$20], ORGANIZATION_ID=[$36])\n" + - " PhoenixJoin(condition=[AND(=($1, $37), =($0, $36))], joinType=[inner])\n" + - " PhoenixJoin(condition=[AND(=($1, $19), =($0, $18))], joinType=[inner])\n" + - " PhoenixFilter(condition=[=($2, 'a')])\n" + + " PhoenixProject(ENTITY_ID=[$19], A_STRING=[$2], ORGANIZATION_ID=[$36])\n" + + " PhoenixJoin(condition=[AND(=($19, $1), =($18, $0))], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n" + + " PhoenixProject(ORGANIZATION_ID=[$18], ENTITY_ID=[$19], A_STRING=[$20], B_STRING=[$21], A_INTEGER=[$22], A_DATE=[$23], A_TIME=[$24], A_TIMESTAMP=[$25], X_DECIMAL=[$26], X_LONG=[$27], X_INTEGER=[$28], Y_INTEGER=[$29], A_BYTE=[$30], A_SHORT=[$31], A_FLOAT=[$32], A_DOUBLE=[$33], A_UNSIGNED_FLOAT=[$34], A_UNSIGNED_DOUBLE=[$35], ORGANIZATION_ID0=[$0], ENTITY_ID0=[$1], A_STRING0=[$2], B_STRING0=[$3], A_INTEGER0=[$4], A_DATE0=[$5], A_TIME0=[$6], A_TIMESTAMP0=[$7], X_DECIMAL0=[$8], X_LONG0=[$9], X_INTEGER0=[$10], Y_INTEGER0=[$11], A_BYTE0=[$12], A_SHORT0=[$13], A_FLOAT0=[$14], A_DOUBLE0=[$15], A_UNSIGNED_FLOAT0=[$16], A_UNSIGNED_DOUBLE0=[$17])\n" + + " PhoenixJoin(condition=[AND(=($19, $1), =($18, $0))], joinType=[inner])\n" + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')])\n") + .resultIs(new Object[][] { + {"00A123122312312", "a", "00D300000000XHP"}, + {"00A223122312312", "a", "00D300000000XHP"}, + {"00A323122312312", "a", "00D300000000XHP"}, + {"00A423122312312", "a", "00D300000000XHP"}}) + .close(); + start().sql("select t1.entity_id, t2.a_string, t3.organization_id from aTable t1 join aTable t2 on t1.entity_id = t2.entity_id and t1.organization_id = t2.organization_id join atable t3 on t1.entity_id = t3.entity_id and t1.organization_id = t3.organization_id") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixProject(ENTITY_ID=[$19], A_STRING=[$2], ORGANIZATION_ID=[$36])\n" + + " PhoenixJoin(condition=[AND(=($19, $1), =($18, $0))], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n" + + " PhoenixJoin(condition=[AND(=($1, $19), =($0, $18))], joinType=[inner])\n" + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n" + - " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") + .resultIs(new Object[][] { + {"00A123122312312", "a", "00D300000000XHP"}, + {"00A223122312312", "a", "00D300000000XHP"}, + {"00A323122312312", "a", "00D300000000XHP"}, + {"00A423122312312", "a", "00D300000000XHP"}, + {"00B523122312312", "b", "00D300000000XHP"}, + {"00B623122312312", "b", "00D300000000XHP"}, + {"00B723122312312", "b", "00D300000000XHP"}, + {"00B823122312312", "b", "00D300000000XHP"}, + {"00C923122312312", "c", "00D300000000XHP"}}) .close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/20815009/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java index b666984..e5f9cda 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java @@ -6,12 +6,17 @@ import java.util.List; import java.util.Set; import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.plan.volcano.RelSubset; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinInfo; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.Util; import org.apache.phoenix.compile.JoinCompiler; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.execute.HashJoinPlan; @@ -43,19 +48,37 @@ public class PhoenixJoin extends Join implements PhoenixRel { } @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + double rowCount = RelMetadataQuery.getRowCount(this); + + for (RelNode input : getInputs()) { + double inputRowCount = input.getRows(); + if (Double.isInfinite(inputRowCount)) { + rowCount = inputRowCount; + } else if (input == getLeft() && isHashJoinDoable()) { + rowCount += inputRowCount; + } else { + rowCount += Util.nLogN(inputRowCount); + } + } + RelOptCost cost = planner.getCostFactory().makeCost(rowCount, 0, 0); + + return cost.multiplyBy(PHOENIX_FACTOR); + } + + @Override public QueryPlan implement(Implementor implementor) { assert getLeft().getConvention() == PhoenixRel.CONVENTION; assert getRight().getConvention() == PhoenixRel.CONVENTION; PhoenixRel left = (PhoenixRel) getLeft(); PhoenixRel right = (PhoenixRel) getRight(); - boolean hashRHS = (left instanceof PhoenixTableScan) && getJoinType() != JoinRelType.RIGHT; - if (!hashRHS) + if (!isHashJoinDoable()) throw new UnsupportedOperationException(); JoinInfo joinInfo = JoinInfo.of(left, right, getCondition()); List<Expression> leftExprs = Lists.<Expression> newArrayList(); List<Expression> rightExprs = Lists.<Expression> newArrayList(); - implementor.pushContext(new ImplementorContext(true)); + implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().isRetainPKColumns())); QueryPlan leftPlan = implementor.visitInput(0, left); PTable leftTable = implementor.getTableRef().getTable(); for (Iterator<Integer> iter = joinInfo.leftKeys.iterator(); iter.hasNext();) { @@ -86,7 +109,8 @@ public class PhoenixJoin extends Join implements PhoenixRel { throw new RuntimeException(e); } implementor.setTableRef(new TableRef(joinedTable)); - Expression postFilterExpr = CalciteUtils.toExpression(joinInfo.getRemaining(getCluster().getRexBuilder()), implementor); + RexNode postFilter = joinInfo.getRemaining(getCluster().getRexBuilder()); + Expression postFilterExpr = postFilter.isAlwaysTrue() ? null : CalciteUtils.toExpression(postFilter, implementor); @SuppressWarnings("unchecked") HashJoinInfo hashJoinInfo = new HashJoinInfo( joinedTable, new ImmutableBytesPtr[] {new ImmutableBytesPtr()}, @@ -99,6 +123,15 @@ public class PhoenixJoin extends Join implements PhoenixRel { return HashJoinPlan.create(SelectStatement.SELECT_STAR, leftPlan, hashJoinInfo, new HashJoinPlan.HashSubPlan[] {new HashJoinPlan.HashSubPlan(0, rightPlan, rightExprs, false, null, null)}); } + private boolean isHashJoinDoable() { + // TODO check memory limit + RelNode rel = getLeft(); + if (rel instanceof RelSubset) { + rel = ((RelSubset) rel).getBest(); + } + return (rel instanceof PhoenixTableScan) && getJoinType() != JoinRelType.RIGHT; + } + private JoinType convertJoinType(JoinRelType type) { JoinType ret = null; switch (type) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/20815009/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java index 8c6153c..f681c88 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java @@ -69,6 +69,8 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { for (RelOptRule rule : rules) { planner.addRule(rule); } + planner.addRule(PhoenixFilterScanMergeRule.INSTANCE); + planner.addRule(PhoenixProjectScanMergeRule.INSTANCE); } @Override @@ -85,8 +87,18 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { final Double selectivity = RelMetadataQuery.getSelectivity(this, filter); cost = cost.multiplyBy(selectivity); } + if (projects != null) { + final double projectFieldRatio = ((double) projects.size()) / getRowType().getFieldCount(); + cost = cost.multiplyBy(projectFieldRatio); + } return cost; } + + @Override + public double getRows() { + return super.getRows() + * RelMetadataQuery.getSelectivity(this, filter); + } @Override public QueryPlan implement(Implementor implementor) { @@ -128,7 +140,7 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); List<Expression> exprs = Lists.<Expression> newArrayList(); for (PColumn column : table.getColumns()) { - if (!SchemaUtil.isPKColumn(column)) { + if (!SchemaUtil.isPKColumn(column) || !implementor.getCurrentContext().isRetainPKColumns()) { Expression expr = implementor.newColumnExpression(column.getPosition()); exprs.add(expr); builder.addField(expr); http://git-wip-us.apache.org/repos/asf/phoenix/blob/20815009/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixToEnumerableConverter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixToEnumerableConverter.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixToEnumerableConverter.java index cad1d66..d1750e3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixToEnumerableConverter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixToEnumerableConverter.java @@ -72,8 +72,6 @@ public class PhoenixToEnumerableConverter extends ConverterImpl implements Enume } static QueryPlan makePlan(PhoenixRel rel) { - Program p = Programs.ofRules(PhoenixFilterScanMergeRule.INSTANCE, PhoenixProjectScanMergeRule.INSTANCE); - rel = (PhoenixRel) (p.run(rel.getCluster().getPlanner(), rel, RelTraitSet.createEmpty())); final PhoenixRel.Implementor phoenixImplementor = new PhoenixRelImplementorImpl(); return phoenixImplementor.visitInput(0, rel); }