Dmitry Lychagin has submitted this change and it was merged. Change subject: [ASTERIXDB-2051][COMP] Fix PushSubplanIntoGroupByRule for complex cases. ......................................................................
[ASTERIXDB-2051][COMP] Fix PushSubplanIntoGroupByRule for complex cases. - user model changes: no - storage format changes: no - interface changes: no Details: - re-implement PushSubplanIntoGroupByRule and let it handle general cases; - add an option to LogicalOperatorDeepCopyWithNewVariablesVisitor for not re-mapping free variables in the given plan subtree. Change-Id: I969c40112be0506981357a9c41bf9675ae12ffb9 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1992 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Dmitry Lychagin <[email protected]> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java A asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-810-3.sqlpp M asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan A asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-3.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan M asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java 18 files changed, 437 insertions(+), 208 deletions(-) Approvals: Jenkins: Verified; ; Verified Dmitry Lychagin: Looks good to me, approved Objections: Anon. E. Moose #1000171: Jenkins: Violations found diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java index 725de12..19c6da7 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java @@ -629,13 +629,13 @@ // Create first copy. LogicalOperatorDeepCopyWithNewVariablesVisitor firstDeepCopyVisitor = new LogicalOperatorDeepCopyWithNewVariablesVisitor( - context, context, newProbeSubTreeVarMap); + context, context, newProbeSubTreeVarMap, true); ILogicalOperator newProbeSubTree = firstDeepCopyVisitor.deepCopy(probeSubTree.getRoot()); inferTypes(newProbeSubTree, context); Mutable<ILogicalOperator> newProbeSubTreeRootRef = new MutableObject<ILogicalOperator>(newProbeSubTree); // Create second copy. LogicalOperatorDeepCopyWithNewVariablesVisitor secondDeepCopyVisitor = new LogicalOperatorDeepCopyWithNewVariablesVisitor( - context, context, joinInputSubTreeVarMap); + context, context, joinInputSubTreeVarMap, true); ILogicalOperator joinInputSubTree = secondDeepCopyVisitor.deepCopy(probeSubTree.getRoot()); inferTypes(joinInputSubTree, context); probeSubTree.getRootRef().setValue(joinInputSubTree); diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-810-3.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-810-3.sqlpp new file mode 100644 index 0000000..4b94bf6 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-810-3.sqlpp @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : This test case is to verify the fix for issue810 + * https://code.google.com/p/asterixdb/issues/detail?id=810 + * Expected Res : SUCCESS + * Date : 16th Nov. 2014 + */ + +DROP DATAVERSE tpch IF EXISTS; +CREATE DATAVERSE tpch; + +USE tpch; + + +CREATE TYPE LineItemType AS CLOSED { + l_orderkey : integer, + l_partkey : integer, + l_suppkey : integer, + l_linenumber : integer, + l_quantity : double, + l_extendedprice : double, + l_discount : double, + l_tax : double, + l_returnflag : string, + l_linestatus : string, + l_shipdate : string, + l_commitdate : string, + l_receiptdate : string, + l_shipinstruct : string, + l_shipmode : string, + l_comment : string +}; + +CREATE DATASET LineItem(LineItemType) PRIMARY KEY l_orderkey,l_linenumber; + + +SELECT l_returnflag AS l_returnflag, + l_linestatus AS l_linestatus, + coll_count(cheap) AS count_cheaps, + coll_count(expensive) AS count_expensives +FROM LineItem AS l +/* +hash */ +GROUP BY l.l_returnflag AS l_returnflag,l.l_linestatus AS l_linestatus +GROUP AS g +LET cheap = ( + SELECT ELEMENT m + FROM (FROM g SELECT VALUE l) AS m + WHERE m.l_discount > 0.05 +), +expensive = ( + SELECT ELEMENT g.l + FROM g + WHERE g.l.l_discount <= 0.05 +) +ORDER BY l_returnflag,l_linestatus +; diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan index d9ce6c8..26d6103 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan @@ -8,12 +8,12 @@ -- ONE_TO_ONE_EXCHANGE |PARTITIONED| -- STABLE_SORT [topK: 100] [$$o_totalprice(DESC), $$o_orderdate(ASC)] |PARTITIONED| -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - -- SORT_GROUP_BY[$$72, $$73] |PARTITIONED| + -- SORT_GROUP_BY[$$73, $$74] |PARTITIONED| { -- AGGREGATE |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } - -- HASH_PARTITION_EXCHANGE [$$72, $$73] |PARTITIONED| + -- HASH_PARTITION_EXCHANGE [$$73, $$74] |PARTITIONED| -- SORT_GROUP_BY[$$56, $$57] |PARTITIONED| { -- AGGREGATE |LOCAL| @@ -49,12 +49,12 @@ -- STREAM_PROJECT |PARTITIONED| -- STREAM_SELECT |PARTITIONED| -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - -- SORT_GROUP_BY[$$69] |PARTITIONED| + -- SORT_GROUP_BY[$$70] |PARTITIONED| { -- AGGREGATE |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } - -- HASH_PARTITION_EXCHANGE [$$69] |PARTITIONED| + -- HASH_PARTITION_EXCHANGE [$$70] |PARTITIONED| -- PRE_CLUSTERED_GROUP_BY[$$58] |PARTITIONED| { -- AGGREGATE |LOCAL| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan index ab10f2d..f2adbf6 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan @@ -3,7 +3,7 @@ -- STREAM_PROJECT |PARTITIONED| -- ASSIGN |PARTITIONED| -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - -- PRE_CLUSTERED_GROUP_BY[$$27] |PARTITIONED| + -- PRE_CLUSTERED_GROUP_BY[$$28] |PARTITIONED| { -- AGGREGATE |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| @@ -20,9 +20,9 @@ -- NESTED_TUPLE_SOURCE |LOCAL| } -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - -- STABLE_SORT [$$27(ASC)] |PARTITIONED| - -- HASH_PARTITION_EXCHANGE [$$27] |PARTITIONED| - -- SORT_GROUP_BY[$$18, $$24] |PARTITIONED| + -- STABLE_SORT [$$28(ASC)] |PARTITIONED| + -- HASH_PARTITION_EXCHANGE [$$28] |PARTITIONED| + -- SORT_GROUP_BY[$$18, $$25] |PARTITIONED| { -- AGGREGATE |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan index cad2fbb..238736a 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan @@ -3,7 +3,7 @@ -- STREAM_PROJECT |PARTITIONED| -- ASSIGN |PARTITIONED| -- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ] |PARTITIONED| - -- PRE_CLUSTERED_GROUP_BY[$$42, $$43] |PARTITIONED| + -- PRE_CLUSTERED_GROUP_BY[$$44, $$45] |PARTITIONED| { -- AGGREGATE |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| @@ -13,8 +13,8 @@ -- NESTED_TUPLE_SOURCE |LOCAL| } -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - -- STABLE_SORT [$$42(ASC), $$43(ASC)] |PARTITIONED| - -- HASH_PARTITION_EXCHANGE [$$42, $$43] |PARTITIONED| + -- STABLE_SORT [$$44(ASC), $$45(ASC)] |PARTITIONED| + -- HASH_PARTITION_EXCHANGE [$$44, $$45] |PARTITIONED| -- PRE_CLUSTERED_GROUP_BY[$$30, $$31] |PARTITIONED| { -- AGGREGATE |LOCAL| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-3.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-3.plan new file mode 100644 index 0000000..9a7df35 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-3.plan @@ -0,0 +1,37 @@ +-- DISTRIBUTE_RESULT |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ] |PARTITIONED| + -- PRE_CLUSTERED_GROUP_BY[$$44, $$45] |PARTITIONED| + { + -- AGGREGATE |LOCAL| + -- NESTED_TUPLE_SOURCE |LOCAL| + } + { + -- AGGREGATE |LOCAL| + -- NESTED_TUPLE_SOURCE |LOCAL| + } + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- STABLE_SORT [$$44(ASC), $$45(ASC)] |PARTITIONED| + -- HASH_PARTITION_EXCHANGE [$$44, $$45] |PARTITIONED| + -- PRE_CLUSTERED_GROUP_BY[$$31, $$32] |PARTITIONED| + { + -- AGGREGATE |LOCAL| + -- STREAM_SELECT |LOCAL| + -- NESTED_TUPLE_SOURCE |LOCAL| + } + { + -- AGGREGATE |LOCAL| + -- STREAM_SELECT |LOCAL| + -- NESTED_TUPLE_SOURCE |LOCAL| + } + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- STABLE_SORT [$$31(ASC), $$32(ASC)] |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- DATASOURCE_SCAN |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- EMPTY_TUPLE_SOURCE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan index 1c56f1c..18b4218 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan @@ -3,7 +3,7 @@ -- STREAM_PROJECT |PARTITIONED| -- ASSIGN |PARTITIONED| -- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ] |PARTITIONED| - -- PRE_CLUSTERED_GROUP_BY[$$42, $$43] |PARTITIONED| + -- PRE_CLUSTERED_GROUP_BY[$$44, $$45] |PARTITIONED| { -- AGGREGATE |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| @@ -13,8 +13,8 @@ -- NESTED_TUPLE_SOURCE |LOCAL| } -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - -- STABLE_SORT [$$42(ASC), $$43(ASC)] |PARTITIONED| - -- HASH_PARTITION_EXCHANGE [$$42, $$43] |PARTITIONED| + -- STABLE_SORT [$$44(ASC), $$45(ASC)] |PARTITIONED| + -- HASH_PARTITION_EXCHANGE [$$44, $$45] |PARTITIONED| -- PRE_CLUSTERED_GROUP_BY[$$32, $$33] |PARTITIONED| { -- AGGREGATE |LOCAL| @@ -31,9 +31,7 @@ -- ONE_TO_ONE_EXCHANGE |PARTITIONED| -- ASSIGN |PARTITIONED| -- STREAM_PROJECT |PARTITIONED| - -- ASSIGN |PARTITIONED| - -- STREAM_PROJECT |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- DATASOURCE_SCAN |PARTITIONED| -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - -- DATASOURCE_SCAN |PARTITIONED| - -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - -- EMPTY_TUPLE_SOURCE |PARTITIONED| + -- EMPTY_TUPLE_SOURCE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan index 13fb7e1..2d75ff0 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan @@ -3,12 +3,12 @@ -- STREAM_PROJECT |PARTITIONED| -- ASSIGN |PARTITIONED| -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - -- SORT_GROUP_BY[$$19] |PARTITIONED| + -- SORT_GROUP_BY[$$20] |PARTITIONED| { -- AGGREGATE |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } - -- HASH_PARTITION_EXCHANGE [$$19] |PARTITIONED| + -- HASH_PARTITION_EXCHANGE [$$20] |PARTITIONED| -- PRE_CLUSTERED_GROUP_BY[$$16] |PARTITIONED| { -- AGGREGATE |LOCAL| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan index fe1f67a..4405b9d 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan @@ -13,12 +13,12 @@ -- ONE_TO_ONE_EXCHANGE |PARTITIONED| -- STABLE_SORT [$$nation_key(ASC)] |PARTITIONED| -- HASH_PARTITION_EXCHANGE [$$nation_key] |PARTITIONED| - -- SORT_GROUP_BY[$$69, $$70] |PARTITIONED| + -- SORT_GROUP_BY[$$70, $$71] |PARTITIONED| { -- AGGREGATE |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } - -- HASH_PARTITION_EXCHANGE [$$69, $$70] |PARTITIONED| + -- HASH_PARTITION_EXCHANGE [$$70, $$71] |PARTITIONED| -- SORT_GROUP_BY[$$50, $$54] |PARTITIONED| { -- AGGREGATE |LOCAL| @@ -31,7 +31,7 @@ -- HASH_PARTITION_EXCHANGE [$$56] |PARTITIONED| -- STREAM_PROJECT |PARTITIONED| -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - -- HYBRID_HASH_JOIN [$$54][$$63] |PARTITIONED| + -- HYBRID_HASH_JOIN [$$54][$$64] |PARTITIONED| -- ONE_TO_ONE_EXCHANGE |PARTITIONED| -- STREAM_PROJECT |PARTITIONED| -- ONE_TO_ONE_EXCHANGE |PARTITIONED| @@ -48,7 +48,7 @@ -- DATASOURCE_SCAN |PARTITIONED| -- ONE_TO_ONE_EXCHANGE |PARTITIONED| -- EMPTY_TUPLE_SOURCE |PARTITIONED| - -- HASH_PARTITION_EXCHANGE [$$63] |PARTITIONED| + -- HASH_PARTITION_EXCHANGE [$$64] |PARTITIONED| -- STREAM_PROJECT |PARTITIONED| -- ASSIGN |PARTITIONED| -- ONE_TO_ONE_EXCHANGE |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan index e229e75..1d24a80 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan @@ -3,7 +3,7 @@ -- STREAM_PROJECT |PARTITIONED| -- ASSIGN |PARTITIONED| -- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ] |PARTITIONED| - -- PRE_CLUSTERED_GROUP_BY[$$75, $$76] |PARTITIONED| + -- PRE_CLUSTERED_GROUP_BY[$$69, $$70] |PARTITIONED| { -- AGGREGATE |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| @@ -17,8 +17,8 @@ -- NESTED_TUPLE_SOURCE |LOCAL| } -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - -- STABLE_SORT [$$75(ASC), $$76(ASC)] |PARTITIONED| - -- HASH_PARTITION_EXCHANGE [$$75, $$76] |PARTITIONED| + -- STABLE_SORT [$$69(ASC), $$70(ASC)] |PARTITIONED| + -- HASH_PARTITION_EXCHANGE [$$69, $$70] |PARTITIONED| -- PRE_CLUSTERED_GROUP_BY[$$42, $$43] |PARTITIONED| { -- AGGREGATE |LOCAL| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan index ca941bf..3cfd8af 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan @@ -3,7 +3,7 @@ -- STREAM_PROJECT |PARTITIONED| -- ASSIGN |PARTITIONED| -- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ] |PARTITIONED| - -- PRE_CLUSTERED_GROUP_BY[$$34, $$35] |PARTITIONED| + -- PRE_CLUSTERED_GROUP_BY[$$36, $$37] |PARTITIONED| { -- AGGREGATE |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| @@ -13,8 +13,8 @@ -- NESTED_TUPLE_SOURCE |LOCAL| } -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - -- STABLE_SORT [$$34(ASC), $$35(ASC)] |PARTITIONED| - -- HASH_PARTITION_EXCHANGE [$$34, $$35] |PARTITIONED| + -- STABLE_SORT [$$36(ASC), $$37(ASC)] |PARTITIONED| + -- HASH_PARTITION_EXCHANGE [$$36, $$37] |PARTITIONED| -- PRE_CLUSTERED_GROUP_BY[$$23, $$24] |PARTITIONED| { -- AGGREGATE |LOCAL| diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan index a5124c3..2eb7603 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan @@ -27,34 +27,34 @@ -- UNNEST |LOCAL| -- NESTED_TUPLE_SOURCE |LOCAL| } - -- SUBPLAN |PARTITIONED| - { - -- AGGREGATE |LOCAL| - -- STREAM_SELECT |LOCAL| - -- ASSIGN |LOCAL| - -- UNNEST |LOCAL| - -- SUBPLAN |LOCAL| - { - -- AGGREGATE |LOCAL| - -- IN_MEMORY_STABLE_SORT [$$j(ASC)] |LOCAL| - -- UNNEST |LOCAL| - -- NESTED_TUPLE_SOURCE |LOCAL| - } - -- NESTED_TUPLE_SOURCE |LOCAL| - } - -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - -- PRE_CLUSTERED_GROUP_BY[$$94, $$95, $$96, $$97] |PARTITIONED| - { - -- AGGREGATE |LOCAL| - -- NESTED_TUPLE_SOURCE |LOCAL| - } - -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - -- STABLE_SORT [$$94(ASC), $$95(ASC), $$96(ASC), $$97(ASC)] |PARTITIONED| - -- HASH_PARTITION_EXCHANGE [$$94, $$95, $$96, $$97] |PARTITIONED| - -- STREAM_PROJECT |PARTITIONED| - -- ASSIGN |PARTITIONED| - -- STREAM_PROJECT |PARTITIONED| - -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - -- DATASOURCE_SCAN |PARTITIONED| - -- ONE_TO_ONE_EXCHANGE |PARTITIONED| - -- EMPTY_TUPLE_SOURCE |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- PRE_CLUSTERED_GROUP_BY[$$94, $$95, $$96, $$97] |PARTITIONED| + { + -- AGGREGATE |LOCAL| + -- NESTED_TUPLE_SOURCE |LOCAL| + } + { + -- AGGREGATE |LOCAL| + -- STREAM_SELECT |LOCAL| + -- ASSIGN |LOCAL| + -- UNNEST |LOCAL| + -- SUBPLAN |LOCAL| + { + -- AGGREGATE |LOCAL| + -- IN_MEMORY_STABLE_SORT [$$j(ASC)] |LOCAL| + -- UNNEST |LOCAL| + -- NESTED_TUPLE_SOURCE |LOCAL| + } + -- AGGREGATE |LOCAL| + -- NESTED_TUPLE_SOURCE |LOCAL| + } + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- STABLE_SORT [$$94(ASC), $$95(ASC), $$96(ASC), $$97(ASC)] |PARTITIONED| + -- HASH_PARTITION_EXCHANGE [$$94, $$95, $$96, $$97] |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ASSIGN |PARTITIONED| + -- STREAM_PROJECT |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- DATASOURCE_SCAN |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| + -- EMPTY_TUPLE_SOURCE |PARTITIONED| diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java index cb89a73..cef387a 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; @@ -43,12 +44,15 @@ private final IVariableContext varContext; private final Map<LogicalVariable, LogicalVariable> inVarMapping; private final Map<LogicalVariable, LogicalVariable> outVarMapping; + private final Set<LogicalVariable> freeVars; public LogicalExpressionDeepCopyWithNewVariablesVisitor(IVariableContext varContext, - Map<LogicalVariable, LogicalVariable> inVarMapping, Map<LogicalVariable, LogicalVariable> variableMapping) { + Map<LogicalVariable, LogicalVariable> inVarMapping, Map<LogicalVariable, LogicalVariable> variableMapping, + Set<LogicalVariable> freeVars) { this.varContext = varContext; this.inVarMapping = inVarMapping; this.outVarMapping = variableMapping; + this.freeVars = freeVars; } public ILogicalExpression deepCopy(ILogicalExpression expr) throws AlgebricksException { @@ -146,6 +150,9 @@ public ILogicalExpression visitVariableReferenceExpression(VariableReferenceExpression expr, Void arg) throws AlgebricksException { LogicalVariable var = expr.getVariableReference(); + if (freeVars.contains(var)) { + return expr; + } LogicalVariable givenVarReplacement = inVarMapping.get(var); if (givenVarReplacement != null) { outVarMapping.put(var, givenVarReplacement); diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java index 0da9110..934577f 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java @@ -19,9 +19,11 @@ package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors; import java.util.ArrayList; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; @@ -38,10 +40,10 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator; @@ -52,7 +54,6 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator; @@ -64,10 +65,12 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl; import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency; import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext; import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil; +import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil; import org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor; /** @@ -87,7 +90,13 @@ // Key: New variable in the new plan. Value: The old variable in the // original plan. - private final LinkedHashMap<LogicalVariable, LogicalVariable> outputVarToInputVarMapping; + private final LinkedHashMap<LogicalVariable, LogicalVariable> outputVarToInputVarMapping = new LinkedHashMap<>(); + + // Free variables: variables that shouldn't be deep copied, i.e., mapped. + private final Set<LogicalVariable> freeVars = new HashSet<>(); + + // Whether free variables in the given plan subtree should be reused. + private final boolean reuseFreeVars; /** * @param varContext @@ -96,12 +105,20 @@ * the type context. */ public LogicalOperatorDeepCopyWithNewVariablesVisitor(IVariableContext varContext, ITypingContext typeContext) { - this.varContext = varContext; - this.typeContext = typeContext; - this.inputVarToOutputVarMapping = new LinkedHashMap<>(); - this.outputVarToInputVarMapping = new LinkedHashMap<>(); - this.exprDeepCopyVisitor = new LogicalExpressionDeepCopyWithNewVariablesVisitor(varContext, - outputVarToInputVarMapping, inputVarToOutputVarMapping); + this(varContext, typeContext, new LinkedHashMap<>(), false); + } + + /** + * @param varContext + * , the variable context. + * @param typeContext + * the type context. + * @param reuseFreeVars + * whether free variables in the given plan tree should be reused. + */ + public LogicalOperatorDeepCopyWithNewVariablesVisitor(IVariableContext varContext, ITypingContext typeContext, + boolean reuseFreeVars) { + this(varContext, typeContext, new LinkedHashMap<>(), reuseFreeVars); } /** @@ -113,15 +130,17 @@ * Variable mapping keyed by variables in the original plan. * Those variables are replaced by their corresponding value in * the map in the copied plan. + * @param reuseFreeVars + * whether free variables in the given plan tree should be reused. */ public LogicalOperatorDeepCopyWithNewVariablesVisitor(IVariableContext varContext, ITypingContext typeContext, - LinkedHashMap<LogicalVariable, LogicalVariable> inVarMapping) { + LinkedHashMap<LogicalVariable, LogicalVariable> inVarMapping, boolean reuseFreeVars) { this.varContext = varContext; this.typeContext = typeContext; this.inputVarToOutputVarMapping = inVarMapping; - this.outputVarToInputVarMapping = new LinkedHashMap<>(); - exprDeepCopyVisitor = new LogicalExpressionDeepCopyWithNewVariablesVisitor(varContext, inVarMapping, - inputVarToOutputVarMapping); + this.exprDeepCopyVisitor = new LogicalExpressionDeepCopyWithNewVariablesVisitor(varContext, inVarMapping, + inputVarToOutputVarMapping, freeVars); + this.reuseFreeVars = reuseFreeVars; } private void copyAnnotations(ILogicalOperator src, ILogicalOperator dest) { @@ -136,6 +155,11 @@ private ILogicalOperator deepCopy(ILogicalOperator op, ILogicalOperator arg) throws AlgebricksException { if (op == null) { return null; + } + if (reuseFreeVars) { + // If the reuseFreeVars flag is set, we collect all free variables in the + // given operator subtree and do not re-map them in the deep-copied plan. + OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc((AbstractLogicalOperator) op, freeVars); } ILogicalOperator opCopy = op.accept(this, arg); if (typeContext != null) { @@ -207,6 +231,9 @@ if (var == null) { return null; } + if (freeVars.contains(var)) { + return var; + } LogicalVariable givenVarReplacement = outputVarToInputVarMapping.get(var); if (givenVarReplacement != null) { inputVarToOutputVarMapping.put(var, givenVarReplacement); @@ -247,6 +274,7 @@ } public void reset() { + freeVars.clear(); inputVarToOutputVarMapping.clear(); outputVarToInputVarMapping.clear(); } @@ -561,10 +589,6 @@ deepCopyVariable(op.getPositionalVariable()), op.getPositionalVariableType(), op.getPositionWriter()); deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy); return opCopy; - } - - public LinkedHashMap<LogicalVariable, LogicalVariable> getOutputToInputVariableMapping() { - return outputVarToInputVarMapping; } public LinkedHashMap<LogicalVariable, LogicalVariable> getInputToOutputVariableMapping() { diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java index 8a8fe6d..8d00696 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java @@ -20,10 +20,12 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; @@ -38,6 +40,8 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors. + LogicalOperatorDeepCopyWithNewVariablesVisitor; import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.OperatorDeepCopyVisitor; import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities; import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl; @@ -196,6 +200,14 @@ return new ALogicalPlanImpl(newRoots); } + public static Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> deepCopyWithNewVars( + ILogicalOperator root, IOptimizationContext ctx) throws AlgebricksException { + LogicalOperatorDeepCopyWithNewVariablesVisitor deepCopyVisitor = new + LogicalOperatorDeepCopyWithNewVariablesVisitor(ctx, null, true); + ILogicalOperator newRoot = deepCopyVisitor.deepCopy(root); + return Pair.of(newRoot, deepCopyVisitor.getInputToOutputVariableMapping()); + } + private static void setDataSource(ILogicalPlan plan, ILogicalOperator dataSource) { for (Mutable<ILogicalOperator> rootRef : plan.getRoots()) { setDataSource(rootRef, dataSource); diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java index 8b36a68..463f214 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java @@ -75,7 +75,7 @@ * collection provided. * * @param op - * @param vars + * @param freeVars * - The collection to which the free variables will be added. */ public static void getFreeVariablesInSelfOrDesc(AbstractLogicalOperator op, Set<LogicalVariable> freeVars) diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java index e73f2a5..ed4196b 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java @@ -322,7 +322,7 @@ // Finds the reference of the bottom-most operator in the pipeline that // should not be pushed to the combiner group-by. Mutable<ILogicalOperator> currentOpRef = new MutableObject<ILogicalOperator>(nestedGby); - Mutable<ILogicalOperator> bottomOpRef = findBottomOpRefStayInOldGby(currentOpRef); + Mutable<ILogicalOperator> bottomOpRef = findBottomOpRefStayInOldGby(nestedGby, currentOpRef); // Adds the used variables in the pipeline from <code>currentOpRef</code> to <code>bottomOpRef</code> // into the group-by keys for the introduced combiner group-by operator. @@ -392,16 +392,29 @@ * Find the bottom-most nested operator reference in the query pipeline rooted at <code>currentOpRef</code> * that cannot be pushed into the combiner group-by operator. * - * @param currentOpRef + * @param nestedGby + * the nested group-by operator. + * @param currentOpRef,the + * reference of the current op. * @return the bottom-most reference of a select operator */ - private Mutable<ILogicalOperator> findBottomOpRefStayInOldGby(Mutable<ILogicalOperator> currentOpRef) + private Mutable<ILogicalOperator> findBottomOpRefStayInOldGby(GroupByOperator nestedGby, + Mutable<ILogicalOperator> currentOpRef) throws AlgebricksException { + Set<LogicalVariable> usedVarsInNestedGby = new HashSet<>(); + // Collects used variables in nested pipelines. + for (ILogicalPlan nestedPlan : nestedGby.getNestedPlans()) { + for (Mutable<ILogicalOperator> rootOpRef : nestedPlan.getRoots()) { + VariableUtilities.getUsedVariablesInDescendantsAndSelf(rootOpRef.getValue(), usedVarsInNestedGby); + } + } Mutable<ILogicalOperator> bottomOpRef = currentOpRef; while (currentOpRef.getValue().getInputs().size() > 0) { - Set<LogicalVariable> producedVars = new HashSet<>(); - VariableUtilities.getProducedVariables(currentOpRef.getValue(), producedVars); - if (currentOpRef.getValue().getOperatorTag() == LogicalOperatorTag.SELECT || !producedVars.isEmpty()) { + // Used for checking the dependency between nestedGby and the current operator + Set<LogicalVariable> dependingVars = new HashSet<>(); + VariableUtilities.getProducedVariables(currentOpRef.getValue(), dependingVars); + dependingVars.removeAll(usedVarsInNestedGby); + if (currentOpRef.getValue().getOperatorTag() == LogicalOperatorTag.SELECT || !dependingVars.isEmpty()) { bottomOpRef = currentOpRef; } currentOpRef = currentOpRef.getValue().getInputs().get(0); diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java index 3a86565..af95ecd 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java @@ -20,13 +20,18 @@ package org.apache.hyracks.algebricks.rewriter.rules.subplan; +import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Deque; import java.util.HashSet; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.ListSet; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; @@ -40,7 +45,9 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities; +import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl; import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil; +import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; /** @@ -51,151 +58,209 @@ */ public class PushSubplanIntoGroupByRule implements IAlgebraicRewriteRule { - /** Stores used variables above the current operator. */ - private final Set<LogicalVariable> usedVarsSoFar = new HashSet<LogicalVariable>(); + /** The pointer to the topmost operator */ + private Mutable<ILogicalOperator> rootRef; + /** Whether the rule has ever been invoked */ + private boolean invoked = false; @Override - public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) + public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException { - return false; + if (!invoked) { + rootRef = opRef; + invoked = true; + } + return rewriteForOperator(rootRef, opRef, context); } - @Override - public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException { - ILogicalOperator parentOperator = opRef.getValue(); - if (context.checkIfInDontApplySet(this, parentOperator)) { - return false; - } - context.addToDontApplySet(this, parentOperator); - VariableUtilities.getUsedVariables(parentOperator, usedVarsSoFar); - if (parentOperator.getInputs().size() <= 0) { - return false; - } + // The core rewriting function for an operator. + private boolean rewriteForOperator(Mutable<ILogicalOperator> rootRef, Mutable<ILogicalOperator> opRef, + IOptimizationContext context) throws AlgebricksException { boolean changed = false; - GroupByOperator gby = null; + ILogicalOperator parentOperator = opRef.getValue(); for (Mutable<ILogicalOperator> ref : parentOperator.getInputs()) { - AbstractLogicalOperator op = (AbstractLogicalOperator) ref.getValue(); - /** Only processes subplan operator. */ - List<SubplanOperator> subplans = new ArrayList<SubplanOperator>(); - if (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) { - while (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) { - SubplanOperator currentSubplan = (SubplanOperator) op; - subplans.add(currentSubplan); - op = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); + ILogicalOperator op = ref.getValue(); + // Only processes subplan operator. + Deque<SubplanOperator> subplans = new ArrayDeque<>(); + if (op.getOperatorTag() != LogicalOperatorTag.SUBPLAN) { + // Recursively rewrites the child plan. + changed |= rewriteForOperator(rootRef, ref, context); + continue; + } + while (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) { + SubplanOperator currentSubplan = (SubplanOperator) op; + // Recursively rewrites the pipelines inside a nested subplan. + for (ILogicalPlan subplan : currentSubplan.getNestedPlans()) { + for (Mutable<ILogicalOperator> nestedRootRef : subplan.getRoots()) { + changed |= rewriteForOperator(nestedRootRef, nestedRootRef, context); + } } - /** Only processes the case a group-by operator is the input of the subplan operators. */ - if (op.getOperatorTag() == LogicalOperatorTag.GROUP) { - gby = (GroupByOperator) op; - List<ILogicalPlan> newGbyNestedPlans = new ArrayList<ILogicalPlan>(); - for (SubplanOperator subplan : subplans) { - List<ILogicalPlan> subplanNestedPlans = subplan.getNestedPlans(); - List<ILogicalPlan> gbyNestedPlans = gby.getNestedPlans(); - List<ILogicalPlan> subplanNestedPlansToRemove = new ArrayList<ILogicalPlan>(); - for (ILogicalPlan subplanNestedPlan : subplanNestedPlans) { - List<Mutable<ILogicalOperator>> rootOpRefs = subplanNestedPlan.getRoots(); - List<Mutable<ILogicalOperator>> rootOpRefsToRemove = new ArrayList<Mutable<ILogicalOperator>>(); - for (Mutable<ILogicalOperator> rootOpRef : rootOpRefs) { - /** Gets free variables in the root operator of a nested plan and its descent. */ - Set<LogicalVariable> freeVars = new ListSet<LogicalVariable>(); - VariableUtilities.getUsedVariablesInDescendantsAndSelf(rootOpRef.getValue(), freeVars); - Set<LogicalVariable> producedVars = new ListSet<LogicalVariable>(); - VariableUtilities.getProducedVariablesInDescendantsAndSelf(rootOpRef.getValue(), - producedVars); - freeVars.removeAll(producedVars); - /** * Checks whether the above freeVars are all contained in live variables * of one nested plan inside the group-by operator. * If yes, then the subplan can be pushed into the nested plan of the group-by. */ - for (ILogicalPlan gbyNestedPlanOriginal : gbyNestedPlans) { - // add a subplan in the original gby - if (!newGbyNestedPlans.contains(gbyNestedPlanOriginal)) { - newGbyNestedPlans.add(gbyNestedPlanOriginal); - } - - // add a pushed subplan - ILogicalPlan gbyNestedPlan = OperatorManipulationUtil.deepCopy( - gbyNestedPlanOriginal, context); - List<Mutable<ILogicalOperator>> gbyRootOpRefs = gbyNestedPlan.getRoots(); - for (int rootIndex = 0; rootIndex < gbyRootOpRefs.size(); rootIndex++) { - //set the nts for a original subplan - Mutable<ILogicalOperator> originalGbyRootOpRef = gbyNestedPlanOriginal - .getRoots().get(rootIndex); - Mutable<ILogicalOperator> originalGbyNtsRef = downToNts(originalGbyRootOpRef); - NestedTupleSourceOperator originalNts = (NestedTupleSourceOperator) originalGbyNtsRef - .getValue(); - originalNts.setDataSourceReference(new MutableObject<ILogicalOperator>(gby)); - - //push a new subplan if possible - Mutable<ILogicalOperator> gbyRootOpRef = gbyRootOpRefs.get(rootIndex); - Set<LogicalVariable> liveVars = new ListSet<LogicalVariable>(); - VariableUtilities.getLiveVariables(gbyRootOpRef.getValue(), liveVars); - if (liveVars.containsAll(freeVars)) { - /** Does the actual push. */ - Mutable<ILogicalOperator> ntsRef = downToNts(rootOpRef); - ntsRef.setValue(gbyRootOpRef.getValue()); - // Removes unused vars. - AggregateOperator aggOp = (AggregateOperator) gbyRootOpRef.getValue(); - for (int varIndex = aggOp.getVariables().size() - 1; varIndex >= 0; varIndex--) { - if (!freeVars.contains(aggOp.getVariables().get(varIndex))) { - aggOp.getVariables().remove(varIndex); - aggOp.getExpressions().remove(varIndex); - } - } - - gbyRootOpRef.setValue(rootOpRef.getValue()); - rootOpRefsToRemove.add(rootOpRef); - - // Sets the nts for a new pushed plan. - Mutable<ILogicalOperator> oldGbyNtsRef = downToNts(gbyRootOpRef); - NestedTupleSourceOperator nts = (NestedTupleSourceOperator) oldGbyNtsRef - .getValue(); - nts.setDataSourceReference(new MutableObject<ILogicalOperator>(gby)); - - newGbyNestedPlans.add(gbyNestedPlan); - changed = true; - continue; - } - } - } - } - rootOpRefs.removeAll(rootOpRefsToRemove); - if (rootOpRefs.size() == 0) { - subplanNestedPlansToRemove.add(subplanNestedPlan); - } - } - subplanNestedPlans.removeAll(subplanNestedPlansToRemove); - } - if (changed) { - ref.setValue(gby); - gby.getNestedPlans().clear(); - gby.getNestedPlans().addAll(newGbyNestedPlans); - } + subplans.addFirst(currentSubplan); + op = op.getInputs().get(0).getValue(); + } + // Only processes the case a group-by operator is the input of the subplan operators. + if (op.getOperatorTag() != LogicalOperatorTag.GROUP) { + continue; + } + GroupByOperator gby = (GroupByOperator) op; + // Recursively rewrites the pipelines inside a nested subplan. + for (ILogicalPlan subplan : gby.getNestedPlans()) { + for (Mutable<ILogicalOperator> nestedRootRef : subplan.getRoots()) { + changed |= rewriteForOperator(nestedRootRef, nestedRootRef, context); } } + changed |= pushSubplansIntoGroupBy(rootRef, parentOperator, subplans, gby, context); } - if (changed) { - cleanup(gby); - context.computeAndSetTypeEnvironmentForOperator(gby); - context.computeAndSetTypeEnvironmentForOperator(parentOperator); + return changed; + } + + // Pushes subplans into the group by operator. + private boolean pushSubplansIntoGroupBy(Mutable<ILogicalOperator> currentRootRef, ILogicalOperator parentOperator, + Deque<SubplanOperator> subplans, GroupByOperator gby, IOptimizationContext context) + throws AlgebricksException { + boolean changed = false; + List<ILogicalPlan> newGbyNestedPlans = new ArrayList<>(); + List<ILogicalPlan> originalNestedPlansInGby = gby.getNestedPlans(); + + // Adds all original subplans from the group by. + for (ILogicalPlan gbyNestedPlanOriginal : originalNestedPlansInGby) { + newGbyNestedPlans.add(gbyNestedPlanOriginal); } + + // Tries to push subplans into the group by. + Iterator<SubplanOperator> subplanOperatorIterator = subplans.iterator(); + while (subplanOperatorIterator.hasNext()) { + SubplanOperator subplan = subplanOperatorIterator.next(); + Iterator<ILogicalPlan> subplanNestedPlanIterator = subplan.getNestedPlans().iterator(); + while (subplanNestedPlanIterator.hasNext()) { + ILogicalPlan subplanNestedPlan = subplanNestedPlanIterator.next(); + List<Mutable<ILogicalOperator>> upperSubplanRootRefs = subplanNestedPlan.getRoots(); + Iterator<Mutable<ILogicalOperator>> upperSubplanRootRefIterator = upperSubplanRootRefs.iterator(); + while (upperSubplanRootRefIterator.hasNext()) { + Mutable<ILogicalOperator> rootOpRef = upperSubplanRootRefIterator.next(); + + // Collects free variables in the root operator of a nested plan and its descent. + Set<LogicalVariable> freeVars = new ListSet<>(); + OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc((AbstractLogicalOperator) rootOpRef.getValue(), + freeVars); + + // Checks whether the above freeVars are all contained in live variables * of one nested plan + // inside the group-by operator. If yes, then the subplan can be pushed into the nested plan + // of the group-by. + for (ILogicalPlan gbyNestedPlanOriginal : originalNestedPlansInGby) { + ILogicalPlan gbyNestedPlan = OperatorManipulationUtil.deepCopy(gbyNestedPlanOriginal, context); + List<Mutable<ILogicalOperator>> gbyRootOpRefs = gbyNestedPlan.getRoots(); + for (int rootIndex = 0; rootIndex < gbyRootOpRefs.size(); rootIndex++) { + // Sets the nts for a original subplan. + Mutable<ILogicalOperator> originalGbyRootOpRef = gbyNestedPlan.getRoots().get(rootIndex); + Mutable<ILogicalOperator> originalGbyNtsRef = downToNts(originalGbyRootOpRef); + NestedTupleSourceOperator originalNts = (NestedTupleSourceOperator) originalGbyNtsRef + .getValue(); + originalNts.setDataSourceReference(new MutableObject<>(gby)); + + // Pushes a new subplan if possible. + Mutable<ILogicalOperator> gbyRootOpRef = gbyRootOpRefs.get(rootIndex); + Set<LogicalVariable> liveVars = new ListSet<>(); + VariableUtilities.getLiveVariables(gbyRootOpRef.getValue(), liveVars); + if (!liveVars.containsAll(freeVars)) { + continue; + } + + AggregateOperator aggOp = (AggregateOperator) gbyRootOpRef.getValue(); + for (int varIndex = aggOp.getVariables().size() - 1; varIndex >= 0; varIndex--) { + if (!freeVars.contains(aggOp.getVariables().get(varIndex))) { + aggOp.getVariables().remove(varIndex); + aggOp.getExpressions().remove(varIndex); + } + } + + // Copy the original nested pipeline inside the group-by. + Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> copiedAggOpAndVarMap = + OperatorManipulationUtil.deepCopyWithNewVars(aggOp, context); + ILogicalOperator newBottomAgg = copiedAggOpAndVarMap.getLeft(); + + // Substitutes variables in the upper nested pipe line. + VariableUtilities.substituteVariablesInDescendantsAndSelf(rootOpRef.getValue(), + copiedAggOpAndVarMap.getRight(), context); + + // Does the actual push. + Mutable<ILogicalOperator> ntsRef = downToNts(rootOpRef); + ntsRef.setValue(newBottomAgg); + gbyRootOpRef.setValue(rootOpRef.getValue()); + + // Sets the nts for a new pushed plan. + Mutable<ILogicalOperator> oldGbyNtsRef = downToNts(new MutableObject<>(newBottomAgg)); + NestedTupleSourceOperator nts = (NestedTupleSourceOperator) oldGbyNtsRef.getValue(); + nts.setDataSourceReference(new MutableObject<>(gby)); + + OperatorManipulationUtil.computeTypeEnvironmentBottomUp(rootOpRef.getValue(), context); + newGbyNestedPlans.add(new ALogicalPlanImpl(rootOpRef)); + + upperSubplanRootRefIterator.remove(); + changed |= true; + break; + } + } + } + + if (upperSubplanRootRefs.isEmpty()) { + subplanNestedPlanIterator.remove(); + } + } + if (subplan.getNestedPlans().isEmpty()) { + subplanOperatorIterator.remove(); + } + } + + // Resets the nested subplans for the group-by operator. + gby.getNestedPlans().clear(); + gby.getNestedPlans().addAll(newGbyNestedPlans); + + // Connects the group-by operator with its parent operator. + ILogicalOperator parent = !subplans.isEmpty() ? subplans.getFirst() : parentOperator; + parent.getInputs().get(0).setValue(gby); + + // Removes unnecessary pipelines inside the group by operator. + cleanup(currentRootRef.getValue(), gby); + + // Computes type environments. + context.computeAndSetTypeEnvironmentForOperator(gby); + context.computeAndSetTypeEnvironmentForOperator(parent); return changed; } /** * Removes unused aggregation variables (and expressions) * - * @param gby + * @param rootOp, + * the root operator of a plan or nested plan. + * @param gby, + * the group-by operator. * @throws AlgebricksException */ - private void cleanup(GroupByOperator gby) throws AlgebricksException { - for (ILogicalPlan nestedPlan : gby.getNestedPlans()) { - for (Mutable<ILogicalOperator> rootRef : nestedPlan.getRoots()) { - AggregateOperator aggOp = (AggregateOperator) rootRef.getValue(); + private void cleanup(ILogicalOperator rootOp, GroupByOperator gby) throws AlgebricksException { + Set<LogicalVariable> freeVars = new HashSet<>(); + OperatorPropertiesUtil.getFreeVariablesInPath(rootOp, gby, freeVars); + Iterator<ILogicalPlan> nestedPlanIterator = gby.getNestedPlans().iterator(); + while (nestedPlanIterator.hasNext()) { + ILogicalPlan nestedPlan = nestedPlanIterator.next(); + Iterator<Mutable<ILogicalOperator>> nestRootRefIterator = nestedPlan.getRoots().iterator(); + while (nestRootRefIterator.hasNext()) { + Mutable<ILogicalOperator> nestRootRef = nestRootRefIterator.next(); + AggregateOperator aggOp = (AggregateOperator) nestRootRef.getValue(); for (int varIndex = aggOp.getVariables().size() - 1; varIndex >= 0; varIndex--) { - if (!usedVarsSoFar.contains(aggOp.getVariables().get(varIndex))) { + if (!freeVars.contains(aggOp.getVariables().get(varIndex))) { aggOp.getVariables().remove(varIndex); aggOp.getExpressions().remove(varIndex); } } + if (aggOp.getVariables().isEmpty()) { + nestRootRefIterator.remove(); + } } - + if (nestedPlan.getRoots().isEmpty()) { + nestedPlanIterator.remove(); + } } } -- To view, visit https://asterix-gerrit.ics.uci.edu/1992 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I969c40112be0506981357a9c41bf9675ae12ffb9 Gerrit-PatchSet: 3 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Yingyi Bu <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Dmitry Lychagin <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]>
