Author: daijy
Date: Tue Jan 26 00:30:40 2010
New Revision: 903027

URL: http://svn.apache.org/viewvc?rev=903027&view=rev
Log:
PIG-1184: PruneColumns optimization does not handle the case of foreach flatten 
correctly if flattened bag is not used later

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
    
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=903027&r1=903026&r2=903027&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Jan 26 00:30:40 2010
@@ -144,6 +144,9 @@
 PIG-1176: Column Pruner issues in union of loader with and without schema
 (daijy)
 
+PIG-1184: PruneColumns optimization does not handle the case of foreach
+flatten correctly if flattened bag is not used later (daijy)
+
 Release 0.6.0 - Unreleased
 
 INCOMPATIBLE CHANGES

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=903027&r1=903026&r2=903027&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java Tue 
Jan 26 00:30:40 2010
@@ -803,28 +803,23 @@
         if (mSchema == null)
             return null;
         
-        if (mSchema.size()<=column)
-        {
-            return null;
-        }
-        
         return mSchemaPlanMapping.get(column);
     }
     
     public boolean isInputFlattened(int column) throws FrontendException {
-        LogicalPlan plan = getRelevantPlan(column);
-        if (plan==null) {
-            int errCode = 2195;
-            throw new FrontendException("Fail to get foreach plan for input 
column "+column,
-                    errCode, PigException.BUG);
-        }
-        int index = mForEachPlans.indexOf(plan);
-        if (index==-1) {
-            int errCode = 2195;
-            throw new FrontendException("Fail to get foreach plan for input 
column "+column,
-                    errCode, PigException.BUG);
+        for (int i=0;i<mForEachPlans.size();i++) {
+            LogicalPlan forEachPlan = mForEachPlans.get(i);
+            TopLevelProjectFinder projectFinder = new 
TopLevelProjectFinder(forEachPlan);
+            projectFinder.visit();
+            for (LOProject project : projectFinder.getProjectList()) {
+                if (project.getCol()==column) {
+                    if (mFlatten.get(i))
+                        return true;
+                }
+            }
         }
-        return mFlatten.get(index);
+
+        return false;
     }
     
     @Override
@@ -942,16 +937,16 @@
             {
                 continue;
             }
-            boolean allPruned = true;
+            boolean anyPruned = false;
             for (LOProject loProject : projectFinder.getProjectSet()) {
                 Pair<Integer, Integer> pair = new Pair<Integer, Integer>(0,
                         loProject.getCol());
-                if (!columns.contains(pair)) {
-                    allPruned = false;
+                if (columns.contains(pair)) {
+                    anyPruned = true;
                     break;
                 }
             }
-            if (allPruned) {
+            if (anyPruned) {
                 planToRemove.add(i);
             }
         }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java?rev=903027&r1=903026&r2=903027&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
 Tue Jan 26 00:30:40 2010
@@ -53,6 +53,7 @@
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.RelationalOperator;
 import org.apache.pig.impl.logicalLayer.TopLevelProjectFinder;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.MapKeysInfo;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -361,20 +362,40 @@
                     }
                 }
             }
-
-                
+            
             // Merge with required input fields of this logical operator.
             // RequiredInputFields come from two sources, one is mapping from 
required output to input, 
             // the other is from the operator itself. Here we use 
getRequiredFields to get the second part,
             // and merge with the first part
             List<RequiredFields> requiredFieldsListOfLOOp;
             
-            // For LOForEach, requiredFields is not really required fields. 
Here required fields means the input
-            // fields required by the entire output columns, such as filter 
condition in LOFilter, group columns in LOCoGroup.
-            // For LOForEach, output columns are generated by the foreach plan 
it belongs to, there is nothing globally required.
-            // So we need to fix the semantic gap here. If the operator is 
LOForEach, requiredFields is null.
+            // For LOForEach, requiredFields all flattened fields. Even the 
flattened fields get pruned, 
+            // it may expand the number of rows in the result. So flattened 
fields shall not be pruned.
+            // LOForEach.getRequiredFields does not give the required fields. 
RequiredFields means that field
+            // is required by all the outputs. The pipeline does not work 
correctly without that field. 
+            // LOForEach.getRequiredFields give all the input fields referred 
in the LOForEach statement, but those
+            // fields can still be pruned (which means, not required)
+            // Eg:
+            // B = foreach A generate a0, a1, a2+a3;
+            // LOForEach.getRequiredFields gives (a0, a1, a2, a3);
+            // However, a2,a3 can be pruned if we do not need the a2+a3 for 
LOForEach.
+            // So here, we do not use LOForEach.getRequiredFields, instead, 
any flattened fields are required fields
+            if (rlo instanceof LOForEach) {
+                List<Pair<Integer, Integer>> flattenedInputs = new 
ArrayList<Pair<Integer, Integer>>();
+                for (int i=0;i<rlo.getSchema().size();i++) {
+                    if (((LOForEach)rlo).isInputFlattened(i)) {
+                        flattenedInputs.add(new Pair<Integer, Integer>(0, i));
+                    }
+                }
+                if (!flattenedInputs.isEmpty()) {
+                    requiredFieldsListOfLOOp = new ArrayList<RequiredFields>();
+                    requiredFieldsListOfLOOp.add(new 
RequiredFields(flattenedInputs));
+                }
+                else
+                    requiredFieldsListOfLOOp = null;
+            }
             // For LOCross/LOUnion, actually we do not require any field here
-            if (rlo instanceof LOForEach || rlo instanceof LOCross || rlo 
instanceof LOUnion)
+            else if (rlo instanceof LOCross || rlo instanceof LOUnion)
                 requiredFieldsListOfLOOp = null;
             else
                 requiredFieldsListOfLOOp = rlo.getRequiredFields();
@@ -785,5 +806,5 @@
             String msg = "Unable to prune plan";
             throw new OptimizerException(msg, errCode, PigException.BUG, e);
         }
-    }
+    }    
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=903027&r1=903026&r2=903027&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Tue Jan 26 
00:30:40 2010
@@ -1767,4 +1767,36 @@
 
         assertTrue(emptyLogFileMessage());
     }
+    
+    // See PIG-1184
+    @Test
+    public void testForEachFlatten() throws Exception {
+        File inputFile = Util.createInputFile("table_testForEachFlatten", "", 
new String[]{"oiue\tM\t{(3),(4)}\t{(toronto),(montreal)}"});
+        
+        pigServer.registerQuery("A = load '"+inputFile.toString()+"' as 
(a0:chararray, a1:chararray, a2:bag{t:tuple(id:chararray)}, 
a3:bag{t:tuple(loc:chararray)});");
+        pigServer.registerQuery("B = foreach A generate a0, a1, flatten(a2), 
flatten(a3), 10;");
+        pigServer.registerQuery("C = foreach B generate a0, $4;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.toString().equals("(oiue,10)"));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.toString().equals("(oiue,10)"));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.toString().equals("(oiue,10)"));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.toString().equals("(oiue,10)"));
+
+        assertFalse(iter.hasNext());
+
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1",
+                "No map keys pruned for A"}));
+    }
 }


Reply via email to