Dear Alan,
As I'm waiting for more Logical Operators, I have implemented more helper
methods for schema merging. There is no spec for this so I'm not sure if
this is what you want.
In the attached patch:-
*Schema.merge(Schema other, boolean otherTakesAliasPrecedence) *
is used for recursively merging schemas.
*Schema.equals(Schema schema, Schema other, boolean relaxInner, boolean
relaxAlias)* is used for recursively comparing schemas.
*Schema.mergeType(byte type1, byte
type2)
*is where the type promotion logic lives in
I also added a new equal method for FieldSchema because I think this line in
the existing equal() "if (schema != fs.schema) return false;" looks wrong!!
I'm willing to add a lot more unit tests if this is what you want.
Pi
Index: test/org/apache/pig/test/TestSchema.java
===================================================================
--- test/org/apache/pig/test/TestSchema.java (revision 0)
+++ test/org/apache/pig/test/TestSchema.java (revision 0)
@@ -0,0 +1,113 @@
+package org.apache.pig.test;
+
+import java.util.* ;
+
+import org.apache.pig.data.* ;
+import org.apache.pig.impl.logicalLayer.schema.* ;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+
+import org.junit.* ;
+
+import junit.framework.Assert;
+import junit.framework.TestCase ;
+
+public class TestSchema extends TestCase {
+
+ @Test
+ public void testSchemaEqual1() {
+
+ List<FieldSchema> innerList1 = new ArrayList<FieldSchema>() ;
+ innerList1.add(new FieldSchema("11a", DataType.INTEGER)) ;
+ innerList1.add(new FieldSchema("11b", DataType.LONG)) ;
+
+ List<FieldSchema> innerList2 = new ArrayList<FieldSchema>() ;
+ innerList2.add(new FieldSchema("11a", DataType.INTEGER)) ;
+ innerList2.add(new FieldSchema("11b", DataType.LONG)) ;
+
+ Schema innerSchema1 = new Schema(innerList1) ;
+ Schema innerSchema2 = new Schema(innerList2) ;
+
+ List<FieldSchema> list1 = new ArrayList<FieldSchema>() ;
+ list1.add(new FieldSchema("1a", DataType.BYTEARRAY)) ;
+ list1.add(new FieldSchema("1b", innerSchema1)) ;
+ list1.add(new FieldSchema("1c", DataType.INTEGER)) ;
+
+ List<FieldSchema> list2 = new ArrayList<FieldSchema>() ;
+ list2.add(new FieldSchema("1a", DataType.BYTEARRAY)) ;
+ list2.add(new FieldSchema("1b", innerSchema2)) ;
+ list2.add(new FieldSchema("1c", DataType.INTEGER)) ;
+
+ Schema schema1 = new Schema(list1) ;
+ Schema schema2 = new Schema(list2) ;
+
+ Assert.assertTrue(Schema.equals(schema1, schema2, false, false)) ;
+
+ innerList2.get(1).alias = "pi" ;
+
+ Assert.assertFalse(Schema.equals(schema1, schema2, false, false)) ;
+ Assert.assertTrue(Schema.equals(schema1, schema2, false, true)) ;
+
+ innerList2.get(1).alias = "11b" ;
+ innerList2.get(1).type = DataType.BYTEARRAY ;
+
+ Assert.assertFalse(Schema.equals(schema1, schema2, false, false)) ;
+ Assert.assertTrue(Schema.equals(schema1, schema2, true, false)) ;
+
+ innerList2.get(1).type = DataType.LONG ;
+
+ Assert.assertTrue(Schema.equals(schema1, schema2, false, false)) ;
+
+ list2.get(0).type = DataType.CHARARRAY ;
+ Assert.assertFalse(Schema.equals(schema1, schema2, false, false)) ;
+ }
+
+ @Test
+ public void testMerge1() {
+
+ // Generate two schemas
+ List<FieldSchema> innerList1 = new ArrayList<FieldSchema>() ;
+ innerList1.add(new FieldSchema("11a", DataType.INTEGER)) ;
+ innerList1.add(new FieldSchema("11b", DataType.FLOAT)) ;
+
+ List<FieldSchema> innerList2 = new ArrayList<FieldSchema>() ;
+ innerList2.add(new FieldSchema("22a", DataType.DOUBLE)) ;
+ innerList2.add(new FieldSchema(null, DataType.LONG)) ;
+
+ Schema innerSchema1 = new Schema(innerList1) ;
+ Schema innerSchema2 = new Schema(innerList2) ;
+
+ List<FieldSchema> list1 = new ArrayList<FieldSchema>() ;
+ list1.add(new FieldSchema("1a", DataType.BYTEARRAY)) ;
+ list1.add(new FieldSchema("1b", innerSchema1)) ;
+ list1.add(new FieldSchema("1c", DataType.LONG)) ;
+
+ List<FieldSchema> list2 = new ArrayList<FieldSchema>() ;
+ list2.add(new FieldSchema("2a", DataType.BYTEARRAY)) ;
+ list2.add(new FieldSchema("2b", innerSchema2)) ;
+ list2.add(new FieldSchema("2c", DataType.INTEGER)) ;
+
+ Schema schema1 = new Schema(list1) ;
+ Schema schema2 = new Schema(list2) ;
+
+ // Merge
+ Schema mergedSchema = schema1.merge(schema2, true) ;
+
+
+ // Generate expected schema
+ List<FieldSchema> expectedInnerList = new ArrayList<FieldSchema>() ;
+ expectedInnerList.add(new FieldSchema("22a", DataType.DOUBLE)) ;
+ expectedInnerList.add(new FieldSchema("11b", DataType.FLOAT)) ;
+
+ Schema expectedInner = new Schema(expectedInnerList) ;
+
+ List<FieldSchema> expectedList = new ArrayList<FieldSchema>() ;
+ expectedList.add(new FieldSchema("2a", DataType.BYTEARRAY)) ;
+ expectedList.add(new FieldSchema("2b", expectedInner)) ;
+ expectedList.add(new FieldSchema("2c", DataType.LONG)) ;
+
+ Schema expected = new Schema(expectedList) ;
+
+ // Compare
+ Assert.assertTrue(Schema.equals(mergedSchema, expected, false, false)) ;
+ }
+}
Index: src/org/apache/pig/impl/logicalLayer/schema/Schema.java
===================================================================
--- src/org/apache/pig/impl/logicalLayer/schema/Schema.java (revision 649870)
+++ src/org/apache/pig/impl/logicalLayer/schema/Schema.java (working copy)
@@ -78,6 +78,44 @@
return true;
}
+
+ /***
+ * Compare two field schema for equality
+ * @param fschema
+ * @param fother
+ * @param relaxInner If true, we don't check inner tuple schemas
+ * @param relaxAlias If true, we don't check aliases
+ * @return
+ */
+ public static boolean equals(FieldSchema fschema,
+ FieldSchema fother,
+ boolean relaxInner,
+ boolean relaxAlias) {
+ if (fschema == null) {
+ return false ;
+ }
+
+ if (fother == null) {
+ return false ;
+ }
+
+ if (fschema.type != fother.type) {
+ return false ;
+ }
+
+ if ( (!relaxAlias) && (fschema.alias != fother.alias) ) {
+ return false ;
+ }
+
+ if ( (!relaxInner) && (fschema.type == DataType.TUPLE) ) {
+ // compare recursively using schema
+ if (!Schema.equals(fschema.schema, fother.schema, false, relaxAlias)) {
+ return false ;
+ }
+ }
+
+ return true ;
+ }
}
private List<FieldSchema> mFields;
@@ -181,6 +219,196 @@
}
return true;
}
+
+ /**
+ * Recursively compare two schemas for equality
+ * @param schema
+ * @param other
+ * @param relaxInner if true, inner schemas will not be checked
+ * @return
+ */
+ public static boolean equals(Schema schema,
+ Schema other,
+ boolean relaxInner,
+ boolean relaxAlias) {
+ if (schema == null) {
+ return false ;
+ }
+
+ if (other == null) {
+ return false ;
+ }
+
+ if (schema.size() != other.size()) return false;
+
+ Iterator<FieldSchema> i = schema.mFields.iterator();
+ Iterator<FieldSchema> j = other.mFields.iterator();
+
+ while (i.hasNext()) {
+
+ FieldSchema myFs = i.next() ;
+ FieldSchema otherFs = j.next() ;
+
+ if ( (!relaxAlias) && (myFs.alias != otherFs.alias) ) {
+ return false ;
+ }
+
+ if (myFs.type != otherFs.type) {
+ return false ;
+ }
+
+ if (!relaxInner) {
+ // Compare recursively using field schema
+ if (!FieldSchema.equals(myFs, otherFs, false, relaxAlias)) {
+ return false ;
+ }
+ }
+
+ }
+ return true;
+ }
+
+
+ /***
+ * Merge this schema with the other schema
+ * @param other the other schema to be merged with
+ * @param otherTakesAliasPrecedence true if aliases from the other
+ * schema take precedence
+ * @return the merged schema, null if they are not compatible
+ */
+ public Schema merge(Schema other, boolean otherTakesAliasPrecedence) {
+ return mergeSchema(this, other, otherTakesAliasPrecedence) ;
+ }
+
+ /***
+ * Recursively merge two schemas
+ * @param schema the initial schema
+ * @param other the other schema to be merged with
+ * @param otherTakesAliasPrecedence true if aliases from the other
+ * schema take precedence
+ * @return the merged schema, null if they are not compatible
+ */
+ private Schema mergeSchema(Schema schema, Schema other,
+ boolean otherTakesAliasPrecedence) {
+
+ if (other == null) {
+ return null ;
+ }
+
+ if (schema.size() != other.size()) {
+ return null ;
+ }
+
+ List<FieldSchema> outputList = new ArrayList<FieldSchema>() ;
+
+ Iterator<FieldSchema> mylist = schema.mFields.iterator() ;
+ Iterator<FieldSchema> otherlist = other.mFields.iterator() ;
+
+ while (mylist.hasNext()) {
+
+ FieldSchema myFs = mylist.next() ;
+ FieldSchema otherFs = otherlist.next() ;
+
+ byte mergedType = mergeType(myFs.type, otherFs.type) ;
+ // if the types cannot be merged, the schemas cannot be merged
+ if (mergedType == DataType.ERROR) {
+ return null ;
+ }
+
+ String mergedAlias = mergeAlias(myFs.alias,
+ otherFs.alias,
+ otherTakesAliasPrecedence) ;
+
+ FieldSchema mergedFs = null ;
+ if (mergedType != DataType.TUPLE) {
+ // just normal merge
+ mergedFs = new FieldSchema(mergedAlias, mergedType) ;
+ }
+ else {
+ // merge inner tuple because both sides are tuples
+ Schema mergedSubSchema = mergeSchema(myFs.schema,
+ otherFs.schema,
+ otherTakesAliasPrecedence) ;
+ // return null if they cannot be merged
+ if (mergedSubSchema == null) {
+ return null ;
+ }
+
+ mergedFs = new FieldSchema(mergedAlias, mergedSubSchema) ;
+
+ }
+ outputList.add(mergedFs) ;
+ }
+
+ return new Schema(outputList) ;
+ }
+
+ /***
+ * Merge two aliases. If one of aliases is null, return the other.
+ * Otherwise check the precedence condition
+ * @param alias
+ * @param other
+ * @param otherTakesPrecedence
+ * @return
+ */
+ private String mergeAlias(String alias, String other
+ ,boolean otherTakesPrecedence) {
+ if (alias == null) {
+ return other ;
+ }
+ else if (other == null) {
+ return alias ;
+ }
+ else if (otherTakesPrecedence) {
+ return other ;
+ }
+ else {
+ return alias ;
+ }
+ }
+
+ /***
+ * Merge types if possible
+ * @param type1
+ * @param type2
+ * @return the merged type, or DataType.ERROR if not successful
+ */
+ private byte mergeType(byte type1, byte type2) {
+ // Only legal types can be merged
+ if ( (!DataType.isUsableType(type1)) ||
+ (!DataType.isUsableType(type2)) ) {
+ return DataType.ERROR ;
+ }
+
+ // Same type is OK
+ if (type1==type2) {
+ return type1 ;
+ }
+
+ // Both are number so we return the bigger type
+ if ( (DataType.isNumberType(type1)) &&
+ (DataType.isNumberType(type2)) ) {
+ return type1>type2 ? type1:type2 ;
+ }
+
+ // One is bytearray and the other is (number or chararray)
+ if ( (type1 == DataType.BYTEARRAY) &&
+ ( (type2 == DataType.CHARARRAY) || (DataType.isNumberType(type2)) )
+ ) {
+ return type2 ;
+ }
+
+ if ( (type2 == DataType.BYTEARRAY) &&
+ ( (type1 == DataType.CHARARRAY) || (DataType.isNumberType(type1)) )
+ ) {
+ return type1 ;
+ }
+
+ // else return just ERROR
+ return DataType.ERROR ;
+ }
+
+
}
Index: src/org/apache/pig/data/DataType.java
===================================================================
--- src/org/apache/pig/data/DataType.java (revision 649870)
+++ src/org/apache/pig/data/DataType.java (working copy)
@@ -552,4 +552,23 @@
}
System.out.println(t.toString());
}
+
+ public static boolean isNumberType(byte t) {
+ switch (t) {
+ case INTEGER: return true ;
+ case LONG: return true ;
+ case FLOAT: return true ;
+ case DOUBLE: return true ;
+ default: return false ;
+ }
+ }
+
+ public static boolean isUsableType(byte t) {
+ switch (t) {
+ case UNKNOWN: return false ;
+ case NULL: return false ;
+ case ERROR: return false ;
+ default :return true ;
+ }
+ }
}