Author: gates
Date: Wed Jan 20 18:34:41 2010
New Revision: 901317

URL: http://svn.apache.org/viewvc?rev=901317&view=rev
Log:
PIG-1191: POCast throws exception for certain sequences of LOAD, FILTER, 
FORACH.  Checking in for Pradeep since he is out.


Modified:
    hadoop/pig/branches/branch-0.6/CHANGES.txt
    
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/LOCast.java
    
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java
    
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestTypeCheckingValidator.java

Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/CHANGES.txt?rev=901317&r1=901316&r2=901317&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/CHANGES.txt Wed Jan 20 18:34:41 2010
@@ -145,6 +145,9 @@
 
 BUG FIXES
 
+PIG-1191: POCast throws exception for certain sequences of LOAD, FILTER,
+FORACH (pradeepkth via gates)
+
 PIG-1143: Poisson Sample Loader should compute the number of samples required
 only once (sriranjan via olgan)
 

Modified: 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/LOCast.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/LOCast.java?rev=901317&r1=901316&r2=901317&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/LOCast.java 
(original)
+++ 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/LOCast.java 
Wed Jan 20 18:34:41 2010
@@ -26,6 +26,7 @@
 import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.pig.data.DataType;
 
 public class LOCast extends ExpressionOperator {
@@ -34,6 +35,11 @@
 
     private static final long serialVersionUID = 2L;
     private FuncSpec mLoadFuncSpec = null;
+    // store field schema representing the schema 
+    // in user specified casts -this is so that if
+    // field schema is unset and then getFieldSchema is called we still 
+    // rebuild the fieldschema correctly as specified by the user in the script
+    private FieldSchema userSpecifiedFieldSchema;
 
     /**
      * 
@@ -65,11 +71,22 @@
     public Schema getSchema() {
         return mSchema;
     }
+    
+    
+    @Override
+    public void setFieldSchema(FieldSchema fs) throws FrontendException {
+        super.setFieldSchema(fs);
+        userSpecifiedFieldSchema = new Schema.FieldSchema(fs);
+    }
 
     @Override
     public Schema.FieldSchema getFieldSchema() throws FrontendException {
         if(!mIsFieldSchemaComputed) {
-            mFieldSchema = new Schema.FieldSchema(null, mType);
+            if(userSpecifiedFieldSchema != null) {
+                mFieldSchema = userSpecifiedFieldSchema;
+            } else {
+                mFieldSchema = new Schema.FieldSchema(null, mType);
+            }
             Schema.FieldSchema parFs  = getExpression().getFieldSchema();
             String canonicalName = (parFs != null ? parFs.canonicalName : 
null);
             mFieldSchema.setParent(canonicalName, getExpression());

Modified: 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java?rev=901317&r1=901316&r2=901317&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java
 (original)
+++ 
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java
 Wed Jan 20 18:34:41 2010
@@ -37,6 +37,7 @@
      *            the logical binary expression operator that has to be visited
      * @throws VisitorException
      */
+    @Override
     protected void visit(BinaryExpressionOperator binOp)
             throws VisitorException {
         binOp.unsetFieldSchema();
@@ -49,6 +50,7 @@
      *            the logical unary operator that has to be visited
      * @throws VisitorException
      */
+    @Override
     protected void visit(UnaryExpressionOperator uniOp) throws 
VisitorException {
         uniOp.unsetFieldSchema();
         super.visit(uniOp);
@@ -60,6 +62,7 @@
      *            the logical cogroup operator that has to be visited
      * @throws VisitorException
      */
+    @Override
     protected void visit(LOCogroup cg) throws VisitorException {
         cg.unsetSchema();
         super.visit(cg);
@@ -71,6 +74,7 @@
      *            the logical sort operator that has to be visited
      * @throws VisitorException
      */
+    @Override
     protected void visit(LOSort s) throws VisitorException {
         s.unsetSchema();
         super.visit(s);
@@ -82,6 +86,7 @@
      *            the logical limit operator that has to be visited
      * @throws VisitorException
      */
+    @Override
     protected void visit(LOLimit limit) throws VisitorException {
         limit.unsetSchema();
         super.visit(limit);
@@ -94,6 +99,7 @@
      *            the logical filter operator that has to be visited
      * @throws VisitorException
      */
+    @Override
     protected void visit(LOFilter filter) throws VisitorException {
         filter.unsetSchema();
         super.visit(filter);
@@ -105,6 +111,7 @@
      *            the logical split operator that has to be visited
      * @throws VisitorException
      */
+    @Override
     protected void visit(LOSplit split) throws VisitorException {
         split.unsetSchema();
         super.visit(split);
@@ -116,6 +123,7 @@
      *            the logical foreach operator that has to be visited
      * @throws VisitorException
      */
+    @Override
     protected void visit(LOForEach forEach) throws VisitorException {
         forEach.unsetSchema();
         super.visit(forEach);
@@ -128,6 +136,7 @@
      *            the user defined function
      * @throws VisitorException
      */
+    @Override
     protected void visit(LOUserFunc func) throws VisitorException {
         func.unsetFieldSchema();
         super.visit(func);
@@ -138,6 +147,7 @@
      *            the logical binCond operator that has to be visited
      * @throws VisitorException
      */
+    @Override
     protected void visit(LOBinCond binCond) throws VisitorException {
         binCond.unsetFieldSchema();
         super.visit(binCond);
@@ -149,64 +159,185 @@
      *            the logical cast operator that has to be visited
      * @throws VisitorException
      */
+    @Override
     protected void visit(LOCast cast) throws VisitorException {
+        cast.unsetFieldSchema();
         super.visit(cast);
     }
     
+    
     /**
      * 
      * @param regexp
      *            the logical regexp operator that has to be visited
      * @throws ParseException
      */
+    @Override
     protected void visit(LORegexp regexp) throws VisitorException {
         regexp.unsetFieldSchema();
         super.visit(regexp);
     }
 
+    @Override
     protected void visit(LOLoad load) throws VisitorException{
         // Don't remove load's schema, it's not like it will change.  And we
         // don't have a way to recover it.
         super.visit(load);
     }
     
+    @Override
     protected void visit(LOStore store) throws VisitorException{
         store.unsetSchema();
         super.visit(store);
     }
     
+    @Override
     protected void visit(LOConst c) throws VisitorException{
         c.unsetSchema();
         super.visit(c);
     }
 
+    @Override
     protected void visit(LOUnion u) throws VisitorException {
         u.unsetSchema();
         super.visit(u);
     }
 
+    @Override
     protected void visit(LOSplitOutput sop) throws VisitorException {
         sop.unsetSchema();
         super.visit(sop);
     }
 
+    @Override
     protected void visit(LODistinct dt) throws VisitorException {
         dt.unsetSchema();
         super.visit(dt);
     }
 
+    @Override
     protected void visit(LOCross cs) throws VisitorException {
         cs.unsetSchema();
         super.visit(cs);
     }
 
+    @Override
     protected void visit(LOProject project) throws VisitorException {
         project.unsetFieldSchema();
         super.visit(project);
     }
 
+    @Override
     protected void visit(LOJoin join) throws VisitorException {
         join.unsetSchema();
         super.visit(join);
     }
+
+    @Override
+    protected void visit(ExpressionOperator op) throws VisitorException {
+        op.unsetFieldSchema();
+        super.visit(op);
+    }
+
+    @Override
+    public void visit(LOAdd op) throws VisitorException {
+        op.unsetFieldSchema();
+        super.visit(op);
+    }
+
+    @Override
+    public void visit(LOAnd binOp) throws VisitorException {
+        binOp.unsetFieldSchema();
+        super.visit(binOp);
+    }
+
+    @Override
+    public void visit(LODivide op) throws VisitorException {
+        op.unsetFieldSchema();
+        super.visit(op);
+    }
+
+    @Override
+    public void visit(LOEqual op) throws VisitorException {
+        op.unsetFieldSchema();
+        super.visit(op);
+    }
+
+    @Override
+    public void visit(LOGreaterThan op) throws VisitorException {
+        op.unsetFieldSchema();
+        super.visit(op);
+    }
+
+    @Override
+    public void visit(LOGreaterThanEqual op) throws VisitorException {
+        op.unsetFieldSchema();
+        super.visit(op);
+    }
+
+    @Override
+    public void visit(LOIsNull uniOp) throws VisitorException {
+        uniOp.unsetFieldSchema();
+        super.visit(uniOp);
+    }
+
+    @Override
+    public void visit(LOLesserThan op) throws VisitorException {
+        op.unsetFieldSchema();
+        super.visit(op);
+    }
+
+    @Override
+    public void visit(LOLesserThanEqual op) throws VisitorException {
+        op.unsetFieldSchema();
+        super.visit(op);
+    }
+
+    @Override
+    public void visit(LOMapLookup op) throws VisitorException {
+        op.unsetFieldSchema();
+        super.visit(op);
+    }
+
+    @Override
+    public void visit(LOMod op) throws VisitorException {
+        op.unsetFieldSchema();
+        super.visit(op);
+    }
+
+    @Override
+    public void visit(LOMultiply op) throws VisitorException {
+        op.unsetFieldSchema();
+        super.visit(op);
+    }
+
+    @Override
+    public void visit(LONegative op) throws VisitorException {
+        op.unsetFieldSchema();
+        super.visit(op);
+    }
+
+    @Override
+    public void visit(LONot uniOp) throws VisitorException {
+        uniOp.unsetFieldSchema();
+        super.visit(uniOp);
+    }
+
+    @Override
+    public void visit(LONotEqual op) throws VisitorException {
+        op.unsetFieldSchema();
+        super.visit(op);
+    }
+
+    @Override
+    public void visit(LOOr binOp) throws VisitorException {
+        binOp.unsetFieldSchema();
+        super.visit(binOp);
+    }
+
+    @Override
+    public void visit(LOSubtract op) throws VisitorException {
+        op.unsetFieldSchema();
+        super.visit(op);
+    }
 }

Modified: 
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestTypeCheckingValidator.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=901317&r1=901316&r2=901317&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestTypeCheckingValidator.java
 (original)
+++ 
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestTypeCheckingValidator.java
 Wed Jan 20 18:34:41 2010
@@ -5533,6 +5533,79 @@
       LogicalPlan plan = planTester.buildPlan("c = foreach b generate 
(chararray)viewinfo#'pos' as position;") ;
 
         // validate
+        runTypeCheckingValidator(plan);
+        
+        checkLoaderInCasts(plan, "org.apache.pig.builtin.PigStorage");
+    }
+    
+    /**
+     * test various scenarios with two level map lookup
+     */
+    @Test
+    public void testTwolevelMapLookupLineage() throws Exception {
+        List<String[]> queries = new ArrayList<String[]>();
+        // CASE 1: LOAD -> FILTER -> FOREACH -> LIMIT -> STORE
+        queries.add(new String[] {"sds = LOAD '/my/data/location' " +
+                       "AS (simpleFields:map[], mapFields:map[], 
listMapFields:map[]);",
+                       "queries = FILTER sds BY 
mapFields#'page_params'#'query' " +
+                       "is NOT NULL;",
+                       "queries_rand = FOREACH queries GENERATE " +
+                       "(CHARARRAY) (mapFields#'page_params'#'query') AS 
query_string;",
+                       "queries_limit = LIMIT queries_rand 100;",
+                       "STORE queries_limit INTO 'out';"});     
+        // CASE 2: LOAD -> FOREACH -> FILTER -> LIMIT -> STORE
+        queries.add(new String[]{"sds = LOAD '/my/data/location'  " +
+                       "AS (simpleFields:map[], mapFields:map[], 
listMapFields:map[]);",
+                       "queries_rand = FOREACH sds GENERATE " +
+                       "(CHARARRAY) (mapFields#'page_params'#'query') AS 
query_string;",
+                       "queries = FILTER queries_rand BY query_string IS NOT 
null;",
+                       "queries_limit = LIMIT queries 100;",
+                       "STORE queries_limit INTO 'out';"});
+        // CASE 3: LOAD -> FOREACH -> FOREACH -> FILTER -> LIMIT -> STORE
+        queries.add(new String[]{"sds = LOAD '/my/data/location'  " +
+                       "AS (simpleFields:map[], mapFields:map[], 
listMapFields:map[]);",
+                       "params = FOREACH sds GENERATE " +
+                       "(map[]) (mapFields#'page_params') AS params;",
+                       "queries = FOREACH params " +
+                       "GENERATE (CHARARRAY) (params#'query') AS 
query_string;",
+                       "queries_filtered = FILTER queries BY query_string IS 
NOT null;",
+                       "queries_limit = LIMIT queries_filtered 100;",
+                       "STORE queries_limit INTO 'out';"});
+        // CASE 4: LOAD -> FOREACH -> FOREACH -> LIMIT -> STORE
+        queries.add(new String[]{"sds = LOAD '/my/data/location'  " +
+                       "AS (simpleFields:map[], mapFields:map[], 
listMapFields:map[]);",
+                       "params = FOREACH sds GENERATE" +
+                       " (map[]) (mapFields#'page_params') AS params;",
+                       "queries = FOREACH params GENERATE " +
+                       "(CHARARRAY) (params#'query') AS query_string;",
+                       "queries_limit = LIMIT queries 100;",
+                       "STORE queries_limit INTO 'out';"});
+        // CASE 5: LOAD -> FOREACH -> FOREACH -> FOREACH -> LIMIT -> STORE
+        queries.add(new String[]{"sds = LOAD '/my/data/location'  " +
+                "AS (simpleFields:map[], mapFields:map[], 
listMapFields:map[]);",
+                "params = FOREACH sds GENERATE " +
+                "(map[]) (mapFields#'page_params') AS params;",
+                "queries = FOREACH params GENERATE " +
+                "(CHARARRAY) (params#'query') AS query_string;",
+                "rand_queries = FOREACH queries GENERATE query_string as 
query;",
+                "queries_limit = LIMIT rand_queries 100;",
+                "STORE rand_queries INTO 'out';"});
+        
+        for (String[] query: queries) {
+            LogicalPlan lp = null;
+            for (String queryLine : query) {
+                lp = planTester.buildPlan(queryLine);    
+            }
+            
+            // validate
+            runTypeCheckingValidator(lp);
+            checkLoaderInCasts(lp, "org.apache.pig.builtin.PigStorage");
+            
+        }
+    }
+    
+    private void runTypeCheckingValidator(LogicalPlan plan) throws 
+    PlanValidationException {
         CompilationMessageCollector collector = new 
CompilationMessageCollector() ;
         TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
         typeValidator.validate(plan, collector) ;
@@ -5544,12 +5617,16 @@
         if (collector.hasError()) {
             throw new AssertionError("Expect no  error") ;
         }
-
+    }
+    
+    private void checkLoaderInCasts(LogicalPlan plan, String loaderClassName) 
+    throws VisitorException {
         CastFinder cf = new CastFinder(plan);
         cf.visit();
         List<LOCast> casts = cf.casts;
         for (LOCast cast : casts) {
-            
assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("org.apache.pig.builtin.PigStorage"));
    
+            assertTrue(cast.getLoadFuncSpec().getClassName().startsWith(
+                    loaderClassName));    
         }
     }
     
@@ -5708,6 +5785,7 @@
      */
     public static class TestBinCondFieldSchema extends EvalFunc<DataBag> {
         //no-op exec method
+        @Override
         public DataBag exec(Tuple input) {
             return null;
         }


Reply via email to