Author: olga
Date: Fri Nov 14 16:56:22 2008
New Revision: 714205
URL: http://svn.apache.org/viewvc?rev=714205&view=rev
Log:
PIG-528: Schema returned in UDF is not used by Pig
Modified:
hadoop/pig/branches/types/CHANGES.txt
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
Modified: hadoop/pig/branches/types/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=714205&r1=714204&r2=714205&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Fri Nov 14 16:56:22 2008
@@ -314,3 +314,5 @@
PIG-525: make sure cast for udf parameters works (olgan)
PIG-512: Expressions in foreach lead to errors (sms via olgan)
+
+ PIG-528: use UDF return in schema computation (sms via olgan)
Modified:
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java?rev=714205&r1=714204&r2=714205&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
Fri Nov 14 16:56:22 2008
@@ -99,11 +99,18 @@
EvalFunc<?> ef = (EvalFunc<?>)
PigContext.instantiateFuncFromSpec(mFuncSpec);
Schema udfSchema = ef.outputSchema(inputSchema);
-
+ byte returnType = DataType.findType(ef.getReturnType());
+
if (null != udfSchema) {
Schema.FieldSchema fs;
try {
- fs = new Schema.FieldSchema(udfSchema.getField(0));
+ if(udfSchema.size() == 0) {
+ fs = new Schema.FieldSchema(null, null, returnType);
+ } else if(udfSchema.size() == 1) {
+ fs = new Schema.FieldSchema(udfSchema.getField(0));
+ } else {
+ fs = new Schema.FieldSchema(null, udfSchema,
DataType.TUPLE);
+ }
} catch (ParseException pe) {
throw new FrontendException(pe.getMessage());
}
@@ -111,7 +118,6 @@
mFieldSchema = fs;
mIsFieldSchemaComputed = true;
} else {
- byte returnType = DataType.findType(ef.getReturnType());
setType(returnType);
mFieldSchema = new Schema.FieldSchema(null, null, returnType);
mIsFieldSchemaComputed = true;
Modified:
hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=714205&r1=714204&r2=714205&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
(original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
Fri Nov 14 16:56:22 2008
@@ -50,6 +50,7 @@
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.Pair;
+import org.apache.pig.test.util.Identity;
import junit.framework.TestCase;
@@ -250,18 +251,6 @@
}
}
-
- static public class Identity extends EvalFunc<Tuple> {
- @Override
- public Tuple exec(Tuple input) throws IOException {
- return input;
- }
-
- public Schema outputSchema(Schema input) {
- return input;
- }
- }
-
static public class MapUDF extends EvalFunc<Map<Object, Object>> {
@Override
public Map<Object, Object> exec(Tuple input) throws IOException {
@@ -968,4 +957,42 @@
assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
}
+
+ @Test
+ public void testIdentity() throws Exception{
+ int LOOP_COUNT = 2;
+ File tmpFile = File.createTempFile("test", "txt");
+ PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ Random r = new Random();
+ for(int i = 0; i < LOOP_COUNT; i++) {
+ for(int j=0;j<LOOP_COUNT;j+=2){
+ ps.println(i+"\t"+j);
+ ps.println(i+"\t"+j);
+ }
+ }
+ ps.close();
+
+ String tmpOutputFile = FileLocalizer.getTemporaryPath(null,
+ pigServer.getPigContext()).toString();
+ pigServer.registerQuery("A = LOAD '" +
Util.generateURI(tmpFile.toString()) + "';");
+ pigServer.registerQuery("B = distinct A ;"); //the argument does not
matter
+ pigServer.registerQuery("C = foreach B generate FLATTEN(" +
Identity.class.getName() + "($0, $1));"); //the argument does not matter
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+ if(!iter.hasNext()) fail("No output found");
+ int numRows = 0;
+ for(int i = 0; i < LOOP_COUNT; i++) {
+ for(int j = 0; j < LOOP_COUNT; j+=2){
+ Tuple t = null;
+ if(iter.hasNext()) t = iter.next();
+ assertEquals(2, t.size());
+ assertEquals(new Double(i), new Double(t.get(0).toString()),
0.01);
+ assertEquals(new Double(j), new Double(t.get(1).toString()),
0.01);
+ ++numRows;
+ }
+ }
+
+ assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
+ }
+
}
Modified:
hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=714205&r1=714204&r2=714205&view=diff
==============================================================================
---
hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
(original)
+++
hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
Fri Nov 14 16:56:22 2008
@@ -54,6 +54,7 @@
import org.apache.pig.impl.logicalLayer.parser.QueryParser ;
import org.apache.pig.impl.logicalLayer.parser.ParseException ;
import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.test.util.Identity;
public class TestLogicalPlanBuilder extends junit.framework.TestCase {
@@ -1684,7 +1685,6 @@
foreach = (LOForEach) lp.getLeaves().get(0);
for(LogicalPlan foreachPlan: foreach.getForEachPlans()) {
- printPlan(foreachPlan);
assertTrue(checkPlanForProjectStar(foreachPlan) == true);
}
@@ -1716,6 +1716,84 @@
}
+ @Test
+ public void testQuery114() throws FrontendException, ParseException {
+ LogicalPlan lp;
+ LOForEach foreach;
+ LOSort sort;
+
+ buildPlan("a = load 'one' as (name, age, gpa);");
+
+ lp = buildPlan("b = foreach a generate " + Identity.class.getName() +
"(name, age);");
+ foreach = (LOForEach) lp.getLeaves().get(0);
+
+ Schema s = new Schema();
+ s.add(new Schema.FieldSchema("name", DataType.BYTEARRAY));
+ s.add(new Schema.FieldSchema("age", DataType.BYTEARRAY));
+ Schema.FieldSchema tupleFs = new Schema.FieldSchema(null, s,
DataType.TUPLE);
+ Schema expectedSchema = new Schema(tupleFs);
+ assertTrue(Schema.equals(foreach.getSchema(), expectedSchema, false,
true));
+
+ }
+
+ @Test
+ public void testQuery115() throws FrontendException, ParseException {
+ LogicalPlan lp;
+ LOForEach foreach;
+ LOSort sort;
+
+ buildPlan("a = load 'one' as (name, age, gpa);");
+
+ lp = buildPlan("b = foreach a generate " + Identity.class.getName() +
"(*);");
+ foreach = (LOForEach) lp.getLeaves().get(0);
+
+ Schema s = new Schema();
+ s.add(new Schema.FieldSchema("name", DataType.BYTEARRAY));
+ s.add(new Schema.FieldSchema("age", DataType.BYTEARRAY));
+ s.add(new Schema.FieldSchema("gpa", DataType.BYTEARRAY));
+ Schema.FieldSchema tupleFs = new Schema.FieldSchema(null, s,
DataType.TUPLE);
+ Schema expectedSchema = new Schema(tupleFs);
+ assertTrue(Schema.equals(foreach.getSchema(), expectedSchema, false,
true));
+
+ }
+
+ @Test
+ public void testQuery116() throws FrontendException, ParseException {
+ LogicalPlan lp;
+ LOForEach foreach;
+ LOSort sort;
+
+ buildPlan("a = load 'one';");
+
+ lp = buildPlan("b = foreach a generate " + Identity.class.getName() +
"($0, $1);");
+ foreach = (LOForEach) lp.getLeaves().get(0);
+
+ Schema s = new Schema();
+ s.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+ s.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+ Schema.FieldSchema tupleFs = new Schema.FieldSchema(null, s,
DataType.TUPLE);
+ Schema expectedSchema = new Schema(tupleFs);
+ assertTrue(Schema.equals(foreach.getSchema(), expectedSchema, false,
true));
+
+ }
+
+ @Test
+ public void testQuery117() throws FrontendException, ParseException {
+ LogicalPlan lp;
+ LOForEach foreach;
+ LOSort sort;
+
+ buildPlan("a = load 'one';");
+
+ lp = buildPlan("b = foreach a generate " + Identity.class.getName() +
"(*);");
+ foreach = (LOForEach) lp.getLeaves().get(0);
+
+ Schema.FieldSchema tupleFs = new Schema.FieldSchema(null, null,
DataType.TUPLE);
+ Schema expectedSchema = new Schema(tupleFs);
+ assertTrue(Schema.equals(foreach.getSchema(), expectedSchema, false,
true));
+
+ }
+
private Schema getSchemaFromString(String schemaString) throws
ParseException {
return getSchemaFromString(schemaString, DataType.BYTEARRAY);
}