Yingyi Bu has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1992

Change subject: [ASTERIXDB-2051][COMP] Fix PushSubplanIntoGroupByRule for 
complex cases.
......................................................................

[ASTERIXDB-2051][COMP] Fix PushSubplanIntoGroupByRule for complex cases.

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- re-implement PushSubplanIntoGroupByRule and let it handle general cases;
- add an option to LogicalOperatorDeepCopyWithNewVariablesVisitor for
  not re-mapping free variables in the given plan subtree.

Change-Id: I969c40112be0506981357a9c41bf9675ae12ffb9
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
A 
asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-810-3.sqlpp
M 
asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
M 
asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan
M 
asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan
A 
asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-3.plan
M 
asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan
M 
asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan
M 
asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan
M 
asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan
M 
asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan
M 
asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
M 
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
M 
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
18 files changed, 431 insertions(+), 205 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/92/1992/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
index 725de12..19c6da7 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
@@ -629,13 +629,13 @@
 
         // Create first copy.
         LogicalOperatorDeepCopyWithNewVariablesVisitor firstDeepCopyVisitor = 
new LogicalOperatorDeepCopyWithNewVariablesVisitor(
-                context, context, newProbeSubTreeVarMap);
+                context, context, newProbeSubTreeVarMap, true);
         ILogicalOperator newProbeSubTree = 
firstDeepCopyVisitor.deepCopy(probeSubTree.getRoot());
         inferTypes(newProbeSubTree, context);
         Mutable<ILogicalOperator> newProbeSubTreeRootRef = new 
MutableObject<ILogicalOperator>(newProbeSubTree);
         // Create second copy.
         LogicalOperatorDeepCopyWithNewVariablesVisitor secondDeepCopyVisitor = 
new LogicalOperatorDeepCopyWithNewVariablesVisitor(
-                context, context, joinInputSubTreeVarMap);
+                context, context, joinInputSubTreeVarMap, true);
         ILogicalOperator joinInputSubTree = 
secondDeepCopyVisitor.deepCopy(probeSubTree.getRoot());
         inferTypes(joinInputSubTree, context);
         probeSubTree.getRootRef().setValue(joinInputSubTree);
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-810-3.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-810-3.sqlpp
new file mode 100644
index 0000000..4b94bf6
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-810-3.sqlpp
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : This test case is to verify the fix for issue810
+ * https://code.google.com/p/asterixdb/issues/detail?id=810
+ * Expected Res : SUCCESS
+ * Date         : 16th Nov. 2014
+ */
+
+DROP  DATAVERSE tpch IF EXISTS;
+CREATE DATAVERSE tpch;
+
+USE tpch;
+
+
+CREATE TYPE LineItemType AS CLOSED {
+  l_orderkey : integer,
+  l_partkey : integer,
+  l_suppkey : integer,
+  l_linenumber : integer,
+  l_quantity : double,
+  l_extendedprice : double,
+  l_discount : double,
+  l_tax : double,
+  l_returnflag : string,
+  l_linestatus : string,
+  l_shipdate : string,
+  l_commitdate : string,
+  l_receiptdate : string,
+  l_shipinstruct : string,
+  l_shipmode : string,
+  l_comment : string
+};
+
+CREATE DATASET LineItem(LineItemType) PRIMARY KEY l_orderkey,l_linenumber;
+
+
+SELECT l_returnflag AS l_returnflag,
+       l_linestatus AS l_linestatus,
+       coll_count(cheap) AS count_cheaps,
+       coll_count(expensive) AS count_expensives
+FROM LineItem AS l
+/* +hash */
+GROUP BY l.l_returnflag AS l_returnflag,l.l_linestatus AS l_linestatus
+GROUP AS g
+LET cheap = (
+      SELECT ELEMENT m
+      FROM (FROM g SELECT VALUE l) AS m
+      WHERE m.l_discount > 0.05
+),
+expensive = (
+      SELECT ELEMENT g.l
+      FROM g
+      WHERE g.l.l_discount <= 0.05
+)
+ORDER BY l_returnflag,l_linestatus
+;
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
index d9ce6c8..26d6103 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
@@ -8,12 +8,12 @@
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                 -- STABLE_SORT [topK: 100] [$$o_totalprice(DESC), 
$$o_orderdate(ASC)]  |PARTITIONED|
                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                    -- SORT_GROUP_BY[$$72, $$73]  |PARTITIONED|
+                    -- SORT_GROUP_BY[$$73, $$74]  |PARTITIONED|
                             {
                               -- AGGREGATE  |LOCAL|
                                 -- NESTED_TUPLE_SOURCE  |LOCAL|
                             }
-                      -- HASH_PARTITION_EXCHANGE [$$72, $$73]  |PARTITIONED|
+                      -- HASH_PARTITION_EXCHANGE [$$73, $$74]  |PARTITIONED|
                         -- SORT_GROUP_BY[$$56, $$57]  |PARTITIONED|
                                 {
                                   -- AGGREGATE  |LOCAL|
@@ -49,12 +49,12 @@
                                             -- STREAM_PROJECT  |PARTITIONED|
                                               -- STREAM_SELECT  |PARTITIONED|
                                                 -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                                  -- SORT_GROUP_BY[$$69]  
|PARTITIONED|
+                                                  -- SORT_GROUP_BY[$$70]  
|PARTITIONED|
                                                           {
                                                             -- AGGREGATE  
|LOCAL|
                                                               -- 
NESTED_TUPLE_SOURCE  |LOCAL|
                                                           }
-                                                    -- HASH_PARTITION_EXCHANGE 
[$$69]  |PARTITIONED|
+                                                    -- HASH_PARTITION_EXCHANGE 
[$$70]  |PARTITIONED|
                                                       -- 
PRE_CLUSTERED_GROUP_BY[$$58]  |PARTITIONED|
                                                               {
                                                                 -- AGGREGATE  
|LOCAL|
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan
index ab10f2d..f2adbf6 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan
@@ -3,7 +3,7 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-          -- PRE_CLUSTERED_GROUP_BY[$$27]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$28]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
@@ -20,9 +20,9 @@
                             -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-              -- STABLE_SORT [$$27(ASC)]  |PARTITIONED|
-                -- HASH_PARTITION_EXCHANGE [$$27]  |PARTITIONED|
-                  -- SORT_GROUP_BY[$$18, $$24]  |PARTITIONED|
+              -- STABLE_SORT [$$28(ASC)]  |PARTITIONED|
+                -- HASH_PARTITION_EXCHANGE [$$28]  |PARTITIONED|
+                  -- SORT_GROUP_BY[$$18, $$25]  |PARTITIONED|
                           {
                             -- AGGREGATE  |LOCAL|
                               -- NESTED_TUPLE_SOURCE  |LOCAL|
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan
index cad2fbb..238736a 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan
@@ -3,7 +3,7 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ]  
|PARTITIONED|
-          -- PRE_CLUSTERED_GROUP_BY[$$42, $$43]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$44, $$45]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
@@ -13,8 +13,8 @@
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-              -- STABLE_SORT [$$42(ASC), $$43(ASC)]  |PARTITIONED|
-                -- HASH_PARTITION_EXCHANGE [$$42, $$43]  |PARTITIONED|
+              -- STABLE_SORT [$$44(ASC), $$45(ASC)]  |PARTITIONED|
+                -- HASH_PARTITION_EXCHANGE [$$44, $$45]  |PARTITIONED|
                   -- PRE_CLUSTERED_GROUP_BY[$$30, $$31]  |PARTITIONED|
                           {
                             -- AGGREGATE  |LOCAL|
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-3.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-3.plan
new file mode 100644
index 0000000..9a7df35
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-3.plan
@@ -0,0 +1,37 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ]  
|PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$44, $$45]  |PARTITIONED|
+                  {
+                    -- AGGREGATE  |LOCAL|
+                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                  }
+                  {
+                    -- AGGREGATE  |LOCAL|
+                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                  }
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- STABLE_SORT [$$44(ASC), $$45(ASC)]  |PARTITIONED|
+                -- HASH_PARTITION_EXCHANGE [$$44, $$45]  |PARTITIONED|
+                  -- PRE_CLUSTERED_GROUP_BY[$$31, $$32]  |PARTITIONED|
+                          {
+                            -- AGGREGATE  |LOCAL|
+                              -- STREAM_SELECT  |LOCAL|
+                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                          }
+                          {
+                            -- AGGREGATE  |LOCAL|
+                              -- STREAM_SELECT  |LOCAL|
+                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                          }
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- STABLE_SORT [$$31(ASC), $$32(ASC)]  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- ASSIGN  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan
index 1c56f1c..18b4218 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan
@@ -3,7 +3,7 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ]  
|PARTITIONED|
-          -- PRE_CLUSTERED_GROUP_BY[$$42, $$43]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$44, $$45]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
@@ -13,8 +13,8 @@
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-              -- STABLE_SORT [$$42(ASC), $$43(ASC)]  |PARTITIONED|
-                -- HASH_PARTITION_EXCHANGE [$$42, $$43]  |PARTITIONED|
+              -- STABLE_SORT [$$44(ASC), $$45(ASC)]  |PARTITIONED|
+                -- HASH_PARTITION_EXCHANGE [$$44, $$45]  |PARTITIONED|
                   -- PRE_CLUSTERED_GROUP_BY[$$32, $$33]  |PARTITIONED|
                           {
                             -- AGGREGATE  |LOCAL|
@@ -31,9 +31,7 @@
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                           -- ASSIGN  |PARTITIONED|
                             -- STREAM_PROJECT  |PARTITIONED|
-                              -- ASSIGN  |PARTITIONED|
-                                -- STREAM_PROJECT  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- DATASOURCE_SCAN  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- DATASOURCE_SCAN  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan
index 13fb7e1..2d75ff0 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan
@@ -3,12 +3,12 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-          -- SORT_GROUP_BY[$$19]  |PARTITIONED|
+          -- SORT_GROUP_BY[$$20]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
-            -- HASH_PARTITION_EXCHANGE [$$19]  |PARTITIONED|
+            -- HASH_PARTITION_EXCHANGE [$$20]  |PARTITIONED|
               -- PRE_CLUSTERED_GROUP_BY[$$16]  |PARTITIONED|
                       {
                         -- AGGREGATE  |LOCAL|
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan
index fe1f67a..4405b9d 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan
@@ -13,12 +13,12 @@
             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
               -- STABLE_SORT [$$nation_key(ASC)]  |PARTITIONED|
                 -- HASH_PARTITION_EXCHANGE [$$nation_key]  |PARTITIONED|
-                  -- SORT_GROUP_BY[$$69, $$70]  |PARTITIONED|
+                  -- SORT_GROUP_BY[$$70, $$71]  |PARTITIONED|
                           {
                             -- AGGREGATE  |LOCAL|
                               -- NESTED_TUPLE_SOURCE  |LOCAL|
                           }
-                    -- HASH_PARTITION_EXCHANGE [$$69, $$70]  |PARTITIONED|
+                    -- HASH_PARTITION_EXCHANGE [$$70, $$71]  |PARTITIONED|
                       -- SORT_GROUP_BY[$$50, $$54]  |PARTITIONED|
                               {
                                 -- AGGREGATE  |LOCAL|
@@ -31,7 +31,7 @@
                                 -- HASH_PARTITION_EXCHANGE [$$56]  
|PARTITIONED|
                                   -- STREAM_PROJECT  |PARTITIONED|
                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- HYBRID_HASH_JOIN [$$54][$$63]  
|PARTITIONED|
+                                      -- HYBRID_HASH_JOIN [$$54][$$64]  
|PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
@@ -48,7 +48,7 @@
                                                       -- DATASOURCE_SCAN  
|PARTITIONED|
                                                         -- ONE_TO_ONE_EXCHANGE 
 |PARTITIONED|
                                                           -- 
EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                        -- HASH_PARTITION_EXCHANGE [$$63]  
|PARTITIONED|
+                                        -- HASH_PARTITION_EXCHANGE [$$64]  
|PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ASSIGN  |PARTITIONED|
                                               -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan
index e229e75..1d24a80 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan
@@ -3,7 +3,7 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ]  
|PARTITIONED|
-          -- PRE_CLUSTERED_GROUP_BY[$$75, $$76]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$69, $$70]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
@@ -17,8 +17,8 @@
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-              -- STABLE_SORT [$$75(ASC), $$76(ASC)]  |PARTITIONED|
-                -- HASH_PARTITION_EXCHANGE [$$75, $$76]  |PARTITIONED|
+              -- STABLE_SORT [$$69(ASC), $$70(ASC)]  |PARTITIONED|
+                -- HASH_PARTITION_EXCHANGE [$$69, $$70]  |PARTITIONED|
                   -- PRE_CLUSTERED_GROUP_BY[$$42, $$43]  |PARTITIONED|
                           {
                             -- AGGREGATE  |LOCAL|
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan
index ca941bf..3cfd8af 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan
@@ -3,7 +3,7 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ]  
|PARTITIONED|
-          -- PRE_CLUSTERED_GROUP_BY[$$34, $$35]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$36, $$37]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
@@ -13,8 +13,8 @@
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-              -- STABLE_SORT [$$34(ASC), $$35(ASC)]  |PARTITIONED|
-                -- HASH_PARTITION_EXCHANGE [$$34, $$35]  |PARTITIONED|
+              -- STABLE_SORT [$$36(ASC), $$37(ASC)]  |PARTITIONED|
+                -- HASH_PARTITION_EXCHANGE [$$36, $$37]  |PARTITIONED|
                   -- PRE_CLUSTERED_GROUP_BY[$$23, $$24]  |PARTITIONED|
                           {
                             -- AGGREGATE  |LOCAL|
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan
index a5124c3..2eb7603 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan
@@ -27,34 +27,34 @@
                               -- UNNEST  |LOCAL|
                                 -- NESTED_TUPLE_SOURCE  |LOCAL|
                       }
-                -- SUBPLAN  |PARTITIONED|
-                        {
-                          -- AGGREGATE  |LOCAL|
-                            -- STREAM_SELECT  |LOCAL|
-                              -- ASSIGN  |LOCAL|
-                                -- UNNEST  |LOCAL|
-                                  -- SUBPLAN  |LOCAL|
-                                          {
-                                            -- AGGREGATE  |LOCAL|
-                                              -- IN_MEMORY_STABLE_SORT 
[$$j(ASC)]  |LOCAL|
-                                                -- UNNEST  |LOCAL|
-                                                  -- NESTED_TUPLE_SOURCE  
|LOCAL|
-                                          }
-                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
-                        }
-                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                    -- PRE_CLUSTERED_GROUP_BY[$$94, $$95, $$96, $$97]  
|PARTITIONED|
-                            {
-                              -- AGGREGATE  |LOCAL|
-                                -- NESTED_TUPLE_SOURCE  |LOCAL|
-                            }
-                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        -- STABLE_SORT [$$94(ASC), $$95(ASC), $$96(ASC), 
$$97(ASC)]  |PARTITIONED|
-                          -- HASH_PARTITION_EXCHANGE [$$94, $$95, $$96, $$97]  
|PARTITIONED|
-                            -- STREAM_PROJECT  |PARTITIONED|
-                              -- ASSIGN  |PARTITIONED|
-                                -- STREAM_PROJECT  |PARTITIONED|
-                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- DATASOURCE_SCAN  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- PRE_CLUSTERED_GROUP_BY[$$94, $$95, $$96, $$97]  
|PARTITIONED|
+                          {
+                            -- AGGREGATE  |LOCAL|
+                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                          }
+                          {
+                            -- AGGREGATE  |LOCAL|
+                              -- STREAM_SELECT  |LOCAL|
+                                -- ASSIGN  |LOCAL|
+                                  -- UNNEST  |LOCAL|
+                                    -- SUBPLAN  |LOCAL|
+                                            {
+                                              -- AGGREGATE  |LOCAL|
+                                                -- IN_MEMORY_STABLE_SORT 
[$$j(ASC)]  |LOCAL|
+                                                  -- UNNEST  |LOCAL|
+                                                    -- NESTED_TUPLE_SOURCE  
|LOCAL|
+                                            }
+                                      -- AGGREGATE  |LOCAL|
+                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                          }
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- STABLE_SORT [$$94(ASC), $$95(ASC), $$96(ASC), 
$$97(ASC)]  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$94, $$95, $$96, $$97]  
|PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ASSIGN  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
index cb89a73..cef387a 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -43,12 +44,15 @@
     private final IVariableContext varContext;
     private final Map<LogicalVariable, LogicalVariable> inVarMapping;
     private final Map<LogicalVariable, LogicalVariable> outVarMapping;
+    private final Set<LogicalVariable> freeVars;
 
     public LogicalExpressionDeepCopyWithNewVariablesVisitor(IVariableContext 
varContext,
-            Map<LogicalVariable, LogicalVariable> inVarMapping, 
Map<LogicalVariable, LogicalVariable> variableMapping) {
+            Map<LogicalVariable, LogicalVariable> inVarMapping, 
Map<LogicalVariable, LogicalVariable> variableMapping,
+            Set<LogicalVariable> freeVars) {
         this.varContext = varContext;
         this.inVarMapping = inVarMapping;
         this.outVarMapping = variableMapping;
+        this.freeVars = freeVars;
     }
 
     public ILogicalExpression deepCopy(ILogicalExpression expr) throws 
AlgebricksException {
@@ -146,6 +150,9 @@
     public ILogicalExpression 
visitVariableReferenceExpression(VariableReferenceExpression expr, Void arg)
             throws AlgebricksException {
         LogicalVariable var = expr.getVariableReference();
+        if (freeVars.contains(var)) {
+            return expr;
+        }
         LogicalVariable givenVarReplacement = inVarMapping.get(var);
         if (givenVarReplacement != null) {
             outVarMapping.put(var, givenVarReplacement);
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 0da9110..6555971 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -19,9 +19,11 @@
 package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -38,10 +40,10 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
@@ -52,7 +54,6 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
@@ -64,10 +65,12 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import 
org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import 
org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import 
org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor;
 
 /**
@@ -87,7 +90,13 @@
 
     // Key: New variable in the new plan. Value: The old variable in the
     // original plan.
-    private final LinkedHashMap<LogicalVariable, LogicalVariable> 
outputVarToInputVarMapping;
+    private final LinkedHashMap<LogicalVariable, LogicalVariable> 
outputVarToInputVarMapping = new LinkedHashMap<>();
+
+    // Free variables: variables that shouldn't be deep copied, i.e., mapped.
+    private final Set<LogicalVariable> freeVars = new HashSet<>();
+
+    // Whether free variables in the given plan subtree should be reused.
+    private final boolean reuseFreeVars;
 
     /**
      * @param varContext
@@ -96,12 +105,20 @@
      *            the type context.
      */
     public LogicalOperatorDeepCopyWithNewVariablesVisitor(IVariableContext 
varContext, ITypingContext typeContext) {
-        this.varContext = varContext;
-        this.typeContext = typeContext;
-        this.inputVarToOutputVarMapping = new LinkedHashMap<>();
-        this.outputVarToInputVarMapping = new LinkedHashMap<>();
-        this.exprDeepCopyVisitor = new 
LogicalExpressionDeepCopyWithNewVariablesVisitor(varContext,
-                outputVarToInputVarMapping, inputVarToOutputVarMapping);
+        this(varContext, typeContext, new LinkedHashMap<>(), false);
+    }
+
+    /**
+     * @param varContext
+     *            , the variable context.
+     * @param typeContext
+     *            the type context.
+     * @param reuseFreeVars
+     *            whether free variables in the given plan tree should be 
reused.
+     */
+    public LogicalOperatorDeepCopyWithNewVariablesVisitor(IVariableContext 
varContext, ITypingContext typeContext,
+            boolean reuseFreeVars) {
+        this(varContext, typeContext, new LinkedHashMap<>(), reuseFreeVars);
     }
 
     /**
@@ -113,15 +130,17 @@
      *            Variable mapping keyed by variables in the original plan.
      *            Those variables are replaced by their corresponding value in
      *            the map in the copied plan.
+     * @param reuseFreeVars
+     *            whether free variables in the given plan tree should be 
reused.
      */
     public LogicalOperatorDeepCopyWithNewVariablesVisitor(IVariableContext 
varContext, ITypingContext typeContext,
-            LinkedHashMap<LogicalVariable, LogicalVariable> inVarMapping) {
+            LinkedHashMap<LogicalVariable, LogicalVariable> inVarMapping, 
boolean reuseFreeVars) {
         this.varContext = varContext;
         this.typeContext = typeContext;
         this.inputVarToOutputVarMapping = inVarMapping;
-        this.outputVarToInputVarMapping = new LinkedHashMap<>();
-        exprDeepCopyVisitor = new 
LogicalExpressionDeepCopyWithNewVariablesVisitor(varContext, inVarMapping,
-                inputVarToOutputVarMapping);
+        this.exprDeepCopyVisitor = new 
LogicalExpressionDeepCopyWithNewVariablesVisitor(varContext, inVarMapping,
+                inputVarToOutputVarMapping, freeVars);
+        this.reuseFreeVars = reuseFreeVars;
     }
 
     private void copyAnnotations(ILogicalOperator src, ILogicalOperator dest) {
@@ -136,6 +155,9 @@
     private ILogicalOperator deepCopy(ILogicalOperator op, ILogicalOperator 
arg) throws AlgebricksException {
         if (op == null) {
             return null;
+        }
+        if (reuseFreeVars) {
+            
OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc((AbstractLogicalOperator) 
op, freeVars);
         }
         ILogicalOperator opCopy = op.accept(this, arg);
         if (typeContext != null) {
@@ -207,6 +229,9 @@
         if (var == null) {
             return null;
         }
+        if (freeVars.contains(var)) {
+            return var;
+        }
         LogicalVariable givenVarReplacement = 
outputVarToInputVarMapping.get(var);
         if (givenVarReplacement != null) {
             inputVarToOutputVarMapping.put(var, givenVarReplacement);
@@ -247,6 +272,7 @@
     }
 
     public void reset() {
+        freeVars.clear();
         inputVarToOutputVarMapping.clear();
         outputVarToInputVarMapping.clear();
     }
@@ -561,10 +587,6 @@
                 deepCopyVariable(op.getPositionalVariable()), 
op.getPositionalVariableType(), op.getPositionWriter());
         deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
         return opCopy;
-    }
-
-    public LinkedHashMap<LogicalVariable, LogicalVariable> 
getOutputToInputVariableMapping() {
-        return outputVarToInputVarMapping;
     }
 
     public LinkedHashMap<LogicalVariable, LogicalVariable> 
getInputToOutputVariableMapping() {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 8a8fe6d..e27c2b1 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -20,10 +20,12 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
@@ -38,6 +40,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.OperatorDeepCopyVisitor;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
@@ -196,6 +199,14 @@
         return new ALogicalPlanImpl(newRoots);
     }
 
+    public static Pair<ILogicalOperator, Map<LogicalVariable, 
LogicalVariable>> deepCopyWithNewVars(
+            ILogicalOperator root, IOptimizationContext ctx) throws 
AlgebricksException {
+        LogicalOperatorDeepCopyWithNewVariablesVisitor deepCopyVisitor = new 
LogicalOperatorDeepCopyWithNewVariablesVisitor(
+                ctx, null, true);
+        ILogicalOperator newRoot = deepCopyVisitor.deepCopy(root);
+        return Pair.of(newRoot, 
deepCopyVisitor.getInputToOutputVariableMapping());
+    }
+
     private static void setDataSource(ILogicalPlan plan, ILogicalOperator 
dataSource) {
         for (Mutable<ILogicalOperator> rootRef : plan.getRoots()) {
             setDataSource(rootRef, dataSource);
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
index 8b36a68..463f214 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
@@ -75,7 +75,7 @@
      * collection provided.
      *
      * @param op
-     * @param vars
+     * @param freeVars
      *            - The collection to which the free variables will be added.
      */
     public static void getFreeVariablesInSelfOrDesc(AbstractLogicalOperator 
op, Set<LogicalVariable> freeVars)
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
index e73f2a5..de64bab 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.algebricks.rewriter.rules;
 
+import java.security.acl.Group;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -322,7 +323,7 @@
             // Finds the reference of the bottom-most operator in the pipeline 
that
             // should not be pushed to the combiner group-by.
             Mutable<ILogicalOperator> currentOpRef = new 
MutableObject<ILogicalOperator>(nestedGby);
-            Mutable<ILogicalOperator> bottomOpRef = 
findBottomOpRefStayInOldGby(currentOpRef);
+            Mutable<ILogicalOperator> bottomOpRef = 
findBottomOpRefStayInOldGby(nestedGby, currentOpRef);
 
             // Adds the used variables in the pipeline from 
<code>currentOpRef</code> to <code>bottomOpRef</code>
             // into the group-by keys for the introduced combiner group-by 
operator.
@@ -392,15 +393,27 @@
      * Find the bottom-most nested operator reference in the query pipeline 
rooted at <code>currentOpRef</code>
      * that cannot be pushed into the combiner group-by operator.
      *
-     * @param currentOpRef
+     * @param nestedGby
+     *            the nested group-by operator.
+     * @param currentOpRef,the
+     *            reference of the current op.
      * @return the bottom-most reference of a select operator
      */
-    private Mutable<ILogicalOperator> 
findBottomOpRefStayInOldGby(Mutable<ILogicalOperator> currentOpRef)
+    private Mutable<ILogicalOperator> 
findBottomOpRefStayInOldGby(GroupByOperator nestedGby,
+            Mutable<ILogicalOperator> currentOpRef)
             throws AlgebricksException {
+        Set<LogicalVariable> usedVarsInNestedGby = new HashSet<>();
+        // Collects used variables in nested pipelines.
+        for (ILogicalPlan nestedPlan : nestedGby.getNestedPlans()) {
+            for (Mutable<ILogicalOperator> rootOpRef : nestedPlan.getRoots()) {
+                VariableUtilities.getUsedVariables(rootOpRef.getValue(), 
usedVarsInNestedGby);
+            }
+        }
         Mutable<ILogicalOperator> bottomOpRef = currentOpRef;
         while (currentOpRef.getValue().getInputs().size() > 0) {
             Set<LogicalVariable> producedVars = new HashSet<>();
             VariableUtilities.getProducedVariables(currentOpRef.getValue(), 
producedVars);
+            producedVars.removeAll(usedVarsInNestedGby);
             if (currentOpRef.getValue().getOperatorTag() == 
LogicalOperatorTag.SELECT || !producedVars.isEmpty()) {
                 bottomOpRef = currentOpRef;
             }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
index 3a86565..7c83dc8 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
@@ -20,13 +20,18 @@
 
 package org.apache.hyracks.algebricks.rewriter.rules.subplan;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Deque;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.ListSet;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -40,7 +45,9 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import 
org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 /**
@@ -51,151 +58,209 @@
  */
 
 public class PushSubplanIntoGroupByRule implements IAlgebraicRewriteRule {
-    /** Stores used variables above the current operator. */
-    private final Set<LogicalVariable> usedVarsSoFar = new 
HashSet<LogicalVariable>();
+    /** The pointer to the topmost operator */
+    private Mutable<ILogicalOperator> rootRef;
+    /** Whether the rule has ever been invoked */
+    private boolean invoked = false;
 
     @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
             throws AlgebricksException {
-        return false;
+        if (!invoked) {
+            rootRef = opRef;
+            invoked = true;
+        }
+        return rewriteForOperator(rootRef, opRef, context);
     }
 
-    @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context) throws AlgebricksException {
-        ILogicalOperator parentOperator = opRef.getValue();
-        if (context.checkIfInDontApplySet(this, parentOperator)) {
-            return false;
-        }
-        context.addToDontApplySet(this, parentOperator);
-        VariableUtilities.getUsedVariables(parentOperator, usedVarsSoFar);
-        if (parentOperator.getInputs().size() <= 0) {
-            return false;
-        }
+    // The core rewriting function for an operator.
+    private boolean rewriteForOperator(Mutable<ILogicalOperator> rootRef, 
Mutable<ILogicalOperator> opRef,
+            IOptimizationContext context) throws AlgebricksException {
         boolean changed = false;
-        GroupByOperator gby = null;
+        ILogicalOperator parentOperator = opRef.getValue();
         for (Mutable<ILogicalOperator> ref : parentOperator.getInputs()) {
-            AbstractLogicalOperator op = (AbstractLogicalOperator) 
ref.getValue();
-            /** Only processes subplan operator. */
-            List<SubplanOperator> subplans = new ArrayList<SubplanOperator>();
-            if (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
-                while (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
-                    SubplanOperator currentSubplan = (SubplanOperator) op;
-                    subplans.add(currentSubplan);
-                    op = (AbstractLogicalOperator) 
op.getInputs().get(0).getValue();
+            ILogicalOperator op = ref.getValue();
+            // Only processes subplan operator.
+            Deque<SubplanOperator> subplans = new ArrayDeque<>();
+            if (op.getOperatorTag() != LogicalOperatorTag.SUBPLAN) {
+                // Recursively rewrites the child plan.
+                changed |= rewriteForOperator(rootRef, ref, context);
+                continue;
+            }
+            while (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
+                SubplanOperator currentSubplan = (SubplanOperator) op;
+                // Recursively rewrites the pipelines inside a nested subplan.
+                for (ILogicalPlan subplan : currentSubplan.getNestedPlans()) {
+                    for (Mutable<ILogicalOperator> nestedRootRef : 
subplan.getRoots()) {
+                        changed |= rewriteForOperator(nestedRootRef, 
nestedRootRef, context);
+                    }
                 }
-                /** Only processes the case a group-by operator is the input 
of the subplan operators. */
-                if (op.getOperatorTag() == LogicalOperatorTag.GROUP) {
-                    gby = (GroupByOperator) op;
-                    List<ILogicalPlan> newGbyNestedPlans = new 
ArrayList<ILogicalPlan>();
-                    for (SubplanOperator subplan : subplans) {
-                        List<ILogicalPlan> subplanNestedPlans = 
subplan.getNestedPlans();
-                        List<ILogicalPlan> gbyNestedPlans = 
gby.getNestedPlans();
-                        List<ILogicalPlan> subplanNestedPlansToRemove = new 
ArrayList<ILogicalPlan>();
-                        for (ILogicalPlan subplanNestedPlan : 
subplanNestedPlans) {
-                            List<Mutable<ILogicalOperator>> rootOpRefs = 
subplanNestedPlan.getRoots();
-                            List<Mutable<ILogicalOperator>> rootOpRefsToRemove 
= new ArrayList<Mutable<ILogicalOperator>>();
-                            for (Mutable<ILogicalOperator> rootOpRef : 
rootOpRefs) {
-                                /** Gets free variables in the root operator 
of a nested plan and its descent. */
-                                Set<LogicalVariable> freeVars = new 
ListSet<LogicalVariable>();
-                                
VariableUtilities.getUsedVariablesInDescendantsAndSelf(rootOpRef.getValue(), 
freeVars);
-                                Set<LogicalVariable> producedVars = new 
ListSet<LogicalVariable>();
-                                
VariableUtilities.getProducedVariablesInDescendantsAndSelf(rootOpRef.getValue(),
-                                        producedVars);
-                                freeVars.removeAll(producedVars);
-                                /** * Checks whether the above freeVars are 
all contained in live variables * of one nested plan inside the group-by 
operator. * If yes, then the subplan can be pushed into the nested plan of the 
group-by. */
-                                for (ILogicalPlan gbyNestedPlanOriginal : 
gbyNestedPlans) {
-                                    // add a subplan in the original gby
-                                    if 
(!newGbyNestedPlans.contains(gbyNestedPlanOriginal)) {
-                                        
newGbyNestedPlans.add(gbyNestedPlanOriginal);
-                                    }
-
-                                    // add a pushed subplan
-                                    ILogicalPlan gbyNestedPlan = 
OperatorManipulationUtil.deepCopy(
-                                            gbyNestedPlanOriginal, context);
-                                    List<Mutable<ILogicalOperator>> 
gbyRootOpRefs = gbyNestedPlan.getRoots();
-                                    for (int rootIndex = 0; rootIndex < 
gbyRootOpRefs.size(); rootIndex++) {
-                                        //set the nts for a original subplan
-                                        Mutable<ILogicalOperator> 
originalGbyRootOpRef = gbyNestedPlanOriginal
-                                                .getRoots().get(rootIndex);
-                                        Mutable<ILogicalOperator> 
originalGbyNtsRef = downToNts(originalGbyRootOpRef);
-                                        NestedTupleSourceOperator originalNts 
= (NestedTupleSourceOperator) originalGbyNtsRef
-                                                .getValue();
-                                        originalNts.setDataSourceReference(new 
MutableObject<ILogicalOperator>(gby));
-
-                                        //push a new subplan if possible
-                                        Mutable<ILogicalOperator> gbyRootOpRef 
= gbyRootOpRefs.get(rootIndex);
-                                        Set<LogicalVariable> liveVars = new 
ListSet<LogicalVariable>();
-                                        
VariableUtilities.getLiveVariables(gbyRootOpRef.getValue(), liveVars);
-                                        if (liveVars.containsAll(freeVars)) {
-                                            /** Does the actual push. */
-                                            Mutable<ILogicalOperator> ntsRef = 
downToNts(rootOpRef);
-                                            
ntsRef.setValue(gbyRootOpRef.getValue());
-                                            // Removes unused vars.
-                                            AggregateOperator aggOp = 
(AggregateOperator) gbyRootOpRef.getValue();
-                                            for (int varIndex = 
aggOp.getVariables().size() - 1; varIndex >= 0; varIndex--) {
-                                                if 
(!freeVars.contains(aggOp.getVariables().get(varIndex))) {
-                                                    
aggOp.getVariables().remove(varIndex);
-                                                    
aggOp.getExpressions().remove(varIndex);
-                                                }
-                                            }
-
-                                            
gbyRootOpRef.setValue(rootOpRef.getValue());
-                                            rootOpRefsToRemove.add(rootOpRef);
-
-                                            // Sets the nts for a new pushed 
plan.
-                                            Mutable<ILogicalOperator> 
oldGbyNtsRef = downToNts(gbyRootOpRef);
-                                            NestedTupleSourceOperator nts = 
(NestedTupleSourceOperator) oldGbyNtsRef
-                                                    .getValue();
-                                            nts.setDataSourceReference(new 
MutableObject<ILogicalOperator>(gby));
-
-                                            
newGbyNestedPlans.add(gbyNestedPlan);
-                                            changed = true;
-                                            continue;
-                                        }
-                                    }
-                                }
-                            }
-                            rootOpRefs.removeAll(rootOpRefsToRemove);
-                            if (rootOpRefs.size() == 0) {
-                                
subplanNestedPlansToRemove.add(subplanNestedPlan);
-                            }
-                        }
-                        
subplanNestedPlans.removeAll(subplanNestedPlansToRemove);
-                    }
-                    if (changed) {
-                        ref.setValue(gby);
-                        gby.getNestedPlans().clear();
-                        gby.getNestedPlans().addAll(newGbyNestedPlans);
-                    }
+                subplans.addFirst(currentSubplan);
+                op = op.getInputs().get(0).getValue();
+            }
+            // Only processes the case a group-by operator is the input of the 
subplan operators.
+            if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
+                continue;
+            }
+            GroupByOperator gby = (GroupByOperator) op;
+            // Recursively rewrites the pipelines inside a nested subplan.
+            for (ILogicalPlan subplan : gby.getNestedPlans()) {
+                for (Mutable<ILogicalOperator> nestedRootRef : 
subplan.getRoots()) {
+                    changed |= rewriteForOperator(nestedRootRef, 
nestedRootRef, context);
                 }
             }
+            changed |= pushSubplansIntoGroupBy(rootRef, parentOperator, 
subplans, gby, context);
         }
-        if (changed) {
-            cleanup(gby);
-            context.computeAndSetTypeEnvironmentForOperator(gby);
-            context.computeAndSetTypeEnvironmentForOperator(parentOperator);
+        return changed;
+    }
+
+    // Pushes subplans into the group by operator.
+    private boolean pushSubplansIntoGroupBy(Mutable<ILogicalOperator> 
currentRootRef, ILogicalOperator parentOperator,
+            Deque<SubplanOperator> subplans, GroupByOperator gby, 
IOptimizationContext context)
+            throws AlgebricksException {
+        boolean changed = false;
+        List<ILogicalPlan> newGbyNestedPlans = new ArrayList<>();
+        List<ILogicalPlan> originalNestedPlansInGby = gby.getNestedPlans();
+
+        // Adds all original subplans from the group by.
+        for (ILogicalPlan gbyNestedPlanOriginal : originalNestedPlansInGby) {
+            newGbyNestedPlans.add(gbyNestedPlanOriginal);
         }
+
+        // Tries to push subplans into the group by.
+        Iterator<SubplanOperator> subplanOperatorIterator = 
subplans.iterator();
+        while (subplanOperatorIterator.hasNext()) {
+            SubplanOperator subplan = subplanOperatorIterator.next();
+            Iterator<ILogicalPlan> subplanNestedPlanIterator = 
subplan.getNestedPlans().iterator();
+            while (subplanNestedPlanIterator.hasNext()) {
+                ILogicalPlan subplanNestedPlan = 
subplanNestedPlanIterator.next();
+                List<Mutable<ILogicalOperator>> upperSubplanRootRefs = 
subplanNestedPlan.getRoots();
+                Iterator<Mutable<ILogicalOperator>> 
upperSubplanRootRefIterator = upperSubplanRootRefs.iterator();
+                while (upperSubplanRootRefIterator.hasNext()) {
+                    Mutable<ILogicalOperator> rootOpRef = 
upperSubplanRootRefIterator.next();
+
+                    // Collects free variables in the root operator of a 
nested plan and its descent.
+                    Set<LogicalVariable> freeVars = new ListSet<>();
+                    
OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc((AbstractLogicalOperator) 
rootOpRef.getValue(),
+                            freeVars);
+
+                    // Checks whether the above freeVars are all contained in 
live variables * of one nested plan
+                    // inside the group-by operator. If yes, then the subplan 
can be pushed into the nested plan
+                    // of the group-by.
+                    for (ILogicalPlan gbyNestedPlanOriginal : 
originalNestedPlansInGby) {
+                        ILogicalPlan gbyNestedPlan = 
OperatorManipulationUtil.deepCopy(gbyNestedPlanOriginal, context);
+                        List<Mutable<ILogicalOperator>> gbyRootOpRefs = 
gbyNestedPlan.getRoots();
+                        for (int rootIndex = 0; rootIndex < 
gbyRootOpRefs.size(); rootIndex++) {
+                            // Sets the nts for a original subplan.
+                            Mutable<ILogicalOperator> originalGbyRootOpRef = 
gbyNestedPlan.getRoots().get(rootIndex);
+                            Mutable<ILogicalOperator> originalGbyNtsRef = 
downToNts(originalGbyRootOpRef);
+                            NestedTupleSourceOperator originalNts = 
(NestedTupleSourceOperator) originalGbyNtsRef
+                                    .getValue();
+                            originalNts.setDataSourceReference(new 
MutableObject<>(gby));
+
+                            // Pushes a new subplan if possible.
+                            Mutable<ILogicalOperator> gbyRootOpRef = 
gbyRootOpRefs.get(rootIndex);
+                            Set<LogicalVariable> liveVars = new ListSet<>();
+                            
VariableUtilities.getLiveVariables(gbyRootOpRef.getValue(), liveVars);
+                            if (!liveVars.containsAll(freeVars)) {
+                                continue;
+                            }
+
+                            AggregateOperator aggOp = (AggregateOperator) 
gbyRootOpRef.getValue();
+                            for (int varIndex = aggOp.getVariables().size() - 
1; varIndex >= 0; varIndex--) {
+                                if 
(!freeVars.contains(aggOp.getVariables().get(varIndex))) {
+                                    aggOp.getVariables().remove(varIndex);
+                                    aggOp.getExpressions().remove(varIndex);
+                                }
+                            }
+
+                            // Copy the original nested pipeline inside the 
group-by.
+                            Pair<ILogicalOperator, Map<LogicalVariable, 
LogicalVariable>> copiedAggOpAndVarMap = OperatorManipulationUtil
+                                    .deepCopyWithNewVars(aggOp, context);
+                            ILogicalOperator newBottomAgg = 
copiedAggOpAndVarMap.getLeft();
+
+                            // Substitutes variables in the upper nested pipe 
line.
+                            
VariableUtilities.substituteVariablesInDescendantsAndSelf(rootOpRef.getValue(),
+                                    copiedAggOpAndVarMap.getRight(), context);
+
+                            // Does the actual push.
+                            Mutable<ILogicalOperator> ntsRef = 
downToNts(rootOpRef);
+                            ntsRef.setValue(newBottomAgg);
+                            gbyRootOpRef.setValue(rootOpRef.getValue());
+
+                            // Sets the nts for a new pushed plan.
+                            Mutable<ILogicalOperator> oldGbyNtsRef = 
downToNts(new MutableObject<>(newBottomAgg));
+                            NestedTupleSourceOperator nts = 
(NestedTupleSourceOperator) oldGbyNtsRef.getValue();
+                            nts.setDataSourceReference(new 
MutableObject<>(gby));
+
+                            
OperatorManipulationUtil.computeTypeEnvironmentBottomUp(rootOpRef.getValue(), 
context);
+                            newGbyNestedPlans.add(new 
ALogicalPlanImpl(rootOpRef));
+
+                            upperSubplanRootRefIterator.remove();
+                            changed |= true;
+                            break;
+                        }
+                    }
+                }
+
+                if (upperSubplanRootRefs.isEmpty()) {
+                    subplanNestedPlanIterator.remove();
+                }
+            }
+            if (subplan.getNestedPlans().isEmpty()) {
+                subplanOperatorIterator.remove();
+            }
+        }
+
+        // Resets the nested subplans for the group-by operator.
+        gby.getNestedPlans().clear();
+        gby.getNestedPlans().addAll(newGbyNestedPlans);
+
+        // Connects the group-by operator with its parent operator.
+        ILogicalOperator parent = !subplans.isEmpty() ? subplans.getFirst() : 
parentOperator;
+        parent.getInputs().get(0).setValue(gby);
+
+        // Removes unnecessary pipelines inside the group by operator.
+        cleanup(currentRootRef.getValue(), gby);
+
+        // Computes type environments.
+        context.computeAndSetTypeEnvironmentForOperator(gby);
+        context.computeAndSetTypeEnvironmentForOperator(parent);
         return changed;
     }
 
     /**
      * Removes unused aggregation variables (and expressions)
      *
-     * @param gby
+     * @param rootOp,
+     *            the root operator of a plan or nested plan.
+     * @param gby,
+     *            the group-by operator.
      * @throws AlgebricksException
      */
-    private void cleanup(GroupByOperator gby) throws AlgebricksException {
-        for (ILogicalPlan nestedPlan : gby.getNestedPlans()) {
-            for (Mutable<ILogicalOperator> rootRef : nestedPlan.getRoots()) {
-                AggregateOperator aggOp = (AggregateOperator) 
rootRef.getValue();
+    private void cleanup(ILogicalOperator rootOp, GroupByOperator gby) throws 
AlgebricksException {
+        Set<LogicalVariable> freeVars = new HashSet<>();
+        OperatorPropertiesUtil.getFreeVariablesInPath(rootOp, gby, freeVars);
+        Iterator<ILogicalPlan> nestedPlanIterator = 
gby.getNestedPlans().iterator();
+        while (nestedPlanIterator.hasNext()) {
+            ILogicalPlan nestedPlan = nestedPlanIterator.next();
+            Iterator<Mutable<ILogicalOperator>> nestRootRefIterator = 
nestedPlan.getRoots().iterator();
+            while (nestRootRefIterator.hasNext()) {
+                Mutable<ILogicalOperator> nestRootRef = 
nestRootRefIterator.next();
+                AggregateOperator aggOp = (AggregateOperator) 
nestRootRef.getValue();
                 for (int varIndex = aggOp.getVariables().size() - 1; varIndex 
>= 0; varIndex--) {
-                    if 
(!usedVarsSoFar.contains(aggOp.getVariables().get(varIndex))) {
+                    if 
(!freeVars.contains(aggOp.getVariables().get(varIndex))) {
                         aggOp.getVariables().remove(varIndex);
                         aggOp.getExpressions().remove(varIndex);
                     }
                 }
+                if (aggOp.getVariables().isEmpty()) {
+                    nestRootRefIterator.remove();
+                }
             }
-
+            if (nestedPlan.getRoots().isEmpty()) {
+                nestedPlanIterator.remove();
+            }
         }
     }
 

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1992
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I969c40112be0506981357a9c41bf9675ae12ffb9
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <[email protected]>

Reply via email to