Author: thejas Date: Mon Sep 20 20:38:23 2010 New Revision: 999106 URL: http://svn.apache.org/viewvc?rev=999106&view=rev Log: PIG-1616: 'union onschema' does not use create output with correct schema when udfs are involved
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt hadoop/pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=999106&r1=999105&r2=999106&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Mon Sep 20 20:38:23 2010 @@ -204,6 +204,9 @@ PIG-1309: Map-side Cogroup (ashutoshc) BUG FIXES +PIG-1616: 'union onschema' does not use create output with correct schema +when udfs are involved (thejas) + PIG-1610: 'union onschema' does handle some cases involving 'namespaced' column names in schema (thejas) Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=999106&r1=999105&r2=999106&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Mon Sep 20 20:38:23 2010 @@ -54,6 +54,8 @@ import org.apache.pig.impl.plan.Operator import org.apache.pig.impl.plan.PlanException; import org.apache.pig.impl.streaming.StreamingCommand; import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec; +import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor; +import org.apache.pig.impl.plan.CompilationMessageCollector; import org.apache.pig.StreamToPig; import org.apache.pig.PigToStream; import org.apache.pig.builtin.PigStreaming; @@ -2390,13 +2392,29 @@ LogicalOperator UnionClause(LogicalPlan try{// this try-catch block will catch all exceptions and convert them // to ParseException. Otherwise, if any exception than ParseException // is thrown , the generated parse code tries to cast - //the exception to Error, resulting in a misleading error message + //the exception to Error, resulting in a misleading error message + + if(isOnSchema) { + // run through validator first on inputs so that the schemas have the right + //types for columns. It will run TypeCheckingValidator as well. + // The compilation messages will be logged when validation is + // done from PigServer, so not doing it here + CompilationMessageCollector collector = new CompilationMessageCollector() ; + boolean isBeforeOptimizer = true; + LogicalPlanValidationExecutor validator = + new LogicalPlanValidationExecutor(lp, pigContext, isBeforeOptimizer); + validator.validate(lp, collector); + } + LogicalOperator union = new LOUnion(lp, new OperatorKey(scope, getNextId())); lp.add(union); log.debug("Added operator " + union.getClass().getName() + " to the logical plan"); if(isOnSchema) - { // this is UNION ONSCHEMA, find merged schema. + { // this is UNION ONSCHEMA, find merged schema + // and (if necessary) add foreach to align columns + + ArrayList<Schema> schemas = new ArrayList<Schema>(inputs.size()); for(LogicalOperator lop : inputs){ Schema sch = lop.getSchema(); Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java?rev=999106&r1=999105&r2=999106&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java Mon Sep 20 20:38:23 2010 @@ -605,6 +605,58 @@ public class TestUnionOnSchema { } + + /** + * Test UNION ONSCHEMA with udf whose default type is different from + * final type + * @throws IOException + * @throws ParseException + */ + @Test + public void testUnionOnSchemaUdfTypeEvolution() throws IOException, ParseException { + PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + //PigServer pig = new PigServer(ExecType.LOCAL); + String query_prefix = + " l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as " + + " (i : int, c : chararray, j : int " + + ", b : bag { t : tuple (c1 : int, c2 : chararray)}" + + ", t : tuple (tc1 : int, tc2 : chararray) );" + + " l2 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as " + + " (i : int, c : chararray, j : int " + + ", b : bag { t : tuple (c1 : int, c2 : chararray)}" + + ", t : tuple (tc1 : int, tc2 : chararray) );" + + "f1 = foreach l1 generate i, MAX(b.c1) as mx;" + + "f2 = foreach l2 generate i, COUNT(b.c1) as mx;" + + ; + String query = query_prefix + "u = union onschema f1, f2;"; + Util.registerMultiLineQuery(pig, query); + Schema sch = pig.dumpSchema("u"); + Schema expectedSch = + Util.getSchemaFromString("i: int, mx: long"); + assertEquals("Checking expected schema",sch, expectedSch); + + // verify schema for reverse order of relations as well + query = query_prefix + "u = union onschema f2, f1;"; + Util.registerMultiLineQuery(pig, query); + sch = pig.dumpSchema("u"); + expectedSch = + Util.getSchemaFromString("i: int, mx: long"); + assertEquals("Checking expected schema",sch, expectedSch); + + + Iterator<Tuple> it = pig.openIterator("u"); + + List<Tuple> expectedRes = + Util.getTuplesFromConstantTupleStrings( + new String[] { + "(1,1L)", + "(5,2L)", + "(1,2L)", + "(5,2L)" + }); + Util.checkQueryOutputsAfterSort(it, expectedRes); + } /** * Udf that has schema of tuple column with no inner schema