Author: thejas
Date: Fri Oct  1 23:51:22 2010
New Revision: 1003713

URL: http://svn.apache.org/viewvc?rev=1003713&view=rev
Log:
PIG-1656: TOBAG udfs ignores columns with null value; it does not use input type
 to determine output schema

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/builtin/TOBAG.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=1003713&r1=1003712&r2=1003713&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Oct  1 23:51:22 2010
@@ -209,6 +209,9 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1656: TOBAG udfs ignores columns with null value; it does not use input 
type 
+ to determine output schema (thejas)
+
 PIG-1658: ORDER BY does not work properly on integer/short keys that are -1 
(yanz)
 
 PIG-1638: sh output gets mixed up with the grunt prompt (nrai via daijy)

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/TOBAG.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/TOBAG.java?rev=1003713&r1=1003712&r2=1003713&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/TOBAG.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/TOBAG.java Fri Oct  1 23:51:22 
2010
@@ -23,14 +23,54 @@ import java.io.IOException;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 
 /**
  * This class takes a list of items and puts them into a bag
  * T = foreach U generate TOBAG($0, $1, $2);
  * It's like saying this:
  * T = foreach U generate {($0), ($1), ($2)}
+ * 
+ * Output schema:
+ * The output schema for this udf depends on the schema of its arguments.
+ * If all the arguments have same type and same inner 
+ * schema (for bags/tuple columns), then the udf output schema would be a bag 
+ * of tuples having a column of the type and inner-schema (if any) of the 
+ * arguments. 
+ * If the arguments are of type tuple/bag, then their innerschmea, including
+ * the alias names should match.
+ * If these conditions are not met the output schema will be a bag with null 
+ * inner schema.
+ *  
+ *  example 1 
+ *  grunt> describe a;
+ *  a: {a0: int,a1: int}
+ *  grunt> b = foreach a generate TOBAG(a0,a1);
+ *  grunt> describe b;
+ *  b: {{int}}
+ *  
+ *  example 2
+ *  grunt> describe a;
+ *  a: {a0: (x: int),a1: (x: int)}
+ *  grunt> b = foreach a generate TOBAG(a0,a1);                                
    
+ *  grunt> describe b;                                                         
    
+ *  b: {{(x: int)}}
+ *  
+ *  example 3
+ *  grunt> describe a;                                                         
    
+ *  a: {a0: (x: int),a1: (y: int)}
+ * -- note that the inner schema is different because the alises (x & y) are 
different
+ *  grunt> b = foreach a generate TOBAG(a0,a1);                                
    
+ *  grunt> describe b;                                                         
    
+ *  b: {{NULL}}
+ *  
+ *  
+ * 
  */
 public class TOBAG extends EvalFunc<DataBag> {
 
@@ -41,11 +81,9 @@ public class TOBAG extends EvalFunc<Data
 
             for (int i = 0; i < input.size(); ++i) {
                 final Object object = input.get(i);
-                if (object != null) {
-                    Tuple tp2 = TupleFactory.getInstance().newTuple(1);
-                    tp2.set(0, object);
-                    bag.add(tp2);
-                }
+                Tuple tp2 = TupleFactory.getInstance().newTuple(1);
+                tp2.set(0, object);
+                bag.add(tp2);
             }
 
             return bag;
@@ -53,4 +91,56 @@ public class TOBAG extends EvalFunc<Data
             throw new RuntimeException("Error while creating a bag", ee);
         }
     }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.pig.EvalFunc#outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema)
+     * If all the columns in the tuple are of same type, then set the bag 
schema 
+     * to bag of tuple with column of this type
+     * 
+     */
+    @Override
+    public Schema outputSchema(Schema input) {
+        byte type = DataType.ERROR;
+        Schema innerSchema = null;
+        
+        for(FieldSchema fs : input.getFields()){
+         if(type == DataType.ERROR){
+             type = fs.type;
+             innerSchema = fs.schema;
+         }else{
+             if( type != fs.type || !nullEquals(innerSchema, fs.schema)){
+                 // invalidate the type
+                 type = DataType.ERROR;
+                 break;
+             }
+         }
+        }
+        try {
+            if(type == DataType.ERROR){
+                return Schema.generateNestedSchema(DataType.BAG, 
DataType.NULL);
+            }
+            FieldSchema innerFs = new Schema.FieldSchema(null, innerSchema, 
type);
+            Schema innerSch = new Schema(innerFs);
+            Schema bagSchema = new Schema(new FieldSchema(null, innerSch, 
DataType.BAG));
+            return bagSchema;
+        } catch (FrontendException e) {
+            //This should not happen
+            throw new RuntimeException("Bug : exception thrown while " +
+                    "creating output schema for TOBAG udf", e);
+        }
+
+    }
+
+    private boolean nullEquals(Schema currentSchema, Schema newSchema) {
+        if(currentSchema == null){
+            if(newSchema != null){
+                return false;
+            }
+            return true;
+        }
+        return currentSchema.equals(newSchema);
+    }
+    
+
 }
+

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=1003713&r1=1003712&r2=1003713&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java Fri Oct  1 
23:51:22 2010
@@ -79,6 +79,8 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.ReadToEndLoader;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
@@ -1459,13 +1461,66 @@ public class TestBuiltin {
 
     @Test
     public void testMiscFunc() throws Exception {
+        
+        //TEST TOBAG
         TOBAG tb = new TOBAG();
 
+        //test output schema of udf
+        Schema expectedSch =
+            Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER);
+
+        //check schema of TOBAG when given input tuple having only integers
+        Schema inputSch = new Schema();
+        inputSch.add(new FieldSchema(null, DataType.INTEGER));
+        assertEquals("schema of tobag when input has only ints",
+                expectedSch, tb.outputSchema(inputSch));
+
+        //add another int column
+        inputSch.add(new FieldSchema(null, DataType.INTEGER));
+        assertEquals("schema of tobag when input has only ints",
+                expectedSch, tb.outputSchema(inputSch));
+
+        //add a long column
+        inputSch.add(new FieldSchema(null, DataType.LONG));
+        //expect null inner schema
+        expectedSch =
+            Schema.generateNestedSchema(DataType.BAG, DataType.NULL);
+        assertEquals("schema of tobag when input has ints and long",
+                expectedSch, tb.outputSchema(inputSch));
+
+        
+        //test schema when input is a tuple with inner schema
+        Schema tupInSchema = new Schema(new FieldSchema("x", 
DataType.CHARARRAY));
+        inputSch = new Schema();
+        inputSch.add(new FieldSchema("a", tupInSchema, DataType.TUPLE));
+        Schema inputSchCp = new Schema(inputSch);
+        inputSchCp.getField(0).alias = null;
+        expectedSch = new Schema(new FieldSchema(null, inputSchCp, 
DataType.BAG));
+        assertEquals("schema of tobag when input has cols of type tuple ",
+                expectedSch, tb.outputSchema(inputSch));
+        
+        inputSch.add(new FieldSchema("b", tupInSchema, DataType.TUPLE));
+        assertEquals("schema of tobag when input has cols of type tuple ",
+                expectedSch, tb.outputSchema(inputSch));
+        
+        //add a column of type tuple with different inner schema
+        tupInSchema = new Schema(new FieldSchema("x", DataType.BYTEARRAY));
+        inputSch.add(new FieldSchema("c", tupInSchema, DataType.TUPLE));
+        //expect null inner schema
+        expectedSch =
+            Schema.generateNestedSchema(DataType.BAG, DataType.NULL);
+        assertEquals("schema of tobag when input has cols of type tuple with 
diff inner schema",
+                expectedSch, tb.outputSchema(inputSch));
+
+        
+        
         Tuple input = TupleFactory.getInstance().newTuple();
         for (int i = 0; i < 100; ++i) {
             input.append(i);
-        }
-
+        }      
+        //test null value in input
+        input.append(null);
+        
         Set<Integer> s = new HashSet<Integer>();
         DataBag db = tb.exec(input);
         for (Tuple t : db) {
@@ -1473,10 +1528,11 @@ public class TestBuiltin {
         }
 
         // finally check the bag had everything we put in the tuple.
-        assertEquals(100, s.size());
+        assertEquals(101, s.size());
         for (int i = 0; i < 100; ++i) {
             assertTrue(s.contains(i));
         }
+        assertTrue("null in tobag result", s.contains(null));
         
         TOTUPLE tt = new TOTUPLE();
 


Reply via email to