Author: gates
Date: Thu May 22 10:12:56 2008
New Revision: 659161

URL: http://svn.apache.org/viewvc?rev=659161&view=rev
Log:
PIG-143 Pi's latest type checker checkin.


Added:
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/SchemaMergeException.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestTypeChecking.java
    
incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java
    
incubator/pig/branches/types/test/org/apache/pig/test/utils/TypeCheckingTestUtil.java
Modified:
    incubator/pig/branches/types/build.xml
    incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestSchema.java
    
incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java

Modified: incubator/pig/branches/types/build.xml
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=659161&r1=659160&r2=659161&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Thu May 22 10:12:56 2008
@@ -277,6 +277,8 @@
                     <exclude name="**/TestPigSplit.java" />
                     <exclude name="**/TestStoreOld.java" />
                     <!-- Excluced because we don't want to run them -->
+                    <exclude name="**/TypeCheckingTestUtil.java" />
+                    <exclude name="**/TypeGraphPrinter.java" />
                     <exclude name="**/TestHelper.java" />
                     <exclude name="**/TestLargeFile.java" />
                     <exclude name="**/TestOrderBy.java" />

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataType.java?rev=659161&r1=659160&r2=659161&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataType.java 
(original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataType.java Thu May 
22 10:12:56 2008
@@ -424,10 +424,13 @@
      * float, or boolean.
      */
     public static boolean isAtomic(byte dataType) {
-        return ((dataType == BYTEARRAY) || (dataType == CHARARRAY) ||
-            (dataType == INTEGER) || (dataType == LONG) || 
-            (dataType == FLOAT) || (dataType == DOUBLE) ||
-            (dataType == FLOAT));
+        return ((dataType == BYTEARRAY) ||
+                (dataType == CHARARRAY) ||
+                (dataType == INTEGER) ||
+                (dataType == LONG) ||
+                (dataType == FLOAT) ||
+                (dataType == DOUBLE) ||
+                (dataType == BOOLEAN));
     }
 
     /**
@@ -822,4 +825,45 @@
             default :return true ;
         }
     }
+
+        /***
+     * Merge types if possible
+     * @param type1
+     * @param type2
+     * @return the merged type, or DataType.ERROR if not successful
+     */
+    public static byte mergeType(byte type1, byte type2) {
+        // Only legal types can be merged
+        if ( (!DataType.isUsableType(type1)) ||
+             (!DataType.isUsableType(type2)) ) {
+            return DataType.ERROR ;
+        }
+
+        // Same type is OK
+        if (type1==type2) {
+            return type1 ;
+        }
+
+        // Both are number so we return the bigger type
+        if ( (DataType.isNumberType(type1)) &&
+             (DataType.isNumberType(type2)) ) {
+            return type1>type2 ? type1:type2 ;
+        }
+
+        // One is bytearray and the other is (number or chararray)
+        if ( (type1 == DataType.BYTEARRAY) &&
+                ( (type2 == DataType.CHARARRAY) || 
(DataType.isNumberType(type2)) )
+              ) {
+            return type2 ;
+        }
+
+        if ( (type2 == DataType.BYTEARRAY) &&
+                ( (type1 == DataType.CHARARRAY) || 
(DataType.isNumberType(type1)) )
+              ) {
+            return type1 ;
+        }
+
+        // else return just ERROR
+        return DataType.ERROR ;
+    }
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=659161&r1=659160&r2=659161&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
 Thu May 22 10:12:56 2008
@@ -29,7 +29,6 @@
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.impl.plan.MultiMap;
@@ -262,6 +261,14 @@
         return mSchema;
     }
 
+    public boolean isTupleGroupCol() {
+        if (mInputs == null || mInputs.size() == 0) {
+            throw new AssertionError("COGroup.isTupleGroupCol() can be called "
+                                     + "after it has an input only") ;
+        }
+        return mGroupByPlans.get(mInputs.get(0)).size() > 1 ;
+    }
+
     @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java?rev=659161&r1=659160&r2=659161&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java 
(original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java 
Thu May 22 10:12:56 2008
@@ -183,4 +183,9 @@
         v.visit(this);
     }
 
+    @Override
+    public byte getType() {
+        return DataType.BAG ;
+    }
+
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=659161&r1=659160&r2=659161&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
 Thu May 22 10:12:56 2008
@@ -23,6 +23,7 @@
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.data.DataType;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -107,4 +108,7 @@
         v.visit(this);
     }
 
+    public byte getType() {
+        return DataType.BAG ;
+    }
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=659161&r1=659160&r2=659161&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java 
(original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java 
Thu May 22 10:12:56 2008
@@ -129,6 +129,10 @@
         return mEnforcedSchema;
     }
 
+    /***
+     * Set this when user enforces schema
+     * @param enforcedSchema
+     */
     public void setEnforcedSchema(Schema enforcedSchema) {
         this.mEnforcedSchema = enforcedSchema;
     }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java?rev=659161&r1=659160&r2=659161&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
 Thu May 22 10:12:56 2008
@@ -125,6 +125,7 @@
 
     public int getCol() {
         if (mProjection.size() != 1)
+            
             throw new RuntimeException(
                     "Internal error: improper use of getCol in "
                             + LOProject.class.getName());
@@ -281,9 +282,67 @@
         return mFieldSchema;
     }
 
+    public boolean isSingleProjection()  {
+        return mProjection.size() == 1 ;
+    }
+
     @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }
 
+    @Override
+    public byte getType() {                         
+        // Called to make sure we've constructed the field schema before trying
+        // to read it.
+        try {
+            getFieldSchema();
+        } catch (FrontendException fe) {
+            return DataType.UNKNOWN;
+        }
+
+        if (mFieldSchema != null){
+            return mFieldSchema.type ;
+        }
+        else {
+            return DataType.UNKNOWN ;
+        }
+    }
+
+    @Override
+    public Schema getSchema() throws FrontendException{
+        // Called to make sure we've constructed the field schema before trying
+        // to read it.
+        getFieldSchema();
+        if (mFieldSchema != null){
+            return mFieldSchema.schema ;
+        }
+        else {
+            return null ;
+        }
+    }
+
+    /* For debugging only */
+    public String toDetailString() {
+        StringBuilder sb = new StringBuilder() ;
+        sb.append("LOProject") ;
+        sb.append(" Id=" + this.mKey.id) ;
+        sb.append(" Projection=") ;
+        boolean isFirst = true ;
+        for(int i=0;i< mProjection.size();i++) {
+            if (isFirst) {
+                isFirst = false ;
+            }
+            else {
+                sb.append(",") ;
+            }
+            sb.append(mProjection.get(i)) ;
+        }
+        sb.append(" isStart=") ;
+        sb.append(mIsStar) ;
+        sb.append(" isSentinel=") ;
+        sb.append(mSentinel) ;
+        return sb.toString() ;
+    }
+
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=659161&r1=659160&r2=659161&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java 
(original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java 
Thu May 22 10:12:56 2008
@@ -26,6 +26,7 @@
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.data.DataType;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -127,4 +128,8 @@
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }
+
+    public byte getType() {
+        return DataType.BAG ;
+    }
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java?rev=659161&r1=659160&r2=659161&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java 
(original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java 
Thu May 22 10:12:56 2008
@@ -27,6 +27,7 @@
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.data.DataType;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -114,4 +115,10 @@
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }
+
+    @Override
+    public byte getType() {
+        return DataType.BAG;
+    }
+    
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java?rev=659161&r1=659160&r2=659161&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java
 Thu May 22 10:12:56 2008
@@ -26,6 +26,7 @@
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.data.DataType;
 
 
 public class LOSplitOutput extends LogicalOperator {
@@ -97,4 +98,8 @@
     public int getReadFrom() {
         return mIndex;
     }
+
+    public byte getType() {
+        return DataType.BAG ;
+    }
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=659161&r1=659160&r2=659161&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java 
(original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java 
Thu May 22 10:12:56 2008
@@ -24,6 +24,7 @@
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.data.DataType;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -105,4 +106,8 @@
         v.visit(this);
     }
 
+    public byte getType() {
+        return DataType.BAG;
+    }
+
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=659161&r1=659160&r2=659161&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
 Thu May 22 10:12:56 2008
@@ -140,6 +140,16 @@
     }
 
     /**
+     * Directly force the schema without reconcilation
+     * Please use with great care
+     * @param schema
+     */
+    public void forceSchema(Schema schema) {
+        this.mSchema = schema;
+    }
+
+
+    /**
      * Get a copy of the schema for the output of this operator.
      */
     public abstract Schema getSchema() throws FrontendException;

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java?rev=659161&r1=659160&r2=659161&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
 Thu May 22 10:12:56 2008
@@ -34,4 +34,16 @@
     public LogicalPlan() {
         super();
     }
+    public LogicalOperator getSingleLeafPlanOutputOp()  {
+        List<LogicalOperator> list = this.getLeaves() ;
+        if (list.size() != 1) {
+            throw new AssertionError("The plan has more than one leaf node") ;
+        }
+        return list.get(0) ;
+    }
+
+    public byte getSingleLeafPlanOutputType()  {
+        return getSingleLeafPlanOutputOp().getType() ;
+    }
+
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=659161&r1=659160&r2=659161&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
 Thu May 22 10:12:56 2008
@@ -24,7 +24,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.Collection;
-import java.io.IOException;
 
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
@@ -289,49 +288,55 @@
      * @throws ParseException if this cannot be reconciled.
      */
     public void reconcile(Schema other) throws ParseException {
-        if (other.size() != size()) {
-            throw new ParseException("Cannot reconcile schemas with different "
-                + "sizes.  This schema has size " + size() + " other has size "
-                + "of " + other.size());
-        }
-
-        Iterator<FieldSchema> i = other.mFields.iterator();
-        for (int j = 0; i.hasNext(); j++) {
-            FieldSchema otherFs = i.next();
-            FieldSchema ourFs = mFields.get(j);
-            log.debug("ourFs: " + ourFs + " otherFs: " + otherFs);
-            if (otherFs.alias != null) {
-                log.debug("otherFs.alias: " + otherFs.alias);
-                if (ourFs.alias != null) {
-                    log.debug("Removing ourFs.alias: " + ourFs.alias);
-                    mAliases.remove(ourFs.alias);
-                    Collection<String> aliases = mFieldSchemas.get(ourFs);
-                    List<String> listAliases = new ArrayList<String>();
-                    for(String alias: aliases) {
-                        listAliases.add(new String(alias));
+
+        if (other != null) {
+        
+            if (other.size() != size()) {
+                throw new ParseException("Cannot reconcile schemas with 
different "
+                    + "sizes.  This schema has size " + size() + " other has 
size "
+                    + "of " + other.size());
+            }
+
+            Iterator<FieldSchema> i = other.mFields.iterator();
+            for (int j = 0; i.hasNext(); j++) {
+                FieldSchema otherFs = i.next();
+                FieldSchema ourFs = mFields.get(j);
+                log.debug("ourFs: " + ourFs + " otherFs: " + otherFs);
+                if (otherFs.alias != null) {
+                    log.debug("otherFs.alias: " + otherFs.alias);
+                    if (ourFs.alias != null) {
+                        log.debug("Removing ourFs.alias: " + ourFs.alias);
+                        mAliases.remove(ourFs.alias);
+                        Collection<String> aliases = mFieldSchemas.get(ourFs);
+                        if (aliases != null) {
+                            List<String> listAliases = new ArrayList<String>();
+                            for(String alias: aliases) {
+                                listAliases.add(new String(alias));
+                            }
+                            for(String alias: listAliases) {
+                                log.debug("Removing alias " + alias + " from 
multimap");
+                                mFieldSchemas.remove(ourFs, alias);
+                            }
+                        }
                     }
-                    for(String alias: listAliases) {
-                        log.debug("Removing alias " + alias + " from 
multimap");
-                        mFieldSchemas.remove(ourFs, alias);
+                    ourFs.alias = otherFs.alias;
+                    log.debug("Setting alias to: " + otherFs.alias);
+                    mAliases.put(ourFs.alias, ourFs);
+                    if(null != ourFs.alias) {
+                        mFieldSchemas.put(ourFs, ourFs.alias);
                     }
                 }
-                ourFs.alias = otherFs.alias;
-                log.debug("Setting alias to: " + otherFs.alias);
-                mAliases.put(ourFs.alias, ourFs);
-                if(null != ourFs.alias) {
-                    mFieldSchemas.put(ourFs, ourFs.alias);
+                if (otherFs.type != DataType.UNKNOWN) {
+                    ourFs.type = otherFs.type;
+                    log.debug("Setting type to: "
+                            + DataType.findTypeName(otherFs.type));
+                }
+                if (otherFs.schema != null) {
+                    ourFs.schema = otherFs.schema;
+                    log.debug("Setting schema to: " + otherFs.schema);
                 }
-            }
-            if (otherFs.type != DataType.UNKNOWN) {
-                ourFs.type = otherFs.type;
-                log.debug("Setting type to: "
-                        + DataType.findTypeName(otherFs.type));
-            }
-            if (otherFs.schema != null) {
-                ourFs.schema = otherFs.schema;
-                log.debug("Setting schema to: " + otherFs.schema);
-            }
 
+            }
         }
     }
 
@@ -369,17 +374,80 @@
 
     @Override
     public String toString() {
-        StringBuilder sb = new StringBuilder("(");
-        boolean first = true;
-        for (FieldSchema fs : mFields) {
-            if (first) first = false;
-            else sb.append(", ");
-            sb.append(fs.toString());
+        StringBuilder sb = new StringBuilder();
+        try {
+            stringifySchema(sb, this, DataType.BAG) ;
+        }
+        catch (ParseException pe) {
+            throw new RuntimeException("PROBLEM PRINTING SCHEMA")  ;
         }
-        sb.append(")");
         return sb.toString();
     }
 
+
+    // This is used for building up output string
+    // type can only be BAG or TUPLE
+    public static void stringifySchema(StringBuilder sb,
+                                       Schema schema,
+                                       byte type)
+                                            throws ParseException{
+
+        if (type == DataType.TUPLE) {
+            sb.append("(") ;
+        }
+        else if (type == DataType.BAG) {
+            sb.append("{") ;
+        }
+        // TODO: Map Support
+
+        if (schema == null) {
+            sb.append("null") ;
+        }
+        else {
+            boolean isFirst = true ;
+            for (int i=0; i< schema.size() ;i++) {
+
+                if (!isFirst) {
+                    sb.append(",") ;
+                }
+                else {
+                    isFirst = false ;
+                }
+
+                FieldSchema fs = schema.getField(i) ;
+
+                if (fs.alias != null) {
+                    sb.append(fs.alias);
+                    sb.append(": ");
+                }
+
+                if (DataType.isAtomic(fs.type)) {
+                    sb.append(DataType.findTypeName(fs.type)) ;
+                }
+                else if ( (fs.type == DataType.TUPLE) ||
+                          (fs.type == DataType.BAG) ) {
+                    // safety net
+                    if (schema != fs.schema) {
+                        stringifySchema(sb, fs.schema, fs.type) ;
+                    }
+                    else {
+                        throw new AssertionError("Schema refers to itself "
+                                                 + "as inner schema") ;
+                    }
+                }
+                // TODO: Support Map
+            }
+        }
+
+        if (type == DataType.TUPLE) {
+            sb.append(")") ;
+        }
+        else if (type == DataType.BAG) {
+            sb.append("}") ;
+        }
+
+    }
+
     public void add(FieldSchema f) {
         mFields.add(f);
         if (null != f.alias) {
@@ -506,31 +574,99 @@
      *                                  schema take precedence
      * @return the merged schema, null if they are not compatible
      */
-    private Schema mergeSchema(Schema schema, Schema other,
+    public static Schema mergeSchema(Schema schema, Schema other,
                                boolean otherTakesAliasPrecedence) {
+        try {
+            Schema newSchema = mergeSchema(schema,
+                                        other,
+                                        otherTakesAliasPrecedence,
+                                        false,
+                                        false) ;
+            return newSchema;
+        }
+        catch(SchemaMergeException sme) {
+            // just mean they are not compatible
+        }
+        return null ;
+    }
+
+    /***
+     * Recursively merge two schemas
+     * @param schema the initial schema
+     * @param other the other schema to be merged with
+     * @param otherTakesAliasPrecedence true if aliases from the other
+     *                                  schema take precedence
+     * @param allowDifferentSizeMerge allow merging of schemas of different 
types
+     * @param allowIncompatibleTypes 1) if types in schemas are not compatible
+     *                               they will be treated as ByteArray 
(untyped)
+     *                               2) if schemas in schemas are not 
compatible
+     *                               and allowIncompatibleTypes is true
+     *                               those inner schemas in the output
+     *                               will be null.
+     * @return the merged schema this can be null if one schema is null and
+     *         allowIncompatibleTypes is true
+     *
+     * @throws SchemaMergeException if they cannot be merged
+     */
+
+    public static Schema mergeSchema(Schema schema,
+                               Schema other,
+                               boolean otherTakesAliasPrecedence,
+                               boolean allowDifferentSizeMerge,
+                               boolean allowIncompatibleTypes)
+                                    throws SchemaMergeException {
+        if (schema == null) {
+            if (allowIncompatibleTypes) {
+                return null ;
+            }
+            else {
+                throw new SchemaMergeException("One schema is null") ;
+            }
+        }
 
         if (other == null) {
-            return null ;
+            if (allowIncompatibleTypes) {
+                return null ;
+            }
+            else {
+                throw new SchemaMergeException("One schema is null") ;
+            }
         }
 
-        if (schema.size() != other.size()) {
-            return null ;
+        if ( (schema.size() != other.size()) &&
+             (!allowDifferentSizeMerge) ) {
+            throw new SchemaMergeException("Different schema size") ;
         }
 
         List<FieldSchema> outputList = new ArrayList<FieldSchema>() ;
 
-        Iterator<FieldSchema> mylist = schema.mFields.iterator() ;
-        Iterator<FieldSchema> otherlist = other.mFields.iterator() ;
+        List<FieldSchema> mylist = schema.mFields ;
+        List<FieldSchema> otherlist = other.mFields ;
+
+        // We iterate up to the smaller one's size
+        int iterateLimit = schema.mFields.size() > other.mFields.size()?
+                            other.mFields.size() : schema.mFields.size() ;
 
-        while (mylist.hasNext()) {
+        int idx = 0;
+        for (; idx< iterateLimit ; idx ++) {
 
-            FieldSchema myFs = mylist.next() ;
-            FieldSchema otherFs = otherlist.next() ;
+            // Just for readability
+            FieldSchema myFs = mylist.get(idx) ;
+            FieldSchema otherFs = otherlist.get(idx) ;
 
-            byte mergedType = mergeType(myFs.type, otherFs.type) ;
-            // if the types cannot be merged, the schemas cannot be merged
+            byte mergedType = DataType.mergeType(myFs.type, otherFs.type) ;
+
+            // If the types cannot be merged
             if (mergedType == DataType.ERROR) {
-                return null ;
+                // If  treatIncompatibleAsByteArray is true,
+                // we will treat it as bytearray
+                if (allowIncompatibleTypes) {
+                    mergedType = DataType.BYTEARRAY ;
+                }
+                // otherwise the schemas cannot be merged
+                else {
+                    throw new SchemaMergeException("Incompatible types") ;
+                }
             }
 
             String mergedAlias = mergeAlias(myFs.alias,
@@ -546,18 +682,63 @@
                 // merge inner tuple because both sides are tuples
                 Schema mergedSubSchema = mergeSchema(myFs.schema,
                                                      otherFs.schema,
-                                                     
otherTakesAliasPrecedence) ;
-                // return null if they cannot be merged
-                if (mergedSubSchema == null) {
-                    return null ;
+                                                     otherTakesAliasPrecedence,
+                                                     allowDifferentSizeMerge,
+                                                     allowIncompatibleTypes) ;
+                // if they cannot be merged and we don't allow incompatible
+                // types, just return null meaning cannot merge
+                if ( (mergedSubSchema == null) &&
+                     (!allowIncompatibleTypes) ) {
+                    throw new SchemaMergeException("Incompatible inner 
schemas") ;
                 }
 
+                // create the merged field
+                // the mergedSubSchema can be true if allowIncompatibleTypes
                 mergedFs = new FieldSchema(mergedAlias, mergedSubSchema) ;
 
             }
             outputList.add(mergedFs) ;
         }
 
+        // Handle different schema size
+        if (allowDifferentSizeMerge) {
+            
+            // if the first schema has leftover, then append the rest
+            for(int i=idx; i < mylist.size(); i++) {
+
+                FieldSchema fs = mylist.get(i) ;
+
+                // for non-schema types
+                if (!DataType.isSchemaType(fs.type)) {
+                    outputList.add(new FieldSchema(fs.alias, fs.type)) ;
+                }
+                // for TUPLE & BAG
+                else {
+                    FieldSchema tmp = new FieldSchema(fs.alias, fs.schema) ;
+                    tmp.type = fs.type ;
+                    outputList.add(tmp) ;
+                }
+            }
+
+             // if the second schema has leftover, then append the rest
+            for(int i=idx; i < otherlist.size(); i++) {
+
+                FieldSchema fs = otherlist.get(i) ;
+
+                // for non-schema types
+                if (!DataType.isSchemaType(fs.type)) {
+                    outputList.add(new FieldSchema(fs.alias, fs.type)) ;
+                }
+                // for TUPLE & BAG
+                else {
+                    FieldSchema tmp = new FieldSchema(fs.alias, fs.schema) ;
+                    tmp.type = fs.type ;
+                    outputList.add(tmp) ;
+                }
+            }
+
+        }
+
         return new Schema(outputList) ;
     }
 
@@ -569,7 +750,7 @@
      * @param otherTakesPrecedence
      * @return
      */
-    private String mergeAlias(String alias, String other
+    private static String mergeAlias(String alias, String other
                               ,boolean otherTakesPrecedence) {
         if (alias == null) {
             return other ;
@@ -585,47 +766,6 @@
         }
     }
 
-    /***
-     * Merge types if possible
-     * @param type1
-     * @param type2
-     * @return the merged type, or DataType.ERROR if not successful
-     */
-    private byte mergeType(byte type1, byte type2) {
-        // Only legal types can be merged
-        if ( (!DataType.isUsableType(type1)) ||
-             (!DataType.isUsableType(type2)) ) {
-            return DataType.ERROR ;
-        }
-
-        // Same type is OK
-        if (type1==type2) {
-            return type1 ;
-        }
-
-        // Both are number so we return the bigger type
-        if ( (DataType.isNumberType(type1)) &&
-             (DataType.isNumberType(type2)) ) {
-            return type1>type2 ? type1:type2 ;
-        }
-
-        // One is bytearray and the other is (number or chararray)
-        if ( (type1 == DataType.BYTEARRAY) &&
-                ( (type2 == DataType.CHARARRAY) || 
(DataType.isNumberType(type2)) )
-              ) {
-            return type2 ;
-        }
-
-        if ( (type2 == DataType.BYTEARRAY) &&
-                ( (type1 == DataType.CHARARRAY) || 
(DataType.isNumberType(type1)) )
-              ) {
-            return type1 ;
-        }
-
-        // else return just ERROR
-        return DataType.ERROR ;
-    }
-
 
 }
 

Added: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/SchemaMergeException.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/SchemaMergeException.java?rev=659161&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/SchemaMergeException.java
 (added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/SchemaMergeException.java
 Thu May 22 10:12:56 2008
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer.schema ;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+
+public class SchemaMergeException extends FrontendException {
+
+    public SchemaMergeException (String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public SchemaMergeException() {
+        this(null, null);
+    }
+
+    public SchemaMergeException(String message) {
+        this(message, null);
+    }
+
+    public SchemaMergeException(Throwable cause) {
+        this(null, cause);
+    }
+}


Reply via email to