Author: gates
Date: Mon Jul 28 14:04:53 2008
New Revision: 680494

URL: http://svn.apache.org/viewvc?rev=680494&view=rev
Log:
PIG-320 Santhosh's patch to address the type checker using the schema of the 
udf return type when it is provided.


Modified:
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=680494&r1=680493&r2=680494&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
 Mon Jul 28 14:04:53 2008
@@ -30,6 +30,7 @@
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
 import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -280,4 +281,14 @@
         mGroupByPlans.put(newOp, innerPlans);
     }
 
+    public void resetSchema() throws VisitorException{
+        for(LogicalOperator input: getInputs()) {
+            for(LogicalPlan plan: mGroupByPlans.get(input)) {
+                SchemaRemover sr = new SchemaRemover(plan);
+                sr.visit();
+            }
+        }
+        unsetSchema();
+    }
+
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=680494&r1=680493&r2=680494&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
 Mon Jul 28 14:04:53 2008
@@ -25,6 +25,7 @@
 import java.util.Iterator;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
@@ -247,4 +248,13 @@
         log.debug("Exiting getSchema");
         return mSchema;
     }
+
+    public void resetSchema() throws VisitorException{
+        for(LogicalPlan plan: mForEachPlans) {
+            SchemaRemover sr = new SchemaRemover(plan);
+            sr.visit();
+        }
+        unsetSchema();
+    }
+
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=680494&r1=680493&r2=680494&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
 Mon Jul 28 14:04:53 2008
@@ -1370,9 +1370,10 @@
             } else {
                 func.setFuncSpec(matchingSpec);
                 ef = (EvalFunc<?>) 
PigContext.instantiateFuncFromSpec(matchingSpec);
-                func.setType(DataType.findType(ef.getReturnType()));
+                setUdfSchema(func, ef, s);
             }
-            
+        } else {
+            setUdfSchema(func, ef, s);
         }
         /*
         while (iterator.hasNext()) {
@@ -1381,6 +1382,26 @@
         */
     }
 
+    private void setUdfSchema(LOUserFunc func, EvalFunc ef, Schema 
inputSchema) throws VisitorException {
+        Schema udfSchema = ef.outputSchema(inputSchema);
+        if (null != udfSchema) {
+            Schema.FieldSchema fs;
+            try {
+                fs = udfSchema.getField(0);
+            } catch (ParseException pe) {
+                throw new VisitorException(pe.getMessage());
+            }
+            func.setType(fs.type);
+            try {
+                func.setFieldSchema(fs);
+            } catch (FrontendException fe) {
+                throw new VisitorException(fe.getMessage());
+            }
+        } else {
+            func.setType(DataType.findType(ef.getReturnType()));
+        }
+    }
+
     /**
      * For Bincond, lhsOp and rhsOp must have the same output type
      * or both sides have to be number
@@ -1570,6 +1591,7 @@
 
     @Override
     protected void visit(LOUnion u) throws VisitorException {
+        u.unsetSchema();
         // Have to make a copy, because as we insert operators, this list will
         // change under us.
         List<LogicalOperator> inputs = 
@@ -1690,6 +1712,7 @@
     
     @Override
     protected void visit(LODistinct op) throws VisitorException {
+        op.unsetSchema();
         LogicalPlan currentPlan = mCurrentWalker.getPlan() ;
         List<LogicalOperator> list = currentPlan.getPredecessors(op) ;
 
@@ -1716,6 +1739,7 @@
      * @throws VisitorException
      */
     protected void visit(LOCross cs) throws VisitorException {
+        cs.unsetSchema();
         List<LogicalOperator> inputs = cs.getInputs() ;
         List<FieldSchema> fsList = new ArrayList<FieldSchema>() ;
 
@@ -1753,6 +1777,7 @@
      */
 
     protected void visit(LOSort s) throws VisitorException {
+        s.unsetSchema();
         LogicalOperator input = s.getInput() ;
         
         // Type checking internal plans.
@@ -1794,6 +1819,7 @@
 
     @Override
     protected void visit(LOFilter filter) throws VisitorException {
+        filter.unsetSchema();
         LogicalOperator input = filter.getInput() ;
         LogicalPlan comparisonPlan = filter.getComparisonPlan() ;
         
@@ -1860,6 +1886,16 @@
      * same type
      */
     protected void visit(LOCogroup cg) throws VisitorException {
+        cg.resetSchema();
+        try {
+            cg.getSchema();
+        } catch (FrontendException fe) {
+            String msg = "Cannot resolve COGroup output schema" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            VisitorException vse = new VisitorException(msg) ;
+            vse.initCause(fe) ;
+            throw vse ;
+        }
         MultiMap<LogicalOperator, LogicalPlan> groupByPlans
                                                     = cg.getGroupByPlans() ;
         List<LogicalOperator> inputs = cg.getInputs() ;
@@ -2156,6 +2192,7 @@
         List<LogicalPlan> plans = f.getForEachPlans() ;
         List<Boolean> flattens = f.getFlatten() ;
 
+        f.resetSchema();
         try {
 
             // Have to resolve all inner plans before calling getSchema
@@ -2193,7 +2230,6 @@
 
             }
 
-            f.setSchemaComputed(false);
             f.getSchema();
 
         }

Modified: 
incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=680494&r1=680493&r2=680494&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java 
(original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java 
Mon Jul 28 14:04:53 2008
@@ -44,6 +44,8 @@
 import org.apache.pig.data.*;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.PigFile;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 
 import junit.framework.TestCase;
 
@@ -173,7 +175,6 @@
         assertFalse(iter.hasNext());
     }
     
-    
     static public class TitleNGrams extends EvalFunc<DataBag> {
         
         @Override
@@ -232,6 +233,16 @@
             }
             return sb.toString();
         }
+
+        public Schema outputSchema(Schema input) {
+            try {
+            Schema stringSchema = new Schema(new Schema.FieldSchema(null, 
DataType.CHARARRAY));
+            Schema.FieldSchema fs = new Schema.FieldSchema(null, stringSchema, 
DataType.BAG);
+            return new Schema(fs);
+            } catch (Exception e) {
+                return null;
+            }
+        }
     }
 
     
@@ -240,6 +251,10 @@
         public Tuple exec(Tuple input) throws IOException {
            return input; 
         }
+
+        public Schema outputSchema(Schema input) {
+            return input;
+        }
     }
     
     


Reply via email to