Author: thejas
Date: Wed Aug  4 22:24:58 2010
New Revision: 982420

URL: http://svn.apache.org/viewvc?rev=982420&view=rev
Log:
PIG-1461: support union operation that merges based on column names

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
    hadoop/pig/trunk/test/org/apache/pig/test/Util.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=982420&r1=982419&r2=982420&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Aug  4 22:24:58 2010
@@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu
 
 IMPROVEMENTS
 
+PIG-1461: support union operation that merges based on column names (thejas)
+
 PIG-1517: Pig needs to support keywords in the package name (aniket486 via 
olgan)
 
 PIG-928: UDFs in scripting languages (aniket486 via daijy)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=982420&r1=982419&r2=982420&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
(original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
Wed Aug  4 22:24:58 2010
@@ -999,6 +999,7 @@ TOKEN : { <BY : "by">       }
 TOKEN : { <USING : "using"> }
 TOKEN : { <INNER : "inner"> }
 TOKEN : { <OUTER : "outer"> }
+TOKEN : { <ONSCHEMA : "ONSCHEMA"> }
 TOKEN : { <STAR : "*">                 }
 TOKEN : { <PARALLEL : "parallel"> }
 TOKEN : { <PARTITION : "partition"> }
@@ -2266,25 +2267,155 @@ LogicalOperator JoinClause(LogicalPlan l
 LogicalOperator UnionClause(LogicalPlan lp) : 
 {
        LogicalOperator op;
+       boolean isOnSchema = false;
        ArrayList<LogicalOperator> inputs = new ArrayList<LogicalOperator>(); 
        log.trace("Entering UnionClause");
 }
 {
-       (op = NestedExpr(lp){inputs.add(op);} 
+    [<ONSCHEMA> {isOnSchema = true;}]
+       (op = NestedExpr(lp){inputs.add(op);}
        ("," op = NestedExpr(lp) {inputs.add(op);})+)
        {
-               LogicalOperator union = new LOUnion(lp, new OperatorKey(scope, 
getNextId()));
-               lp.add(union);
-               log.debug("Added operator " + union.getClass().getName() + " to 
the logical plan");
-               
-               for (LogicalOperator lop: inputs) {
-                       lp.connect(lop, union);
-                       log.debug("Connected union input operator " + 
lop.getClass().getName() + " to operator " + lop.getClass().getName() + " in 
the logical plan");
-               }               
-               
-               log.trace("Exiting UnionClause");
-               return union;
+            try{// this try-catch block will catch all exceptions and convert 
them
+                // to ParseException. Otherwise, if any exception than 
ParseException
+                // is thrown , the generated parse code tries to cast
+                //the exception to Error, resulting in a misleading error 
message 
+                LogicalOperator union = new LOUnion(lp, new OperatorKey(scope, 
getNextId()));
+                lp.add(union);
+                log.debug("Added operator " + union.getClass().getName() + " 
to the logical plan");
+
+                if(isOnSchema)             
+                {  // this is UNION ONSCHEMA, find merged schema.
+                    ArrayList<Schema> schemas = new 
ArrayList<Schema>(inputs.size());
+                    for(LogicalOperator lop : inputs){
+                        Schema sch = lop.getSchema();
+                        if(sch == null)                     
+                        {                         
+                            String msg = "Schema of relation " + lop.getAlias()
+                            + " is null." 
+                            + " UNION ONSCHEMA cannot be used with relations 
that"
+                            + " have null schema.";
+                            throw new ParseException(msg);
+                        }
+                        schemas.add(sch);
+                    }
+                    Schema mergedSchema ;
+                    try {
+                        mergedSchema = Schema.mergeSchemasByAlias(schemas);   
+                    }catch(SchemaMergeException e)                 {
+                        String msg = "Error merging schemas for union 
operator";
+                        ParseException pe = new ParseException(msg);
+                        pe.initCause(e);
+                        throw pe;                 
+                    }
+
+                    // add a foreach for inputs that don't match mergedSchema, 
projecting
+                    // null for columns that don't exist in the input
+                    ArrayList<LogicalOperator> newInputs =
+                        new ArrayList<LogicalOperator>(inputs.size());
+                    for(LogicalOperator lop : inputs)                 
+                    {                     
+                        if(! lop.getSchema().equals(mergedSchema))
+                        {
+                            //the mergedSchema is different from this 
operators schema
+                            // so add a foreach to project columns 
appropriately
+                            int mergeSchSz = mergedSchema.size();
+                            ArrayList<LogicalPlan> generatePlans =
+                                new ArrayList<LogicalPlan>(mergeSchSz);
+                            ArrayList<Boolean> flattenList =
+                                new ArrayList<Boolean>(mergeSchSz);
+
+                            for(Schema.FieldSchema fs : 
mergedSchema.getFields()) { 
+                                LogicalPlan projectPlan = new LogicalPlan();
+                                Schema inpSchema = lop.getSchema();
+                                flattenList.add(Boolean.FALSE);
+
+                                int inpPos = inpSchema.getPosition(fs.alias);
+
+                                LogicalOperator columnProj = null;
+                                boolean isCastNeeded = false;
+                                if(inpPos == -1){   
+                                    //the column is not present in schema of 
this input,
+                                    // so project null
+                                    columnProj =
+                                        new LOConst(lp,
+                                                new OperatorKey(scope, 
getNextId()),
+                                                null
+                                        );
+                                    // cast is necessary if the type in schema 
is
+                                    // not a BYTEARRAY
+                                    if(fs.type != DataType.BYTEARRAY){
+                                        isCastNeeded = true;
+                                    }
+                                }else {
+                                    //project the column from input
+                                    columnProj = 
+                                        new LOProject(projectPlan,
+                                                new OperatorKey(scope, 
getNextId()),
+                                                lop, inpPos
+                                        );
+
+                                    //cast is needed if types are different.   
 
+                                    //compatibility of types has already been 
checked
+                                    //during creation of mergedSchema
+                                    Schema.FieldSchema inpFs = 
inpSchema.getField(fs.alias);
+                                    if(inpFs.type != fs.type)
+                                        isCastNeeded = true;
+                                }
+                                projectPlan.add(columnProj);
+
+                                //add a LOCast if necessary
+                                if(isCastNeeded){
+                                    LOCast loCast = new LOCast(projectPlan,
+                                            new OperatorKey(scope, 
getNextId()),
+                                            fs.type
+                                    );
+                                    loCast.setFieldSchema(fs);
+                                    projectPlan.add(loCast);
+                                    projectPlan.connect(columnProj, loCast);
+                                }
+                                generatePlans.add(projectPlan);
+
+                            }
+                            LogicalOperator foreach = new LOForEach(lp,
+                                    new OperatorKey(scope, getNextId()),
+                                    generatePlans, flattenList
+                            );
+                            lp.add(foreach);
+                            lp.connect(lop, foreach);
+                            newInputs.add(foreach);
+                        }else {
+                            // schema of input is same as mergedSchema,
+                            //no additional foreach is required
+                            newInputs.add(lop);                     
+                        }
+
+                    }
+                    // use newInputs as the inputs for union
+                    inputs = newInputs;     
+                }
+
+                for (LogicalOperator lop: inputs) {
+                    lp.connect(lop, union);
+                    log.debug("Connected union input operator " +
+                            lop.getClass().getName() + " to operator " +
+                            lop.getClass().getName() + " in the logical plan");
+                }              
+
+                log.trace("Exiting UnionClause");
+                return union;
+            }
+            catch(ParseException e){
+                // its already a ParseException, just throw it.
+                throw e;
+            }
+            catch(Exception e){
+                ParseException pe = new ParseException();
+                pe.initCause(e);
+                throw pe;           
+            }
        }
+
 }
 
     LogicalOperator ForEachClause(LogicalPlan lp) : 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=982420&r1=982419&r2=982420&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java 
Wed Aug  4 22:24:58 2010
@@ -1467,6 +1467,177 @@ public class Schema implements Serializa
     }
     
     /**
+     * Merges collection of schemas using their column aliases 
+     * (unlike mergeSchema(..) functions which merge using positions)
+     * Schema will not be merged if types are incompatible, 
+     * as per DataType.mergeType(..)
+     * For Tuples and Bags, SubSchemas have to be equal be considered 
compatible
+     * @param schemas - list of schemas to be merged using their column alias
+     * @return merged schema
+     * @throws SchemaMergeException
+     */
+    public static Schema mergeSchemasByAlias(Collection<Schema> schemas)
+    throws SchemaMergeException{
+        Schema mergedSchema = null;
+
+        // list of schemas that have currently been merged, used in error 
message
+        ArrayList<Schema> mergedSchemas = new 
ArrayList<Schema>(schemas.size());
+        for(Schema sch : schemas){
+            if(mergedSchema == null){
+                mergedSchema = new Schema(sch);
+                mergedSchemas.add(sch);
+                continue;
+            }
+            try{
+                mergedSchema = mergeSchemaByAlias(mergedSchema, sch);
+                mergedSchemas.add(sch);
+            }catch(SchemaMergeException e){
+                String msg = "Error merging schema: ("  + sch + ") with " 
+                + "merged schema: (" + mergedSchema + ")" + " of schemas : "
+                + mergedSchemas;
+                throw new SchemaMergeException(msg, e);
+            }
+        }
+        return mergedSchema;
+    }
+    
+    /**
+     * Merges two schemas using their column aliases 
+     * (unlike mergeSchema(..) functions which merge using positions)
+     * Schema will not be merged if types are incompatible, 
+     * as per DataType.mergeType(..)
+     * For Tuples and Bags, SubSchemas have to be equal be considered 
compatible
+     * @param schema1
+     * @param schema2
+     * @return Merged Schema
+     * @throws SchemaMergeException if schemas cannot be merged
+     */
+    public static Schema mergeSchemaByAlias(Schema schema1,
+            Schema schema2)
+    throws SchemaMergeException{
+        Schema mergedSchema = new Schema();
+        // add/merge fields present in first schema 
+        for(FieldSchema fs1 : schema1.getFields()){
+            checkNullAlias(fs1, schema1);
+            
+            FieldSchema fs2 = 
getFieldThrowSchemaMergeException(schema2,fs1.alias);
+            FieldSchema mergedFs = 
mergeFieldSchemaFirstLevelSameAlias(fs1,fs2);
+            mergedSchema.add(mergedFs);
+        }
+        
+        //add schemas from 2nd schema, that are not already present in
+        // merged schema
+        for(FieldSchema fs2 : schema2.getFields()){
+            checkNullAlias(fs2, schema2);
+            if(getFieldThrowSchemaMergeException(mergedSchema, fs2.alias) == 
null){
+                    try {
+                        mergedSchema.add(fs2.clone());
+                    } catch (CloneNotSupportedException e) {
+                        throw new SchemaMergeException(
+                                "Error encountered while merging schemas", e);
+                    }
+            }
+        }
+        return mergedSchema;
+        
+    }
+    
+    private static void checkNullAlias(FieldSchema fs, Schema schema)
+    throws SchemaMergeException {
+        if(fs.alias == null){
+            throw new SchemaMergeException(
+                    "Schema having field with null alias cannot be merged " +
+                    "using alias. Schema :" + schema
+            );
+        }
+    }
+
+    /**
+     * Schema will not be merged if types are incompatible, 
+     * as per DataType.mergeType(..)
+     * For Tuples and Bags, SubSchemas have to be equal be considered 
compatible
+     * Aliases are assumed to be same for both
+     * @param fs1
+     * @param fs2
+     * @return
+     * @throws SchemaMergeException
+     */
+    private static FieldSchema mergeFieldSchemaFirstLevelSameAlias(FieldSchema 
fs1,
+            FieldSchema fs2) 
+    throws SchemaMergeException {
+        if(fs1 == null)
+            return fs2;
+        if(fs2 == null)
+            return fs1;
+
+        Schema innerSchema = null;
+        
+        byte mergedType = DataType.mergeType(fs1.type, fs2.type) ;
+
+        // If the types cannot be merged
+        if (mergedType == DataType.ERROR) {
+                int errCode = 1031;
+                String msg = "Incompatible types for merging schemas. Field 
schema: "
+                    + fs1 + " Other field schema: " + fs2;
+                throw new SchemaMergeException(msg, errCode, 
PigException.INPUT) ;
+        }
+        if(DataType.isSchemaType(mergedType)) {
+            // if one of them is a bytearray, pick inner schema of other one
+            if( fs1.type == DataType.BYTEARRAY ){
+                innerSchema = fs2.schema;
+            }else if(fs2.type == DataType.BYTEARRAY){
+                innerSchema = fs1.schema;
+            }
+            else {
+                //in case of types with inner schema such as bags and tuples
+                // the inner schema has to be same
+                if(!equals(fs1.schema, fs2.schema, false, false)){
+                    int errCode = 1032;
+                    String msg = "Incompatible types for merging inner schemas 
of " +
+                    " Field schema type: " + fs1 + " Other field schema type: 
" + fs2;
+                    throw new SchemaMergeException(msg, errCode, 
PigException.INPUT) ;                
+                }
+                innerSchema = fs1.schema;
+            }
+        }
+        try {
+            return new FieldSchema(fs1.alias, innerSchema, mergedType) ;
+        } catch (FrontendException e) {
+            // this exception is not expected
+            int errCode = 2124;
+            throw new SchemaMergeException(
+                    "Error in creating fieldSchema",
+                    errCode,
+                    PigException.BUG
+            );
+        }
+    }
+    
+    
+    /**
+     * Utility function that calls schema.getFiled(alias), and converts 
+     * {...@link FrontendException} to {...@link SchemaMergeException}
+     * @param schema
+     * @param alias
+     * @return FieldSchema
+     * @throws SchemaMergeException
+     */
+    private static FieldSchema getFieldThrowSchemaMergeException(
+            Schema schema, String alias) throws SchemaMergeException {
+        FieldSchema fs = null;
+        try {
+            fs = schema.getField(alias);
+        } catch (FrontendException e) {
+            String msg = "Caught exception finding FieldSchema for alias" +
+            alias;
+            throw new SchemaMergeException(msg, e);
+        }
+        return fs;
+    }
+    
+    
+    
+    /**
      * 
      * @param topLevelType DataType type of the top level element
      * @param innerTypes DataType types of the inner level element

Modified: hadoop/pig/trunk/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/Util.java?rev=982420&r1=982419&r2=982420&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/Util.java Wed Aug  4 22:24:58 2010
@@ -37,6 +37,8 @@ import java.io.PrintWriter;
 import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -356,6 +358,27 @@ public class Util {
         }
     }
 
+    /**
+     * Helper function to check if the result of a Pig Query is in line with 
+     * expected results. It sorts actual and expected results before comparison
+     * 
+     * @param actualResultsIt Result of the executed Pig query
+     * @param expectedResList Expected results to validate against
+     */
+     static public void checkQueryOutputsAfterSort(Iterator<Tuple> 
actualResultsIt, 
+                                     List<Tuple> expectedResList) {
+         List<Tuple> actualResList = new ArrayList<Tuple>();
+         while(actualResultsIt.hasNext()){
+             actualResList.add(actualResultsIt.next());
+         }
+         
+         Collections.sort(actualResList);
+         Collections.sort(expectedResList);
+
+         Assert.assertEquals("Comparing actual and expected results. ",
+                 expectedResList, actualResList);
+     }
+     
        /**
         * Utility method to copy a file form local filesystem to the dfs on
         * the minicluster for testing in mapreduce mode


Reply via email to