Taewoo Kim has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1073

Change subject: ASTERIXDB-1081, ASTERIXDB-1086, ASTERIXDB-1246: multiple paths 
in a plan can be handled.
......................................................................

ASTERIXDB-1081, ASTERIXDB-1086, ASTERIXDB-1246: multiple paths in a plan can be 
handled.

 - ASTERIXDB-1081: Fixed RemoveUnusedAssignAndAggregateRule to reflect multiple 
paths in the plan.
 - ASTERIXDB-1086: Fixed IntroduceProjectsRule to reflect multiples paths in 
the plan.
 - ASTERIXDB-1246: Fixed RemoveRedundantGroupByDecorVarsRule to remove decor 
variables
                   before IntroduceProjects rule fires.

Change-Id: I69e055572f024f28a857d4e64916b4806e63c083
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
M 
asterixdb/asterix-app/src/test/resources/optimizerts/results/inverted-index-join/issue741.plan
M 
asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1020.plan
M 
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
D 
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveRedundantGroupByDecorVars.java
A 
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveRedundantGroupByDecorVarsRule.java
M 
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
7 files changed, 432 insertions(+), 180 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/73/1073/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index eb4751d..cf3fb8a 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -112,7 +112,7 @@
 import 
org.apache.hyracks.algebricks.rewriter.rules.PushUnnestDownThroughUnionRule;
 import org.apache.hyracks.algebricks.rewriter.rules.ReinferAllTypesRule;
 import 
org.apache.hyracks.algebricks.rewriter.rules.RemoveCartesianProductWithEmptyBranchRule;
-import 
org.apache.hyracks.algebricks.rewriter.rules.RemoveRedundantGroupByDecorVars;
+import 
org.apache.hyracks.algebricks.rewriter.rules.RemoveRedundantGroupByDecorVarsRule;
 import 
org.apache.hyracks.algebricks.rewriter.rules.RemoveRedundantVariablesRule;
 import 
org.apache.hyracks.algebricks.rewriter.rules.RemoveUnnecessarySortMergeExchange;
 import 
org.apache.hyracks.algebricks.rewriter.rules.RemoveUnusedAssignAndAggregateRule;
@@ -251,7 +251,7 @@
         consolidation.add(new IntroduceAggregateCombinerRule());
         consolidation.add(new CountVarToCountOneRule());
         consolidation.add(new RemoveUnusedAssignAndAggregateRule());
-        consolidation.add(new RemoveRedundantGroupByDecorVars());
+        consolidation.add(new RemoveRedundantGroupByDecorVarsRule());
         //PushUnnestDownUnion => RemoveRedundantListifyRule cause these rules 
are correlated
         consolidation.add(new PushUnnestDownThroughUnionRule());
         consolidation.add(new RemoveRedundantListifyRule());
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/inverted-index-join/issue741.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/inverted-index-join/issue741.plan
index 5b08bf5..4e40dd2 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/inverted-index-join/issue741.plan
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/inverted-index-join/issue741.plan
@@ -13,33 +13,20 @@
               -- STABLE_SORT [$$25(ASC)]  |PARTITIONED|
                 -- HASH_PARTITION_EXCHANGE [$$25]  |PARTITIONED|
                   -- STREAM_PROJECT  |PARTITIONED|
-                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- HYBRID_HASH_JOIN [$$36][$$25]  |PARTITIONED|
+                    -- STREAM_SELECT  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          -- STREAM_PROJECT  |PARTITIONED|
-                            -- STREAM_SELECT  |PARTITIONED|
-                              -- STREAM_PROJECT  |PARTITIONED|
-                                -- ASSIGN  |PARTITIONED|
-                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- DATASOURCE_SCAN  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                        -- HASH_PARTITION_EXCHANGE [$$25]  |PARTITIONED|
-                          -- STREAM_PROJECT  |PARTITIONED|
-                            -- STREAM_SELECT  |PARTITIONED|
-                              -- STREAM_PROJECT  |PARTITIONED|
+                          -- BTREE_SEARCH  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- STABLE_SORT [$$39(ASC)]  |PARTITIONED|
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- BTREE_SEARCH  |PARTITIONED|
-                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- STABLE_SORT [$$39(ASC)]  |PARTITIONED|
-                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- 
LENGTH_PARTITIONED_INVERTED_INDEX_SEARCH  |PARTITIONED|
-                                            -- BROADCAST_EXCHANGE  
|PARTITIONED|
-                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                -- STREAM_SELECT  |PARTITIONED|
-                                                  -- STREAM_PROJECT  
|PARTITIONED|
-                                                    -- ASSIGN  |PARTITIONED|
-                                                      -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                                        -- DATASOURCE_SCAN  
|PARTITIONED|
-                                                          -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- 
EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                  -- LENGTH_PARTITIONED_INVERTED_INDEX_SEARCH  
|PARTITIONED|
+                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- STREAM_SELECT  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
+                                                -- DATASOURCE_SCAN  
|PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
+                                                    -- EMPTY_TUPLE_SOURCE  
|PARTITIONED|
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1020.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1020.plan
index 7eb6d35..d836589 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1020.plan
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1020.plan
@@ -27,7 +27,7 @@
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                 -- NESTED_LOOP  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- ASSIGN  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
                                       -- ASSIGN  |PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                           -- DATASOURCE_SCAN  |PARTITIONED|
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
index 6bc129e..14af10b 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
@@ -20,13 +20,14 @@
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -48,11 +49,14 @@
  */
 public class IntroduceProjectsRule implements IAlgebraicRewriteRule {
 
-    private final Set<LogicalVariable> usedVars = new 
HashSet<LogicalVariable>();
-    private final Set<LogicalVariable> liveVars = new 
HashSet<LogicalVariable>();
-    private final Set<LogicalVariable> producedVars = new 
HashSet<LogicalVariable>();
-    private final List<LogicalVariable> projectVars = new 
ArrayList<LogicalVariable>();
+       private final Set<LogicalVariable> usedVars = new HashSet<>();
+       private final Set<LogicalVariable> liveVars = new HashSet<>();
+       private final Set<LogicalVariable> producedVars = new HashSet<>();
+       private final List<LogicalVariable> projectVars = new ArrayList<>();
     protected boolean hasRun = false;
+       // Keep track of used variables after the current operator, including 
used
+       // variables in itself.
+       private final Map<AbstractLogicalOperator, HashSet<LogicalVariable>> 
allUsedVarsAfterOpMap = new HashMap<>();
 
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context) {
@@ -60,13 +64,47 @@
     }
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context) throws AlgebricksException {
+       public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
+                       throws AlgebricksException {
         if (hasRun) {
             return false;
         }
         hasRun = true;
+
+               // Collect all used variables after each operator, including 
the used
+               // variables in itself in the plan.
+               Set<LogicalVariable> parentUsedVars = new 
HashSet<LogicalVariable>();
+               collectUsedVars(opRef, parentUsedVars);
+
+               // Introduce projects
         return introduceProjects(null, -1, opRef, 
Collections.<LogicalVariable> emptySet(), context);
     }
+
+       // Collect all used variables after each operator, including the used
+       // variables in itself in the plan.
+       // Collecting information is required since there can be multiple paths 
in
+       // the plan and introduceProjects()
+       // can deal with only one path at a time during conducting
+       // depth-first-search.
+       protected void collectUsedVars(Mutable<ILogicalOperator> opRef, 
Set<LogicalVariable> parentUsedVars)
+                       throws AlgebricksException {
+               AbstractLogicalOperator op = (AbstractLogicalOperator) 
opRef.getValue();
+               HashSet<LogicalVariable> usedVarsPerOp = new HashSet<>();
+               VariableUtilities.getUsedVariables(op, usedVarsPerOp);
+               usedVarsPerOp.addAll(parentUsedVars);
+
+               if (allUsedVarsAfterOpMap.get(op) == null) {
+                       allUsedVarsAfterOpMap.put(op, usedVarsPerOp);
+               } else {
+                       allUsedVarsAfterOpMap.get(op).addAll(usedVarsPerOp);
+               }
+
+               for (int i = 0; i < op.getInputs().size(); i++) {
+                       Mutable<ILogicalOperator> inputOpRef = 
op.getInputs().get(i);
+                       collectUsedVars(inputOpRef, usedVarsPerOp);
+               }
+
+       }
 
     protected boolean introduceProjects(AbstractLogicalOperator parentOp, int 
parentInputIndex,
             Mutable<ILogicalOperator> opRef, Set<LogicalVariable> 
parentUsedVars, IOptimizationContext context)
@@ -78,10 +116,18 @@
         VariableUtilities.getUsedVariables(op, usedVars);
 
         // In the top-down pass, maintain a set of variables that are used in 
op and all its parents.
-        HashSet<LogicalVariable> parentsUsedVars = new 
HashSet<LogicalVariable>();
+               // This is a necessary step for the newly created project 
operator
+               // during this optimization,
+               // since we already have all information from collectUsedVars() 
method
+               // for the other operators.
+               HashSet<LogicalVariable> parentsUsedVars = new HashSet<>();
         parentsUsedVars.addAll(parentUsedVars);
         parentsUsedVars.addAll(usedVars);
 
+               if (allUsedVarsAfterOpMap.get(op) != null) {
+                       parentsUsedVars.addAll(allUsedVarsAfterOpMap.get(op));
+               }
+
         // Descend into children.
         for (int i = 0; i < op.getInputs().size(); i++) {
             Mutable<ILogicalOperator> inputOpRef = op.getInputs().get(i);
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveRedundantGroupByDecorVars.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveRedundantGroupByDecorVars.java
deleted file mode 100644
index ebdc88a..0000000
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveRedundantGroupByDecorVars.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.rewriter.rules;
-
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.commons.lang3.mutable.Mutable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-/**
- * Removes duplicate variables from a group-by operator's decor list.
- */
-public class RemoveRedundantGroupByDecorVars implements IAlgebraicRewriteRule {
-
-    private final Set<LogicalVariable> vars = new HashSet<LogicalVariable>();
-
-    @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context) {
-        AbstractLogicalOperator op = (AbstractLogicalOperator) 
opRef.getValue();
-        if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
-            return false;
-        }
-        if (context.checkIfInDontApplySet(this, op)) {
-            return false;
-        }
-        vars.clear();
-
-        boolean modified = false;
-        GroupByOperator groupOp = (GroupByOperator) op;
-        Iterator<Pair<LogicalVariable, Mutable<ILogicalExpression>>> iter = 
groupOp.getDecorList().iterator();
-        while (iter.hasNext()) {
-            Pair<LogicalVariable, Mutable<ILogicalExpression>> decor = 
iter.next();
-            if (decor.first != null || 
decor.second.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
-                continue;
-            }
-            VariableReferenceExpression varRefExpr = 
(VariableReferenceExpression) decor.second.getValue();
-            LogicalVariable var = varRefExpr.getVariableReference();
-            if (vars.contains(var)) {
-                iter.remove();
-                modified = true;
-            } else {
-                vars.add(var);
-            }
-        }
-        if (modified) {
-            context.addToDontApplySet(this, op);
-        }
-        return modified;
-    }
-
-    @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context) throws AlgebricksException {
-        return false;
-    }
-}
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveRedundantGroupByDecorVarsRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveRedundantGroupByDecorVarsRule.java
new file mode 100644
index 0000000..e419776
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveRedundantGroupByDecorVarsRule.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Removes duplicate and/or unnecessary variables from a group-by operator's
+ * decor list.
+ */
+public class RemoveRedundantGroupByDecorVarsRule implements 
IAlgebraicRewriteRule {
+
+       protected Set<LogicalVariable> usedVars = new HashSet<>();
+       protected Mutable<ILogicalOperator> groupByRef = null;
+       protected GroupByOperator groupByOp = null;
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context) {
+               return false;
+       }
+
+       @Override
+       public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
+                       throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) 
opRef.getValue();
+               // Begin from the root operator to collect used variables after 
a
+               // possible group-by operator.
+               if (op.getOperatorTag() != 
LogicalOperatorTag.DISTRIBUTE_RESULT) {
+                       if (op.getOperatorTag() != LogicalOperatorTag.SINK) {
+                               return false;
+                       }
+        }
+        if (context.checkIfInDontApplySet(this, op)) {
+            return false;
+        }
+               usedVars.clear();
+               boolean planTransformed = false;
+
+               planTransformed = checkAndApplyTheRule(opRef, context);
+
+               return planTransformed;
+       }
+
+       // Collect used variables in each operator in the plan until the 
optimizer
+       // sees a GroupBy operator.
+       // It first removes duplicated variables in the decor list.
+       // Then, it eliminates useless variables in the decor list
+       // that are not going to be used after the given groupBy operator.
+       protected boolean checkAndApplyTheRule(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
+                       throws AlgebricksException {
+               AbstractLogicalOperator op = (AbstractLogicalOperator) 
opRef.getValue();
+               Set<LogicalVariable> usedVarsFromThisOp = new HashSet<>();
+               Set<LogicalVariable> collectedUsedVarsBeforeThisOpFromRoot = 
new HashSet<>();
+               boolean redundantVarsRemoved = false;
+               boolean uselessVarsRemoved = false;
+
+               // Found Group-By operator?
+               if (op.getOperatorTag() == LogicalOperatorTag.GROUP) {
+                       GroupByOperator groupByOp = (GroupByOperator) op;
+                       Set<LogicalVariable> decorVars = new HashSet<>();
+
+                       // First, get rid of duplicated variables from a 
group-by operator's
+                       // decor list.
+                       Iterator<Pair<LogicalVariable, 
Mutable<ILogicalExpression>>> iter = groupByOp.getDecorList().iterator();
+                       while (iter.hasNext()) {
+                               Pair<LogicalVariable, 
Mutable<ILogicalExpression>> decor = iter.next();
+                               if (decor.first != null
+                                               || 
decor.second.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                                       continue;
+                               }
+                               VariableReferenceExpression varRefExpr = 
(VariableReferenceExpression) decor.second.getValue();
+                               LogicalVariable var = 
varRefExpr.getVariableReference();
+                               if (decorVars.contains(var)) {
+                                       iter.remove();
+                                       redundantVarsRemoved = true;
+                               } else {
+                                       decorVars.add(var);
+                               }
+                       }
+
+                       // Next, get rid of useless decor variables in the 
GROUP-BY
+                       // operator.
+                       List<Pair<LogicalVariable, 
Mutable<ILogicalExpression>>> newDecorList = new ArrayList<>();
+                       for (Pair<LogicalVariable, Mutable<ILogicalExpression>> 
p : groupByOp.getDecorList()) {
+                               LogicalVariable decorVar = 
GroupByOperator.getDecorVariable(p);
+                               // If a variable in the decor list will not be 
used after this
+                               // operator, then it needs to removed.
+                               if (!usedVars.contains(decorVar)) {
+                                       uselessVarsRemoved = true;
+                               } else {
+                                       // Maintain the variable since it will 
be used.
+                                       newDecorList.add(p);
+                               }
+                       }
+
+                       // If we have identified useless decor variables,
+                       // then the decor list needs to be reset without those 
variables.
+                       if (uselessVarsRemoved) {
+                               groupByOp.getDecorList().clear();
+                               groupByOp.getDecorList().addAll(newDecorList);
+            }
+
+                       // If the plan transformation is successful, we don't 
need to
+                       // traverse the plan any more,
+                       // since if there are more GROUP-BY operators, the next 
trigger on
+                       // this plan will find them.
+                       if (redundantVarsRemoved || uselessVarsRemoved) {
+                               
context.computeAndSetTypeEnvironmentForOperator(groupByOp);
+                               context.addToDontApplySet(this, op);
+                               return redundantVarsRemoved || 
uselessVarsRemoved;
+            }
+        }
+
+               // Either we have found a GroupBy operator but no removal is 
happened
+               // or there we haven't found a GroupBy operator yet.
+               // Thus, we add used variables for this operator and keep going 
through
+               // the plan.
+               VariableUtilities.getUsedVariables(op, usedVarsFromThisOp);
+               collectedUsedVarsBeforeThisOpFromRoot.addAll(usedVars);
+               usedVars.addAll(usedVarsFromThisOp);
+
+               // Recursively check the plan and try to optimize it.
+               for (int i = 0; i < op.getInputs().size(); i++) {
+                       boolean groupByChanged = 
checkAndApplyTheRule(op.getInputs().get(i), context);
+                       if (groupByChanged) {
+                               return true;
+                       }
+        }
+
+               // This rule can't be applied to this operator or its 
descendants.
+               // Thus, remove the effects of this operator so that the
+               // depth-first-search can return to the parent.
+               usedVars.clear();
+               usedVars.addAll(collectedUsedVarsBeforeThisOpFromRoot);
+
+        return false;
+    }
+
+}
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
index b25497c..0c74e47 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
@@ -21,12 +21,12 @@
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.ListSet;
 import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -50,47 +50,122 @@
  */
 public class RemoveUnusedAssignAndAggregateRule implements 
IAlgebraicRewriteRule {
 
+       // Keep the variables that are produced by ASSIGN, UNNEST, AGGREGATE, 
and
+       // UNION operators.
+       Map<Mutable<ILogicalOperator>, Set<LogicalVariable>> assignedVarMap = 
new LinkedHashMap<>();
+       Set<LogicalVariable> assignedVarSet = new HashSet<>();
+
+       // Keep the variables that are used after ASSIGN, UNNEST, AGGREGATE, and
+       // UNION operators.
+       Map<Mutable<ILogicalOperator>, Set<LogicalVariable>> 
accumulatedUsedVarFromRootMap = new LinkedHashMap<>();
+
+       boolean isTransformed = false;
+
+       // Keep the variable-mapping of a UNION operator
+       // This is required to keep the variables of the left or right branch 
of the
+       // UNION operator
+       // if the output variable of the UNION operator is survived.
+       Set<LogicalVariable> survivedUnionSourceVarSet = new 
HashSet<LogicalVariable>();
+
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context) {
         return false;
     }
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context) throws AlgebricksException {
+       public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
+                       throws AlgebricksException {
         if (context.checkIfInDontApplySet(this, opRef.getValue())) {
             return false;
         }
-        Set<LogicalVariable> toRemove = new HashSet<LogicalVariable>();
-        collectUnusedAssignedVars((AbstractLogicalOperator) opRef.getValue(), 
toRemove, true, context);
-        boolean smthToRemove = !toRemove.isEmpty();
-        if (smthToRemove) {
-            removeUnusedAssigns(opRef, toRemove, context);
+
+               clear();
+               Set<LogicalVariable> accumulatedUsedVarFromRootSet = new 
HashSet<>();
+               collectUnusedAssignedVars(opRef, accumulatedUsedVarFromRootSet, 
true, context);
+
+               // If there are ASSIGN, UNNEST, AGGREGATE, and UNION operators 
in the
+               // plan,
+               // we try to remove these operators if the produced variables 
from these
+               // operators are not used.
+               if (!assignedVarMap.isEmpty()) {
+                       removeUnusedAssigns(opRef, context);
+               }
+
+               return isTransformed;
+       }
+
+       /**
+        * Collect the information from the given operator and removes assigned
+        * variables if it is used afterwards.
+        */
+       private Set<LogicalVariable> 
removeAssignVarFromConsideration(Mutable<ILogicalOperator> opRef) {
+               Set<LogicalVariable> assignVarsSetForThisOp = null;
+               Set<LogicalVariable> usedVarsSetForThisOp = null;
+
+               if (accumulatedUsedVarFromRootMap.containsKey(opRef)) {
+                       usedVarsSetForThisOp = 
accumulatedUsedVarFromRootMap.get(opRef);
+               }
+
+               if (assignedVarMap.containsKey(opRef)) {
+                       assignVarsSetForThisOp = assignedVarMap.get(opRef);
+               }
+
+               if (assignVarsSetForThisOp != null && 
!assignVarsSetForThisOp.isEmpty()) {
+                       Iterator<LogicalVariable> varIter = 
assignVarsSetForThisOp.iterator();
+                       while (varIter.hasNext()) {
+                               LogicalVariable v = varIter.next();
+                               if ((usedVarsSetForThisOp != null && 
usedVarsSetForThisOp.contains(v))
+                                               || 
survivedUnionSourceVarSet.contains(v)) {
+                                       varIter.remove();
+                               }
+                       }
+               }
+
+               // The source variables of the UNIONALL operator should be 
survived
+               // since we are sure that the output of UNIONALL operator is 
used
+               // afterwards.
+               if (opRef.getValue().getOperatorTag() == 
LogicalOperatorTag.UNIONALL) {
+                       Iterator<Triple<LogicalVariable, LogicalVariable, 
LogicalVariable>> iter = ((UnionAllOperator) opRef
+                                       
.getValue()).getVariableMappings().iterator();
+                       while (iter.hasNext()) {
+                               Triple<LogicalVariable, LogicalVariable, 
LogicalVariable> varMapping = iter.next();
+                               survivedUnionSourceVarSet.add(varMapping.first);
+                               
survivedUnionSourceVarSet.add(varMapping.second);
+                       }
         }
-        return !toRemove.isEmpty();
+
+               return assignVarsSetForThisOp;
     }
 
-    private void removeUnusedAssigns(Mutable<ILogicalOperator> opRef, 
Set<LogicalVariable> toRemove,
-            IOptimizationContext context) throws AlgebricksException {
+       private void removeUnusedAssigns(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
+                       throws AlgebricksException {
+
         AbstractLogicalOperator op = (AbstractLogicalOperator) 
opRef.getValue();
-        while (removeFromAssigns(op, toRemove, context) == 0) {
+
+               Set<LogicalVariable> assignVarsSetForThisOp = 
removeAssignVarFromConsideration(opRef);
+
+               while (removeFromAssigns(op, assignVarsSetForThisOp, context) 
== 0) {
             if (op.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
                 break;
             }
             op = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
             opRef.setValue(op);
+                       assignVarsSetForThisOp = 
removeAssignVarFromConsideration(opRef);
         }
+
         Iterator<Mutable<ILogicalOperator>> childIter = 
op.getInputs().iterator();
         while (childIter.hasNext()) {
             Mutable<ILogicalOperator> cRef = childIter.next();
-            removeUnusedAssigns(cRef, toRemove, context);
+                       removeUnusedAssigns(cRef, context);
         }
+
         if (op.hasNestedPlans()) {
             AbstractOperatorWithNestedPlans opWithNest = 
(AbstractOperatorWithNestedPlans) op;
             Iterator<ILogicalPlan> planIter = 
opWithNest.getNestedPlans().iterator();
             while (planIter.hasNext()) {
                 ILogicalPlan p = planIter.next();
                 for (Mutable<ILogicalOperator> r : p.getRoots()) {
-                    removeUnusedAssigns(r, toRemove, context);
+                                       removeUnusedAssigns(r, context);
                 }
             }
 
@@ -123,6 +198,7 @@
                 AssignOperator assign = (AssignOperator) op;
                 if (removeUnusedVarsAndExprs(toRemove, assign.getVariables(), 
assign.getExpressions())) {
                     context.computeAndSetTypeEnvironmentForOperator(assign);
+                               isTransformed = true;
                 }
                 return assign.getVariables().size();
             }
@@ -130,14 +206,17 @@
                 AggregateOperator agg = (AggregateOperator) op;
                 if (removeUnusedVarsAndExprs(toRemove, agg.getVariables(), 
agg.getExpressions())) {
                     context.computeAndSetTypeEnvironmentForOperator(agg);
+                               isTransformed = true;
                 }
                 return agg.getVariables().size();
             }
             case UNNEST: {
                 UnnestOperator uOp = (UnnestOperator) op;
                 LogicalVariable pVar = uOp.getPositionalVariable();
-                if (pVar != null && toRemove.contains(pVar)) {
+                       if (pVar != null && toRemove != null && 
toRemove.contains(pVar)) {
                     uOp.setPositionalVariable(null);
+                               assignedVarSet.remove(pVar);
+                               isTransformed = true;
                 }
                 break;
             }
@@ -145,9 +224,12 @@
                 UnionAllOperator unionOp = (UnionAllOperator) op;
                 if (removeUnusedVarsFromUnionAll(unionOp, toRemove)) {
                     context.computeAndSetTypeEnvironmentForOperator(unionOp);
+                               isTransformed = true;
                 }
                 return unionOp.getVariableMappings().size();
             }
+               default:
+                       break;
         }
         return -1;
     }
@@ -156,71 +238,81 @@
         Iterator<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> 
iter = unionOp.getVariableMappings()
                 .iterator();
         boolean modified = false;
-        Set<LogicalVariable> removeFromRemoveSet = new 
HashSet<LogicalVariable>();
-        while (iter.hasNext()) {
-            Triple<LogicalVariable, LogicalVariable, LogicalVariable> 
varMapping = iter.next();
-            if (toRemove.contains(varMapping.third)) {
-                iter.remove();
-                modified = true;
+               if (toRemove != null && !toRemove.isEmpty()) {
+                       while (iter.hasNext()) {
+                               Triple<LogicalVariable, LogicalVariable, 
LogicalVariable> varMapping = iter.next();
+                               if (toRemove.contains(varMapping.third)) {
+                                       iter.remove();
+                                       assignedVarSet.remove(varMapping.third);
+                                       modified = true;
+                               } else {
+                                       // In case when the output variable of 
Union is survived,
+                                       // the source variables should not be 
removed.
+                                       
survivedUnionSourceVarSet.add(varMapping.first);
+                                       
survivedUnionSourceVarSet.add(varMapping.second);
+                               }
             }
-            // In any case, make sure we do not removing these variables.
-            removeFromRemoveSet.add(varMapping.first);
-            removeFromRemoveSet.add(varMapping.second);
         }
-        toRemove.removeAll(removeFromRemoveSet);
         return modified;
     }
 
     private boolean removeUnusedVarsAndExprs(Set<LogicalVariable> toRemove, 
List<LogicalVariable> varList,
             List<Mutable<ILogicalExpression>> exprList) {
         boolean changed = false;
-        Iterator<LogicalVariable> varIter = varList.iterator();
-        Iterator<Mutable<ILogicalExpression>> exprIter = exprList.iterator();
-        while (varIter.hasNext()) {
-            LogicalVariable v = varIter.next();
-            exprIter.next();
-            if (toRemove.contains(v)) {
-                varIter.remove();
-                exprIter.remove();
-                changed = true;
+               if (toRemove != null && !toRemove.isEmpty()) {
+                       Iterator<LogicalVariable> varIter = varList.iterator();
+                       Iterator<Mutable<ILogicalExpression>> exprIter = 
exprList.iterator();
+                       while (varIter.hasNext()) {
+                               LogicalVariable v = varIter.next();
+                               exprIter.next();
+                               if (toRemove.contains(v)) {
+                                       varIter.remove();
+                                       exprIter.remove();
+                                       assignedVarSet.remove(v);
+                                       changed = true;
+                               }
             }
         }
         return changed;
     }
 
-    private void collectUnusedAssignedVars(AbstractLogicalOperator op, 
Set<LogicalVariable> toRemove, boolean first,
-            IOptimizationContext context) throws AlgebricksException {
+       private void collectUnusedAssignedVars(Mutable<ILogicalOperator> opRef,
+                       Set<LogicalVariable> accumulatedUsedVarFromRootSet, 
boolean first, IOptimizationContext context)
+                       throws AlgebricksException {
+               AbstractLogicalOperator op = (AbstractLogicalOperator) 
opRef.getValue();
         if (!first) {
             context.addToDontApplySet(this, op);
         }
-        for (Mutable<ILogicalOperator> c : op.getInputs()) {
-            collectUnusedAssignedVars((AbstractLogicalOperator) c.getValue(), 
toRemove, false, context);
-        }
-        if (op.hasNestedPlans()) {
-            AbstractOperatorWithNestedPlans opWithNested = 
(AbstractOperatorWithNestedPlans) op;
-            for (ILogicalPlan plan : opWithNested.getNestedPlans()) {
-                for (Mutable<ILogicalOperator> r : plan.getRoots()) {
-                    collectUnusedAssignedVars((AbstractLogicalOperator) 
r.getValue(), toRemove, false, context);
-                }
-            }
-        }
-        boolean removeUsedVars = true;
+
+               Set<LogicalVariable> assignVarsSetInThisOp = new HashSet<>();
+               Set<LogicalVariable> usedVarsSetInThisOp = new HashSet<>();
+
+               // Add used variables in this operator to the accumulated used 
variables
+               // set?
+               boolean addUsedVarsInThisOp = true;
+               // ASSIGN, AGGREGATE, UNNEST, OR UNIONALL operator found?
+               boolean assignOpFound = false;
+
+               // Collect assigned variables in this operator.
         switch (op.getOperatorTag()) {
             case ASSIGN: {
                 AssignOperator assign = (AssignOperator) op;
-                toRemove.addAll(assign.getVariables());
+                       assignVarsSetInThisOp.addAll(assign.getVariables());
+                       assignOpFound = true;
                 break;
             }
             case AGGREGATE: {
                 AggregateOperator agg = (AggregateOperator) op;
-                toRemove.addAll(agg.getVariables());
+                       assignVarsSetInThisOp.addAll(agg.getVariables());
+                       assignOpFound = true;
                 break;
             }
             case UNNEST: {
                 UnnestOperator uOp = (UnnestOperator) op;
                 LogicalVariable pVar = uOp.getPositionalVariable();
                 if (pVar != null) {
-                    toRemove.add(pVar);
+                               assignVarsSetInThisOp.add(pVar);
+                               assignOpFound = true;
                 }
                 break;
             }
@@ -228,17 +320,55 @@
                 UnionAllOperator unionOp = (UnionAllOperator) op;
                 for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> 
varMapping : unionOp
                         .getVariableMappings()) {
-                    toRemove.add(varMapping.third);
+                               assignVarsSetInThisOp.add(varMapping.third);
                 }
-                removeUsedVars = false;
+                       assignOpFound = true;
+                       // Don't add used variables in UNIONALL.
+                       addUsedVarsInThisOp = false;
+                       break;
+               }
+               default:
                 break;
+               }
+
+               if (assignOpFound) {
+                       assignedVarMap.put(opRef, assignVarsSetInThisOp);
+                       assignedVarSet.addAll(assignVarsSetInThisOp);
+               }
+
+               if (addUsedVarsInThisOp) {
+                       VariableUtilities.getUsedVariables(op, 
usedVarsSetInThisOp);
+                       
accumulatedUsedVarFromRootSet.addAll(usedVarsSetInThisOp);
+                       // We may have visited this operator before if there 
are multiple
+                       // paths in the plan.
+                       if (accumulatedUsedVarFromRootMap.containsKey(opRef)) {
+                               
accumulatedUsedVarFromRootMap.get(opRef).addAll(usedVarsSetInThisOp);
+                       } else {
+                               accumulatedUsedVarFromRootMap.put(opRef, new 
HashSet<LogicalVariable>(accumulatedUsedVarFromRootSet));
             }
+               } else {
+                       accumulatedUsedVarFromRootMap.put(opRef, new 
HashSet<LogicalVariable>(accumulatedUsedVarFromRootSet));
         }
-        if (removeUsedVars) {
-            List<LogicalVariable> used = new LinkedList<LogicalVariable>();
-            VariableUtilities.getUsedVariables(op, used);
-            toRemove.removeAll(used);
+
+               for (Mutable<ILogicalOperator> c : op.getInputs()) {
+                       collectUnusedAssignedVars(c, new 
HashSet<LogicalVariable>(accumulatedUsedVarFromRootSet), false, context);
+               }
+               if (op.hasNestedPlans()) {
+                       AbstractOperatorWithNestedPlans opWithNested = 
(AbstractOperatorWithNestedPlans) op;
+                       for (ILogicalPlan plan : opWithNested.getNestedPlans()) 
{
+                               for (Mutable<ILogicalOperator> r : 
plan.getRoots()) {
+                                       collectUnusedAssignedVars(r, new 
HashSet<LogicalVariable>(accumulatedUsedVarFromRootSet), false,
+                                                       context);
+                               }
+                       }
         }
     }
 
+       private void clear() {
+               assignedVarMap.clear();
+               assignedVarSet.clear();
+               accumulatedUsedVarFromRootMap.clear();
+               survivedUnionSourceVarSet.clear();
+               isTransformed = false;
+       }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1073
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I69e055572f024f28a857d4e64916b4806e63c083
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Taewoo Kim <wangs...@yahoo.com>

Reply via email to