Author: daijy Date: Fri Jan 22 08:00:38 2010 New Revision: 902027 URL: http://svn.apache.org/viewvc?rev=902027&view=rev Log: PIG-1184: PruneColumns optimization does not handle the case of foreach flatten correctly if flattened bag is not used later
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=902027&r1=902026&r2=902027&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Fri Jan 22 08:00:38 2010 @@ -144,6 +144,9 @@ PIG-1176: Column Pruner issues in union of loader with and without schema (daijy) +PIG-1184: PruneColumns optimization does not handle the case of foreach +flatten correctly if flattened bag is not used later (daijy) + Release 0.6.0 - Unreleased INCOMPATIBLE CHANGES Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java?rev=902027&r1=902026&r2=902027&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java Fri Jan 22 08:00:38 2010 @@ -53,6 +53,7 @@ import org.apache.pig.impl.logicalLayer.LogicalPlan; import org.apache.pig.impl.logicalLayer.RelationalOperator; import org.apache.pig.impl.logicalLayer.TopLevelProjectFinder; +import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.plan.MapKeysInfo; import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.OperatorKey; @@ -361,20 +362,40 @@ } } } - - + // Merge with required input fields of this logical operator. // RequiredInputFields come from two sources, one is mapping from required output to input, // the other is from the operator itself. Here we use getRequiredFields to get the second part, // and merge with the first part List<RequiredFields> requiredFieldsListOfLOOp; - // For LOForEach, requiredFields is not really required fields. Here required fields means the input - // fields required by the entire output columns, such as filter condition in LOFilter, group columns in LOCoGroup. - // For LOForEach, output columns are generated by the foreach plan it belongs to, there is nothing globally required. - // So we need to fix the semantic gap here. If the operator is LOForEach, requiredFields is null. + // For LOForEach, requiredFields all flattened fields. Even the flattened fields get pruned, + // it may expand the number of rows in the result. So flattened fields shall not be pruned. + // LOForEach.getRequiredFields does not give the required fields. RequiredFields means that field + // is required by all the outputs. The pipeline does not work correctly without that field. + // LOForEach.getRequiredFields give all the input fields referred in the LOForEach statement, but those + // fields can still be pruned (which means, not required) + // Eg: + // B = foreach A generate a0, a1, a2+a3; + // LOForEach.getRequiredFields gives (a0, a1, a2, a3); + // However, input column a2 and a3 can be pruned if we do not need output a2+a3 for LOForEach. + // So here, we do not use LOForEach.getRequiredFields, instead, any flattened fields are required fields + if (rlo instanceof LOForEach) { + List<Pair<Integer, Integer>> flattenedInputs = new ArrayList<Pair<Integer, Integer>>(); + for (int i=0;i<rlo.getSchema().size();i++) { + if (((LOForEach)rlo).isInputFlattened(i)) { + flattenedInputs.add(new Pair<Integer, Integer>(0, i)); + } + } + if (!flattenedInputs.isEmpty()) { + requiredFieldsListOfLOOp = new ArrayList<RequiredFields>(); + requiredFieldsListOfLOOp.add(new RequiredFields(flattenedInputs)); + } + else + requiredFieldsListOfLOOp = null; + } // For LOCross/LOUnion, actually we do not require any field here - if (rlo instanceof LOForEach || rlo instanceof LOCross || rlo instanceof LOUnion) + else if (rlo instanceof LOCross || rlo instanceof LOUnion) requiredFieldsListOfLOOp = null; else requiredFieldsListOfLOOp = rlo.getRequiredFields(); @@ -785,5 +806,5 @@ String msg = "Unable to prune plan"; throw new OptimizerException(msg, errCode, PigException.BUG, e); } - } + } } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=902027&r1=902026&r2=902027&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Fri Jan 22 08:00:38 2010 @@ -1767,4 +1767,36 @@ assertTrue(emptyLogFileMessage()); } + + // See PIG-1184 + @Test + public void testForEachFlatten() throws Exception { + File inputFile = Util.createInputFile("table_testForEachFlatten", "", new String[]{"oiue\tM\t{(3),(4)}\t{(toronto),(montreal)}"}); + + pigServer.registerQuery("A = load '"+inputFile.toString()+"' as (a0:chararray, a1:chararray, a2:bag{t:tuple(id:chararray)}, a3:bag{t:tuple(loc:chararray)});"); + pigServer.registerQuery("B = foreach A generate a0, a1, flatten(a2), flatten(a3), 10;"); + pigServer.registerQuery("C = foreach B generate a0, $4;"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + assertTrue(iter.hasNext()); + Tuple t = iter.next(); + assertTrue(t.toString().equals("(oiue,10)")); + + assertTrue(iter.hasNext()); + t = iter.next(); + assertTrue(t.toString().equals("(oiue,10)")); + + assertTrue(iter.hasNext()); + t = iter.next(); + assertTrue(t.toString().equals("(oiue,10)")); + + assertTrue(iter.hasNext()); + t = iter.next(); + assertTrue(t.toString().equals("(oiue,10)")); + + assertFalse(iter.hasNext()); + + assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1", + "No map keys pruned for A"})); + } }