Yingyi Bu has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1992
Change subject: [ASTERIXDB-2051][COMP] Fix PushSubplanIntoGroupByRule for
complex cases.
......................................................................
[ASTERIXDB-2051][COMP] Fix PushSubplanIntoGroupByRule for complex cases.
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- re-implement PushSubplanIntoGroupByRule and let it handle general cases;
- add an option to LogicalOperatorDeepCopyWithNewVariablesVisitor for
not re-mapping free variables in the given plan subtree.
Change-Id: I969c40112be0506981357a9c41bf9675ae12ffb9
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
A
asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-810-3.sqlpp
M
asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
M
asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan
M
asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan
A
asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-3.plan
M
asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan
M
asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan
M
asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan
M
asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan
M
asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan
M
asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
M
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
M
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
18 files changed, 431 insertions(+), 205 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/92/1992/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
index 725de12..19c6da7 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
@@ -629,13 +629,13 @@
// Create first copy.
LogicalOperatorDeepCopyWithNewVariablesVisitor firstDeepCopyVisitor =
new LogicalOperatorDeepCopyWithNewVariablesVisitor(
- context, context, newProbeSubTreeVarMap);
+ context, context, newProbeSubTreeVarMap, true);
ILogicalOperator newProbeSubTree =
firstDeepCopyVisitor.deepCopy(probeSubTree.getRoot());
inferTypes(newProbeSubTree, context);
Mutable<ILogicalOperator> newProbeSubTreeRootRef = new
MutableObject<ILogicalOperator>(newProbeSubTree);
// Create second copy.
LogicalOperatorDeepCopyWithNewVariablesVisitor secondDeepCopyVisitor =
new LogicalOperatorDeepCopyWithNewVariablesVisitor(
- context, context, joinInputSubTreeVarMap);
+ context, context, joinInputSubTreeVarMap, true);
ILogicalOperator joinInputSubTree =
secondDeepCopyVisitor.deepCopy(probeSubTree.getRoot());
inferTypes(joinInputSubTree, context);
probeSubTree.getRootRef().setValue(joinInputSubTree);
diff --git
a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-810-3.sqlpp
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-810-3.sqlpp
new file mode 100644
index 0000000..4b94bf6
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-810-3.sqlpp
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : This test case is to verify the fix for issue810
+ * https://code.google.com/p/asterixdb/issues/detail?id=810
+ * Expected Res : SUCCESS
+ * Date : 16th Nov. 2014
+ */
+
+DROP DATAVERSE tpch IF EXISTS;
+CREATE DATAVERSE tpch;
+
+USE tpch;
+
+
+CREATE TYPE LineItemType AS CLOSED {
+ l_orderkey : integer,
+ l_partkey : integer,
+ l_suppkey : integer,
+ l_linenumber : integer,
+ l_quantity : double,
+ l_extendedprice : double,
+ l_discount : double,
+ l_tax : double,
+ l_returnflag : string,
+ l_linestatus : string,
+ l_shipdate : string,
+ l_commitdate : string,
+ l_receiptdate : string,
+ l_shipinstruct : string,
+ l_shipmode : string,
+ l_comment : string
+};
+
+CREATE DATASET LineItem(LineItemType) PRIMARY KEY l_orderkey,l_linenumber;
+
+
+SELECT l_returnflag AS l_returnflag,
+ l_linestatus AS l_linestatus,
+ coll_count(cheap) AS count_cheaps,
+ coll_count(expensive) AS count_expensives
+FROM LineItem AS l
+/* +hash */
+GROUP BY l.l_returnflag AS l_returnflag,l.l_linestatus AS l_linestatus
+GROUP AS g
+LET cheap = (
+ SELECT ELEMENT m
+ FROM (FROM g SELECT VALUE l) AS m
+ WHERE m.l_discount > 0.05
+),
+expensive = (
+ SELECT ELEMENT g.l
+ FROM g
+ WHERE g.l.l_discount <= 0.05
+)
+ORDER BY l_returnflag,l_linestatus
+;
diff --git
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
index d9ce6c8..26d6103 100644
---
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
+++
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
@@ -8,12 +8,12 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STABLE_SORT [topK: 100] [$$o_totalprice(DESC),
$$o_orderdate(ASC)] |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- SORT_GROUP_BY[$$72, $$73] |PARTITIONED|
+ -- SORT_GROUP_BY[$$73, $$74] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
- -- HASH_PARTITION_EXCHANGE [$$72, $$73] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$73, $$74] |PARTITIONED|
-- SORT_GROUP_BY[$$56, $$57] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
@@ -49,12 +49,12 @@
-- STREAM_PROJECT |PARTITIONED|
-- STREAM_SELECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE
|PARTITIONED|
- -- SORT_GROUP_BY[$$69]
|PARTITIONED|
+ -- SORT_GROUP_BY[$$70]
|PARTITIONED|
{
-- AGGREGATE
|LOCAL|
--
NESTED_TUPLE_SOURCE |LOCAL|
}
- -- HASH_PARTITION_EXCHANGE
[$$69] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE
[$$70] |PARTITIONED|
--
PRE_CLUSTERED_GROUP_BY[$$58] |PARTITIONED|
{
-- AGGREGATE
|LOCAL|
diff --git
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan
index ab10f2d..f2adbf6 100644
---
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan
+++
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan
@@ -3,7 +3,7 @@
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$27] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$28] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
@@ -20,9 +20,9 @@
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$27(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$27] |PARTITIONED|
- -- SORT_GROUP_BY[$$18, $$24] |PARTITIONED|
+ -- STABLE_SORT [$$28(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$28] |PARTITIONED|
+ -- SORT_GROUP_BY[$$18, $$25] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
diff --git
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan
index cad2fbb..238736a 100644
---
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan
+++
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan
@@ -3,7 +3,7 @@
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ]
|PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$42, $$43] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$44, $$45] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
@@ -13,8 +13,8 @@
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$42(ASC), $$43(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$42, $$43] |PARTITIONED|
+ -- STABLE_SORT [$$44(ASC), $$45(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$44, $$45] |PARTITIONED|
-- PRE_CLUSTERED_GROUP_BY[$$30, $$31] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
diff --git
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-3.plan
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-3.plan
new file mode 100644
index 0000000..9a7df35
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-3.plan
@@ -0,0 +1,37 @@
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ]
|PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$44, $$45] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$44(ASC), $$45(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$44, $$45] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$31, $$32] |PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- STREAM_SELECT |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ {
+ -- AGGREGATE |LOCAL|
+ -- STREAM_SELECT |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$31(ASC), $$32(ASC)] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan
index 1c56f1c..18b4218 100644
---
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan
+++
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan
@@ -3,7 +3,7 @@
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ]
|PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$42, $$43] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$44, $$45] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
@@ -13,8 +13,8 @@
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$42(ASC), $$43(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$42, $$43] |PARTITIONED|
+ -- STABLE_SORT [$$44(ASC), $$45(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$44, $$45] |PARTITIONED|
-- PRE_CLUSTERED_GROUP_BY[$$32, $$33] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
@@ -31,9 +31,7 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan
index 13fb7e1..2d75ff0 100644
---
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan
+++
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan
@@ -3,12 +3,12 @@
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- SORT_GROUP_BY[$$19] |PARTITIONED|
+ -- SORT_GROUP_BY[$$20] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
- -- HASH_PARTITION_EXCHANGE [$$19] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$20] |PARTITIONED|
-- PRE_CLUSTERED_GROUP_BY[$$16] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
diff --git
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan
index fe1f67a..4405b9d 100644
---
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan
+++
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan
@@ -13,12 +13,12 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STABLE_SORT [$$nation_key(ASC)] |PARTITIONED|
-- HASH_PARTITION_EXCHANGE [$$nation_key] |PARTITIONED|
- -- SORT_GROUP_BY[$$69, $$70] |PARTITIONED|
+ -- SORT_GROUP_BY[$$70, $$71] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
- -- HASH_PARTITION_EXCHANGE [$$69, $$70] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$70, $$71] |PARTITIONED|
-- SORT_GROUP_BY[$$50, $$54] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
@@ -31,7 +31,7 @@
-- HASH_PARTITION_EXCHANGE [$$56]
|PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- HYBRID_HASH_JOIN [$$54][$$63]
|PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$54][$$64]
|PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE
|PARTITIONED|
@@ -48,7 +48,7 @@
-- DATASOURCE_SCAN
|PARTITIONED|
-- ONE_TO_ONE_EXCHANGE
|PARTITIONED|
--
EMPTY_TUPLE_SOURCE |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$63]
|PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$64]
|PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE
|PARTITIONED|
diff --git
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan
index e229e75..1d24a80 100644
---
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan
+++
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan
@@ -3,7 +3,7 @@
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ]
|PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$75, $$76] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$69, $$70] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
@@ -17,8 +17,8 @@
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$75(ASC), $$76(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$75, $$76] |PARTITIONED|
+ -- STABLE_SORT [$$69(ASC), $$70(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$69, $$70] |PARTITIONED|
-- PRE_CLUSTERED_GROUP_BY[$$42, $$43] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
diff --git
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan
index ca941bf..3cfd8af 100644
---
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan
+++
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan
@@ -3,7 +3,7 @@
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ]
|PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$34, $$35] |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$36, $$37] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
@@ -13,8 +13,8 @@
-- NESTED_TUPLE_SOURCE |LOCAL|
}
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$34(ASC), $$35(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$34, $$35] |PARTITIONED|
+ -- STABLE_SORT [$$36(ASC), $$37(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$36, $$37] |PARTITIONED|
-- PRE_CLUSTERED_GROUP_BY[$$23, $$24] |PARTITIONED|
{
-- AGGREGATE |LOCAL|
diff --git
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan
index a5124c3..2eb7603 100644
---
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan
+++
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan
@@ -27,34 +27,34 @@
-- UNNEST |LOCAL|
-- NESTED_TUPLE_SOURCE |LOCAL|
}
- -- SUBPLAN |PARTITIONED|
- {
- -- AGGREGATE |LOCAL|
- -- STREAM_SELECT |LOCAL|
- -- ASSIGN |LOCAL|
- -- UNNEST |LOCAL|
- -- SUBPLAN |LOCAL|
- {
- -- AGGREGATE |LOCAL|
- -- IN_MEMORY_STABLE_SORT
[$$j(ASC)] |LOCAL|
- -- UNNEST |LOCAL|
- -- NESTED_TUPLE_SOURCE
|LOCAL|
- }
- -- NESTED_TUPLE_SOURCE |LOCAL|
- }
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- PRE_CLUSTERED_GROUP_BY[$$94, $$95, $$96, $$97]
|PARTITIONED|
- {
- -- AGGREGATE |LOCAL|
- -- NESTED_TUPLE_SOURCE |LOCAL|
- }
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$94(ASC), $$95(ASC), $$96(ASC),
$$97(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$94, $$95, $$96, $$97]
|PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- PRE_CLUSTERED_GROUP_BY[$$94, $$95, $$96, $$97]
|PARTITIONED|
+ {
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ {
+ -- AGGREGATE |LOCAL|
+ -- STREAM_SELECT |LOCAL|
+ -- ASSIGN |LOCAL|
+ -- UNNEST |LOCAL|
+ -- SUBPLAN |LOCAL|
+ {
+ -- AGGREGATE |LOCAL|
+ -- IN_MEMORY_STABLE_SORT
[$$j(ASC)] |LOCAL|
+ -- UNNEST |LOCAL|
+ -- NESTED_TUPLE_SOURCE
|LOCAL|
+ }
+ -- AGGREGATE |LOCAL|
+ -- NESTED_TUPLE_SOURCE |LOCAL|
+ }
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$94(ASC), $$95(ASC), $$96(ASC),
$$97(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$94, $$95, $$96, $$97]
|PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
index cb89a73..cef387a 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -43,12 +44,15 @@
private final IVariableContext varContext;
private final Map<LogicalVariable, LogicalVariable> inVarMapping;
private final Map<LogicalVariable, LogicalVariable> outVarMapping;
+ private final Set<LogicalVariable> freeVars;
public LogicalExpressionDeepCopyWithNewVariablesVisitor(IVariableContext
varContext,
- Map<LogicalVariable, LogicalVariable> inVarMapping,
Map<LogicalVariable, LogicalVariable> variableMapping) {
+ Map<LogicalVariable, LogicalVariable> inVarMapping,
Map<LogicalVariable, LogicalVariable> variableMapping,
+ Set<LogicalVariable> freeVars) {
this.varContext = varContext;
this.inVarMapping = inVarMapping;
this.outVarMapping = variableMapping;
+ this.freeVars = freeVars;
}
public ILogicalExpression deepCopy(ILogicalExpression expr) throws
AlgebricksException {
@@ -146,6 +150,9 @@
public ILogicalExpression
visitVariableReferenceExpression(VariableReferenceExpression expr, Void arg)
throws AlgebricksException {
LogicalVariable var = expr.getVariableReference();
+ if (freeVars.contains(var)) {
+ return expr;
+ }
LogicalVariable givenVarReplacement = inVarMapping.get(var);
if (givenVarReplacement != null) {
outVarMapping.put(var, givenVarReplacement);
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 0da9110..6555971 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -19,9 +19,11 @@
package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -38,10 +40,10 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
@@ -52,7 +54,6 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
@@ -64,10 +65,12 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
import
org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
import
org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
import
org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor;
/**
@@ -87,7 +90,13 @@
// Key: New variable in the new plan. Value: The old variable in the
// original plan.
- private final LinkedHashMap<LogicalVariable, LogicalVariable>
outputVarToInputVarMapping;
+ private final LinkedHashMap<LogicalVariable, LogicalVariable>
outputVarToInputVarMapping = new LinkedHashMap<>();
+
+ // Free variables: variables that shouldn't be deep copied, i.e., mapped.
+ private final Set<LogicalVariable> freeVars = new HashSet<>();
+
+ // Whether free variables in the given plan subtree should be reused.
+ private final boolean reuseFreeVars;
/**
* @param varContext
@@ -96,12 +105,20 @@
* the type context.
*/
public LogicalOperatorDeepCopyWithNewVariablesVisitor(IVariableContext
varContext, ITypingContext typeContext) {
- this.varContext = varContext;
- this.typeContext = typeContext;
- this.inputVarToOutputVarMapping = new LinkedHashMap<>();
- this.outputVarToInputVarMapping = new LinkedHashMap<>();
- this.exprDeepCopyVisitor = new
LogicalExpressionDeepCopyWithNewVariablesVisitor(varContext,
- outputVarToInputVarMapping, inputVarToOutputVarMapping);
+ this(varContext, typeContext, new LinkedHashMap<>(), false);
+ }
+
+ /**
+ * @param varContext
+ * , the variable context.
+ * @param typeContext
+ * the type context.
+ * @param reuseFreeVars
+ * whether free variables in the given plan tree should be
reused.
+ */
+ public LogicalOperatorDeepCopyWithNewVariablesVisitor(IVariableContext
varContext, ITypingContext typeContext,
+ boolean reuseFreeVars) {
+ this(varContext, typeContext, new LinkedHashMap<>(), reuseFreeVars);
}
/**
@@ -113,15 +130,17 @@
* Variable mapping keyed by variables in the original plan.
* Those variables are replaced by their corresponding value in
* the map in the copied plan.
+ * @param reuseFreeVars
+ * whether free variables in the given plan tree should be
reused.
*/
public LogicalOperatorDeepCopyWithNewVariablesVisitor(IVariableContext
varContext, ITypingContext typeContext,
- LinkedHashMap<LogicalVariable, LogicalVariable> inVarMapping) {
+ LinkedHashMap<LogicalVariable, LogicalVariable> inVarMapping,
boolean reuseFreeVars) {
this.varContext = varContext;
this.typeContext = typeContext;
this.inputVarToOutputVarMapping = inVarMapping;
- this.outputVarToInputVarMapping = new LinkedHashMap<>();
- exprDeepCopyVisitor = new
LogicalExpressionDeepCopyWithNewVariablesVisitor(varContext, inVarMapping,
- inputVarToOutputVarMapping);
+ this.exprDeepCopyVisitor = new
LogicalExpressionDeepCopyWithNewVariablesVisitor(varContext, inVarMapping,
+ inputVarToOutputVarMapping, freeVars);
+ this.reuseFreeVars = reuseFreeVars;
}
private void copyAnnotations(ILogicalOperator src, ILogicalOperator dest) {
@@ -136,6 +155,9 @@
private ILogicalOperator deepCopy(ILogicalOperator op, ILogicalOperator
arg) throws AlgebricksException {
if (op == null) {
return null;
+ }
+ if (reuseFreeVars) {
+
OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc((AbstractLogicalOperator)
op, freeVars);
}
ILogicalOperator opCopy = op.accept(this, arg);
if (typeContext != null) {
@@ -207,6 +229,9 @@
if (var == null) {
return null;
}
+ if (freeVars.contains(var)) {
+ return var;
+ }
LogicalVariable givenVarReplacement =
outputVarToInputVarMapping.get(var);
if (givenVarReplacement != null) {
inputVarToOutputVarMapping.put(var, givenVarReplacement);
@@ -247,6 +272,7 @@
}
public void reset() {
+ freeVars.clear();
inputVarToOutputVarMapping.clear();
outputVarToInputVarMapping.clear();
}
@@ -561,10 +587,6 @@
deepCopyVariable(op.getPositionalVariable()),
op.getPositionalVariableType(), op.getPositionWriter());
deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
return opCopy;
- }
-
- public LinkedHashMap<LogicalVariable, LogicalVariable>
getOutputToInputVariableMapping() {
- return outputVarToInputVarMapping;
}
public LinkedHashMap<LogicalVariable, LogicalVariable>
getInputToOutputVariableMapping() {
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 8a8fe6d..e27c2b1 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -20,10 +20,12 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
@@ -38,6 +40,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.OperatorDeepCopyVisitor;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
@@ -196,6 +199,14 @@
return new ALogicalPlanImpl(newRoots);
}
+ public static Pair<ILogicalOperator, Map<LogicalVariable,
LogicalVariable>> deepCopyWithNewVars(
+ ILogicalOperator root, IOptimizationContext ctx) throws
AlgebricksException {
+ LogicalOperatorDeepCopyWithNewVariablesVisitor deepCopyVisitor = new
LogicalOperatorDeepCopyWithNewVariablesVisitor(
+ ctx, null, true);
+ ILogicalOperator newRoot = deepCopyVisitor.deepCopy(root);
+ return Pair.of(newRoot,
deepCopyVisitor.getInputToOutputVariableMapping());
+ }
+
private static void setDataSource(ILogicalPlan plan, ILogicalOperator
dataSource) {
for (Mutable<ILogicalOperator> rootRef : plan.getRoots()) {
setDataSource(rootRef, dataSource);
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
index 8b36a68..463f214 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
@@ -75,7 +75,7 @@
* collection provided.
*
* @param op
- * @param vars
+ * @param freeVars
* - The collection to which the free variables will be added.
*/
public static void getFreeVariablesInSelfOrDesc(AbstractLogicalOperator
op, Set<LogicalVariable> freeVars)
diff --git
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
index e73f2a5..de64bab 100644
---
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
+++
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.algebricks.rewriter.rules;
+import java.security.acl.Group;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
@@ -322,7 +323,7 @@
// Finds the reference of the bottom-most operator in the pipeline
that
// should not be pushed to the combiner group-by.
Mutable<ILogicalOperator> currentOpRef = new
MutableObject<ILogicalOperator>(nestedGby);
- Mutable<ILogicalOperator> bottomOpRef =
findBottomOpRefStayInOldGby(currentOpRef);
+ Mutable<ILogicalOperator> bottomOpRef =
findBottomOpRefStayInOldGby(nestedGby, currentOpRef);
// Adds the used variables in the pipeline from
<code>currentOpRef</code> to <code>bottomOpRef</code>
// into the group-by keys for the introduced combiner group-by
operator.
@@ -392,15 +393,27 @@
* Find the bottom-most nested operator reference in the query pipeline
rooted at <code>currentOpRef</code>
* that cannot be pushed into the combiner group-by operator.
*
- * @param currentOpRef
+ * @param nestedGby
+ * the nested group-by operator.
+ * @param currentOpRef,the
+ * reference of the current op.
* @return the bottom-most reference of a select operator
*/
- private Mutable<ILogicalOperator>
findBottomOpRefStayInOldGby(Mutable<ILogicalOperator> currentOpRef)
+ private Mutable<ILogicalOperator>
findBottomOpRefStayInOldGby(GroupByOperator nestedGby,
+ Mutable<ILogicalOperator> currentOpRef)
throws AlgebricksException {
+ Set<LogicalVariable> usedVarsInNestedGby = new HashSet<>();
+ // Collects used variables in nested pipelines.
+ for (ILogicalPlan nestedPlan : nestedGby.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> rootOpRef : nestedPlan.getRoots()) {
+ VariableUtilities.getUsedVariables(rootOpRef.getValue(),
usedVarsInNestedGby);
+ }
+ }
Mutable<ILogicalOperator> bottomOpRef = currentOpRef;
while (currentOpRef.getValue().getInputs().size() > 0) {
Set<LogicalVariable> producedVars = new HashSet<>();
VariableUtilities.getProducedVariables(currentOpRef.getValue(),
producedVars);
+ producedVars.removeAll(usedVarsInNestedGby);
if (currentOpRef.getValue().getOperatorTag() ==
LogicalOperatorTag.SELECT || !producedVars.isEmpty()) {
bottomOpRef = currentOpRef;
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
index 3a86565..7c83dc8 100644
---
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
+++
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
@@ -20,13 +20,18 @@
package org.apache.hyracks.algebricks.rewriter.rules.subplan;
+import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Deque;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.ListSet;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -40,7 +45,9 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
import
org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
/**
@@ -51,151 +58,209 @@
*/
public class PushSubplanIntoGroupByRule implements IAlgebraicRewriteRule {
- /** Stores used variables above the current operator. */
- private final Set<LogicalVariable> usedVarsSoFar = new
HashSet<LogicalVariable>();
+ /** The pointer to the topmost operator */
+ private Mutable<ILogicalOperator> rootRef;
+ /** Whether the rule has ever been invoked */
+ private boolean invoked = false;
@Override
- public boolean rewritePost(Mutable<ILogicalOperator> opRef,
IOptimizationContext context)
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef,
IOptimizationContext context)
throws AlgebricksException {
- return false;
+ if (!invoked) {
+ rootRef = opRef;
+ invoked = true;
+ }
+ return rewriteForOperator(rootRef, opRef, context);
}
- @Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef,
IOptimizationContext context) throws AlgebricksException {
- ILogicalOperator parentOperator = opRef.getValue();
- if (context.checkIfInDontApplySet(this, parentOperator)) {
- return false;
- }
- context.addToDontApplySet(this, parentOperator);
- VariableUtilities.getUsedVariables(parentOperator, usedVarsSoFar);
- if (parentOperator.getInputs().size() <= 0) {
- return false;
- }
+ // The core rewriting function for an operator.
+ private boolean rewriteForOperator(Mutable<ILogicalOperator> rootRef,
Mutable<ILogicalOperator> opRef,
+ IOptimizationContext context) throws AlgebricksException {
boolean changed = false;
- GroupByOperator gby = null;
+ ILogicalOperator parentOperator = opRef.getValue();
for (Mutable<ILogicalOperator> ref : parentOperator.getInputs()) {
- AbstractLogicalOperator op = (AbstractLogicalOperator)
ref.getValue();
- /** Only processes subplan operator. */
- List<SubplanOperator> subplans = new ArrayList<SubplanOperator>();
- if (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
- while (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
- SubplanOperator currentSubplan = (SubplanOperator) op;
- subplans.add(currentSubplan);
- op = (AbstractLogicalOperator)
op.getInputs().get(0).getValue();
+ ILogicalOperator op = ref.getValue();
+ // Only processes subplan operator.
+ Deque<SubplanOperator> subplans = new ArrayDeque<>();
+ if (op.getOperatorTag() != LogicalOperatorTag.SUBPLAN) {
+ // Recursively rewrites the child plan.
+ changed |= rewriteForOperator(rootRef, ref, context);
+ continue;
+ }
+ while (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
+ SubplanOperator currentSubplan = (SubplanOperator) op;
+ // Recursively rewrites the pipelines inside a nested subplan.
+ for (ILogicalPlan subplan : currentSubplan.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> nestedRootRef :
subplan.getRoots()) {
+ changed |= rewriteForOperator(nestedRootRef,
nestedRootRef, context);
+ }
}
- /** Only processes the case a group-by operator is the input
of the subplan operators. */
- if (op.getOperatorTag() == LogicalOperatorTag.GROUP) {
- gby = (GroupByOperator) op;
- List<ILogicalPlan> newGbyNestedPlans = new
ArrayList<ILogicalPlan>();
- for (SubplanOperator subplan : subplans) {
- List<ILogicalPlan> subplanNestedPlans =
subplan.getNestedPlans();
- List<ILogicalPlan> gbyNestedPlans =
gby.getNestedPlans();
- List<ILogicalPlan> subplanNestedPlansToRemove = new
ArrayList<ILogicalPlan>();
- for (ILogicalPlan subplanNestedPlan :
subplanNestedPlans) {
- List<Mutable<ILogicalOperator>> rootOpRefs =
subplanNestedPlan.getRoots();
- List<Mutable<ILogicalOperator>> rootOpRefsToRemove
= new ArrayList<Mutable<ILogicalOperator>>();
- for (Mutable<ILogicalOperator> rootOpRef :
rootOpRefs) {
- /** Gets free variables in the root operator
of a nested plan and its descent. */
- Set<LogicalVariable> freeVars = new
ListSet<LogicalVariable>();
-
VariableUtilities.getUsedVariablesInDescendantsAndSelf(rootOpRef.getValue(),
freeVars);
- Set<LogicalVariable> producedVars = new
ListSet<LogicalVariable>();
-
VariableUtilities.getProducedVariablesInDescendantsAndSelf(rootOpRef.getValue(),
- producedVars);
- freeVars.removeAll(producedVars);
- /** * Checks whether the above freeVars are
all contained in live variables * of one nested plan inside the group-by
operator. * If yes, then the subplan can be pushed into the nested plan of the
group-by. */
- for (ILogicalPlan gbyNestedPlanOriginal :
gbyNestedPlans) {
- // add a subplan in the original gby
- if
(!newGbyNestedPlans.contains(gbyNestedPlanOriginal)) {
-
newGbyNestedPlans.add(gbyNestedPlanOriginal);
- }
-
- // add a pushed subplan
- ILogicalPlan gbyNestedPlan =
OperatorManipulationUtil.deepCopy(
- gbyNestedPlanOriginal, context);
- List<Mutable<ILogicalOperator>>
gbyRootOpRefs = gbyNestedPlan.getRoots();
- for (int rootIndex = 0; rootIndex <
gbyRootOpRefs.size(); rootIndex++) {
- //set the nts for a original subplan
- Mutable<ILogicalOperator>
originalGbyRootOpRef = gbyNestedPlanOriginal
- .getRoots().get(rootIndex);
- Mutable<ILogicalOperator>
originalGbyNtsRef = downToNts(originalGbyRootOpRef);
- NestedTupleSourceOperator originalNts
= (NestedTupleSourceOperator) originalGbyNtsRef
- .getValue();
- originalNts.setDataSourceReference(new
MutableObject<ILogicalOperator>(gby));
-
- //push a new subplan if possible
- Mutable<ILogicalOperator> gbyRootOpRef
= gbyRootOpRefs.get(rootIndex);
- Set<LogicalVariable> liveVars = new
ListSet<LogicalVariable>();
-
VariableUtilities.getLiveVariables(gbyRootOpRef.getValue(), liveVars);
- if (liveVars.containsAll(freeVars)) {
- /** Does the actual push. */
- Mutable<ILogicalOperator> ntsRef =
downToNts(rootOpRef);
-
ntsRef.setValue(gbyRootOpRef.getValue());
- // Removes unused vars.
- AggregateOperator aggOp =
(AggregateOperator) gbyRootOpRef.getValue();
- for (int varIndex =
aggOp.getVariables().size() - 1; varIndex >= 0; varIndex--) {
- if
(!freeVars.contains(aggOp.getVariables().get(varIndex))) {
-
aggOp.getVariables().remove(varIndex);
-
aggOp.getExpressions().remove(varIndex);
- }
- }
-
-
gbyRootOpRef.setValue(rootOpRef.getValue());
- rootOpRefsToRemove.add(rootOpRef);
-
- // Sets the nts for a new pushed
plan.
- Mutable<ILogicalOperator>
oldGbyNtsRef = downToNts(gbyRootOpRef);
- NestedTupleSourceOperator nts =
(NestedTupleSourceOperator) oldGbyNtsRef
- .getValue();
- nts.setDataSourceReference(new
MutableObject<ILogicalOperator>(gby));
-
-
newGbyNestedPlans.add(gbyNestedPlan);
- changed = true;
- continue;
- }
- }
- }
- }
- rootOpRefs.removeAll(rootOpRefsToRemove);
- if (rootOpRefs.size() == 0) {
-
subplanNestedPlansToRemove.add(subplanNestedPlan);
- }
- }
-
subplanNestedPlans.removeAll(subplanNestedPlansToRemove);
- }
- if (changed) {
- ref.setValue(gby);
- gby.getNestedPlans().clear();
- gby.getNestedPlans().addAll(newGbyNestedPlans);
- }
+ subplans.addFirst(currentSubplan);
+ op = op.getInputs().get(0).getValue();
+ }
+ // Only processes the case a group-by operator is the input of the
subplan operators.
+ if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
+ continue;
+ }
+ GroupByOperator gby = (GroupByOperator) op;
+ // Recursively rewrites the pipelines inside a nested subplan.
+ for (ILogicalPlan subplan : gby.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> nestedRootRef :
subplan.getRoots()) {
+ changed |= rewriteForOperator(nestedRootRef,
nestedRootRef, context);
}
}
+ changed |= pushSubplansIntoGroupBy(rootRef, parentOperator,
subplans, gby, context);
}
- if (changed) {
- cleanup(gby);
- context.computeAndSetTypeEnvironmentForOperator(gby);
- context.computeAndSetTypeEnvironmentForOperator(parentOperator);
+ return changed;
+ }
+
+ // Pushes subplans into the group by operator.
+ private boolean pushSubplansIntoGroupBy(Mutable<ILogicalOperator>
currentRootRef, ILogicalOperator parentOperator,
+ Deque<SubplanOperator> subplans, GroupByOperator gby,
IOptimizationContext context)
+ throws AlgebricksException {
+ boolean changed = false;
+ List<ILogicalPlan> newGbyNestedPlans = new ArrayList<>();
+ List<ILogicalPlan> originalNestedPlansInGby = gby.getNestedPlans();
+
+ // Adds all original subplans from the group by.
+ for (ILogicalPlan gbyNestedPlanOriginal : originalNestedPlansInGby) {
+ newGbyNestedPlans.add(gbyNestedPlanOriginal);
}
+
+ // Tries to push subplans into the group by.
+ Iterator<SubplanOperator> subplanOperatorIterator =
subplans.iterator();
+ while (subplanOperatorIterator.hasNext()) {
+ SubplanOperator subplan = subplanOperatorIterator.next();
+ Iterator<ILogicalPlan> subplanNestedPlanIterator =
subplan.getNestedPlans().iterator();
+ while (subplanNestedPlanIterator.hasNext()) {
+ ILogicalPlan subplanNestedPlan =
subplanNestedPlanIterator.next();
+ List<Mutable<ILogicalOperator>> upperSubplanRootRefs =
subplanNestedPlan.getRoots();
+ Iterator<Mutable<ILogicalOperator>>
upperSubplanRootRefIterator = upperSubplanRootRefs.iterator();
+ while (upperSubplanRootRefIterator.hasNext()) {
+ Mutable<ILogicalOperator> rootOpRef =
upperSubplanRootRefIterator.next();
+
+ // Collects free variables in the root operator of a
nested plan and its descent.
+ Set<LogicalVariable> freeVars = new ListSet<>();
+
OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc((AbstractLogicalOperator)
rootOpRef.getValue(),
+ freeVars);
+
+ // Checks whether the above freeVars are all contained in
live variables * of one nested plan
+ // inside the group-by operator. If yes, then the subplan
can be pushed into the nested plan
+ // of the group-by.
+ for (ILogicalPlan gbyNestedPlanOriginal :
originalNestedPlansInGby) {
+ ILogicalPlan gbyNestedPlan =
OperatorManipulationUtil.deepCopy(gbyNestedPlanOriginal, context);
+ List<Mutable<ILogicalOperator>> gbyRootOpRefs =
gbyNestedPlan.getRoots();
+ for (int rootIndex = 0; rootIndex <
gbyRootOpRefs.size(); rootIndex++) {
+ // Sets the nts for a original subplan.
+ Mutable<ILogicalOperator> originalGbyRootOpRef =
gbyNestedPlan.getRoots().get(rootIndex);
+ Mutable<ILogicalOperator> originalGbyNtsRef =
downToNts(originalGbyRootOpRef);
+ NestedTupleSourceOperator originalNts =
(NestedTupleSourceOperator) originalGbyNtsRef
+ .getValue();
+ originalNts.setDataSourceReference(new
MutableObject<>(gby));
+
+ // Pushes a new subplan if possible.
+ Mutable<ILogicalOperator> gbyRootOpRef =
gbyRootOpRefs.get(rootIndex);
+ Set<LogicalVariable> liveVars = new ListSet<>();
+
VariableUtilities.getLiveVariables(gbyRootOpRef.getValue(), liveVars);
+ if (!liveVars.containsAll(freeVars)) {
+ continue;
+ }
+
+ AggregateOperator aggOp = (AggregateOperator)
gbyRootOpRef.getValue();
+ for (int varIndex = aggOp.getVariables().size() -
1; varIndex >= 0; varIndex--) {
+ if
(!freeVars.contains(aggOp.getVariables().get(varIndex))) {
+ aggOp.getVariables().remove(varIndex);
+ aggOp.getExpressions().remove(varIndex);
+ }
+ }
+
+ // Copy the original nested pipeline inside the
group-by.
+ Pair<ILogicalOperator, Map<LogicalVariable,
LogicalVariable>> copiedAggOpAndVarMap = OperatorManipulationUtil
+ .deepCopyWithNewVars(aggOp, context);
+ ILogicalOperator newBottomAgg =
copiedAggOpAndVarMap.getLeft();
+
+ // Substitutes variables in the upper nested pipe
line.
+
VariableUtilities.substituteVariablesInDescendantsAndSelf(rootOpRef.getValue(),
+ copiedAggOpAndVarMap.getRight(), context);
+
+ // Does the actual push.
+ Mutable<ILogicalOperator> ntsRef =
downToNts(rootOpRef);
+ ntsRef.setValue(newBottomAgg);
+ gbyRootOpRef.setValue(rootOpRef.getValue());
+
+ // Sets the nts for a new pushed plan.
+ Mutable<ILogicalOperator> oldGbyNtsRef =
downToNts(new MutableObject<>(newBottomAgg));
+ NestedTupleSourceOperator nts =
(NestedTupleSourceOperator) oldGbyNtsRef.getValue();
+ nts.setDataSourceReference(new
MutableObject<>(gby));
+
+
OperatorManipulationUtil.computeTypeEnvironmentBottomUp(rootOpRef.getValue(),
context);
+ newGbyNestedPlans.add(new
ALogicalPlanImpl(rootOpRef));
+
+ upperSubplanRootRefIterator.remove();
+ changed |= true;
+ break;
+ }
+ }
+ }
+
+ if (upperSubplanRootRefs.isEmpty()) {
+ subplanNestedPlanIterator.remove();
+ }
+ }
+ if (subplan.getNestedPlans().isEmpty()) {
+ subplanOperatorIterator.remove();
+ }
+ }
+
+ // Resets the nested subplans for the group-by operator.
+ gby.getNestedPlans().clear();
+ gby.getNestedPlans().addAll(newGbyNestedPlans);
+
+ // Connects the group-by operator with its parent operator.
+ ILogicalOperator parent = !subplans.isEmpty() ? subplans.getFirst() :
parentOperator;
+ parent.getInputs().get(0).setValue(gby);
+
+ // Removes unnecessary pipelines inside the group by operator.
+ cleanup(currentRootRef.getValue(), gby);
+
+ // Computes type environments.
+ context.computeAndSetTypeEnvironmentForOperator(gby);
+ context.computeAndSetTypeEnvironmentForOperator(parent);
return changed;
}
/**
* Removes unused aggregation variables (and expressions)
*
- * @param gby
+ * @param rootOp,
+ * the root operator of a plan or nested plan.
+ * @param gby,
+ * the group-by operator.
* @throws AlgebricksException
*/
- private void cleanup(GroupByOperator gby) throws AlgebricksException {
- for (ILogicalPlan nestedPlan : gby.getNestedPlans()) {
- for (Mutable<ILogicalOperator> rootRef : nestedPlan.getRoots()) {
- AggregateOperator aggOp = (AggregateOperator)
rootRef.getValue();
+ private void cleanup(ILogicalOperator rootOp, GroupByOperator gby) throws
AlgebricksException {
+ Set<LogicalVariable> freeVars = new HashSet<>();
+ OperatorPropertiesUtil.getFreeVariablesInPath(rootOp, gby, freeVars);
+ Iterator<ILogicalPlan> nestedPlanIterator =
gby.getNestedPlans().iterator();
+ while (nestedPlanIterator.hasNext()) {
+ ILogicalPlan nestedPlan = nestedPlanIterator.next();
+ Iterator<Mutable<ILogicalOperator>> nestRootRefIterator =
nestedPlan.getRoots().iterator();
+ while (nestRootRefIterator.hasNext()) {
+ Mutable<ILogicalOperator> nestRootRef =
nestRootRefIterator.next();
+ AggregateOperator aggOp = (AggregateOperator)
nestRootRef.getValue();
for (int varIndex = aggOp.getVariables().size() - 1; varIndex
>= 0; varIndex--) {
- if
(!usedVarsSoFar.contains(aggOp.getVariables().get(varIndex))) {
+ if
(!freeVars.contains(aggOp.getVariables().get(varIndex))) {
aggOp.getVariables().remove(varIndex);
aggOp.getExpressions().remove(varIndex);
}
}
+ if (aggOp.getVariables().isEmpty()) {
+ nestRootRefIterator.remove();
+ }
}
-
+ if (nestedPlan.getRoots().isEmpty()) {
+ nestedPlanIterator.remove();
+ }
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1992
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I969c40112be0506981357a9c41bf9675ae12ffb9
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <[email protected]>