Repository: phoenix Updated Branches: refs/heads/calcite 208150098 -> 8097c8b9d
First aggregate query passed Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8097c8b9 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8097c8b9 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8097c8b9 Branch: refs/heads/calcite Commit: 8097c8b9deb083bece69f170be319b95d3930df5 Parents: 2081500 Author: maryannxue <wei....@intel.com> Authored: Wed Apr 1 12:30:15 2015 -0400 Committer: maryannxue <wei....@intel.com> Committed: Wed Apr 1 12:30:15 2015 -0400 ---------------------------------------------------------------------- .../org/apache/phoenix/calcite/CalciteTest.java | 28 ++++ .../apache/phoenix/calcite/CalciteUtils.java | 50 ++++++ .../phoenix/calcite/PhoenixAggregate.java | 160 ++++++++++++++++++- .../apache/phoenix/calcite/PhoenixSchema.java | 2 +- .../phoenix/execute/DelegateQueryPlan.java | 4 + .../apache/phoenix/execute/HashJoinPlan.java | 8 + .../java/org/apache/phoenix/util/TestUtil.java | 4 +- 7 files changed, 251 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/8097c8b9/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java index a9ad76b..70d44f6 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java @@ -15,6 +15,7 @@ import java.sql.*; import java.util.List; import static org.apache.phoenix.util.TestUtil.JOIN_ITEM_TABLE_FULL_NAME; +import static org.apache.phoenix.util.TestUtil.JOIN_ORDER_TABLE_FULL_NAME; import static org.apache.phoenix.util.TestUtil.JOIN_SUPPLIER_TABLE_FULL_NAME; import static org.junit.Assert.*; @@ -275,6 +276,33 @@ public class CalciteTest extends BaseClientManagedTimeIT { {"00C923122312312", "c", "00D300000000XHP"}}) .close(); } + + @Test public void testAggregate() { + start().sql("select a_string, count(entity_id) from atable group by a_string") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixAggregate(group=[{0}], EXPR$1=[COUNT()])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]], project=[[$2]])\n") + .resultIs(new Object[][] { + {"a", 4L}, + {"b", 4L}, + {"c", 1L}}) + .close(); + } + + @Test public void testSubquery() { + start().sql("SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\")") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixProject(order_id=[$0])\n" + + " PhoenixJoin(condition=[AND(=($2, $6), =($4, $7))], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, ORDERTABLE]])\n" + + " PhoenixAggregate(group=[{0}], EXPR$0=[MAX($1)])\n" + + " PhoenixProject(item_id0=[$6], QUANTITY=[$4])\n" + + " PhoenixJoin(condition=[=($6, $2)], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, ORDERTABLE]])\n" + + " PhoenixAggregate(group=[{0}])\n" + + " PhoenixTableScan(table=[[phoenix, ORDERTABLE]], project=[[$2]])\n") + .close(); + } @Test public void testConnectUsingModel() throws Exception { final Start start = new Start() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/8097c8b9/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java index b6eaf37..4962bb5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java @@ -9,6 +9,8 @@ import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlFunction; import org.apache.calcite.sql.SqlKind; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -17,6 +19,10 @@ import org.apache.phoenix.expression.ComparisonExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ExpressionType; import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.expression.function.AggregateFunction; +import org.apache.phoenix.expression.function.CountAggregateFunction; +import org.apache.phoenix.expression.function.FunctionExpression; +import org.apache.phoenix.expression.function.SumAggregateFunction; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -84,6 +90,36 @@ public class CalciteUtils { }); } + + private static final Map<String, FunctionFactory> FUNCTION_MAP = Maps + .newHashMapWithExpectedSize(ExpressionType.values().length); + private static final FunctionFactory getFactory(SqlFunction func) { + FunctionFactory fFactory = FUNCTION_MAP.get(func.getName()); + if (fFactory == null) { + throw new UnsupportedOperationException("Unsupported SqlFunction: " + + func); + } + return fFactory; + } + static { + FUNCTION_MAP.put("COUNT", new FunctionFactory() { + @Override + public FunctionExpression newFunction(SqlFunction sqlFunc, + List<Expression> args) { + if (args.isEmpty()) { + args = Lists.asList(LiteralExpression.newConstant(1), new Expression[0]); + } + return new CountAggregateFunction(args); + } + }); + FUNCTION_MAP.put("SUM", new FunctionFactory() { + @Override + public FunctionExpression newFunction(SqlFunction sqlFunc, + List<Expression> args) { + return new SumAggregateFunction(args); + } + }); + } static Expression toExpression(RexNode node, Implementor implementor) { ExpressionFactory eFactory = getFactory(node); @@ -91,7 +127,21 @@ public class CalciteUtils { return expression; } + static AggregateFunction toAggregateFunction(SqlAggFunction aggFunc, List<Integer> args, Implementor implementor) { + FunctionFactory fFactory = getFactory(aggFunc); + List<Expression> exprs = Lists.newArrayListWithExpectedSize(args.size()); + for (Integer index : args) { + exprs.add(implementor.newColumnExpression(index)); + } + + return (AggregateFunction) (fFactory.newFunction(aggFunc, exprs)); + } + public static interface ExpressionFactory { public Expression newExpression(RexNode node, Implementor implementor); } + + public static interface FunctionFactory { + public FunctionExpression newFunction(SqlFunction sqlFunc, List<Expression> args); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8097c8b9/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java index fb113cc..7a38f25 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java @@ -1,15 +1,48 @@ package org.apache.phoenix.calcite; +import java.sql.SQLException; import java.util.List; import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.InvalidRelException; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.util.ImmutableBitSet; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.compile.ColumnProjector; +import org.apache.phoenix.compile.ExpressionProjector; +import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.GroupByCompiler.GroupBy; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.execute.AggregatePlan; +import org.apache.phoenix.execute.ClientAggregatePlan; +import org.apache.phoenix.execute.HashJoinPlan; +import org.apache.phoenix.execute.ScanPlan; +import org.apache.phoenix.expression.CoerceExpression; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.RowKeyColumnExpression; +import org.apache.phoenix.expression.aggregator.ClientAggregators; +import org.apache.phoenix.expression.aggregator.ServerAggregators; +import org.apache.phoenix.expression.function.AggregateFunction; +import org.apache.phoenix.expression.function.SingleAggregateFunction; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.schema.RowKeyValueAccessor; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PDecimal; +import org.apache.phoenix.schema.types.PVarchar; + +import com.google.common.collect.Lists; /** * Implementation of {@link org.apache.calcite.rel.core.Aggregate} @@ -32,6 +65,11 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel { throw new InvalidRelException("unsupported group type: " + getGroupType()); } } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + return super.computeSelfCost(planner).multiplyBy(PHOENIX_FACTOR); + } @Override public PhoenixAggregate copy(RelTraitSet traits, RelNode input, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggregateCalls) { @@ -47,7 +85,125 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel { @Override public QueryPlan implement(Implementor implementor) { assert getConvention() == getInput().getConvention(); - implementor.visitInput(0, (PhoenixRel) getInput()); - throw new UnsupportedOperationException(); + if (groupSets.size() > 1) { + throw new UnsupportedOperationException(); + } + + QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); + TableRef tableRef = implementor.getTableRef(); + ScanPlan basePlan = null; + if (plan instanceof ScanPlan) { + basePlan = (ScanPlan) plan; + } else if (plan instanceof HashJoinPlan) { + QueryPlan delegate = ((HashJoinPlan) plan).getDelegate(); + if (delegate instanceof ScanPlan) { + basePlan = (ScanPlan) delegate; + } + } + // TopN, we can not merge with the base plan. + if (!plan.getOrderBy().getOrderByExpressions().isEmpty() && plan.getLimit() != null) { + basePlan = null; + } + PhoenixStatement stmt = plan.getContext().getStatement(); + StatementContext context; + try { + context = basePlan == null ? new StatementContext(stmt, FromCompiler.getResolver(tableRef), new Scan(), new SequenceManager(stmt)) : basePlan.getContext(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + + List<Integer> ordinals = groupSet.asList(); + // TODO check order-preserving + String groupExprAttribName = BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS; + // TODO sort group by keys. not sure if there is a way to avoid this sorting, + // otherwise we would have add an extra projection. + List<Expression> exprs = Lists.newArrayListWithExpectedSize(ordinals.size()); + List<Expression> keyExprs = exprs; + for (int i = 0; i < ordinals.size(); i++) { + Expression expr = implementor.newColumnExpression(ordinals.get(i)); + exprs.add(expr); + PDataType keyType = getKeyType(expr); + if (keyType == expr.getDataType()) { + continue; + } + if (keyExprs == exprs) { + keyExprs = Lists.newArrayList(exprs); + } + try { + keyExprs.set(i, CoerceExpression.create(expr, keyType)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + GroupBy groupBy = new GroupBy.GroupByBuilder().setScanAttribName(groupExprAttribName).setExpressions(exprs).setKeyExpressions(keyExprs).build(); + + // TODO sort aggFuncs. same problem with group by key sorting. + List<SingleAggregateFunction> aggFuncs = Lists.newArrayList(); + for (AggregateCall call : aggCalls) { + AggregateFunction aggFunc = CalciteUtils.toAggregateFunction(call.getAggregation(), call.getArgList(), implementor); + if (!(aggFunc instanceof SingleAggregateFunction)) { + throw new UnsupportedOperationException(); + } + aggFuncs.add((SingleAggregateFunction) aggFunc); + } + int minNullableIndex = getMinNullableIndex(aggFuncs,groupBy.isEmpty()); + context.getScan().setAttribute(BaseScannerRegionObserver.AGGREGATORS, ServerAggregators.serialize(aggFuncs, minNullableIndex)); + ClientAggregators clientAggregators = new ClientAggregators(aggFuncs, minNullableIndex); + context.getAggregationManager().setAggregators(clientAggregators); + + SelectStatement select = SelectStatement.SELECT_STAR; + RowProjector rowProjector = createRowProjector(keyExprs, aggFuncs); + if (basePlan == null) { + return new ClientAggregatePlan(context, select, tableRef, rowProjector, null, null, OrderBy.EMPTY_ORDER_BY, groupBy, null, plan); + } + + QueryPlan aggPlan = new AggregatePlan(context, select, basePlan.getTableRef(), rowProjector, null, OrderBy.EMPTY_ORDER_BY, null, groupBy, null); + if (plan instanceof ScanPlan) + return aggPlan; + + HashJoinPlan hashJoinPlan = (HashJoinPlan) plan; + return HashJoinPlan.create(select, aggPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans()); + } + + private static RowProjector createRowProjector(List<Expression> keyExprs, List<SingleAggregateFunction> aggFuncs) { + List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList(); + for (int i = 0; i < keyExprs.size(); i++) { + Expression keyExpr = keyExprs.get(i); + RowKeyValueAccessor accessor = new RowKeyValueAccessor(keyExprs, i); + Expression expr = new RowKeyColumnExpression(keyExpr, accessor, keyExpr.getDataType()); + columnProjectors.add(new ExpressionProjector(expr.toString(), "", expr, false)); + } + for (SingleAggregateFunction aggFunc : aggFuncs) { + columnProjectors.add(new ExpressionProjector(aggFunc.toString(), "", aggFunc, false)); + } + return new RowProjector(columnProjectors, 0, false); + } + + private static PDataType getKeyType(Expression expression) { + PDataType type = expression.getDataType(); + if (!expression.isNullable() || !type.isFixedWidth()) { + return type; + } + if (type.isCastableTo(PDecimal.INSTANCE)) { + return PDecimal.INSTANCE; + } + if (type.isCastableTo(PVarchar.INSTANCE)) { + return PVarchar.INSTANCE; + } + // This might happen if someone tries to group by an array + throw new IllegalStateException("Multiple occurrences of type " + type + " may not occur in a GROUP BY clause"); + } + + private static int getMinNullableIndex(List<SingleAggregateFunction> aggFuncs, boolean isUngroupedAggregation) { + int minNullableIndex = aggFuncs.size(); + for (int i = 0; i < aggFuncs.size(); i++) { + SingleAggregateFunction aggFunc = aggFuncs.get(i); + if (isUngroupedAggregation ? aggFunc.getAggregator().isNullable() : aggFunc.getAggregatorExpression().isNullable()) { + minNullableIndex = i; + break; + } + } + return minNullableIndex; } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8097c8b9/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java index 9c4f47e..c51308e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java @@ -66,7 +66,7 @@ public class PhoenixSchema implements Schema { @Override public Set<String> getTableNames() { - return ImmutableSet.of("ATABLE", "ITEMTABLE", "SUPPLIERTABLE"); + return ImmutableSet.of("ATABLE", "ITEMTABLE", "SUPPLIERTABLE", "ORDERTABLE", "CUSTOMERTABLE"); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/8097c8b9/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java index 4d50ba0..f487533 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java @@ -36,6 +36,10 @@ public abstract class DelegateQueryPlan implements QueryPlan { public DelegateQueryPlan(QueryPlan delegate) { this.delegate = delegate; } + + public QueryPlan getDelegate() { + return delegate; + } @Override public StatementContext getContext() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/8097c8b9/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index aea075d..0f6edc3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -110,6 +110,14 @@ public class HashJoinPlan extends DelegateQueryPlan { this.subPlans = subPlans; this.recompileWhereClause = recompileWhereClause; } + + public HashJoinInfo getJoinInfo() { + return this.joinInfo; + } + + public SubPlan[] getSubPlans() { + return this.subPlans; + } @Override public ResultIterator iterator() throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/8097c8b9/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index 220d465..2b7a62b 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -194,8 +194,8 @@ public class TestUtil { public static final String JOIN_ITEM_TABLE = "ItemTable"; public static final String JOIN_SUPPLIER_TABLE = "SupplierTable"; public static final String JOIN_COITEM_TABLE = "CoitemTable"; - public static final String JOIN_ORDER_TABLE_FULL_NAME = '"' + JOIN_SCHEMA + "\".\"" + JOIN_ORDER_TABLE + '"'; - public static final String JOIN_CUSTOMER_TABLE_FULL_NAME = '"' + JOIN_SCHEMA + "\".\"" + JOIN_CUSTOMER_TABLE + '"'; + public static final String JOIN_ORDER_TABLE_FULL_NAME = JOIN_ORDER_TABLE; // '"' + JOIN_SCHEMA + "\".\"" + JOIN_ORDER_TABLE + '"'; + public static final String JOIN_CUSTOMER_TABLE_FULL_NAME = JOIN_CUSTOMER_TABLE; // '"' + JOIN_SCHEMA + "\".\"" + JOIN_CUSTOMER_TABLE + '"'; public static final String JOIN_ITEM_TABLE_FULL_NAME = JOIN_ITEM_TABLE; //'"' + JOIN_SCHEMA + "\".\"" + JOIN_ITEM_TABLE + '"'; public static final String JOIN_SUPPLIER_TABLE_FULL_NAME = JOIN_SUPPLIER_TABLE; //'"' + JOIN_SCHEMA + "\".\"" + JOIN_SUPPLIER_TABLE + '"'; public static final String JOIN_COITEM_TABLE_FULL_NAME = '"' + JOIN_SCHEMA + "\".\"" + JOIN_COITEM_TABLE + '"';