Author: olga
Date: Fri Nov 14 16:56:22 2008
New Revision: 714205

URL: http://svn.apache.org/viewvc?rev=714205&view=rev
Log:
PIG-528: Schema returned in UDF is not used by Pig

Modified:
    hadoop/pig/branches/types/CHANGES.txt
    
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
    
hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java

Modified: hadoop/pig/branches/types/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=714205&r1=714204&r2=714205&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Fri Nov 14 16:56:22 2008
@@ -314,3 +314,5 @@
     PIG-525: make sure cast for udf parameters works (olgan)
 
     PIG-512: Expressions in foreach lead to errors (sms via olgan)
+
+    PIG-528: use UDF return in schema computation (sms via olgan)

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java?rev=714205&r1=714204&r2=714205&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java 
(original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java 
Fri Nov 14 16:56:22 2008
@@ -99,11 +99,18 @@
     
             EvalFunc<?> ef = (EvalFunc<?>) 
PigContext.instantiateFuncFromSpec(mFuncSpec);
             Schema udfSchema = ef.outputSchema(inputSchema);
-    
+            byte returnType = DataType.findType(ef.getReturnType());
+
             if (null != udfSchema) {
                 Schema.FieldSchema fs;
                 try {
-                    fs = new Schema.FieldSchema(udfSchema.getField(0));
+                    if(udfSchema.size() == 0) {
+                        fs = new Schema.FieldSchema(null, null, returnType);
+                    } else if(udfSchema.size() == 1) {
+                        fs = new Schema.FieldSchema(udfSchema.getField(0));
+                    } else {
+                        fs = new Schema.FieldSchema(null, udfSchema, 
DataType.TUPLE);
+                    }
                 } catch (ParseException pe) {
                     throw new FrontendException(pe.getMessage());
                 }
@@ -111,7 +118,6 @@
                 mFieldSchema = fs;
                 mIsFieldSchemaComputed = true;
             } else {
-                byte returnType = DataType.findType(ef.getReturnType());
                 setType(returnType);
                 mFieldSchema = new Schema.FieldSchema(null, null, returnType);
                 mIsFieldSchemaComputed = true;

Modified: 
hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=714205&r1=714204&r2=714205&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java 
(original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java 
Fri Nov 14 16:56:22 2008
@@ -50,6 +50,7 @@
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.Pair;
+import org.apache.pig.test.util.Identity;
 
 import junit.framework.TestCase;
 
@@ -250,18 +251,6 @@
         }
     }
 
-    
-    static public class Identity extends EvalFunc<Tuple> {
-        @Override
-        public Tuple exec(Tuple input) throws IOException {
-           return input; 
-        }
-
-        public Schema outputSchema(Schema input) {
-            return input;
-        }
-    }
-    
     static public class MapUDF extends EvalFunc<Map<Object, Object>> {
         @Override
         public Map<Object, Object> exec(Tuple input) throws IOException {
@@ -968,4 +957,42 @@
 
         assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
     }
+
+    @Test
+    public void testIdentity() throws Exception{
+        int LOOP_COUNT = 2;
+        File tmpFile = File.createTempFile("test", "txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        Random r = new Random();
+        for(int i = 0; i < LOOP_COUNT; i++) {
+            for(int j=0;j<LOOP_COUNT;j+=2){
+                ps.println(i+"\t"+j);
+                ps.println(i+"\t"+j);
+            }
+        }
+        ps.close();
+
+        String tmpOutputFile = FileLocalizer.getTemporaryPath(null, 
+        pigServer.getPigContext()).toString();
+        pigServer.registerQuery("A = LOAD '" + 
Util.generateURI(tmpFile.toString()) + "';");
+        pigServer.registerQuery("B = distinct A ;"); //the argument does not 
matter
+        pigServer.registerQuery("C = foreach B generate FLATTEN(" + 
Identity.class.getName() + "($0, $1));"); //the argument does not matter
+
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        if(!iter.hasNext()) fail("No output found");
+        int numRows = 0;
+        for(int i = 0; i < LOOP_COUNT; i++) {
+            for(int j = 0; j < LOOP_COUNT; j+=2){
+                Tuple t = null;
+                if(iter.hasNext()) t = iter.next();
+                assertEquals(2, t.size());
+                assertEquals(new Double(i), new Double(t.get(0).toString()), 
0.01);
+                assertEquals(new Double(j), new Double(t.get(1).toString()), 
0.01);
+                ++numRows;
+            }
+        }
+
+        assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
+    }
+
 }

Modified: 
hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=714205&r1=714204&r2=714205&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java 
(original)
+++ 
hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java 
Fri Nov 14 16:56:22 2008
@@ -54,6 +54,7 @@
 import org.apache.pig.impl.logicalLayer.parser.QueryParser ;
 import org.apache.pig.impl.logicalLayer.parser.ParseException ;
 import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.test.util.Identity;
 
 
 public class TestLogicalPlanBuilder extends junit.framework.TestCase {
@@ -1684,7 +1685,6 @@
         foreach = (LOForEach) lp.getLeaves().get(0);
 
         for(LogicalPlan foreachPlan: foreach.getForEachPlans()) {
-            printPlan(foreachPlan);
             assertTrue(checkPlanForProjectStar(foreachPlan) == true);
         }
 
@@ -1716,6 +1716,84 @@
 
     }
 
+    @Test
+    public void testQuery114()  throws FrontendException, ParseException {
+        LogicalPlan lp;
+        LOForEach foreach;
+        LOSort sort;
+
+        buildPlan("a = load 'one' as (name, age, gpa);");
+
+        lp = buildPlan("b = foreach a generate " + Identity.class.getName() + 
"(name, age);");
+        foreach = (LOForEach) lp.getLeaves().get(0);
+
+        Schema s = new Schema();
+        s.add(new Schema.FieldSchema("name", DataType.BYTEARRAY));
+        s.add(new Schema.FieldSchema("age", DataType.BYTEARRAY));
+        Schema.FieldSchema tupleFs = new Schema.FieldSchema(null, s, 
DataType.TUPLE);
+        Schema expectedSchema = new Schema(tupleFs);
+        assertTrue(Schema.equals(foreach.getSchema(), expectedSchema, false, 
true));
+
+    }
+
+    @Test
+    public void testQuery115()  throws FrontendException, ParseException {
+        LogicalPlan lp;
+        LOForEach foreach;
+        LOSort sort;
+
+        buildPlan("a = load 'one' as (name, age, gpa);");
+
+        lp = buildPlan("b = foreach a generate " + Identity.class.getName() + 
"(*);");
+        foreach = (LOForEach) lp.getLeaves().get(0);
+
+        Schema s = new Schema();
+        s.add(new Schema.FieldSchema("name", DataType.BYTEARRAY));
+        s.add(new Schema.FieldSchema("age", DataType.BYTEARRAY));
+        s.add(new Schema.FieldSchema("gpa", DataType.BYTEARRAY));
+        Schema.FieldSchema tupleFs = new Schema.FieldSchema(null, s, 
DataType.TUPLE);
+        Schema expectedSchema = new Schema(tupleFs);
+        assertTrue(Schema.equals(foreach.getSchema(), expectedSchema, false, 
true));
+
+    }
+
+    @Test
+    public void testQuery116()  throws FrontendException, ParseException {
+        LogicalPlan lp;
+        LOForEach foreach;
+        LOSort sort;
+
+        buildPlan("a = load 'one';");
+
+        lp = buildPlan("b = foreach a generate " + Identity.class.getName() + 
"($0, $1);");
+        foreach = (LOForEach) lp.getLeaves().get(0);
+
+        Schema s = new Schema();
+        s.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+        s.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+        Schema.FieldSchema tupleFs = new Schema.FieldSchema(null, s, 
DataType.TUPLE);
+        Schema expectedSchema = new Schema(tupleFs);
+        assertTrue(Schema.equals(foreach.getSchema(), expectedSchema, false, 
true));
+
+    }
+
+    @Test
+    public void testQuery117()  throws FrontendException, ParseException {
+        LogicalPlan lp;
+        LOForEach foreach;
+        LOSort sort;
+
+        buildPlan("a = load 'one';");
+
+        lp = buildPlan("b = foreach a generate " + Identity.class.getName() + 
"(*);");
+        foreach = (LOForEach) lp.getLeaves().get(0);
+
+        Schema.FieldSchema tupleFs = new Schema.FieldSchema(null, null, 
DataType.TUPLE);
+        Schema expectedSchema = new Schema(tupleFs);
+        assertTrue(Schema.equals(foreach.getSchema(), expectedSchema, false, 
true));
+
+    }
+
     private Schema getSchemaFromString(String schemaString) throws 
ParseException {
         return getSchemaFromString(schemaString, DataType.BYTEARRAY);
     }


Reply via email to