Author: thejas
Date: Fri Sep 17 00:35:44 2010
New Revision: 997957

URL: http://svn.apache.org/viewvc?rev=997957&view=rev
Log:
PIG-1610:  'union onschema' does handle some cases involving 'namespaced'
column names in schema

Modified:
    hadoop/pig/branches/branch-0.8/CHANGES.txt
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
    
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java

Modified: hadoop/pig/branches/branch-0.8/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/CHANGES.txt?rev=997957&r1=997956&r2=997957&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.8/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.8/CHANGES.txt Fri Sep 17 00:35:44 2010
@@ -192,6 +192,9 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1610:  'union onschema' does handle some cases involving 'namespaced' 
+column names in schema (thejas)
+
 PIG-1609: 'union onschema' should give a more useful error message when 
 schema of one of the relations has null column name(thejas)
 

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=997957&r1=997956&r2=997957&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
 Fri Sep 17 00:35:44 2010
@@ -2433,6 +2433,15 @@ LogicalOperator UnionClause(LogicalPlan 
                     // null for columns that don't exist in the input
                     ArrayList<LogicalOperator> newInputs =
                         new ArrayList<LogicalOperator>(inputs.size());
+                    
+                    //create a user defined schema list for use in LOForeach
+                    // using merged schema
+                    ArrayList<Schema> mergedSchemaList = new 
ArrayList<Schema>();
+                    for(Schema.FieldSchema fs : mergedSchema.getFields()){
+                        mergedSchemaList.add(new Schema(new 
Schema.FieldSchema(fs.alias, DataType.NULL)));
+                    }
+                    
+                    
                     for(LogicalOperator lop : inputs)                 
                     {                     
                         if(! lop.getSchema().equals(mergedSchema))
@@ -2450,7 +2459,7 @@ LogicalOperator UnionClause(LogicalPlan 
                                 Schema inpSchema = lop.getSchema();
                                 flattenList.add(Boolean.FALSE);
 
-                                int inpPos = inpSchema.getPosition(fs.alias);
+                                int inpPos = 
inpSchema.getPositionSubName(fs.alias);
 
                                 LogicalOperator columnProj = null;
                                 boolean isCastNeeded = false;
@@ -2478,7 +2487,7 @@ LogicalOperator UnionClause(LogicalPlan 
                                     //cast is needed if types are different.   
 
                                     //compatibility of types has already been 
checked
                                     //during creation of mergedSchema
-                                    Schema.FieldSchema inpFs = 
inpSchema.getField(fs.alias);
+                                    Schema.FieldSchema inpFs = 
inpSchema.getFieldSubNameMatch(fs.alias);
                                     if(inpFs.type != fs.type)
                                         isCastNeeded = true;
                                 }
@@ -2499,7 +2508,8 @@ LogicalOperator UnionClause(LogicalPlan 
                             }
                             LogicalOperator foreach = new LOForEach(lp,
                                     new OperatorKey(scope, getNextId()),
-                                    generatePlans, flattenList
+                                    generatePlans, flattenList,
+                                    mergedSchemaList
                             );
                             lp.add(foreach);
                             lp.connect(lop, foreach);
@@ -2514,7 +2524,9 @@ LogicalOperator UnionClause(LogicalPlan 
                     // use newInputs as the inputs for union
                     inputs = newInputs;     
                 }
+                
 
+                
                 for (LogicalOperator lop: inputs) {
                     lp.connect(lop, union);
                     log.debug("Connected union input operator " +

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=997957&r1=997956&r2=997957&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
 Fri Sep 17 00:35:44 2010
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Collection;
+import java.util.Map.Entry;
 
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
@@ -822,6 +823,54 @@ public class Schema implements Serializa
         }
     }
 
+    
+    /**
+     * Given an alias name, find the associated FieldSchema. If exact name is 
+     * not found see if any field matches the part of the 'namespaced' alias.
+     * eg. if given alias is nm::a , and schema is (a,b). It will return 
+     * FieldSchema of a.
+     * if given alias is nm::a and schema is (nm2::a, b), it will return null
+     * @param alias Alias to look up.
+     * @return FieldSchema, or null if no such alias is in this tuple.
+     */
+    public FieldSchema getFieldSubNameMatch(String alias) throws 
FrontendException {
+        if(alias == null)
+            return null;
+        FieldSchema fs = getField(alias);
+        if(fs != null){
+            return fs;
+        }
+        //fs is null
+        final String sep = "::";
+        ArrayList<FieldSchema> matchedFieldSchemas = new 
ArrayList<FieldSchema>();
+        if(alias.contains(sep)){
+            for(FieldSchema field : mFields) {
+                if(alias.endsWith(sep + field.alias)){
+                    matchedFieldSchemas.add(field);
+                }
+            }
+        }
+        if(matchedFieldSchemas.size() > 1){
+            boolean hasNext = false;
+            StringBuilder sb = new StringBuilder("Found more than one " +
+            "sub alias name match: ");
+            for (FieldSchema matchFs : matchedFieldSchemas) {
+                if(hasNext) {
+                    sb.append(", ");
+                } else {
+                    hasNext = true;
+                }
+                sb.append(matchFs.alias);
+            }
+            int errCode = 1116;
+            throw new FrontendException(sb.toString(), errCode, 
PigException.INPUT);
+        }else if(matchedFieldSchemas.size() == 1){
+            fs = matchedFieldSchemas.get(0);
+        }
+
+        return fs;
+    }
+    
     /**
      * Given a field number, find the associated FieldSchema.
      *
@@ -1096,6 +1145,31 @@ public class Schema implements Serializa
      * @return position of the FieldSchema.
      */
     public int getPosition(String alias) throws FrontendException{
+        return getPosition(alias, false);
+    }
+
+
+    /**
+     * Given an alias, find the associated position of the field schema.
+     * It uses getFieldSubNameMatch to look for subName matches as well.
+     * @param alias
+     *            alias of the FieldSchema.
+     * @return position of the FieldSchema.
+     */
+    public int getPositionSubName(String alias) throws FrontendException{
+        return getPosition(alias, true);
+    }
+    
+    
+    private int getPosition(String alias, boolean isSubNameMatch)
+    throws FrontendException {
+        if(isSubNameMatch && twoLevelAccessRequired){
+            // should not happen
+            int errCode = 2248;
+            String msg = "twoLevelAccessRequired==true is not supported with" +
+            "and isSubNameMatch==true ";
+            throw new FrontendException(msg, errCode, PigException.BUG);
+        }
         if(twoLevelAccessRequired) {
             // this is the case where "this" schema is that of
             // a bag which has just one tuple fieldschema which
@@ -1106,7 +1180,7 @@ public class Schema implements Serializa
             // which is that of a tuple
             if(mFields.size() != 1) {
                 int errCode = 1008;
-               String msg = "Expected a bag schema with a single " +
+                String msg = "Expected a bag schema with a single " +
                 "element of type "+ DataType.findTypeName(DataType.TUPLE) +
                 " but got a bag schema with multiple elements.";
                 throw new FrontendException(msg, errCode, PigException.INPUT);
@@ -1114,10 +1188,10 @@ public class Schema implements Serializa
             Schema.FieldSchema tupleFS = mFields.get(0);
             if(tupleFS.type != DataType.TUPLE) {
                 int errCode = 1009;
-               String msg = "Expected a bag schema with a single " +
-                       "element of type "+ 
DataType.findTypeName(DataType.TUPLE) +
-                       " but got an element of type " +
-                       DataType.findTypeName(tupleFS.type);
+                String msg = "Expected a bag schema with a single " +
+                        "element of type "+ 
DataType.findTypeName(DataType.TUPLE) +
+                        " but got an element of type " +
+                        DataType.findTypeName(tupleFS.type);
                 throw new FrontendException(msg, errCode, PigException.INPUT);
             }
             
@@ -1127,16 +1201,16 @@ public class Schema implements Serializa
             // in the tuple
             if(alias.equals(tupleFS.alias)) {
                 int errCode = 1028;
-               String msg = "Access to the tuple ("+ alias + ") of " +
-                       "the bag is disallowed. Only access to the elements of 
" +
-                       "the tuple in the bag is allowed.";
+                String msg = "Access to the tuple ("+ alias + ") of " +
+                        "the bag is disallowed. Only access to the elements of 
" +
+                        "the tuple in the bag is allowed.";
                 throw new FrontendException(msg, errCode, PigException.INPUT);
             }
             
             // all is good - get the position from the tuple's schema
             return tupleFS.schema.getPosition(alias);
         } else {
-            FieldSchema fs = getField(alias);
+            FieldSchema fs = isSubNameMatch ? getFieldSubNameMatch(alias) : 
getField(alias);
     
             if (null == fs) {
                 return -1;
@@ -1379,6 +1453,10 @@ public class Schema implements Serializa
                                boolean allowDifferentSizeMerge,
                                boolean allowIncompatibleTypes)
                                     throws SchemaMergeException {
+        if(schema == null && other == null){
+            //if both are null, they are not incompatible
+            return null;
+        }
         if (schema == null) {
             if (allowIncompatibleTypes) {
                 return null ;
@@ -1454,20 +1532,13 @@ public class Schema implements Serializa
             }
             else {
                 // merge inner tuple because both sides are tuples
+                //if inner schema are incompatible and 
allowIncompatibleTypes==true
+                // an exception is thrown by mergeSchema
                 Schema mergedSubSchema = mergeSchema(myFs.schema,
                                                      otherFs.schema,
                                                      otherTakesAliasPrecedence,
                                                      allowDifferentSizeMerge,
                                                      allowIncompatibleTypes) ;
-                // if they cannot be merged and we don't allow incompatible
-                // types, just return null meaning cannot merge
-                if ( (mergedSubSchema == null) &&
-                     (!allowIncompatibleTypes) ) {
-                    int errCode = 1032;
-                    String msg = "Incompatible inner schemas for merging 
schemas. "
-                        + " Field schema: " + myFs.schema + " Other field 
schema: " + otherFs.schema;
-                    throw new SchemaMergeException(msg, errCode, 
PigException.INPUT) ;
-                }
 
                 // create the merged field
                 // the mergedSubSchema can be true if allowIncompatibleTypes
@@ -1599,32 +1670,35 @@ public class Schema implements Serializa
             Schema schema2)
     throws SchemaMergeException{
         Schema mergedSchema = new Schema();
+        HashSet<FieldSchema> schema2colsAdded = new HashSet<FieldSchema>();
         // add/merge fields present in first schema 
         for(FieldSchema fs1 : schema1.getFields()){
             checkNullAlias(fs1, schema1);
-            
-            FieldSchema fs2 = 
getFieldThrowSchemaMergeException(schema2,fs1.alias);
+            FieldSchema fs2 = 
getFieldSubNameMatchThrowSchemaMergeException(schema2,fs1.alias);
+            if(fs2 != null){
+                schema2colsAdded.add(fs2);
+            }
             FieldSchema mergedFs = 
mergeFieldSchemaFirstLevelSameAlias(fs1,fs2);
             mergedSchema.add(mergedFs);
         }
-        
+
         //add schemas from 2nd schema, that are not already present in
         // merged schema
         for(FieldSchema fs2 : schema2.getFields()){
             checkNullAlias(fs2, schema2);
-            if(getFieldThrowSchemaMergeException(mergedSchema, fs2.alias) == 
null){
-                    try {
-                        mergedSchema.add(fs2.clone());
-                    } catch (CloneNotSupportedException e) {
-                        throw new SchemaMergeException(
-                                "Error encountered while merging schemas", e);
-                    }
+            if(! schema2colsAdded.contains(fs2)){
+                try {
+                    mergedSchema.add(fs2.clone());
+                } catch (CloneNotSupportedException e) {
+                    throw new SchemaMergeException(
+                            "Error encountered while merging schemas", e);
+                }
             }
         }
         return mergedSchema;
-        
+
     }
-    
+
     private static void checkNullAlias(FieldSchema fs, Schema schema)
     throws SchemaMergeException {
         if(fs.alias == null){
@@ -1655,6 +1729,8 @@ public class Schema implements Serializa
 
         Schema innerSchema = null;
         
+        String alias = mergeNameSpacedAlias(fs1.alias, fs2.alias);
+        
         byte mergedType = DataType.mergeType(fs1.type, fs2.type) ;
 
         // If the types cannot be merged
@@ -1684,7 +1760,7 @@ public class Schema implements Serializa
             }
         }
         try {
-            return new FieldSchema(fs1.alias, innerSchema, mergedType) ;
+            return new FieldSchema(alias, innerSchema, mergedType) ;
         } catch (FrontendException e) {
             // this exception is not expected
             int errCode = 2124;
@@ -1698,6 +1774,29 @@ public class Schema implements Serializa
     
     
     /**
+     * If one of the aliases is of form 'nm::str1', and other is of the form
+     * 'str1', this returns str1
+     * @param alias1
+     * @param alias2
+     * @return merged alias
+     * @throws SchemaMergeException
+     */
+    private static String mergeNameSpacedAlias(String alias1, String alias2)
+    throws SchemaMergeException {
+        if(alias1.equals(alias2)){
+            return alias1;
+        }
+        if(alias1.endsWith("::" + alias2)){
+            return alias2;
+        }
+        if(alias2.endsWith("::" + alias1)){
+            return alias1;
+        }
+        //the aliases are different, alias cannot be merged
+        return null;
+    }
+
+    /**
      * Utility function that calls schema.getFiled(alias), and converts 
      * {...@link FrontendException} to {...@link SchemaMergeException}
      * @param schema
@@ -1705,13 +1804,13 @@ public class Schema implements Serializa
      * @return FieldSchema
      * @throws SchemaMergeException
      */
-    private static FieldSchema getFieldThrowSchemaMergeException(
+    private static FieldSchema getFieldSubNameMatchThrowSchemaMergeException(
             Schema schema, String alias) throws SchemaMergeException {
         FieldSchema fs = null;
         try {
-            fs = schema.getField(alias);
+            fs = schema.getFieldSubNameMatch(alias);
         } catch (FrontendException e) {
-            String msg = "Caught exception finding FieldSchema for alias" +
+            String msg = "Caught exception finding FieldSchema for alias " +
             alias;
             throw new SchemaMergeException(msg, e);
         }

Modified: 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java?rev=997957&r1=997956&r2=997957&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java 
(original)
+++ 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestUnionOnSchema.java 
Fri Sep 17 00:35:44 2010
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.test;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 import java.io.File;
@@ -28,13 +29,17 @@ import java.util.List;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.pig.EvalFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -127,7 +132,148 @@ public class TestUnionOnSchema  {
 
     }
     
+    /**
+     * Test UNION ONSCHEMA where a common column has additional 'namespace' 
part
+     *  in the column name in one of the inputs
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testUnionOnSchemaScopedColumnName() throws IOException, 
ParseException {
+        PigServer pig = new PigServer(ExecType.LOCAL);
+        String query_prefix = 
+        "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); " 
+        + "g = group l1 by i; "
+        + "f = foreach g generate flatten(l1); "
+        + "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); ";
+
+        String query = query_prefix + "u = union onschema f, l2; " ; 
+        Util.registerMultiLineQuery(pig, query);
+        Schema sch = pig.dumpSchema("u");
+        Schema expectedSch = Util.getSchemaFromString("i: int, j: int");
+        assertEquals("Checking expected schema",sch, expectedSch);
+        Iterator<Tuple> it = pig.openIterator("u");
+
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "(1,2)",
+                            "(5,3)",
+                            "(1,2)",
+                            "(5,3)"
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+        
+        // now try reversing the order of relation
+        query = query_prefix + "u = union onschema l2, f; " ; 
+        Util.registerMultiLineQuery(pig, query);
+        sch = pig.dumpSchema("u");
+        expectedSch = Util.getSchemaFromString("i: int, j: int");
+        assertEquals("Checking expected schema",sch, expectedSch);
+        it = pig.openIterator("u");
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+    }
     
+    /**
+     * Test UNION ONSCHEMA where a common column has additional 'namespace' 
part
+     *  in the column name in both the inputs
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testUnionOnSchemaScopedColumnNameBothInp1() throws 
IOException, ParseException {
+        PigServer pig = new PigServer(ExecType.LOCAL);
+        String query = 
+        "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); " 
+        + "g1 = group l1 by i; "
+        + "f1 = foreach g1 generate group as gkey, flatten(l1); "
+        + "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, x : chararray); " 
+        + "g2 = group l2 by i; "
+        + "f2 = foreach g2 generate group as gkey, flatten(l2); "
+        + "u = union onschema f1, f2; " ; 
+        Util.registerMultiLineQuery(pig, query);
+        
+        Schema sch = pig.dumpSchema("u");
+        Schema expectedSch = 
+            Util.getSchemaFromString("gkey: int, l1::i: int, l1::j: int, 
l2::i: int, l2::x: chararray");
+        assertEquals("Checking expected schema",sch, expectedSch);
+
+        Iterator<Tuple> it = pig.openIterator("u");
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "(1,1,2,null,null)",
+                            "(5,5,3,null,null)",
+                            "(1,null,null,1,'2')",
+                            "(5,null,null,5,'3')"
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+    }
+    
+    /**
+     * Test UNION ONSCHEMA where a common column has additional 'namespace' 
part
+     *  in the column name in both the inputs
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testUnionOnSchemaScopedColumnNameBothInp2() throws 
IOException, ParseException {
+        PigServer pig = new PigServer(ExecType.LOCAL);
+        String query =
+            "   l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); " 
+            + " l2 = load '" + INP_FILE_2NUMS + "' as (i : int, x : 
chararray); " 
+            + " cg1 = cogroup l1 by i, l2 by i; "
+            + " f1 = foreach cg1 generate group as gkey, flatten(l1), 
flatten(l2); "
+            + " cg2 = cogroup l2 by i, l1 by i; "
+            + " f2 = foreach cg1 generate group as gkey, flatten(l2), 
flatten(l1); "
+            + "u = union onschema f1, f2; " ; 
+        Util.registerMultiLineQuery(pig, query);
+                
+        Schema sch = pig.dumpSchema("u");
+        Schema expectedSch = 
+            Util.getSchemaFromString("gkey: int, l1::i: int, l1::j: int, 
l2::i: int, l2::x: chararray");
+        assertEquals("Checking expected schema",sch, expectedSch);
+
+        Iterator<Tuple> it = pig.openIterator("u");
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "(1,1,2,1,'2')",
+                            "(5,5,3,5,'3')",
+                            "(1,1,2,1,'2')",
+                            "(5,5,3,5,'3')",
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+        
+    }
+    
+    /**
+     * Test UNION ONSCHEMA where a common column has additional 'namespace' 
part
+     *  in the column name in one of the inputs.
+     *  Negative test case
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testUnionOnSchemaScopedColumnNameNeg() throws IOException, 
ParseException {
+        
+        String expectedErr = "Found more than one match: l1::i, l2::i";
+        String query_prefix =
+            "  l1 = load '/tmp/fn' as (i : int, j : long); "
+            + "l2 = load '/tmp/fn' as (i : int, j : long); "
+            + "cg = cogroup l1 by i, l2 by i;"
+            + "f = foreach cg generate flatten(l1), flatten(l2); "
+            + "l3 = load '/tmp/fn2' as (i : int, j : long); "
+            ;
+        String query = query_prefix +  "u = union onschema f, l3; ";
+        checkSchemaEx(query, expectedErr);
+
+        // now try reversing the order of relation
+        query = query_prefix +  "u = union onschema l3, f; ";
+        checkSchemaEx(query, expectedErr);
+
+    }
 
     /**
      * Test UNION ONSCHEMA on two inputs with same column names, but different
@@ -423,9 +569,108 @@ public class TestUnionOnSchema  {
     }
     
     
+    /**
+     * Test UNION ONSCHEMA with input relation having udfs
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testUnionOnSchemaInputUdfs() throws IOException, 
ParseException {
+        PigServer pig = new PigServer(ExecType.LOCAL);
+        String query =
+            "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : chararray);"
+            + "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, j : chararray);"
+            + "f1 = foreach l1 generate i, CONCAT(j,j) as cj, " +
+                       
"org.apache.pig.test.TestUnionOnSchema\\$UDFTupleNullSchema(i,j) as uo;"
+            + "u = union onschema f1, l2;"
+        ; 
+        Util.registerMultiLineQuery(pig, query);
+
+        Schema sch = pig.dumpSchema("u");
+        Schema expectedSch = 
+            Util.getSchemaFromString("i: int, cj: chararray, uo: Tuple(), j: 
chararray");
+        assertEquals("Checking expected schema",sch, expectedSch);
+        
 
+        Iterator<Tuple> it = pig.openIterator("u");
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "(1,null,null,'2')",
+                            "(5,null,null,'3')",
+                            "(1,'22',(1,'2'),null)",
+                            "(5,'33',(5,'3'),null)"
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+    }
     
+
+    /**
+     * Udf that has schema of tuple column with no inner schema 
+     */
+    public static class UDFTupleNullSchema extends EvalFunc <Tuple> {
+        public Tuple exec(Tuple input) {
+            return input;
+        }
+        
+        @Override
+        public Schema outputSchema(Schema input) {
+            FieldSchema fs =
+                new Schema.FieldSchema(getSchemaName("UDFTupleNullSchema", 
input),
+                        DataType.TUPLE);
+                return new Schema(fs);
+        }
+
+    }
     
+    /**
+     * Test UNION ONSCHEMA with input relation having column names with 
multiple
+     * level of namespace in their names
+     * @throws IOException
+     * @throws ParseException
+     */
+    @Test
+    public void testUnionOnSchemaScopeMulti() throws IOException, 
ParseException {
+        PigServer pig = new PigServer(ExecType.LOCAL);
+        String query_prefix =
+            "  a = load '" + INP_FILE_2NUMS+ "' as (i:int, j:int); "
+            + "b = group a by i; "
+            + "c = foreach b generate group as gp, flatten(a); "
+            + "d = group c by $0; "
+            + "e = foreach d generate group as gp, flatten(c); "
+            + "f = load  '" + INP_FILE_2NUMS + "' as (i, j); "
+            ;
+        String query = query_prefix + "u = union onschema e,f;";
+        Util.registerMultiLineQuery(pig, query);
+
+        Schema sch = pig.dumpSchema("u");
+        Schema expectedSch = 
+            Util.getSchemaFromString("gp: int,c::gp: int,i: int,j: int");
+        assertEquals("Checking expected schema",sch, expectedSch);
+        
+        
+        query = query_prefix + "u = union onschema f,e;";
+        Util.registerMultiLineQuery(pig, query);
+
+        sch = pig.dumpSchema("u");
+        expectedSch = 
+            Util.getSchemaFromString("i: int,j: int, gp: int,c::gp: int");
+        assertEquals("Checking expected schema",sch, expectedSch);
+        
+        
+        Iterator<Tuple> it = pig.openIterator("u");
+        List<Tuple> expectedRes = 
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                            "(1,2,null,null)",
+                            "(5,3,null,null)",
+                            "(1,2,1,1)",
+                            "(5,3,5,5)",
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+    }
     
     
 }


Reply via email to