Author: daijy
Date: Mon Sep  6 21:16:18 2010
New Revision: 993156

URL: http://svn.apache.org/viewvc?rev=993156&view=rev
Log:
PIG-1178: LogicalPlan and Optimizer are too complex and hard to work with 
(PIG-1178-10.patch)

Modified:
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOUnion.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
    
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java
    
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanLogToPhyTranslationVisitor.java

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java?rev=993156&r1=993155&r2=993156&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
 Mon Sep  6 21:16:18 2010
@@ -45,6 +45,7 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.LOCogroup.GROUPTYPE;
 import org.apache.pig.impl.logicalLayer.LOJoin.JOINTYPE;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.PlanWalker;
 import org.apache.pig.impl.plan.VisitorException;
@@ -209,6 +210,13 @@ public class LogicalPlanMigrationVistor 
         org.apache.pig.newplan.logical.relational.LOGenerate gen = 
             new 
org.apache.pig.newplan.logical.relational.LOGenerate(innerPlan, expPlans, flat);
         
+        if (forEach.getUserDefinedSchema()!=null) {
+            List<LogicalSchema> userDefinedSchema = new 
ArrayList<LogicalSchema>();
+            for (Schema schema : forEach.getUserDefinedSchema()) {
+                userDefinedSchema.add(Util.translateSchema(schema));
+            }
+            gen.setUserDefinedSchema(userDefinedSchema);
+        }
         innerPlan.add(gen);                
         
         List<LogicalPlan> ll = forEach.getForEachPlans();

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOGenerate.java?rev=993156&r1=993155&r2=993156&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
 Mon Sep  6 21:16:18 2010
@@ -33,6 +33,12 @@ import org.apache.pig.newplan.logical.re
 public class LOGenerate extends LogicalRelationalOperator {
      private List<LogicalExpressionPlan> outputPlans;
      private boolean[] flattenFlags;
+     private List<LogicalSchema> mUserDefinedSchema = null;
+     private List<LogicalSchema> outputPlanSchemas = null;
+     // If LOGenerate generate new uid, cache it here.
+     // This happens when expression plan does not have complete schema, 
however,
+     // user give complete schema in ForEach statement in script
+     private List<LogicalSchema> uidOnlySchemas = null;
 
     public LOGenerate(OperatorPlan plan, List<LogicalExpressionPlan> ps, 
boolean[] flatten) {
         super("LOGenerate", plan);
@@ -46,60 +52,130 @@ public class LOGenerate extends LogicalR
             return schema;
         }
         
+        if (uidOnlySchemas == null) {
+            uidOnlySchemas = new ArrayList<LogicalSchema>();
+            for (int i=0;i<outputPlans.size();i++)
+                uidOnlySchemas.add(null);
+        }
+        
         schema = new LogicalSchema();
+        outputPlanSchemas = new ArrayList<LogicalSchema>();
         
         for(int i=0; i<outputPlans.size(); i++) {
             LogicalExpression exp = 
(LogicalExpression)outputPlans.get(i).getSources().get(0);
             
-            LogicalFieldSchema fieldSchema = null;
-            if (exp.getFieldSchema()==null) {
-                schema = null;
-                break;
+            LogicalSchema mUserDefinedSchemaCopy = null;
+            if (mUserDefinedSchema!=null && mUserDefinedSchema.get(i)!=null) {
+                mUserDefinedSchemaCopy = new LogicalSchema();
+                for (LogicalSchema.LogicalFieldSchema fs : 
mUserDefinedSchema.get(i).getFields()) {
+                    mUserDefinedSchemaCopy.addField(fs.deepCopy());
+                }
             }
-            fieldSchema = exp.getFieldSchema().deepCopy();
             
-            if (fieldSchema.type != DataType.TUPLE && fieldSchema.type != 
DataType.BAG) {
-                // if type is primitive, just add to schema
-                schema.addField(fieldSchema);
-                continue;
-            } else {
-                // if bag/tuple don't have inner schema, after flatten, we 
don't have schema for the entire operator
-                if (fieldSchema.schema==null) {
-                    schema=null;
-                    break;
-                }
-                // if flatten is set, set schema of tuple field to this schema
-                List<LogicalSchema.LogicalFieldSchema> innerFieldSchemas = new 
ArrayList<LogicalSchema.LogicalFieldSchema>();
-                if (flattenFlags[i]) {
-                    if (fieldSchema.type == DataType.BAG) {
-                        // if it is bag of tuples, get the schema of tuples
-                        if (fieldSchema.schema!=null) {
-                            if (fieldSchema.schema.isTwoLevelAccessRequired()) 
{
-                                //  assert(fieldSchema.schema.size() == 1 && 
fieldSchema.schema.getField(0).type == DataType.TUPLE)
-                                innerFieldSchemas = 
fieldSchema.schema.getField(0).schema.getFields();
-                            } else {
+            LogicalFieldSchema fieldSchema = null;
+            
+            LogicalSchema expSchema = null;
+            
+            if (exp.getFieldSchema()!=null) {
+            
+                fieldSchema = exp.getFieldSchema().deepCopy();
+                
+                expSchema = new LogicalSchema();
+                if (fieldSchema.type != DataType.TUPLE && fieldSchema.type != 
DataType.BAG) {
+                    // if type is primitive, just add to schema
+                    if (fieldSchema!=null)
+                        expSchema.addField(fieldSchema);
+                    else
+                        expSchema = null;
+                } else {
+                    // if bag/tuple don't have inner schema, after flatten, we 
don't have schema for the entire operator
+                    if (fieldSchema.schema==null) {
+                        expSchema = null;
+                    }
+                    else {
+                        // if flatten is set, set schema of tuple field to 
this schema
+                        List<LogicalSchema.LogicalFieldSchema> 
innerFieldSchemas = new ArrayList<LogicalSchema.LogicalFieldSchema>();
+                        if (flattenFlags[i]) {
+                            if (fieldSchema.type == DataType.BAG) {
+                                // if it is bag of tuples, get the schema of 
tuples
+                                if (fieldSchema.schema!=null) {
+                                    if 
(fieldSchema.schema.isTwoLevelAccessRequired()) {
+                                        //  assert(fieldSchema.schema.size() 
== 1 && fieldSchema.schema.getField(0).type == DataType.TUPLE)
+                                        innerFieldSchemas = 
fieldSchema.schema.getField(0).schema.getFields();
+                                    } else {
+                                        innerFieldSchemas = 
fieldSchema.schema.getFields();
+                                    }
+                                    for (LogicalSchema.LogicalFieldSchema fs : 
innerFieldSchemas) {
+                                        fs.alias = fieldSchema.alias + "::" + 
fs.alias;
+                                    }
+                                }
+                            } else { // DataType.TUPLE
                                 innerFieldSchemas = 
fieldSchema.schema.getFields();
+                                for (LogicalSchema.LogicalFieldSchema fs : 
innerFieldSchemas) {
+                                    fs.alias = fieldSchema.alias + "::" + 
fs.alias;
+                                }
                             }
-                            for (LogicalSchema.LogicalFieldSchema fs : 
innerFieldSchemas) {
-                                fs.alias = fieldSchema.alias + "::" + fs.alias;
-                            }
-                        }
-                    } else { // DataType.TUPLE
-                        innerFieldSchemas = fieldSchema.schema.getFields();
-                        for (LogicalSchema.LogicalFieldSchema fs : 
innerFieldSchemas) {
-                            fs.alias = fieldSchema.alias + "::" + fs.alias;
+                            
+                            for (LogicalSchema.LogicalFieldSchema fs : 
innerFieldSchemas)
+                                expSchema.addField(fs);
                         }
+                        else
+                            expSchema.addField(fieldSchema);
                     }
-                    
-                    for (LogicalSchema.LogicalFieldSchema fs : 
innerFieldSchemas)
-                        schema.addField(fs);
                 }
-                else
-                    schema.addField(fieldSchema);
             }
+            
+            // Merge with user defined schema
+            if (expSchema!=null && expSchema.size()==0)
+                expSchema = null;
+            LogicalSchema planSchema = new LogicalSchema();
+            if (mUserDefinedSchemaCopy!=null) {
+                LogicalSchema mergedSchema = new LogicalSchema();
+                // merge with userDefinedSchema
+                if (expSchema==null) {
+                    // Use user defined schema
+                    for (LogicalFieldSchema fs : 
mUserDefinedSchemaCopy.getFields()) {
+                        fs.stampFieldSchema();
+                        mergedSchema.addField(fs);
+                    }
+                }
+                else {
+                    // Merge uid with the exp field schema
+                    mergedSchema = LogicalSchema.merge(mUserDefinedSchemaCopy, 
expSchema);
+                    mergedSchema.mergeUid(expSchema);
+                }
+                for (LogicalFieldSchema fs : mergedSchema.getFields())
+                    planSchema.addField(fs);
+            } else {
+                // if any plan do not have schema, the whole LOGenerate do not 
have schema
+                if (expSchema==null) {
+                    planSchema = null;
+                }
+                else {
+                    // Merge schema for the plan
+                    for (LogicalFieldSchema fs : expSchema.getFields())
+                        planSchema.addField(fs);
+                }
+            }
+            
+            if (planSchema==null) {
+                schema = null;
+                break;
+            }
+            for (LogicalFieldSchema fs : planSchema.getFields())
+                schema.addField(fs);
+            
+            // If the schema is generated by user defined schema, keep uid
+            if (expSchema==null) {
+                LogicalSchema uidOnlySchema = 
schema.mergeUid(uidOnlySchemas.get(i));
+                uidOnlySchemas.set(i, uidOnlySchema);
+            }
+            outputPlanSchemas.add(planSchema);
         }
-        if (schema!=null && schema.size()==0)
+        if (schema==null || schema.size()==0) {
             schema = null;
+            outputPlanSchemas = null;
+        }
         return schema;
     }
 
@@ -175,4 +251,28 @@ public class LOGenerate extends LogicalR
         }
         return msg.toString();
     }
+    
+    public List<LogicalSchema> getUserDefinedSchema() {
+        return mUserDefinedSchema;
+    }
+
+    public void setUserDefinedSchema(List<LogicalSchema> userDefinedSchema) {
+        mUserDefinedSchema = userDefinedSchema;
+    }
+    
+    public List<LogicalSchema> getOutputPlanSchemas() {
+        return outputPlanSchemas;
+    }
+    
+    public void setOutputPlanSchemas(List<LogicalSchema> outputPlanSchemas) {
+        this.outputPlanSchemas = outputPlanSchemas;
+    }
+    
+    public List<LogicalSchema> getUidOnlySchemas() {
+        return uidOnlySchemas;
+    }
+    
+    public void setUidOnlySchemas(List<LogicalSchema> uidOnlySchemas) {
+        this.uidOnlySchemas = uidOnlySchemas;
+    }
 }

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOUnion.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOUnion.java?rev=993156&r1=993155&r2=993156&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOUnion.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOUnion.java
 Mon Sep  6 21:16:18 2010
@@ -62,6 +62,8 @@ public class LOUnion extends LogicalRela
         // Merge schema
         for (int i=2;i<inputs.size();i++) {
             LogicalSchema otherSchema = 
((LogicalRelationalOperator)inputs.get(i)).getSchema();
+            if (mergedSchema==null || otherSchema==null)
+                return null;
             mergedSchema = LogicalSchema.merge(mergedSchema, otherSchema);
             if (mergedSchema == null)
                 return null;

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java?rev=993156&r1=993155&r2=993156&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
 Mon Sep  6 21:16:18 2010
@@ -142,14 +142,14 @@ public class LogicalSchema {
         public LogicalFieldSchema cloneUid() {
             LogicalFieldSchema resultFs = null;
             if (schema==null) {
-                resultFs = new LogicalFieldSchema(null, null, (byte)-1, uid);
+                resultFs = new LogicalFieldSchema(null, null, type, uid);
             }
             else {
-                LogicalSchema schema = new LogicalSchema();
-                resultFs = new LogicalFieldSchema(null, schema, (byte)-1, uid);
+                LogicalSchema newSchema = new LogicalSchema();
+                resultFs = new LogicalFieldSchema(null, newSchema, type, uid);
                 for (int i=0;i<schema.size();i++) {
                     LogicalFieldSchema fs = schema.getField(i).cloneUid();
-                    schema.addField(fs);
+                    newSchema.addField(fs);
                 }
             }
             return resultFs;
@@ -297,6 +297,13 @@ public class LogicalSchema {
      * @return a merged schema, or null if the merge fails
      */
     public static LogicalSchema merge(LogicalSchema s1, LogicalSchema s2) 
throws FrontendException {
+        // If any of the schema is null, take the other party
+        if (s1==null || s2==null) {
+            if (s1!=null) return s1.deepCopy();
+            else if (s2!=null) return s2.deepCopy();
+            else return null;
+        }
+        
         if (s1.size()!=s2.size()) return null;
         LogicalSchema mergedSchema = new LogicalSchema();
         for (int i=0;i<s1.size();i++) {
@@ -311,7 +318,11 @@ public class LogicalSchema {
             else {
                 mergedAlias = fs1.alias; // If both schema have alias, the 
first one win
             }
-            mergedType = fs1.type;
+            if (fs1.type==DataType.NULL)
+                mergedType = fs2.type;
+            else
+                mergedType = fs1.type;
+            
             if (DataType.isSchemaType(mergedType)) {
                 mergedSubSchema = merge(fs1.schema, fs2.schema);
                 if (mergedSubSchema==null) {

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java?rev=993156&r1=993155&r2=993156&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java
 Mon Sep  6 21:16:18 2010
@@ -443,28 +443,13 @@ public class ColumnPruneHelper {
              while(iter.hasNext()) {
                  long uid = iter.next();
                  for(int i=0; i<ll.size(); i++) {
-                     boolean found = false;
                      LogicalExpressionPlan exp = ll.get(i);
-                     LogicalExpression op = 
(LogicalExpression)exp.getSources().get(0);
-                     
-                     if (gen.getFlattenFlags()[i] && 
(op.getFieldSchema().type==DataType.TUPLE ||
-                             op.getFieldSchema().type== DataType.BAG)) {
-                         // if uid equal to the expression, get all uids of 
original projections
-                         LogicalSchema schema;
-
-                         schema = op.getFieldSchema().schema;
-                         for (LogicalSchema.LogicalFieldSchema fs : 
schema.getFields())
-                         {
-                             if (fs.uid==uid) {
-                                 found = true;
-                                 break;
-                             }
-                         }
-                     }
-                     else {
-                         // No flatten, collect outer uid
-                         if (op.getFieldSchema().uid == uid) {                 
        
+                     boolean found = false;
+                     LogicalSchema planSchema = 
gen.getOutputPlanSchemas().get(i);
+                     for (LogicalFieldSchema fs : planSchema.getFields()) {
+                         if (fs.uid == uid) {
                              found = true;
+                             break;
                          }
                      }
                      
@@ -506,6 +491,8 @@ public class ColumnPruneHelper {
                      continue;
                  List<Operator> srcs = exp.getSinks();
                  for (Operator src : srcs) {
+                     if (!(src instanceof ProjectExpression))
+                         continue;
                      List<LOInnerLoad> innerLoads = 
LOForEach.findReacheableInnerLoadFromBoundaryProject((ProjectExpression)src);
                      for (LOInnerLoad innerLoad : innerLoads) {
                          ProjectExpression prj = innerLoad.getProjection();

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java?rev=993156&r1=993155&r2=993156&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
 Mon Sep  6 21:16:18 2010
@@ -335,6 +335,8 @@ public class ColumnPruneVisitor extends 
         
         // Build the temporary structure based on genPlansToRemove, which 
include:
         // * flattenList
+        // * outputPlanSchemas
+        // * uidOnlySchemas
         // * inputsRemoved
         //     We first construct inputsNeeded, and inputsRemoved = (all 
inputs) - inputsNeeded.
         //     We cannot figure out inputsRemoved directly since the inputs 
may be used by other output plan.
@@ -342,11 +344,15 @@ public class ColumnPruneVisitor extends 
         List<Boolean> flattenList = new ArrayList<Boolean>();
         Set<Integer> inputsNeeded = new HashSet<Integer>();
         Set<Integer> inputsRemoved = new HashSet<Integer>();
+        List<LogicalSchema> outputPlanSchemas = new ArrayList<LogicalSchema>();
+        List<LogicalSchema> uidOnlySchemas = new ArrayList<LogicalSchema>();
         
         for (int i=0;i<genPlans.size();i++) {
             LogicalExpressionPlan genPlan = genPlans.get(i);
             if (!genPlansToRemove.contains(genPlan)) {
                 flattenList.add(gen.getFlattenFlags()[i]);
+                outputPlanSchemas.add(gen.getOutputPlanSchemas().get(i));
+                uidOnlySchemas.add(gen.getUidOnlySchemas().get(i));
                 List<Operator> sinks = genPlan.getSinks();
                 for(Operator s: sinks) {
                     if (s instanceof ProjectExpression) {
@@ -367,12 +373,14 @@ public class ColumnPruneVisitor extends 
         
         
         // Change LOGenerate: remove unneeded output expression plan
-        // change flatten flag
+        // change flatten flag, outputPlanSchema, uidOnlySchemas
         boolean[] flatten = new boolean[flattenList.size()];
         for (int i=0;i<flattenList.size();i++)
             flatten[i] = flattenList.get(i);
 
         gen.setFlattenFlags(flatten);
+        gen.setOutputPlanSchemas(outputPlanSchemas);
+        gen.setUidOnlySchemas(uidOnlySchemas);
         
         for (LogicalExpressionPlan genPlanToRemove : genPlansToRemove) {
             genPlans.remove(genPlanToRemove);

Modified: 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java?rev=993156&r1=993155&r2=993156&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java
 Mon Sep  6 21:16:18 2010
@@ -104,12 +104,20 @@ public abstract class PlanOptimizer {
                     if (matches != null) {
                         Transformer transformer = rule.getNewTransformer();
                         for (OperatorPlan m : matches) {
-                            if (transformer.check(m)) {
-                                sawMatch = true;
-                                transformer.transform(m);
-                                for(PlanTransformListener l: listeners) {
-                                    l.transformed(plan, 
transformer.reportChanges());
+                            try {
+                                if (transformer.check(m)) {
+                                    sawMatch = true;
+                                    transformer.transform(m);
+                                    for(PlanTransformListener l: listeners) {
+                                        l.transformed(plan, 
transformer.reportChanges());
+                                    }
                                 }
+                            } catch (Exception e) {
+                                StringBuffer message = new StringBuffer("Error 
processing rule " + rule.name);
+                                if (!rule.isMandatory()) {
+                                    message.append(". Try -t " + rule.name);
+                                }
+                                throw new 
FrontendException(message.toString(), 2000, e);
                             }
                         }
                     }

Modified: 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanLogToPhyTranslationVisitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanLogToPhyTranslationVisitor.java?rev=993156&r1=993155&r2=993156&view=diff
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanLogToPhyTranslationVisitor.java
 (original)
+++ 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanLogToPhyTranslationVisitor.java
 Mon Sep  6 21:16:18 2010
@@ -51,6 +51,7 @@ import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
 import org.apache.pig.newplan.logical.expression.AddExpression;
@@ -1338,4 +1339,36 @@ public class TestNewPlanLogToPhyTranslat
         POPackage pack = (POPackage)phyPlan.getSuccessors(globalR).get(0);
         assertEquals( DataType.TUPLE, pack.getResultType() );
     }
+    
+    public void testUserDefinedForEachSchema1() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'a.txt';");
+        lpt.buildPlan("b = foreach a generate $0 as a0, $1 as a1;");        
+        LogicalPlan plan = lpt.buildPlan("store b into 'empty';");  
+        
+        org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = 
migratePlan(plan);
+        Operator store = newLogicalPlan.getSinks().get(0);
+        LOForEach foreach = 
(LOForEach)newLogicalPlan.getPredecessors(store).get(0);
+        foreach.getSchema();
+        
+        assertTrue(foreach.getSchema().size()==2);
+        assertTrue(foreach.getSchema().getField(0).alias.equals("a0"));
+        assertTrue(foreach.getSchema().getField(1).alias.equals("a1"));
+    }
+
+    public void testUserDefinedForEachSchema2() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'a.txt' as (b:bag{});");
+        lpt.buildPlan("b = foreach a generate flatten($0) as (a0, a1);");
+        LogicalPlan plan = lpt.buildPlan("store b into 'empty';");  
+        
+        org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = 
migratePlan(plan);
+        Operator store = newLogicalPlan.getSinks().get(0);
+        LOForEach foreach = 
(LOForEach)newLogicalPlan.getPredecessors(store).get(0);
+        foreach.getSchema();
+        
+        assertTrue(foreach.getSchema().size()==2);
+        assertTrue(foreach.getSchema().getField(0).alias.equals("a0"));
+        assertTrue(foreach.getSchema().getField(1).alias.equals("a1"));
+    }
 }


Reply via email to