Author: thejas
Date: Mon Sep 20 20:39:01 2010
New Revision: 999107

URL: http://svn.apache.org/viewvc?rev=999107&view=rev
Log:
PIG-1616: 'union onschema' does not use create output with correct schema
when udfs are involved

Modified:
    hadoop/pig/branches/branch-0.8/CHANGES.txt
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java

Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/CHANGES.txt?rev=999107&r1=999106&r2=999107&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.8/CHANGES.txt Mon Sep 20 20:39:01 2010
@@ -192,6 +192,8 @@ PIG-1353: Map-side joins (ashutoshc)
 PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
+PIG-1616: 'union onschema' does not use create output with correct schema
+when udfs are involved (thejas)
 
 PIG-1610:  'union onschema' does handle some cases involving 'namespaced' 
 column names in schema (thejas)

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=999107&r1=999106&r2=999107&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
 Mon Sep 20 20:39:01 2010
@@ -54,6 +54,8 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.streaming.StreamingCommand;
 import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+import 
org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
+import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.StreamToPig;
 import org.apache.pig.PigToStream;
 import org.apache.pig.builtin.PigStreaming;
@@ -2390,13 +2392,29 @@ LogicalOperator UnionClause(LogicalPlan 
             try{// this try-catch block will catch all exceptions and convert 
them
                 // to ParseException. Otherwise, if any exception than 
ParseException
                 // is thrown , the generated parse code tries to cast
-                //the exception to Error, resulting in a misleading error 
message 
+                //the exception to Error, resulting in a misleading error 
message
+                
+                if(isOnSchema) {
+                    // run through validator first on inputs so that the 
schemas have the right
+                    //types for columns. It will run TypeCheckingValidator as 
well.
+                    // The compilation messages will be logged when validation 
is
+                    // done from PigServer, so not doing it here
+                    CompilationMessageCollector collector = new 
CompilationMessageCollector() ;
+                    boolean isBeforeOptimizer = true;
+                    LogicalPlanValidationExecutor validator = 
+                        new LogicalPlanValidationExecutor(lp, pigContext, 
isBeforeOptimizer);
+                    validator.validate(lp, collector);
+                }
+                
                 LogicalOperator union = new LOUnion(lp, new OperatorKey(scope, 
getNextId()));
                 lp.add(union);
                 log.debug("Added operator " + union.getClass().getName() + " 
to the logical plan");
 
                 if(isOnSchema)             
-                {  // this is UNION ONSCHEMA, find merged schema.
+                {  // this is UNION ONSCHEMA, find merged schema 
+                    // and  (if necessary) add foreach to align columns
+                    
+                    
                     ArrayList<Schema> schemas = new 
ArrayList<Schema>(inputs.size());
                     for(LogicalOperator lop : inputs){
                         Schema sch = lop.getSchema();

Modified: 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java?rev=999107&r1=999106&r2=999107&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java 
(original)
+++ 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java 
Mon Sep 20 20:39:01 2010
@@ -605,6 +605,58 @@ public class TestUnionOnSchema  {
 
     }
     
+    
+    /**
+     * Test UNION ONSCHEMA with udf whose default type is different from
+     * final type
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testUnionOnSchemaUdfTypeEvolution() throws IOException, 
ParseException {
+        PigServer pig = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties());
+        //PigServer pig = new PigServer(ExecType.LOCAL);
+        String query_prefix =
+            "  l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as " 
+            + "  (i : int, c : chararray, j : int " 
+            +       ", b : bag { t : tuple (c1 : int, c2 : chararray)}" 
+            +       ", t : tuple (tc1 : int, tc2 : chararray) );"
+            + " l2 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as " 
+            + "  (i : int, c : chararray, j : int " 
+            +       ", b : bag { t : tuple (c1 : int, c2 : chararray)}" 
+            +       ", t : tuple (tc1 : int, tc2 : chararray) );"
+            + "f1 = foreach l1 generate i, MAX(b.c1) as mx;"
+            + "f2 = foreach l2 generate i, COUNT(b.c1) as mx;"
+
+        ; 
+        String query = query_prefix  + "u = union onschema f1, f2;";
+        Util.registerMultiLineQuery(pig, query);
+        Schema sch = pig.dumpSchema("u");
+        Schema expectedSch = 
+            Util.getSchemaFromString("i: int, mx: long");
+        assertEquals("Checking expected schema",sch, expectedSch);
+        
+        // verify schema for reverse order of relations as well
+        query = query_prefix  + "u = union onschema f2, f1;";
+        Util.registerMultiLineQuery(pig, query);
+        sch = pig.dumpSchema("u");
+        expectedSch = 
+            Util.getSchemaFromString("i: int, mx: long");
+        assertEquals("Checking expected schema",sch, expectedSch);
+        
+        
+        Iterator<Tuple> it = pig.openIterator("u");
+        
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "(1,1L)",
+                            "(5,2L)",
+                            "(1,2L)",
+                            "(5,2L)"
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+    }
 
     /**
      * Udf that has schema of tuple column with no inner schema 


Reply via email to