Author: daijy Date: Mon Sep 6 21:16:18 2010 New Revision: 993156 URL: http://svn.apache.org/viewvc?rev=993156&view=rev Log: PIG-1178: LogicalPlan and Optimizer are too complex and hard to work with (PIG-1178-10.patch)
Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOGenerate.java hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOUnion.java hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanLogToPhyTranslationVisitor.java Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java?rev=993156&r1=993155&r2=993156&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java Mon Sep 6 21:16:18 2010 @@ -45,6 +45,7 @@ import org.apache.pig.impl.logicalLayer. import org.apache.pig.impl.logicalLayer.LogicalPlan; import org.apache.pig.impl.logicalLayer.LOCogroup.GROUPTYPE; import org.apache.pig.impl.logicalLayer.LOJoin.JOINTYPE; +import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.plan.DependencyOrderWalker; import org.apache.pig.impl.plan.PlanWalker; import org.apache.pig.impl.plan.VisitorException; @@ -209,6 +210,13 @@ public class LogicalPlanMigrationVistor org.apache.pig.newplan.logical.relational.LOGenerate gen = new org.apache.pig.newplan.logical.relational.LOGenerate(innerPlan, expPlans, flat); + if (forEach.getUserDefinedSchema()!=null) { + List<LogicalSchema> userDefinedSchema = new ArrayList<LogicalSchema>(); + for (Schema schema : forEach.getUserDefinedSchema()) { + userDefinedSchema.add(Util.translateSchema(schema)); + } + gen.setUserDefinedSchema(userDefinedSchema); + } innerPlan.add(gen); List<LogicalPlan> ll = forEach.getForEachPlans(); Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOGenerate.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOGenerate.java?rev=993156&r1=993155&r2=993156&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOGenerate.java (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOGenerate.java Mon Sep 6 21:16:18 2010 @@ -33,6 +33,12 @@ import org.apache.pig.newplan.logical.re public class LOGenerate extends LogicalRelationalOperator { private List<LogicalExpressionPlan> outputPlans; private boolean[] flattenFlags; + private List<LogicalSchema> mUserDefinedSchema = null; + private List<LogicalSchema> outputPlanSchemas = null; + // If LOGenerate generate new uid, cache it here. + // This happens when expression plan does not have complete schema, however, + // user give complete schema in ForEach statement in script + private List<LogicalSchema> uidOnlySchemas = null; public LOGenerate(OperatorPlan plan, List<LogicalExpressionPlan> ps, boolean[] flatten) { super("LOGenerate", plan); @@ -46,60 +52,130 @@ public class LOGenerate extends LogicalR return schema; } + if (uidOnlySchemas == null) { + uidOnlySchemas = new ArrayList<LogicalSchema>(); + for (int i=0;i<outputPlans.size();i++) + uidOnlySchemas.add(null); + } + schema = new LogicalSchema(); + outputPlanSchemas = new ArrayList<LogicalSchema>(); for(int i=0; i<outputPlans.size(); i++) { LogicalExpression exp = (LogicalExpression)outputPlans.get(i).getSources().get(0); - LogicalFieldSchema fieldSchema = null; - if (exp.getFieldSchema()==null) { - schema = null; - break; + LogicalSchema mUserDefinedSchemaCopy = null; + if (mUserDefinedSchema!=null && mUserDefinedSchema.get(i)!=null) { + mUserDefinedSchemaCopy = new LogicalSchema(); + for (LogicalSchema.LogicalFieldSchema fs : mUserDefinedSchema.get(i).getFields()) { + mUserDefinedSchemaCopy.addField(fs.deepCopy()); + } } - fieldSchema = exp.getFieldSchema().deepCopy(); - if (fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG) { - // if type is primitive, just add to schema - schema.addField(fieldSchema); - continue; - } else { - // if bag/tuple don't have inner schema, after flatten, we don't have schema for the entire operator - if (fieldSchema.schema==null) { - schema=null; - break; - } - // if flatten is set, set schema of tuple field to this schema - List<LogicalSchema.LogicalFieldSchema> innerFieldSchemas = new ArrayList<LogicalSchema.LogicalFieldSchema>(); - if (flattenFlags[i]) { - if (fieldSchema.type == DataType.BAG) { - // if it is bag of tuples, get the schema of tuples - if (fieldSchema.schema!=null) { - if (fieldSchema.schema.isTwoLevelAccessRequired()) { - // assert(fieldSchema.schema.size() == 1 && fieldSchema.schema.getField(0).type == DataType.TUPLE) - innerFieldSchemas = fieldSchema.schema.getField(0).schema.getFields(); - } else { + LogicalFieldSchema fieldSchema = null; + + LogicalSchema expSchema = null; + + if (exp.getFieldSchema()!=null) { + + fieldSchema = exp.getFieldSchema().deepCopy(); + + expSchema = new LogicalSchema(); + if (fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG) { + // if type is primitive, just add to schema + if (fieldSchema!=null) + expSchema.addField(fieldSchema); + else + expSchema = null; + } else { + // if bag/tuple don't have inner schema, after flatten, we don't have schema for the entire operator + if (fieldSchema.schema==null) { + expSchema = null; + } + else { + // if flatten is set, set schema of tuple field to this schema + List<LogicalSchema.LogicalFieldSchema> innerFieldSchemas = new ArrayList<LogicalSchema.LogicalFieldSchema>(); + if (flattenFlags[i]) { + if (fieldSchema.type == DataType.BAG) { + // if it is bag of tuples, get the schema of tuples + if (fieldSchema.schema!=null) { + if (fieldSchema.schema.isTwoLevelAccessRequired()) { + // assert(fieldSchema.schema.size() == 1 && fieldSchema.schema.getField(0).type == DataType.TUPLE) + innerFieldSchemas = fieldSchema.schema.getField(0).schema.getFields(); + } else { + innerFieldSchemas = fieldSchema.schema.getFields(); + } + for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) { + fs.alias = fieldSchema.alias + "::" + fs.alias; + } + } + } else { // DataType.TUPLE innerFieldSchemas = fieldSchema.schema.getFields(); + for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) { + fs.alias = fieldSchema.alias + "::" + fs.alias; + } } - for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) { - fs.alias = fieldSchema.alias + "::" + fs.alias; - } - } - } else { // DataType.TUPLE - innerFieldSchemas = fieldSchema.schema.getFields(); - for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) { - fs.alias = fieldSchema.alias + "::" + fs.alias; + + for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) + expSchema.addField(fs); } + else + expSchema.addField(fieldSchema); } - - for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) - schema.addField(fs); } - else - schema.addField(fieldSchema); } + + // Merge with user defined schema + if (expSchema!=null && expSchema.size()==0) + expSchema = null; + LogicalSchema planSchema = new LogicalSchema(); + if (mUserDefinedSchemaCopy!=null) { + LogicalSchema mergedSchema = new LogicalSchema(); + // merge with userDefinedSchema + if (expSchema==null) { + // Use user defined schema + for (LogicalFieldSchema fs : mUserDefinedSchemaCopy.getFields()) { + fs.stampFieldSchema(); + mergedSchema.addField(fs); + } + } + else { + // Merge uid with the exp field schema + mergedSchema = LogicalSchema.merge(mUserDefinedSchemaCopy, expSchema); + mergedSchema.mergeUid(expSchema); + } + for (LogicalFieldSchema fs : mergedSchema.getFields()) + planSchema.addField(fs); + } else { + // if any plan do not have schema, the whole LOGenerate do not have schema + if (expSchema==null) { + planSchema = null; + } + else { + // Merge schema for the plan + for (LogicalFieldSchema fs : expSchema.getFields()) + planSchema.addField(fs); + } + } + + if (planSchema==null) { + schema = null; + break; + } + for (LogicalFieldSchema fs : planSchema.getFields()) + schema.addField(fs); + + // If the schema is generated by user defined schema, keep uid + if (expSchema==null) { + LogicalSchema uidOnlySchema = schema.mergeUid(uidOnlySchemas.get(i)); + uidOnlySchemas.set(i, uidOnlySchema); + } + outputPlanSchemas.add(planSchema); } - if (schema!=null && schema.size()==0) + if (schema==null || schema.size()==0) { schema = null; + outputPlanSchemas = null; + } return schema; } @@ -175,4 +251,28 @@ public class LOGenerate extends LogicalR } return msg.toString(); } + + public List<LogicalSchema> getUserDefinedSchema() { + return mUserDefinedSchema; + } + + public void setUserDefinedSchema(List<LogicalSchema> userDefinedSchema) { + mUserDefinedSchema = userDefinedSchema; + } + + public List<LogicalSchema> getOutputPlanSchemas() { + return outputPlanSchemas; + } + + public void setOutputPlanSchemas(List<LogicalSchema> outputPlanSchemas) { + this.outputPlanSchemas = outputPlanSchemas; + } + + public List<LogicalSchema> getUidOnlySchemas() { + return uidOnlySchemas; + } + + public void setUidOnlySchemas(List<LogicalSchema> uidOnlySchemas) { + this.uidOnlySchemas = uidOnlySchemas; + } } Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOUnion.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOUnion.java?rev=993156&r1=993155&r2=993156&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOUnion.java (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LOUnion.java Mon Sep 6 21:16:18 2010 @@ -62,6 +62,8 @@ public class LOUnion extends LogicalRela // Merge schema for (int i=2;i<inputs.size();i++) { LogicalSchema otherSchema = ((LogicalRelationalOperator)inputs.get(i)).getSchema(); + if (mergedSchema==null || otherSchema==null) + return null; mergedSchema = LogicalSchema.merge(mergedSchema, otherSchema); if (mergedSchema == null) return null; Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java?rev=993156&r1=993155&r2=993156&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java Mon Sep 6 21:16:18 2010 @@ -142,14 +142,14 @@ public class LogicalSchema { public LogicalFieldSchema cloneUid() { LogicalFieldSchema resultFs = null; if (schema==null) { - resultFs = new LogicalFieldSchema(null, null, (byte)-1, uid); + resultFs = new LogicalFieldSchema(null, null, type, uid); } else { - LogicalSchema schema = new LogicalSchema(); - resultFs = new LogicalFieldSchema(null, schema, (byte)-1, uid); + LogicalSchema newSchema = new LogicalSchema(); + resultFs = new LogicalFieldSchema(null, newSchema, type, uid); for (int i=0;i<schema.size();i++) { LogicalFieldSchema fs = schema.getField(i).cloneUid(); - schema.addField(fs); + newSchema.addField(fs); } } return resultFs; @@ -297,6 +297,13 @@ public class LogicalSchema { * @return a merged schema, or null if the merge fails */ public static LogicalSchema merge(LogicalSchema s1, LogicalSchema s2) throws FrontendException { + // If any of the schema is null, take the other party + if (s1==null || s2==null) { + if (s1!=null) return s1.deepCopy(); + else if (s2!=null) return s2.deepCopy(); + else return null; + } + if (s1.size()!=s2.size()) return null; LogicalSchema mergedSchema = new LogicalSchema(); for (int i=0;i<s1.size();i++) { @@ -311,7 +318,11 @@ public class LogicalSchema { else { mergedAlias = fs1.alias; // If both schema have alias, the first one win } - mergedType = fs1.type; + if (fs1.type==DataType.NULL) + mergedType = fs2.type; + else + mergedType = fs1.type; + if (DataType.isSchemaType(mergedType)) { mergedSubSchema = merge(fs1.schema, fs2.schema); if (mergedSubSchema==null) { Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java?rev=993156&r1=993155&r2=993156&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java Mon Sep 6 21:16:18 2010 @@ -443,28 +443,13 @@ public class ColumnPruneHelper { while(iter.hasNext()) { long uid = iter.next(); for(int i=0; i<ll.size(); i++) { - boolean found = false; LogicalExpressionPlan exp = ll.get(i); - LogicalExpression op = (LogicalExpression)exp.getSources().get(0); - - if (gen.getFlattenFlags()[i] && (op.getFieldSchema().type==DataType.TUPLE || - op.getFieldSchema().type== DataType.BAG)) { - // if uid equal to the expression, get all uids of original projections - LogicalSchema schema; - - schema = op.getFieldSchema().schema; - for (LogicalSchema.LogicalFieldSchema fs : schema.getFields()) - { - if (fs.uid==uid) { - found = true; - break; - } - } - } - else { - // No flatten, collect outer uid - if (op.getFieldSchema().uid == uid) { + boolean found = false; + LogicalSchema planSchema = gen.getOutputPlanSchemas().get(i); + for (LogicalFieldSchema fs : planSchema.getFields()) { + if (fs.uid == uid) { found = true; + break; } } @@ -506,6 +491,8 @@ public class ColumnPruneHelper { continue; List<Operator> srcs = exp.getSinks(); for (Operator src : srcs) { + if (!(src instanceof ProjectExpression)) + continue; List<LOInnerLoad> innerLoads = LOForEach.findReacheableInnerLoadFromBoundaryProject((ProjectExpression)src); for (LOInnerLoad innerLoad : innerLoads) { ProjectExpression prj = innerLoad.getProjection(); Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java?rev=993156&r1=993155&r2=993156&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java Mon Sep 6 21:16:18 2010 @@ -335,6 +335,8 @@ public class ColumnPruneVisitor extends // Build the temporary structure based on genPlansToRemove, which include: // * flattenList + // * outputPlanSchemas + // * uidOnlySchemas // * inputsRemoved // We first construct inputsNeeded, and inputsRemoved = (all inputs) - inputsNeeded. // We cannot figure out inputsRemoved directly since the inputs may be used by other output plan. @@ -342,11 +344,15 @@ public class ColumnPruneVisitor extends List<Boolean> flattenList = new ArrayList<Boolean>(); Set<Integer> inputsNeeded = new HashSet<Integer>(); Set<Integer> inputsRemoved = new HashSet<Integer>(); + List<LogicalSchema> outputPlanSchemas = new ArrayList<LogicalSchema>(); + List<LogicalSchema> uidOnlySchemas = new ArrayList<LogicalSchema>(); for (int i=0;i<genPlans.size();i++) { LogicalExpressionPlan genPlan = genPlans.get(i); if (!genPlansToRemove.contains(genPlan)) { flattenList.add(gen.getFlattenFlags()[i]); + outputPlanSchemas.add(gen.getOutputPlanSchemas().get(i)); + uidOnlySchemas.add(gen.getUidOnlySchemas().get(i)); List<Operator> sinks = genPlan.getSinks(); for(Operator s: sinks) { if (s instanceof ProjectExpression) { @@ -367,12 +373,14 @@ public class ColumnPruneVisitor extends // Change LOGenerate: remove unneeded output expression plan - // change flatten flag + // change flatten flag, outputPlanSchema, uidOnlySchemas boolean[] flatten = new boolean[flattenList.size()]; for (int i=0;i<flattenList.size();i++) flatten[i] = flattenList.get(i); gen.setFlattenFlags(flatten); + gen.setOutputPlanSchemas(outputPlanSchemas); + gen.setUidOnlySchemas(uidOnlySchemas); for (LogicalExpressionPlan genPlanToRemove : genPlansToRemove) { genPlans.remove(genPlanToRemove); Modified: hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java?rev=993156&r1=993155&r2=993156&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java (original) +++ hadoop/pig/branches/branch-0.8/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java Mon Sep 6 21:16:18 2010 @@ -104,12 +104,20 @@ public abstract class PlanOptimizer { if (matches != null) { Transformer transformer = rule.getNewTransformer(); for (OperatorPlan m : matches) { - if (transformer.check(m)) { - sawMatch = true; - transformer.transform(m); - for(PlanTransformListener l: listeners) { - l.transformed(plan, transformer.reportChanges()); + try { + if (transformer.check(m)) { + sawMatch = true; + transformer.transform(m); + for(PlanTransformListener l: listeners) { + l.transformed(plan, transformer.reportChanges()); + } } + } catch (Exception e) { + StringBuffer message = new StringBuffer("Error processing rule " + rule.name); + if (!rule.isMandatory()) { + message.append(". Try -t " + rule.name); + } + throw new FrontendException(message.toString(), 2000, e); } } } Modified: hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanLogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanLogToPhyTranslationVisitor.java?rev=993156&r1=993155&r2=993156&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanLogToPhyTranslationVisitor.java (original) +++ hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/TestNewPlanLogToPhyTranslationVisitor.java Mon Sep 6 21:16:18 2010 @@ -51,6 +51,7 @@ import org.apache.pig.data.DataType; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.logicalLayer.LogicalPlan; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.newplan.Operator; import org.apache.pig.newplan.OperatorPlan; import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor; import org.apache.pig.newplan.logical.expression.AddExpression; @@ -1338,4 +1339,36 @@ public class TestNewPlanLogToPhyTranslat POPackage pack = (POPackage)phyPlan.getSuccessors(globalR).get(0); assertEquals( DataType.TUPLE, pack.getResultType() ); } + + public void testUserDefinedForEachSchema1() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(pc); + lpt.buildPlan("a = load 'a.txt';"); + lpt.buildPlan("b = foreach a generate $0 as a0, $1 as a1;"); + LogicalPlan plan = lpt.buildPlan("store b into 'empty';"); + + org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan); + Operator store = newLogicalPlan.getSinks().get(0); + LOForEach foreach = (LOForEach)newLogicalPlan.getPredecessors(store).get(0); + foreach.getSchema(); + + assertTrue(foreach.getSchema().size()==2); + assertTrue(foreach.getSchema().getField(0).alias.equals("a0")); + assertTrue(foreach.getSchema().getField(1).alias.equals("a1")); + } + + public void testUserDefinedForEachSchema2() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(pc); + lpt.buildPlan("a = load 'a.txt' as (b:bag{});"); + lpt.buildPlan("b = foreach a generate flatten($0) as (a0, a1);"); + LogicalPlan plan = lpt.buildPlan("store b into 'empty';"); + + org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan); + Operator store = newLogicalPlan.getSinks().get(0); + LOForEach foreach = (LOForEach)newLogicalPlan.getPredecessors(store).get(0); + foreach.getSchema(); + + assertTrue(foreach.getSchema().size()==2); + assertTrue(foreach.getSchema().getField(0).alias.equals("a0")); + assertTrue(foreach.getSchema().getField(1).alias.equals("a1")); + } }