Dear Alan,

As I'm waiting for more Logical Operators, I have implemented more helper
methods for schema merging. There is no spec for this so I'm not sure if
this is what you want.

In the attached patch:-

*Schema.merge(Schema other, boolean otherTakesAliasPrecedence)   *
is used for recursively merging schemas.
*Schema.equals(Schema schema, Schema other, boolean relaxInner, boolean
relaxAlias)*             is used for recursively comparing schemas.
*Schema.mergeType(byte type1, byte
type2)
*is where the type promotion logic lives in

I also added a new equal method for FieldSchema because I think this line in
the existing equal() "if (schema != fs.schema) return false;" looks wrong!!
I'm willing to add a lot more unit tests if this is what you want.

Pi
Index: test/org/apache/pig/test/TestSchema.java
===================================================================
--- test/org/apache/pig/test/TestSchema.java	(revision 0)
+++ test/org/apache/pig/test/TestSchema.java	(revision 0)
@@ -0,0 +1,113 @@
+package org.apache.pig.test;
+
+import java.util.* ;
+
+import org.apache.pig.data.* ;
+import org.apache.pig.impl.logicalLayer.schema.* ;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+
+import org.junit.* ;
+
+import junit.framework.Assert;
+import junit.framework.TestCase ;
+
+public class TestSchema extends TestCase {
+    
+    @Test
+    public void testSchemaEqual1() {
+        
+        List<FieldSchema> innerList1 = new ArrayList<FieldSchema>() ;
+        innerList1.add(new FieldSchema("11a", DataType.INTEGER)) ;
+        innerList1.add(new FieldSchema("11b", DataType.LONG)) ;
+        
+        List<FieldSchema> innerList2 = new ArrayList<FieldSchema>() ;
+        innerList2.add(new FieldSchema("11a", DataType.INTEGER)) ;
+        innerList2.add(new FieldSchema("11b", DataType.LONG)) ;
+        
+        Schema innerSchema1 = new Schema(innerList1) ;
+        Schema innerSchema2 = new Schema(innerList2) ;
+                
+        List<FieldSchema> list1 = new ArrayList<FieldSchema>() ;
+        list1.add(new FieldSchema("1a", DataType.BYTEARRAY)) ;
+        list1.add(new FieldSchema("1b", innerSchema1)) ;
+        list1.add(new FieldSchema("1c", DataType.INTEGER)) ;
+        
+        List<FieldSchema> list2 = new ArrayList<FieldSchema>() ;
+        list2.add(new FieldSchema("1a", DataType.BYTEARRAY)) ;
+        list2.add(new FieldSchema("1b", innerSchema2)) ;
+        list2.add(new FieldSchema("1c", DataType.INTEGER)) ;
+        
+        Schema schema1 = new Schema(list1) ;
+        Schema schema2 = new Schema(list2) ;
+        
+        Assert.assertTrue(Schema.equals(schema1, schema2, false, false)) ;
+        
+        innerList2.get(1).alias = "pi" ;
+        
+        Assert.assertFalse(Schema.equals(schema1, schema2, false, false)) ;
+        Assert.assertTrue(Schema.equals(schema1, schema2, false, true)) ;
+        
+        innerList2.get(1).alias = "11b" ;
+        innerList2.get(1).type = DataType.BYTEARRAY ;
+        
+        Assert.assertFalse(Schema.equals(schema1, schema2, false, false)) ;
+        Assert.assertTrue(Schema.equals(schema1, schema2, true, false)) ;
+        
+        innerList2.get(1).type = DataType.LONG ;
+        
+        Assert.assertTrue(Schema.equals(schema1, schema2, false, false)) ;
+        
+        list2.get(0).type = DataType.CHARARRAY ;
+        Assert.assertFalse(Schema.equals(schema1, schema2, false, false)) ;
+    }
+    
+    @Test
+    public void testMerge1() {
+        
+        // Generate two schemas
+        List<FieldSchema> innerList1 = new ArrayList<FieldSchema>() ;
+        innerList1.add(new FieldSchema("11a", DataType.INTEGER)) ; 
+        innerList1.add(new FieldSchema("11b", DataType.FLOAT)) ;
+        
+        List<FieldSchema> innerList2 = new ArrayList<FieldSchema>() ;
+        innerList2.add(new FieldSchema("22a", DataType.DOUBLE)) ;
+        innerList2.add(new FieldSchema(null, DataType.LONG)) ;
+        
+        Schema innerSchema1 = new Schema(innerList1) ;
+        Schema innerSchema2 = new Schema(innerList2) ;
+                
+        List<FieldSchema> list1 = new ArrayList<FieldSchema>() ;
+        list1.add(new FieldSchema("1a", DataType.BYTEARRAY)) ;
+        list1.add(new FieldSchema("1b", innerSchema1)) ;
+        list1.add(new FieldSchema("1c", DataType.LONG)) ;
+        
+        List<FieldSchema> list2 = new ArrayList<FieldSchema>() ;
+        list2.add(new FieldSchema("2a", DataType.BYTEARRAY)) ;
+        list2.add(new FieldSchema("2b", innerSchema2)) ;
+        list2.add(new FieldSchema("2c", DataType.INTEGER)) ;
+        
+        Schema schema1 = new Schema(list1) ;
+        Schema schema2 = new Schema(list2) ;
+        
+        // Merge
+        Schema mergedSchema = schema1.merge(schema2, true) ;
+        
+        
+        // Generate expected schema
+        List<FieldSchema> expectedInnerList = new ArrayList<FieldSchema>() ;
+        expectedInnerList.add(new FieldSchema("22a", DataType.DOUBLE)) ;
+        expectedInnerList.add(new FieldSchema("11b", DataType.FLOAT)) ;
+        
+        Schema expectedInner = new Schema(expectedInnerList) ;
+        
+        List<FieldSchema> expectedList = new ArrayList<FieldSchema>() ;
+        expectedList.add(new FieldSchema("2a", DataType.BYTEARRAY)) ;
+        expectedList.add(new FieldSchema("2b", expectedInner)) ;
+        expectedList.add(new FieldSchema("2c", DataType.LONG)) ;
+        
+        Schema expected = new Schema(expectedList) ;
+        
+        // Compare
+        Assert.assertTrue(Schema.equals(mergedSchema, expected, false, false)) ;
+    }
+}
Index: src/org/apache/pig/impl/logicalLayer/schema/Schema.java
===================================================================
--- src/org/apache/pig/impl/logicalLayer/schema/Schema.java	(revision 649870)
+++ src/org/apache/pig/impl/logicalLayer/schema/Schema.java	(working copy)
@@ -78,6 +78,44 @@
 
             return true;
         }
+        
+        /***
+         * Compare two field schema for equality
+         * @param fschema
+         * @param fother
+         * @param relaxInner If true, we don't check inner tuple schemas
+         * @param relaxAlias If true, we don't check aliases
+         * @return
+         */
+        public static boolean equals(FieldSchema fschema, 
+                                     FieldSchema fother, 
+                                     boolean relaxInner,
+                                     boolean relaxAlias) {
+            if (fschema == null) {              
+                return false ;
+            }
+            
+            if (fother == null) {
+                return false ;
+            }
+            
+            if (fschema.type != fother.type) {
+                return false ;
+            }
+            
+            if ( (!relaxAlias) && (fschema.alias != fother.alias) ) {
+                return false ;
+            }
+            
+            if ( (!relaxInner) && (fschema.type == DataType.TUPLE) ) {
+               // compare recursively using schema
+               if (!Schema.equals(fschema.schema, fother.schema, false, relaxAlias)) {
+                   return false ;
+               }
+            }
+            
+            return true ;
+        }
     }
 
     private List<FieldSchema> mFields;
@@ -181,6 +219,196 @@
         }
         return true;
     }
+    
+    /**
+     * Recursively compare two schemas for equality
+     * @param schema
+     * @param other
+     * @param relaxInner if true, inner schemas will not be checked
+     * @return
+     */
+    public static boolean equals(Schema schema, 
+                                 Schema other, 
+                                 boolean relaxInner,
+                                 boolean relaxAlias) {
+        if (schema == null) {
+            return false ;
+        }
+        
+        if (other == null) {
+            return false ;
+        }
+        
+        if (schema.size() != other.size()) return false;
+
+        Iterator<FieldSchema> i = schema.mFields.iterator();
+        Iterator<FieldSchema> j = other.mFields.iterator();
+        
+        while (i.hasNext()) {
+            
+            FieldSchema myFs = i.next() ;
+            FieldSchema otherFs = j.next() ;
+            
+            if ( (!relaxAlias) && (myFs.alias != otherFs.alias) ) {
+                return false ;
+            }
+            
+            if (myFs.type != otherFs.type) {
+                return false ;
+            }
+            
+            if (!relaxInner) {
+                // Compare recursively using field schema
+                if (!FieldSchema.equals(myFs, otherFs, false, relaxAlias)) {
+                    return false ;
+                }            
+            }
+            
+        }
+        return true;
+    }
+    
+    
+    /***
+     * Merge this schema with the other schema
+     * @param other the other schema to be merged with
+     * @param otherTakesAliasPrecedence true if aliases from the other
+     *                                  schema take precedence
+     * @return the merged schema, null if they are not compatible
+     */
+    public Schema merge(Schema other, boolean otherTakesAliasPrecedence) {
+        return mergeSchema(this, other, otherTakesAliasPrecedence) ;
+    }
+    
+    /***
+     * Recursively merge two schemas 
+     * @param schema the initial schema
+     * @param other the other schema to be merged with
+     * @param otherTakesAliasPrecedence true if aliases from the other
+     *                                  schema take precedence
+     * @return the merged schema, null if they are not compatible
+     */
+    private Schema mergeSchema(Schema schema, Schema other, 
+                               boolean otherTakesAliasPrecedence) {
+        
+        if (other == null) {
+            return null ;
+        }
+        
+        if (schema.size() != other.size()) {
+            return null ;
+        }
+        
+        List<FieldSchema> outputList = new ArrayList<FieldSchema>() ;
+        
+        Iterator<FieldSchema> mylist = schema.mFields.iterator() ;
+        Iterator<FieldSchema> otherlist = other.mFields.iterator() ;
+        
+        while (mylist.hasNext()) {
+            
+            FieldSchema myFs = mylist.next() ;
+            FieldSchema otherFs = otherlist.next() ;
+            
+            byte mergedType = mergeType(myFs.type, otherFs.type) ;
+            // if the types cannot be merged, the schemas cannot be merged
+            if (mergedType == DataType.ERROR) {
+                return null ;
+            }
+            
+            String mergedAlias = mergeAlias(myFs.alias, 
+                                            otherFs.alias, 
+                                            otherTakesAliasPrecedence) ;
+            
+            FieldSchema mergedFs = null ;
+            if (mergedType != DataType.TUPLE) {
+                // just normal merge              
+                mergedFs = new FieldSchema(mergedAlias, mergedType) ;
+            }
+            else {
+                // merge inner tuple because both sides are tuples
+                Schema mergedSubSchema = mergeSchema(myFs.schema, 
+                                                     otherFs.schema,
+                                                     otherTakesAliasPrecedence) ;
+                // return null if they cannot be merged
+                if (mergedSubSchema == null) {
+                    return null ;
+                }
+                
+                mergedFs = new FieldSchema(mergedAlias, mergedSubSchema) ;
+                
+            }
+            outputList.add(mergedFs) ;
+        }
+        
+        return new Schema(outputList) ;
+    }
+    
+    /***
+     * Merge two aliases. If one of aliases is null, return the other.
+     * Otherwise check the precedence condition
+     * @param alias
+     * @param other
+     * @param otherTakesPrecedence
+     * @return
+     */
+    private String mergeAlias(String alias, String other
+                              ,boolean otherTakesPrecedence) {
+        if (alias == null) {
+            return other ;
+        }
+        else if (other == null) {
+            return alias ;
+        }
+        else if (otherTakesPrecedence) {
+            return other ;
+        }
+        else {
+            return alias ;
+        }
+    }
+    
+    /***
+     * Merge types if possible
+     * @param type1
+     * @param type2
+     * @return the merged type, or DataType.ERROR if not successful
+     */
+    private byte mergeType(byte type1, byte type2) {
+        // Only legal types can be merged
+        if ( (!DataType.isUsableType(type1)) ||
+             (!DataType.isUsableType(type2)) ) {
+            return DataType.ERROR ;
+        }
+        
+        // Same type is OK
+        if (type1==type2) {
+            return type1 ;
+        }
+        
+        // Both are number so we return the bigger type
+        if ( (DataType.isNumberType(type1)) &&
+             (DataType.isNumberType(type2)) ) {
+            return type1>type2 ? type1:type2 ;
+        }
+        
+        // One is bytearray and the other is (number or chararray)
+        if ( (type1 == DataType.BYTEARRAY) &&
+                ( (type2 == DataType.CHARARRAY) || (DataType.isNumberType(type2)) )
+              ) {
+            return type2 ;
+        }
+        
+        if ( (type2 == DataType.BYTEARRAY) &&
+                ( (type1 == DataType.CHARARRAY) || (DataType.isNumberType(type1)) )
+              ) {
+            return type1 ;
+        }
+        
+        // else return just ERROR
+        return DataType.ERROR ;
+    }
+    
+    
 }
 
 
Index: src/org/apache/pig/data/DataType.java
===================================================================
--- src/org/apache/pig/data/DataType.java	(revision 649870)
+++ src/org/apache/pig/data/DataType.java	(working copy)
@@ -552,4 +552,23 @@
         }
         System.out.println(t.toString());
     }
+    
+    public static boolean isNumberType(byte t) {
+        switch (t) {
+            case INTEGER:   return true ;
+            case LONG:      return true ;
+            case FLOAT:     return true ;
+            case DOUBLE:    return true ;
+            default: return false ;
+        }        
+    }
+    
+    public static boolean isUsableType(byte t) {
+        switch (t) {
+            case UNKNOWN:    return false ;
+            case NULL:       return false ;
+            case ERROR:      return false ;
+            default :return true ;
+        }
+    }
 }

Reply via email to