Repository: phoenix
Updated Branches:
  refs/heads/calcite 9adb3e00f -> 208150098


Bug fixes for multi joins


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/20815009
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/20815009
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/20815009

Branch: refs/heads/calcite
Commit: 20815009859b2a0e179a8831289fc3e56a6cec67
Parents: 9adb3e0
Author: maryannxue <wei....@intel.com>
Authored: Wed Mar 25 18:14:31 2015 -0400
Committer: maryannxue <wei....@intel.com>
Committed: Wed Mar 25 18:14:31 2015 -0400

----------------------------------------------------------------------
 .../org/apache/phoenix/calcite/CalciteTest.java | 54 +++++++++++++-------
 .../org/apache/phoenix/calcite/PhoenixJoin.java | 41 +++++++++++++--
 .../phoenix/calcite/PhoenixTableScan.java       | 14 ++++-
 .../calcite/PhoenixToEnumerableConverter.java   |  2 -
 4 files changed, 85 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/20815009/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java 
b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
index a719271..a9ad76b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
@@ -188,8 +188,7 @@ public class CalciteTest extends BaseClientManagedTimeIT {
     @Test public void testTableScan() throws Exception {
         start().sql("select * from aTable where a_string = 'a'")
                 .explainIs("PhoenixToEnumerableConverter\n" +
-                           "  PhoenixFilter(condition=[=($2, 'a')])\n" +
-                           "    PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
+                           "  PhoenixTableScan(table=[[phoenix, ATABLE]], 
filter=[=($2, 'a')])\n")
                 .resultIs(new Object[][] {
                           {"00D300000000XHP", "00A123122312312", "a"}, 
                           {"00D300000000XHP", "00A223122312312", "a"}, 
@@ -201,9 +200,7 @@ public class CalciteTest extends BaseClientManagedTimeIT {
     @Test public void testProject() throws Exception {
         start().sql("select entity_id, a_string, organization_id from aTable 
where a_string = 'a'")
                 .explainIs("PhoenixToEnumerableConverter\n" +
-                           "  PhoenixProject(ENTITY_ID=[$1], A_STRING=[$2], 
ORGANIZATION_ID=[$0])\n" +
-                           "    PhoenixFilter(condition=[=($2, 'a')])\n" +
-                           "      PhoenixTableScan(table=[[phoenix, 
ATABLE]])\n")
+                           "  PhoenixTableScan(table=[[phoenix, ATABLE]], 
filter=[=($2, 'a')], project=[[$1, $2, $0]])\n")
                 .resultIs(new Object[][] {
                           {"00A123122312312", "a", "00D300000000XHP"}, 
                           {"00A223122312312", "a", "00D300000000XHP"}, 
@@ -217,11 +214,8 @@ public class CalciteTest extends BaseClientManagedTimeIT {
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixProject(ENTITY_ID=[$4], A_STRING=[$2], 
ORGANIZATION_ID=[$3])\n" +
                            "    PhoenixJoin(condition=[AND(=($4, $1), =($3, 
$0))], joinType=[inner])\n" +
-                           "      PhoenixProject(ORGANIZATION_ID=[$0], 
ENTITY_ID=[$1], A_STRING=[$2])\n" +
-                           "        PhoenixTableScan(table=[[phoenix, 
ATABLE]])\n" +
-                           "      PhoenixProject(ORGANIZATION_ID=[$0], 
ENTITY_ID=[$1], A_STRING=[$2])\n" +
-                           "        PhoenixFilter(condition=[=($2, 'a')])\n" +
-                           "          PhoenixTableScan(table=[[phoenix, 
ATABLE]])\n")
+                           "      PhoenixTableScan(table=[[phoenix, ATABLE]], 
project=[[$0, $1, $2]])\n" +
+                           "      PhoenixTableScan(table=[[phoenix, ATABLE]], 
filter=[=($2, 'a')], project=[[$0, $1, $2]])\n")
                 .resultIs(new Object[][] {
                           {"00A123122312312", "a", "00D300000000XHP"}, 
                           {"00A223122312312", "a", "00D300000000XHP"}, 
@@ -233,10 +227,8 @@ public class CalciteTest extends BaseClientManagedTimeIT {
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixProject(item_id=[$0], NAME=[$1], 
supplier_id=[$3], NAME0=[$4])\n" +
                            "    PhoenixJoin(condition=[=($2, $3)], 
joinType=[inner])\n" +
-                           "      PhoenixProject(item_id=[$0], NAME=[$1], 
supplier_id=[$5])\n" +
-                           "        PhoenixTableScan(table=[[phoenix, 
ITEMTABLE]])\n" +
-                           "      PhoenixProject(supplier_id=[$0], 
NAME=[$1])\n" +
-                           "        PhoenixTableScan(table=[[phoenix, 
SUPPLIERTABLE]])\n")
+                           "      PhoenixTableScan(table=[[phoenix, 
ITEMTABLE]], project=[[$0, $1, $5]])\n" +
+                           "      PhoenixTableScan(table=[[phoenix, 
SUPPLIERTABLE]], project=[[$0, $1]])\n")
                 .resultIs(new Object[][] {
                           {"0000000001", "T1", "0000000001", "S1"}, 
                           {"0000000002", "T2", "0000000001", "S1"}, 
@@ -250,13 +242,37 @@ public class CalciteTest extends BaseClientManagedTimeIT {
     @Test public void testMultiJoin() throws Exception {
         start().sql("select t1.entity_id, t2.a_string, t3.organization_id from 
aTable t1 join aTable t2 on t1.entity_id = t2.entity_id and t1.organization_id 
= t2.organization_id join atable t3 on t1.entity_id = t3.entity_id and 
t1.organization_id = t3.organization_id where t1.a_string = 'a'") 
                 .explainIs("PhoenixToEnumerableConverter\n" +
-                           "  PhoenixProject(ENTITY_ID=[$1], A_STRING=[$20], 
ORGANIZATION_ID=[$36])\n" +
-                           "    PhoenixJoin(condition=[AND(=($1, $37), =($0, 
$36))], joinType=[inner])\n" +
-                           "      PhoenixJoin(condition=[AND(=($1, $19), =($0, 
$18))], joinType=[inner])\n" +
-                           "        PhoenixFilter(condition=[=($2, 'a')])\n" +
+                           "  PhoenixProject(ENTITY_ID=[$19], A_STRING=[$2], 
ORGANIZATION_ID=[$36])\n" +
+                           "    PhoenixJoin(condition=[AND(=($19, $1), =($18, 
$0))], joinType=[inner])\n" +
+                           "      PhoenixTableScan(table=[[phoenix, 
ATABLE]])\n" +
+                           "      PhoenixProject(ORGANIZATION_ID=[$18], 
ENTITY_ID=[$19], A_STRING=[$20], B_STRING=[$21], A_INTEGER=[$22], A_DATE=[$23], 
A_TIME=[$24], A_TIMESTAMP=[$25], X_DECIMAL=[$26], X_LONG=[$27], 
X_INTEGER=[$28], Y_INTEGER=[$29], A_BYTE=[$30], A_SHORT=[$31], A_FLOAT=[$32], 
A_DOUBLE=[$33], A_UNSIGNED_FLOAT=[$34], A_UNSIGNED_DOUBLE=[$35], 
ORGANIZATION_ID0=[$0], ENTITY_ID0=[$1], A_STRING0=[$2], B_STRING0=[$3], 
A_INTEGER0=[$4], A_DATE0=[$5], A_TIME0=[$6], A_TIMESTAMP0=[$7], 
X_DECIMAL0=[$8], X_LONG0=[$9], X_INTEGER0=[$10], Y_INTEGER0=[$11], 
A_BYTE0=[$12], A_SHORT0=[$13], A_FLOAT0=[$14], A_DOUBLE0=[$15], 
A_UNSIGNED_FLOAT0=[$16], A_UNSIGNED_DOUBLE0=[$17])\n" +
+                           "        PhoenixJoin(condition=[AND(=($19, $1), 
=($18, $0))], joinType=[inner])\n" +
                            "          PhoenixTableScan(table=[[phoenix, 
ATABLE]])\n" +
+                           "          PhoenixTableScan(table=[[phoenix, 
ATABLE]], filter=[=($2, 'a')])\n")
+                .resultIs(new Object[][] {
+                          {"00A123122312312", "a", "00D300000000XHP"}, 
+                          {"00A223122312312", "a", "00D300000000XHP"}, 
+                          {"00A323122312312", "a", "00D300000000XHP"}, 
+                          {"00A423122312312", "a", "00D300000000XHP"}})
+                .close();
+        start().sql("select t1.entity_id, t2.a_string, t3.organization_id from 
aTable t1 join aTable t2 on t1.entity_id = t2.entity_id and t1.organization_id 
= t2.organization_id join atable t3 on t1.entity_id = t3.entity_id and 
t1.organization_id = t3.organization_id") 
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixProject(ENTITY_ID=[$19], A_STRING=[$2], 
ORGANIZATION_ID=[$36])\n" +
+                           "    PhoenixJoin(condition=[AND(=($19, $1), =($18, 
$0))], joinType=[inner])\n" +
+                           "      PhoenixTableScan(table=[[phoenix, 
ATABLE]])\n" +
+                           "      PhoenixJoin(condition=[AND(=($1, $19), =($0, 
$18))], joinType=[inner])\n" +
                            "        PhoenixTableScan(table=[[phoenix, 
ATABLE]])\n" +
-                           "      PhoenixTableScan(table=[[phoenix, 
ATABLE]])\n")
+                           "        PhoenixTableScan(table=[[phoenix, 
ATABLE]])\n")
+                .resultIs(new Object[][] {
+                          {"00A123122312312", "a", "00D300000000XHP"}, 
+                          {"00A223122312312", "a", "00D300000000XHP"}, 
+                          {"00A323122312312", "a", "00D300000000XHP"}, 
+                          {"00A423122312312", "a", "00D300000000XHP"}, 
+                          {"00B523122312312", "b", "00D300000000XHP"}, 
+                          {"00B623122312312", "b", "00D300000000XHP"}, 
+                          {"00B723122312312", "b", "00D300000000XHP"}, 
+                          {"00B823122312312", "b", "00D300000000XHP"}, 
+                          {"00C923122312312", "c", "00D300000000XHP"}})
                 .close();
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/20815009/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
index b666984..e5f9cda 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
@@ -6,12 +6,17 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Util;
 import org.apache.phoenix.compile.JoinCompiler;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.HashJoinPlan;
@@ -43,19 +48,37 @@ public class PhoenixJoin extends Join implements PhoenixRel 
{
     }
 
     @Override
+    public RelOptCost computeSelfCost(RelOptPlanner planner) {
+        double rowCount = RelMetadataQuery.getRowCount(this);
+        
+        for (RelNode input : getInputs()) {
+            double inputRowCount = input.getRows();
+            if (Double.isInfinite(inputRowCount)) {
+                rowCount = inputRowCount;
+            } else if (input == getLeft() && isHashJoinDoable()) {
+                rowCount += inputRowCount;
+            } else {
+                rowCount += Util.nLogN(inputRowCount);
+            }
+        }
+        RelOptCost cost = planner.getCostFactory().makeCost(rowCount, 0, 0);
+
+        return cost.multiplyBy(PHOENIX_FACTOR);
+    }
+    
+    @Override
     public QueryPlan implement(Implementor implementor) {
         assert getLeft().getConvention() == PhoenixRel.CONVENTION;
         assert getRight().getConvention() == PhoenixRel.CONVENTION;
         PhoenixRel left = (PhoenixRel) getLeft();
         PhoenixRel right = (PhoenixRel) getRight();
-        boolean hashRHS = (left instanceof PhoenixTableScan) && getJoinType() 
!= JoinRelType.RIGHT;
-        if (!hashRHS)
+        if (!isHashJoinDoable())
             throw new UnsupportedOperationException();
         
         JoinInfo joinInfo = JoinInfo.of(left, right, getCondition());
         List<Expression> leftExprs = Lists.<Expression> newArrayList();
         List<Expression> rightExprs = Lists.<Expression> newArrayList();
-        implementor.pushContext(new ImplementorContext(true));
+        implementor.pushContext(new 
ImplementorContext(implementor.getCurrentContext().isRetainPKColumns()));
         QueryPlan leftPlan = implementor.visitInput(0, left);
         PTable leftTable = implementor.getTableRef().getTable();
         for (Iterator<Integer> iter = joinInfo.leftKeys.iterator(); 
iter.hasNext();) {
@@ -86,7 +109,8 @@ public class PhoenixJoin extends Join implements PhoenixRel {
             throw new RuntimeException(e);
         }
         implementor.setTableRef(new TableRef(joinedTable));
-        Expression postFilterExpr = 
CalciteUtils.toExpression(joinInfo.getRemaining(getCluster().getRexBuilder()), 
implementor);
+        RexNode postFilter = 
joinInfo.getRemaining(getCluster().getRexBuilder());
+        Expression postFilterExpr = postFilter.isAlwaysTrue() ? null : 
CalciteUtils.toExpression(postFilter, implementor);
         @SuppressWarnings("unchecked")
         HashJoinInfo hashJoinInfo = new HashJoinInfo(
                 joinedTable, new ImmutableBytesPtr[] {new 
ImmutableBytesPtr()}, 
@@ -99,6 +123,15 @@ public class PhoenixJoin extends Join implements PhoenixRel 
{
         return HashJoinPlan.create(SelectStatement.SELECT_STAR, leftPlan, 
hashJoinInfo, new HashJoinPlan.HashSubPlan[] {new HashJoinPlan.HashSubPlan(0, 
rightPlan, rightExprs, false, null, null)});
     }
     
+    private boolean isHashJoinDoable() {
+        // TODO check memory limit
+        RelNode rel = getLeft();
+        if (rel instanceof RelSubset) {
+            rel = ((RelSubset) rel).getBest();
+        }
+        return (rel instanceof PhoenixTableScan) && getJoinType() != 
JoinRelType.RIGHT;
+    }
+    
     private JoinType convertJoinType(JoinRelType type) {
         JoinType ret = null;
         switch (type) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/20815009/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
index 8c6153c..f681c88 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
@@ -69,6 +69,8 @@ public class PhoenixTableScan extends TableScan implements 
PhoenixRel {
         for (RelOptRule rule : rules) {
             planner.addRule(rule);
         }
+        planner.addRule(PhoenixFilterScanMergeRule.INSTANCE);
+        planner.addRule(PhoenixProjectScanMergeRule.INSTANCE);
     }
 
     @Override
@@ -85,8 +87,18 @@ public class PhoenixTableScan extends TableScan implements 
PhoenixRel {
             final Double selectivity = RelMetadataQuery.getSelectivity(this, 
filter);
             cost = cost.multiplyBy(selectivity);
         }
+        if (projects != null) {
+            final double projectFieldRatio = ((double) projects.size()) / 
getRowType().getFieldCount();
+            cost = cost.multiplyBy(projectFieldRatio);
+        }
         return cost;
     }
+    
+    @Override
+    public double getRows() {
+        return super.getRows()
+                * RelMetadataQuery.getSelectivity(this, filter);
+    }
 
     @Override
     public QueryPlan implement(Implementor implementor) {
@@ -128,7 +140,7 @@ public class PhoenixTableScan extends TableScan implements 
PhoenixRel {
         KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
         List<Expression> exprs = Lists.<Expression> newArrayList();
         for (PColumn column : table.getColumns()) {
-            if (!SchemaUtil.isPKColumn(column)) {
+            if (!SchemaUtil.isPKColumn(column) || 
!implementor.getCurrentContext().isRetainPKColumns()) {
                 Expression expr = 
implementor.newColumnExpression(column.getPosition());
                 exprs.add(expr);
                 builder.addField(expr);                

http://git-wip-us.apache.org/repos/asf/phoenix/blob/20815009/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixToEnumerableConverter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixToEnumerableConverter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixToEnumerableConverter.java
index cad1d66..d1750e3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixToEnumerableConverter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixToEnumerableConverter.java
@@ -72,8 +72,6 @@ public class PhoenixToEnumerableConverter extends 
ConverterImpl implements Enume
     }
     
     static QueryPlan makePlan(PhoenixRel rel) {
-        Program p = Programs.ofRules(PhoenixFilterScanMergeRule.INSTANCE, 
PhoenixProjectScanMergeRule.INSTANCE);
-        rel = (PhoenixRel) (p.run(rel.getCluster().getPlanner(), rel, 
RelTraitSet.createEmpty()));
         final PhoenixRel.Implementor phoenixImplementor = new 
PhoenixRelImplementorImpl();
         return phoenixImplementor.visitInput(0, rel);
     }

Reply via email to