Author: daijy
Date: Fri Jan 22 08:00:38 2010
New Revision: 902027

URL: http://svn.apache.org/viewvc?rev=902027&view=rev
Log:
PIG-1184: PruneColumns optimization does not handle the case of foreach flatten 
correctly if flattened bag is not used later

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=902027&r1=902026&r2=902027&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Jan 22 08:00:38 2010
@@ -144,6 +144,9 @@
 PIG-1176: Column Pruner issues in union of loader with and without schema
 (daijy)
 
+PIG-1184: PruneColumns optimization does not handle the case of foreach
+flatten correctly if flattened bag is not used later (daijy)
+
 Release 0.6.0 - Unreleased
 
 INCOMPATIBLE CHANGES

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java?rev=902027&r1=902026&r2=902027&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
 Fri Jan 22 08:00:38 2010
@@ -53,6 +53,7 @@
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.RelationalOperator;
 import org.apache.pig.impl.logicalLayer.TopLevelProjectFinder;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.MapKeysInfo;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -361,20 +362,40 @@
                     }
                 }
             }
-
-                
+            
             // Merge with required input fields of this logical operator.
             // RequiredInputFields come from two sources, one is mapping from 
required output to input, 
             // the other is from the operator itself. Here we use 
getRequiredFields to get the second part,
             // and merge with the first part
             List<RequiredFields> requiredFieldsListOfLOOp;
             
-            // For LOForEach, requiredFields is not really required fields. 
Here required fields means the input
-            // fields required by the entire output columns, such as filter 
condition in LOFilter, group columns in LOCoGroup.
-            // For LOForEach, output columns are generated by the foreach plan 
it belongs to, there is nothing globally required.
-            // So we need to fix the semantic gap here. If the operator is 
LOForEach, requiredFields is null.
+            // For LOForEach, requiredFields all flattened fields. Even the 
flattened fields get pruned, 
+            // it may expand the number of rows in the result. So flattened 
fields shall not be pruned.
+            // LOForEach.getRequiredFields does not give the required fields. 
RequiredFields means that field
+            // is required by all the outputs. The pipeline does not work 
correctly without that field. 
+            // LOForEach.getRequiredFields give all the input fields referred 
in the LOForEach statement, but those
+            // fields can still be pruned (which means, not required)
+            // Eg:
+            // B = foreach A generate a0, a1, a2+a3;
+            // LOForEach.getRequiredFields gives (a0, a1, a2, a3);
+            // However, input column a2 and a3 can be pruned if we do not need 
output a2+a3 for LOForEach.
+            // So here, we do not use LOForEach.getRequiredFields, instead, 
any flattened fields are required fields
+            if (rlo instanceof LOForEach) {
+                List<Pair<Integer, Integer>> flattenedInputs = new 
ArrayList<Pair<Integer, Integer>>();
+                for (int i=0;i<rlo.getSchema().size();i++) {
+                    if (((LOForEach)rlo).isInputFlattened(i)) {
+                        flattenedInputs.add(new Pair<Integer, Integer>(0, i));
+                    }
+                }
+                if (!flattenedInputs.isEmpty()) {
+                    requiredFieldsListOfLOOp = new ArrayList<RequiredFields>();
+                    requiredFieldsListOfLOOp.add(new 
RequiredFields(flattenedInputs));
+                }
+                else
+                    requiredFieldsListOfLOOp = null;
+            }
             // For LOCross/LOUnion, actually we do not require any field here
-            if (rlo instanceof LOForEach || rlo instanceof LOCross || rlo 
instanceof LOUnion)
+            else if (rlo instanceof LOCross || rlo instanceof LOUnion)
                 requiredFieldsListOfLOOp = null;
             else
                 requiredFieldsListOfLOOp = rlo.getRequiredFields();
@@ -785,5 +806,5 @@
             String msg = "Unable to prune plan";
             throw new OptimizerException(msg, errCode, PigException.BUG, e);
         }
-    }
+    }    
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=902027&r1=902026&r2=902027&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Fri Jan 22 
08:00:38 2010
@@ -1767,4 +1767,36 @@
 
         assertTrue(emptyLogFileMessage());
     }
+    
+    // See PIG-1184
+    @Test
+    public void testForEachFlatten() throws Exception {
+        File inputFile = Util.createInputFile("table_testForEachFlatten", "", 
new String[]{"oiue\tM\t{(3),(4)}\t{(toronto),(montreal)}"});
+        
+        pigServer.registerQuery("A = load '"+inputFile.toString()+"' as 
(a0:chararray, a1:chararray, a2:bag{t:tuple(id:chararray)}, 
a3:bag{t:tuple(loc:chararray)});");
+        pigServer.registerQuery("B = foreach A generate a0, a1, flatten(a2), 
flatten(a3), 10;");
+        pigServer.registerQuery("C = foreach B generate a0, $4;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.toString().equals("(oiue,10)"));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.toString().equals("(oiue,10)"));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.toString().equals("(oiue,10)"));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.toString().equals("(oiue,10)"));
+
+        assertFalse(iter.hasNext());
+
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1",
+                "No map keys pruned for A"}));
+    }
 }


Reply via email to