Author: thejas Date: Fri Oct 1 23:51:22 2010 New Revision: 1003713 URL: http://svn.apache.org/viewvc?rev=1003713&view=rev Log: PIG-1656: TOBAG udfs ignores columns with null value; it does not use input type to determine output schema
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/builtin/TOBAG.java hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=1003713&r1=1003712&r2=1003713&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Fri Oct 1 23:51:22 2010 @@ -209,6 +209,9 @@ PIG-1309: Map-side Cogroup (ashutoshc) BUG FIXES +PIG-1656: TOBAG udfs ignores columns with null value; it does not use input type + to determine output schema (thejas) + PIG-1658: ORDER BY does not work properly on integer/short keys that are -1 (yanz) PIG-1638: sh output gets mixed up with the grunt prompt (nrai via daijy) Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/TOBAG.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/TOBAG.java?rev=1003713&r1=1003712&r2=1003713&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/builtin/TOBAG.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/builtin/TOBAG.java Fri Oct 1 23:51:22 2010 @@ -23,14 +23,54 @@ import java.io.IOException; import org.apache.pig.EvalFunc; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; /** * This class takes a list of items and puts them into a bag * T = foreach U generate TOBAG($0, $1, $2); * It's like saying this: * T = foreach U generate {($0), ($1), ($2)} + * + * Output schema: + * The output schema for this udf depends on the schema of its arguments. + * If all the arguments have same type and same inner + * schema (for bags/tuple columns), then the udf output schema would be a bag + * of tuples having a column of the type and inner-schema (if any) of the + * arguments. + * If the arguments are of type tuple/bag, then their innerschmea, including + * the alias names should match. + * If these conditions are not met the output schema will be a bag with null + * inner schema. + * + * example 1 + * grunt> describe a; + * a: {a0: int,a1: int} + * grunt> b = foreach a generate TOBAG(a0,a1); + * grunt> describe b; + * b: {{int}} + * + * example 2 + * grunt> describe a; + * a: {a0: (x: int),a1: (x: int)} + * grunt> b = foreach a generate TOBAG(a0,a1); + * grunt> describe b; + * b: {{(x: int)}} + * + * example 3 + * grunt> describe a; + * a: {a0: (x: int),a1: (y: int)} + * -- note that the inner schema is different because the alises (x & y) are different + * grunt> b = foreach a generate TOBAG(a0,a1); + * grunt> describe b; + * b: {{NULL}} + * + * + * */ public class TOBAG extends EvalFunc<DataBag> { @@ -41,11 +81,9 @@ public class TOBAG extends EvalFunc<Data for (int i = 0; i < input.size(); ++i) { final Object object = input.get(i); - if (object != null) { - Tuple tp2 = TupleFactory.getInstance().newTuple(1); - tp2.set(0, object); - bag.add(tp2); - } + Tuple tp2 = TupleFactory.getInstance().newTuple(1); + tp2.set(0, object); + bag.add(tp2); } return bag; @@ -53,4 +91,56 @@ public class TOBAG extends EvalFunc<Data throw new RuntimeException("Error while creating a bag", ee); } } + + /* (non-Javadoc) + * @see org.apache.pig.EvalFunc#outputSchema(org.apache.pig.impl.logicalLayer.schema.Schema) + * If all the columns in the tuple are of same type, then set the bag schema + * to bag of tuple with column of this type + * + */ + @Override + public Schema outputSchema(Schema input) { + byte type = DataType.ERROR; + Schema innerSchema = null; + + for(FieldSchema fs : input.getFields()){ + if(type == DataType.ERROR){ + type = fs.type; + innerSchema = fs.schema; + }else{ + if( type != fs.type || !nullEquals(innerSchema, fs.schema)){ + // invalidate the type + type = DataType.ERROR; + break; + } + } + } + try { + if(type == DataType.ERROR){ + return Schema.generateNestedSchema(DataType.BAG, DataType.NULL); + } + FieldSchema innerFs = new Schema.FieldSchema(null, innerSchema, type); + Schema innerSch = new Schema(innerFs); + Schema bagSchema = new Schema(new FieldSchema(null, innerSch, DataType.BAG)); + return bagSchema; + } catch (FrontendException e) { + //This should not happen + throw new RuntimeException("Bug : exception thrown while " + + "creating output schema for TOBAG udf", e); + } + + } + + private boolean nullEquals(Schema currentSchema, Schema newSchema) { + if(currentSchema == null){ + if(newSchema != null){ + return false; + } + return true; + } + return currentSchema.equals(newSchema); + } + + } + Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=1003713&r1=1003712&r2=1003713&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java Fri Oct 1 23:51:22 2010 @@ -79,6 +79,8 @@ import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.ReadToEndLoader; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; @@ -1459,13 +1461,66 @@ public class TestBuiltin { @Test public void testMiscFunc() throws Exception { + + //TEST TOBAG TOBAG tb = new TOBAG(); + //test output schema of udf + Schema expectedSch = + Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER); + + //check schema of TOBAG when given input tuple having only integers + Schema inputSch = new Schema(); + inputSch.add(new FieldSchema(null, DataType.INTEGER)); + assertEquals("schema of tobag when input has only ints", + expectedSch, tb.outputSchema(inputSch)); + + //add another int column + inputSch.add(new FieldSchema(null, DataType.INTEGER)); + assertEquals("schema of tobag when input has only ints", + expectedSch, tb.outputSchema(inputSch)); + + //add a long column + inputSch.add(new FieldSchema(null, DataType.LONG)); + //expect null inner schema + expectedSch = + Schema.generateNestedSchema(DataType.BAG, DataType.NULL); + assertEquals("schema of tobag when input has ints and long", + expectedSch, tb.outputSchema(inputSch)); + + + //test schema when input is a tuple with inner schema + Schema tupInSchema = new Schema(new FieldSchema("x", DataType.CHARARRAY)); + inputSch = new Schema(); + inputSch.add(new FieldSchema("a", tupInSchema, DataType.TUPLE)); + Schema inputSchCp = new Schema(inputSch); + inputSchCp.getField(0).alias = null; + expectedSch = new Schema(new FieldSchema(null, inputSchCp, DataType.BAG)); + assertEquals("schema of tobag when input has cols of type tuple ", + expectedSch, tb.outputSchema(inputSch)); + + inputSch.add(new FieldSchema("b", tupInSchema, DataType.TUPLE)); + assertEquals("schema of tobag when input has cols of type tuple ", + expectedSch, tb.outputSchema(inputSch)); + + //add a column of type tuple with different inner schema + tupInSchema = new Schema(new FieldSchema("x", DataType.BYTEARRAY)); + inputSch.add(new FieldSchema("c", tupInSchema, DataType.TUPLE)); + //expect null inner schema + expectedSch = + Schema.generateNestedSchema(DataType.BAG, DataType.NULL); + assertEquals("schema of tobag when input has cols of type tuple with diff inner schema", + expectedSch, tb.outputSchema(inputSch)); + + + Tuple input = TupleFactory.getInstance().newTuple(); for (int i = 0; i < 100; ++i) { input.append(i); - } - + } + //test null value in input + input.append(null); + Set<Integer> s = new HashSet<Integer>(); DataBag db = tb.exec(input); for (Tuple t : db) { @@ -1473,10 +1528,11 @@ public class TestBuiltin { } // finally check the bag had everything we put in the tuple. - assertEquals(100, s.size()); + assertEquals(101, s.size()); for (int i = 0; i < 100; ++i) { assertTrue(s.contains(i)); } + assertTrue("null in tobag result", s.contains(null)); TOTUPLE tt = new TOTUPLE();