Author: gates Date: Thu May 22 10:12:56 2008 New Revision: 659161 URL: http://svn.apache.org/viewvc?rev=659161&view=rev Log: PIG-143 Pi's latest type checker checkin.
Added: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/SchemaMergeException.java incubator/pig/branches/types/test/org/apache/pig/test/TestTypeChecking.java incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java incubator/pig/branches/types/test/org/apache/pig/test/utils/TypeCheckingTestUtil.java Modified: incubator/pig/branches/types/build.xml incubator/pig/branches/types/src/org/apache/pig/data/DataType.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java incubator/pig/branches/types/test/org/apache/pig/test/TestSchema.java incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java Modified: incubator/pig/branches/types/build.xml URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=659161&r1=659160&r2=659161&view=diff ============================================================================== --- incubator/pig/branches/types/build.xml (original) +++ incubator/pig/branches/types/build.xml Thu May 22 10:12:56 2008 @@ -277,6 +277,8 @@ <exclude name="**/TestPigSplit.java" /> <exclude name="**/TestStoreOld.java" /> <!-- Excluced because we don't want to run them --> + <exclude name="**/TypeCheckingTestUtil.java" /> + <exclude name="**/TypeGraphPrinter.java" /> <exclude name="**/TestHelper.java" /> <exclude name="**/TestLargeFile.java" /> <exclude name="**/TestOrderBy.java" /> Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataType.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataType.java?rev=659161&r1=659160&r2=659161&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/data/DataType.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/data/DataType.java Thu May 22 10:12:56 2008 @@ -424,10 +424,13 @@ * float, or boolean. */ public static boolean isAtomic(byte dataType) { - return ((dataType == BYTEARRAY) || (dataType == CHARARRAY) || - (dataType == INTEGER) || (dataType == LONG) || - (dataType == FLOAT) || (dataType == DOUBLE) || - (dataType == FLOAT)); + return ((dataType == BYTEARRAY) || + (dataType == CHARARRAY) || + (dataType == INTEGER) || + (dataType == LONG) || + (dataType == FLOAT) || + (dataType == DOUBLE) || + (dataType == BOOLEAN)); } /** @@ -822,4 +825,45 @@ default :return true ; } } + + /*** + * Merge types if possible + * @param type1 + * @param type2 + * @return the merged type, or DataType.ERROR if not successful + */ + public static byte mergeType(byte type1, byte type2) { + // Only legal types can be merged + if ( (!DataType.isUsableType(type1)) || + (!DataType.isUsableType(type2)) ) { + return DataType.ERROR ; + } + + // Same type is OK + if (type1==type2) { + return type1 ; + } + + // Both are number so we return the bigger type + if ( (DataType.isNumberType(type1)) && + (DataType.isNumberType(type2)) ) { + return type1>type2 ? type1:type2 ; + } + + // One is bytearray and the other is (number or chararray) + if ( (type1 == DataType.BYTEARRAY) && + ( (type2 == DataType.CHARARRAY) || (DataType.isNumberType(type2)) ) + ) { + return type2 ; + } + + if ( (type2 == DataType.BYTEARRAY) && + ( (type1 == DataType.CHARARRAY) || (DataType.isNumberType(type1)) ) + ) { + return type1 ; + } + + // else return just ERROR + return DataType.ERROR ; + } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=659161&r1=659160&r2=659161&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Thu May 22 10:12:56 2008 @@ -29,7 +29,6 @@ import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.apache.pig.impl.plan.PlanVisitor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.impl.plan.MultiMap; @@ -262,6 +261,14 @@ return mSchema; } + public boolean isTupleGroupCol() { + if (mInputs == null || mInputs.size() == 0) { + throw new AssertionError("COGroup.isTupleGroupCol() can be called " + + "after it has an input only") ; + } + return mGroupByPlans.get(mInputs.get(0)).size() > 1 ; + } + @Override public void visit(LOVisitor v) throws VisitorException { v.visit(this); Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java?rev=659161&r1=659160&r2=659161&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java Thu May 22 10:12:56 2008 @@ -183,4 +183,9 @@ v.visit(this); } + @Override + public byte getType() { + return DataType.BAG ; + } + } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=659161&r1=659160&r2=659161&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java Thu May 22 10:12:56 2008 @@ -23,6 +23,7 @@ import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.plan.PlanVisitor; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.data.DataType; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -107,4 +108,7 @@ v.visit(this); } + public byte getType() { + return DataType.BAG ; + } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=659161&r1=659160&r2=659161&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java Thu May 22 10:12:56 2008 @@ -129,6 +129,10 @@ return mEnforcedSchema; } + /*** + * Set this when user enforces schema + * @param enforcedSchema + */ public void setEnforcedSchema(Schema enforcedSchema) { this.mEnforcedSchema = enforcedSchema; } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java?rev=659161&r1=659160&r2=659161&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java Thu May 22 10:12:56 2008 @@ -125,6 +125,7 @@ public int getCol() { if (mProjection.size() != 1) + throw new RuntimeException( "Internal error: improper use of getCol in " + LOProject.class.getName()); @@ -281,9 +282,67 @@ return mFieldSchema; } + public boolean isSingleProjection() { + return mProjection.size() == 1 ; + } + @Override public void visit(LOVisitor v) throws VisitorException { v.visit(this); } + @Override + public byte getType() { + // Called to make sure we've constructed the field schema before trying + // to read it. + try { + getFieldSchema(); + } catch (FrontendException fe) { + return DataType.UNKNOWN; + } + + if (mFieldSchema != null){ + return mFieldSchema.type ; + } + else { + return DataType.UNKNOWN ; + } + } + + @Override + public Schema getSchema() throws FrontendException{ + // Called to make sure we've constructed the field schema before trying + // to read it. + getFieldSchema(); + if (mFieldSchema != null){ + return mFieldSchema.schema ; + } + else { + return null ; + } + } + + /* For debugging only */ + public String toDetailString() { + StringBuilder sb = new StringBuilder() ; + sb.append("LOProject") ; + sb.append(" Id=" + this.mKey.id) ; + sb.append(" Projection=") ; + boolean isFirst = true ; + for(int i=0;i< mProjection.size();i++) { + if (isFirst) { + isFirst = false ; + } + else { + sb.append(",") ; + } + sb.append(mProjection.get(i)) ; + } + sb.append(" isStart=") ; + sb.append(mIsStar) ; + sb.append(" isSentinel=") ; + sb.append(mSentinel) ; + return sb.toString() ; + } + } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=659161&r1=659160&r2=659161&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java Thu May 22 10:12:56 2008 @@ -26,6 +26,7 @@ import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.plan.PlanVisitor; +import org.apache.pig.data.DataType; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -127,4 +128,8 @@ public void visit(LOVisitor v) throws VisitorException { v.visit(this); } + + public byte getType() { + return DataType.BAG ; + } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java?rev=659161&r1=659160&r2=659161&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java Thu May 22 10:12:56 2008 @@ -27,6 +27,7 @@ import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.plan.PlanVisitor; +import org.apache.pig.data.DataType; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -114,4 +115,10 @@ public void visit(LOVisitor v) throws VisitorException { v.visit(this); } + + @Override + public byte getType() { + return DataType.BAG; + } + } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java?rev=659161&r1=659160&r2=659161&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java Thu May 22 10:12:56 2008 @@ -26,6 +26,7 @@ import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.data.DataType; public class LOSplitOutput extends LogicalOperator { @@ -97,4 +98,8 @@ public int getReadFrom() { return mIndex; } + + public byte getType() { + return DataType.BAG ; + } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=659161&r1=659160&r2=659161&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java Thu May 22 10:12:56 2008 @@ -24,6 +24,7 @@ import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.plan.PlanVisitor; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.data.DataType; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -105,4 +106,8 @@ v.visit(this); } + public byte getType() { + return DataType.BAG; + } + } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=659161&r1=659160&r2=659161&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java Thu May 22 10:12:56 2008 @@ -140,6 +140,16 @@ } /** + * Directly force the schema without reconcilation + * Please use with great care + * @param schema + */ + public void forceSchema(Schema schema) { + this.mSchema = schema; + } + + + /** * Get a copy of the schema for the output of this operator. */ public abstract Schema getSchema() throws FrontendException; Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java?rev=659161&r1=659160&r2=659161&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java Thu May 22 10:12:56 2008 @@ -34,4 +34,16 @@ public LogicalPlan() { super(); } + public LogicalOperator getSingleLeafPlanOutputOp() { + List<LogicalOperator> list = this.getLeaves() ; + if (list.size() != 1) { + throw new AssertionError("The plan has more than one leaf node") ; + } + return list.get(0) ; + } + + public byte getSingleLeafPlanOutputType() { + return getSingleLeafPlanOutputOp().getType() ; + } + } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=659161&r1=659160&r2=659161&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Thu May 22 10:12:56 2008 @@ -24,7 +24,6 @@ import java.util.Map; import java.util.Set; import java.util.Collection; -import java.io.IOException; import org.apache.pig.data.DataType; import org.apache.pig.impl.logicalLayer.parser.ParseException; @@ -289,49 +288,55 @@ * @throws ParseException if this cannot be reconciled. */ public void reconcile(Schema other) throws ParseException { - if (other.size() != size()) { - throw new ParseException("Cannot reconcile schemas with different " - + "sizes. This schema has size " + size() + " other has size " - + "of " + other.size()); - } - - Iterator<FieldSchema> i = other.mFields.iterator(); - for (int j = 0; i.hasNext(); j++) { - FieldSchema otherFs = i.next(); - FieldSchema ourFs = mFields.get(j); - log.debug("ourFs: " + ourFs + " otherFs: " + otherFs); - if (otherFs.alias != null) { - log.debug("otherFs.alias: " + otherFs.alias); - if (ourFs.alias != null) { - log.debug("Removing ourFs.alias: " + ourFs.alias); - mAliases.remove(ourFs.alias); - Collection<String> aliases = mFieldSchemas.get(ourFs); - List<String> listAliases = new ArrayList<String>(); - for(String alias: aliases) { - listAliases.add(new String(alias)); + + if (other != null) { + + if (other.size() != size()) { + throw new ParseException("Cannot reconcile schemas with different " + + "sizes. This schema has size " + size() + " other has size " + + "of " + other.size()); + } + + Iterator<FieldSchema> i = other.mFields.iterator(); + for (int j = 0; i.hasNext(); j++) { + FieldSchema otherFs = i.next(); + FieldSchema ourFs = mFields.get(j); + log.debug("ourFs: " + ourFs + " otherFs: " + otherFs); + if (otherFs.alias != null) { + log.debug("otherFs.alias: " + otherFs.alias); + if (ourFs.alias != null) { + log.debug("Removing ourFs.alias: " + ourFs.alias); + mAliases.remove(ourFs.alias); + Collection<String> aliases = mFieldSchemas.get(ourFs); + if (aliases != null) { + List<String> listAliases = new ArrayList<String>(); + for(String alias: aliases) { + listAliases.add(new String(alias)); + } + for(String alias: listAliases) { + log.debug("Removing alias " + alias + " from multimap"); + mFieldSchemas.remove(ourFs, alias); + } + } } - for(String alias: listAliases) { - log.debug("Removing alias " + alias + " from multimap"); - mFieldSchemas.remove(ourFs, alias); + ourFs.alias = otherFs.alias; + log.debug("Setting alias to: " + otherFs.alias); + mAliases.put(ourFs.alias, ourFs); + if(null != ourFs.alias) { + mFieldSchemas.put(ourFs, ourFs.alias); } } - ourFs.alias = otherFs.alias; - log.debug("Setting alias to: " + otherFs.alias); - mAliases.put(ourFs.alias, ourFs); - if(null != ourFs.alias) { - mFieldSchemas.put(ourFs, ourFs.alias); + if (otherFs.type != DataType.UNKNOWN) { + ourFs.type = otherFs.type; + log.debug("Setting type to: " + + DataType.findTypeName(otherFs.type)); + } + if (otherFs.schema != null) { + ourFs.schema = otherFs.schema; + log.debug("Setting schema to: " + otherFs.schema); } - } - if (otherFs.type != DataType.UNKNOWN) { - ourFs.type = otherFs.type; - log.debug("Setting type to: " - + DataType.findTypeName(otherFs.type)); - } - if (otherFs.schema != null) { - ourFs.schema = otherFs.schema; - log.debug("Setting schema to: " + otherFs.schema); - } + } } } @@ -369,17 +374,80 @@ @Override public String toString() { - StringBuilder sb = new StringBuilder("("); - boolean first = true; - for (FieldSchema fs : mFields) { - if (first) first = false; - else sb.append(", "); - sb.append(fs.toString()); + StringBuilder sb = new StringBuilder(); + try { + stringifySchema(sb, this, DataType.BAG) ; + } + catch (ParseException pe) { + throw new RuntimeException("PROBLEM PRINTING SCHEMA") ; } - sb.append(")"); return sb.toString(); } + + // This is used for building up output string + // type can only be BAG or TUPLE + public static void stringifySchema(StringBuilder sb, + Schema schema, + byte type) + throws ParseException{ + + if (type == DataType.TUPLE) { + sb.append("(") ; + } + else if (type == DataType.BAG) { + sb.append("{") ; + } + // TODO: Map Support + + if (schema == null) { + sb.append("null") ; + } + else { + boolean isFirst = true ; + for (int i=0; i< schema.size() ;i++) { + + if (!isFirst) { + sb.append(",") ; + } + else { + isFirst = false ; + } + + FieldSchema fs = schema.getField(i) ; + + if (fs.alias != null) { + sb.append(fs.alias); + sb.append(": "); + } + + if (DataType.isAtomic(fs.type)) { + sb.append(DataType.findTypeName(fs.type)) ; + } + else if ( (fs.type == DataType.TUPLE) || + (fs.type == DataType.BAG) ) { + // safety net + if (schema != fs.schema) { + stringifySchema(sb, fs.schema, fs.type) ; + } + else { + throw new AssertionError("Schema refers to itself " + + "as inner schema") ; + } + } + // TODO: Support Map + } + } + + if (type == DataType.TUPLE) { + sb.append(")") ; + } + else if (type == DataType.BAG) { + sb.append("}") ; + } + + } + public void add(FieldSchema f) { mFields.add(f); if (null != f.alias) { @@ -506,31 +574,99 @@ * schema take precedence * @return the merged schema, null if they are not compatible */ - private Schema mergeSchema(Schema schema, Schema other, + public static Schema mergeSchema(Schema schema, Schema other, boolean otherTakesAliasPrecedence) { + try { + Schema newSchema = mergeSchema(schema, + other, + otherTakesAliasPrecedence, + false, + false) ; + return newSchema; + } + catch(SchemaMergeException sme) { + // just mean they are not compatible + } + return null ; + } + + /*** + * Recursively merge two schemas + * @param schema the initial schema + * @param other the other schema to be merged with + * @param otherTakesAliasPrecedence true if aliases from the other + * schema take precedence + * @param allowDifferentSizeMerge allow merging of schemas of different types + * @param allowIncompatibleTypes 1) if types in schemas are not compatible + * they will be treated as ByteArray (untyped) + * 2) if schemas in schemas are not compatible + * and allowIncompatibleTypes is true + * those inner schemas in the output + * will be null. + * @return the merged schema this can be null if one schema is null and + * allowIncompatibleTypes is true + * + * @throws SchemaMergeException if they cannot be merged + */ + + public static Schema mergeSchema(Schema schema, + Schema other, + boolean otherTakesAliasPrecedence, + boolean allowDifferentSizeMerge, + boolean allowIncompatibleTypes) + throws SchemaMergeException { + if (schema == null) { + if (allowIncompatibleTypes) { + return null ; + } + else { + throw new SchemaMergeException("One schema is null") ; + } + } if (other == null) { - return null ; + if (allowIncompatibleTypes) { + return null ; + } + else { + throw new SchemaMergeException("One schema is null") ; + } } - if (schema.size() != other.size()) { - return null ; + if ( (schema.size() != other.size()) && + (!allowDifferentSizeMerge) ) { + throw new SchemaMergeException("Different schema size") ; } List<FieldSchema> outputList = new ArrayList<FieldSchema>() ; - Iterator<FieldSchema> mylist = schema.mFields.iterator() ; - Iterator<FieldSchema> otherlist = other.mFields.iterator() ; + List<FieldSchema> mylist = schema.mFields ; + List<FieldSchema> otherlist = other.mFields ; + + // We iterate up to the smaller one's size + int iterateLimit = schema.mFields.size() > other.mFields.size()? + other.mFields.size() : schema.mFields.size() ; - while (mylist.hasNext()) { + int idx = 0; + for (; idx< iterateLimit ; idx ++) { - FieldSchema myFs = mylist.next() ; - FieldSchema otherFs = otherlist.next() ; + // Just for readability + FieldSchema myFs = mylist.get(idx) ; + FieldSchema otherFs = otherlist.get(idx) ; - byte mergedType = mergeType(myFs.type, otherFs.type) ; - // if the types cannot be merged, the schemas cannot be merged + byte mergedType = DataType.mergeType(myFs.type, otherFs.type) ; + + // If the types cannot be merged if (mergedType == DataType.ERROR) { - return null ; + // If treatIncompatibleAsByteArray is true, + // we will treat it as bytearray + if (allowIncompatibleTypes) { + mergedType = DataType.BYTEARRAY ; + } + // otherwise the schemas cannot be merged + else { + throw new SchemaMergeException("Incompatible types") ; + } } String mergedAlias = mergeAlias(myFs.alias, @@ -546,18 +682,63 @@ // merge inner tuple because both sides are tuples Schema mergedSubSchema = mergeSchema(myFs.schema, otherFs.schema, - otherTakesAliasPrecedence) ; - // return null if they cannot be merged - if (mergedSubSchema == null) { - return null ; + otherTakesAliasPrecedence, + allowDifferentSizeMerge, + allowIncompatibleTypes) ; + // if they cannot be merged and we don't allow incompatible + // types, just return null meaning cannot merge + if ( (mergedSubSchema == null) && + (!allowIncompatibleTypes) ) { + throw new SchemaMergeException("Incompatible inner schemas") ; } + // create the merged field + // the mergedSubSchema can be true if allowIncompatibleTypes mergedFs = new FieldSchema(mergedAlias, mergedSubSchema) ; } outputList.add(mergedFs) ; } + // Handle different schema size + if (allowDifferentSizeMerge) { + + // if the first schema has leftover, then append the rest + for(int i=idx; i < mylist.size(); i++) { + + FieldSchema fs = mylist.get(i) ; + + // for non-schema types + if (!DataType.isSchemaType(fs.type)) { + outputList.add(new FieldSchema(fs.alias, fs.type)) ; + } + // for TUPLE & BAG + else { + FieldSchema tmp = new FieldSchema(fs.alias, fs.schema) ; + tmp.type = fs.type ; + outputList.add(tmp) ; + } + } + + // if the second schema has leftover, then append the rest + for(int i=idx; i < otherlist.size(); i++) { + + FieldSchema fs = otherlist.get(i) ; + + // for non-schema types + if (!DataType.isSchemaType(fs.type)) { + outputList.add(new FieldSchema(fs.alias, fs.type)) ; + } + // for TUPLE & BAG + else { + FieldSchema tmp = new FieldSchema(fs.alias, fs.schema) ; + tmp.type = fs.type ; + outputList.add(tmp) ; + } + } + + } + return new Schema(outputList) ; } @@ -569,7 +750,7 @@ * @param otherTakesPrecedence * @return */ - private String mergeAlias(String alias, String other + private static String mergeAlias(String alias, String other ,boolean otherTakesPrecedence) { if (alias == null) { return other ; @@ -585,47 +766,6 @@ } } - /*** - * Merge types if possible - * @param type1 - * @param type2 - * @return the merged type, or DataType.ERROR if not successful - */ - private byte mergeType(byte type1, byte type2) { - // Only legal types can be merged - if ( (!DataType.isUsableType(type1)) || - (!DataType.isUsableType(type2)) ) { - return DataType.ERROR ; - } - - // Same type is OK - if (type1==type2) { - return type1 ; - } - - // Both are number so we return the bigger type - if ( (DataType.isNumberType(type1)) && - (DataType.isNumberType(type2)) ) { - return type1>type2 ? type1:type2 ; - } - - // One is bytearray and the other is (number or chararray) - if ( (type1 == DataType.BYTEARRAY) && - ( (type2 == DataType.CHARARRAY) || (DataType.isNumberType(type2)) ) - ) { - return type2 ; - } - - if ( (type2 == DataType.BYTEARRAY) && - ( (type1 == DataType.CHARARRAY) || (DataType.isNumberType(type1)) ) - ) { - return type1 ; - } - - // else return just ERROR - return DataType.ERROR ; - } - } Added: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/SchemaMergeException.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/SchemaMergeException.java?rev=659161&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/SchemaMergeException.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/SchemaMergeException.java Thu May 22 10:12:56 2008 @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.logicalLayer.schema ; + +import org.apache.pig.impl.logicalLayer.FrontendException; + +public class SchemaMergeException extends FrontendException { + + public SchemaMergeException (String message, Throwable cause) { + super(message, cause); + } + + public SchemaMergeException() { + this(null, null); + } + + public SchemaMergeException(String message) { + this(message, null); + } + + public SchemaMergeException(Throwable cause) { + this(null, cause); + } +}