Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java?rev=919634&r1=919633&r2=919634&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java Fri Mar 5 21:55:19 2010 @@ -17,14 +17,22 @@ */ package org.apache.pig.test; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.PrintStream; import java.util.List; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Add; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Divide; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.EqualToExpr; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GreaterThanExpr; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LessThanExpr; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Mod; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Multiply; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONegative; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Subtract; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; @@ -36,15 +44,29 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; import org.apache.pig.data.DataType; import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor; +import org.apache.pig.experimental.logical.expression.AddExpression; +import org.apache.pig.experimental.logical.expression.DivideExpression; +import org.apache.pig.experimental.logical.expression.IsNullExpression; import org.apache.pig.experimental.logical.expression.LogicalExpression; +import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan; +import org.apache.pig.experimental.logical.expression.ModExpression; +import org.apache.pig.experimental.logical.expression.MultiplyExpression; +import org.apache.pig.experimental.logical.expression.NegativeExpression; +import org.apache.pig.experimental.logical.expression.NotExpression; +import org.apache.pig.experimental.logical.expression.ProjectExpression; +import org.apache.pig.experimental.logical.expression.SubtractExpression; +import org.apache.pig.experimental.logical.optimizer.PlanPrinter; import org.apache.pig.experimental.logical.optimizer.UidStamper; +import org.apache.pig.experimental.logical.relational.LOFilter; +import org.apache.pig.experimental.logical.relational.LOForEach; +import org.apache.pig.experimental.logical.relational.LOGenerate; +import org.apache.pig.experimental.logical.relational.LOLoad; import org.apache.pig.experimental.logical.relational.LogToPhyTranslationVisitor; import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator; import org.apache.pig.experimental.logical.relational.LogicalSchema; import org.apache.pig.experimental.logical.relational.LogicalSchema.LogicalFieldSchema; -import org.apache.pig.experimental.plan.Operator; import org.apache.pig.experimental.plan.OperatorPlan; -import org.apache.pig.experimental.plan.PlanVisitor; +import org.apache.pig.impl.logicalLayer.LOIsNull; import org.apache.pig.impl.logicalLayer.LogicalPlan; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.test.utils.LogicalPlanTester; @@ -466,7 +488,6 @@ PhysicalPlan phyPlan = translatePlan(newLogicalPlan); assertEquals(phyPlan.size(), 3); - POLoad load = (POLoad)phyPlan.getRoots().get(0); assertEquals(phyPlan.getLeaves().get(0).getClass(), POStore.class); POForEach foreach = (POForEach)phyPlan.getSuccessors(phyPlan.getRoots().get(0)).get(0); @@ -476,13 +497,13 @@ assertEquals(inner.size(), 1); POProject prj = (POProject)inner.getRoots().get(0); assertEquals(prj.getColumn(), 0); - assertEquals(prj.getInputs().get(0), load); - + assertEquals(prj.getInputs(), null); + inner = foreach.getInputPlans().get(1); assertEquals(inner.size(), 1); prj = (POProject)inner.getRoots().get(0); assertEquals(prj.getColumn(), 1); - assertEquals(prj.getInputs().get(0), load); + assertEquals(prj.getInputs(), null); Boolean[] flat = foreach.getToBeFlattened().toArray(new Boolean[0]); assertFalse(flat[0]); assertFalse(flat[1]); @@ -512,7 +533,6 @@ PhysicalPlan phyPlan = translatePlan(newLogicalPlan); assertEquals(phyPlan.size(), 3); - POLoad load = (POLoad)phyPlan.getRoots().get(0); assertEquals(phyPlan.getLeaves().get(0).getClass(), POStore.class); POForEach foreach = (POForEach)phyPlan.getSuccessors(phyPlan.getRoots().get(0)).get(0); @@ -522,16 +542,449 @@ assertEquals(inner.size(), 1); POProject prj = (POProject)inner.getRoots().get(0); assertEquals(prj.getColumn(), 0); - assertEquals(prj.getInputs().get(0), load); + assertEquals(prj.getInputs(), null); inner = foreach.getInputPlans().get(1); assertEquals(inner.size(), 1); prj = (POProject)inner.getRoots().get(0); assertEquals(prj.getColumn(), 1); - assertEquals(prj.getInputs().get(0), load); + assertEquals(prj.getInputs(), null); Boolean[] flat = foreach.getToBeFlattened().toArray(new Boolean[0]); assertFalse(flat[0]); assertTrue(flat[1]); } + public void testPlanwithPlus() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load 'd.txt' as (a:int, b:int);"); + lpt.buildPlan("b = foreach a generate a+b;"); + LogicalPlan plan = lpt.buildPlan("store b into 'empty';"); + + org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan); + LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0); + assertEquals( LOLoad.class, ld.getClass() ); + LOLoad load = (LOLoad)ld; + LogicalSchema ls = load.getSchema(); + + PhysicalPlan phyPlan = translatePlan(newLogicalPlan); + + PhysicalOperator pFE = phyPlan.getSuccessors( phyPlan.getRoots().get(0) ).get(0); + assertEquals( POForEach.class, pFE.getClass() ); + POForEach pForEach = (POForEach)pFE; + PhysicalPlan inputPln = pForEach.getInputPlans().get(0); + + assertEquals(1, ls.getField(0).uid); + assertEquals(2, ls.getField(1).uid); + + LogicalRelationalOperator fe = + (LogicalRelationalOperator) newLogicalPlan.getSuccessors(load).get(0); + assertEquals( LOForEach.class, fe.getClass() ); + LOForEach forEach = (LOForEach)fe; + + org.apache.pig.experimental.logical.relational.LogicalPlan innerPlan = + forEach.getInnerPlan(); + + assertEquals( 1, innerPlan.getSinks().size() ); + assertEquals( LOGenerate.class, innerPlan.getSinks().get(0).getClass() ); + LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0); + assertEquals( 1, gen.getOutputPlans().size() ); + LogicalExpressionPlan genExp = gen.getOutputPlans().get(0); + + assertEquals( 1, genExp.getSources().size() ); + + + // Main Tests start here + assertEquals( AddExpression.class, genExp.getSources().get(0).getClass() ); + AddExpression add = (AddExpression) genExp.getSources().get(0); + assertEquals( ls.getField(0).uid, add.getLhs().getUid() ); + assertEquals( ls.getField(1).uid, add.getRhs().getUid() ); + assertTrue( ls.getField(0).uid != add.getUid() ); + assertTrue( ls.getField(1).uid != add.getUid() ); + + assertEquals( 1, inputPln.getLeaves().size() ); + assertEquals( Add.class, inputPln.getLeaves().get(0).getClass() ); + Add pAdd = (Add) inputPln.getLeaves().get(0); + assertEquals( 2, inputPln.getRoots().size() ); + assertEquals( POProject.class, pAdd.getLhs().getClass() ); + assertEquals( POProject.class, pAdd.getRhs().getClass() ); + } + + public void testPlanwithSubtract() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load 'd.txt' as (a:int, b:int);"); + lpt.buildPlan("b = foreach a generate a-b;"); + LogicalPlan plan = lpt.buildPlan("store b into 'empty';"); + + org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan); + LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0); + assertEquals( LOLoad.class, ld.getClass() ); + LOLoad load = (LOLoad)ld; + LogicalSchema ls = load.getSchema(); + + PhysicalPlan phyPlan = translatePlan(newLogicalPlan); + + PhysicalOperator pFE = phyPlan.getSuccessors( phyPlan.getRoots().get(0) ).get(0); + assertEquals( POForEach.class, pFE.getClass() ); + POForEach pForEach = (POForEach)pFE; + PhysicalPlan inputPln = pForEach.getInputPlans().get(0); + + assertEquals(1, ls.getField(0).uid); + assertEquals(2, ls.getField(1).uid); + + LogicalRelationalOperator fe = + (LogicalRelationalOperator) newLogicalPlan.getSuccessors(load).get(0); + assertEquals( LOForEach.class, fe.getClass() ); + LOForEach forEach = (LOForEach)fe; + + org.apache.pig.experimental.logical.relational.LogicalPlan innerPlan = + forEach.getInnerPlan(); + + assertEquals( 1, innerPlan.getSinks().size() ); + assertEquals( LOGenerate.class, innerPlan.getSinks().get(0).getClass() ); + LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0); + assertEquals( 1, gen.getOutputPlans().size() ); + LogicalExpressionPlan genExp = gen.getOutputPlans().get(0); + + assertEquals( 1, genExp.getSources().size() ); + + // Main Tests start here + assertEquals( SubtractExpression.class, genExp.getSources().get(0).getClass() ); + SubtractExpression add = (SubtractExpression) genExp.getSources().get(0); + assertEquals( ls.getField(0).uid, add.getLhs().getUid() ); + assertEquals( ls.getField(1).uid, add.getRhs().getUid() ); + assertTrue( ls.getField(0).uid != add.getUid() ); + assertTrue( ls.getField(1).uid != add.getUid() ); + + assertEquals( 1, inputPln.getLeaves().size() ); + assertEquals( Subtract.class, inputPln.getLeaves().get(0).getClass() ); + Subtract pSubtract = (Subtract) inputPln.getLeaves().get(0); + assertEquals( 2, inputPln.getRoots().size() ); + assertEquals( POProject.class, pSubtract.getLhs().getClass() ); + assertEquals( POProject.class, pSubtract.getRhs().getClass() ); + } + + public void testPlanwithMultiply() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load 'd.txt' as (a:int, b:int);"); + lpt.buildPlan("b = foreach a generate a*b;"); + LogicalPlan plan = lpt.buildPlan("store b into 'empty';"); + + org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan); + LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0); + assertEquals( LOLoad.class, ld.getClass() ); + LOLoad load = (LOLoad)ld; + LogicalSchema ls = load.getSchema(); + + PhysicalPlan phyPlan = translatePlan(newLogicalPlan); + + PhysicalOperator pFE = phyPlan.getSuccessors( phyPlan.getRoots().get(0) ).get(0); + assertEquals( POForEach.class, pFE.getClass() ); + POForEach pForEach = (POForEach)pFE; + PhysicalPlan inputPln = pForEach.getInputPlans().get(0); + + assertEquals(1, ls.getField(0).uid); + assertEquals(2, ls.getField(1).uid); + + LogicalRelationalOperator fe = + (LogicalRelationalOperator) newLogicalPlan.getSuccessors(load).get(0); + assertEquals( LOForEach.class, fe.getClass() ); + LOForEach forEach = (LOForEach)fe; + + org.apache.pig.experimental.logical.relational.LogicalPlan innerPlan = + forEach.getInnerPlan(); + + assertEquals( 1, innerPlan.getSinks().size() ); + assertEquals( LOGenerate.class, innerPlan.getSinks().get(0).getClass() ); + LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0); + assertEquals( 1, gen.getOutputPlans().size() ); + LogicalExpressionPlan genExp = gen.getOutputPlans().get(0); + + assertEquals( 1, genExp.getSources().size() ); + + // Main Tests start here + assertEquals( MultiplyExpression.class, genExp.getSources().get(0).getClass() ); + MultiplyExpression add = (MultiplyExpression) genExp.getSources().get(0); + assertEquals( ls.getField(0).uid, add.getLhs().getUid() ); + assertEquals( ls.getField(1).uid, add.getRhs().getUid() ); + assertTrue( ls.getField(0).uid != add.getUid() ); + assertTrue( ls.getField(1).uid != add.getUid() ); + + assertEquals( 1, inputPln.getLeaves().size() ); + assertEquals( Multiply.class, inputPln.getLeaves().get(0).getClass() ); + Multiply pMultiply = (Multiply) inputPln.getLeaves().get(0); + assertEquals( 2, inputPln.getRoots().size() ); + assertEquals( POProject.class, pMultiply.getLhs().getClass() ); + assertEquals( POProject.class, pMultiply.getRhs().getClass() ); + } + + public void testPlanwithDivide() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load 'd.txt' as (a:int, b:int);"); + lpt.buildPlan("b = foreach a generate a/b;"); + LogicalPlan plan = lpt.buildPlan("store b into 'empty';"); + + org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan); + LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0); + assertEquals( LOLoad.class, ld.getClass() ); + LOLoad load = (LOLoad)ld; + LogicalSchema ls = load.getSchema(); + + PhysicalPlan phyPlan = translatePlan(newLogicalPlan); + + PhysicalOperator pFE = phyPlan.getSuccessors( phyPlan.getRoots().get(0) ).get(0); + assertEquals( POForEach.class, pFE.getClass() ); + POForEach pForEach = (POForEach)pFE; + PhysicalPlan inputPln = pForEach.getInputPlans().get(0); + + assertEquals(1, ls.getField(0).uid); + assertEquals(2, ls.getField(1).uid); + + LogicalRelationalOperator fe = + (LogicalRelationalOperator) newLogicalPlan.getSuccessors(load).get(0); + assertEquals( LOForEach.class, fe.getClass() ); + LOForEach forEach = (LOForEach)fe; + + org.apache.pig.experimental.logical.relational.LogicalPlan innerPlan = + forEach.getInnerPlan(); + + assertEquals( 1, innerPlan.getSinks().size() ); + assertEquals( LOGenerate.class, innerPlan.getSinks().get(0).getClass() ); + LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0); + assertEquals( 1, gen.getOutputPlans().size() ); + LogicalExpressionPlan genExp = gen.getOutputPlans().get(0); + + assertEquals( 1, genExp.getSources().size() ); + + // Main Tests start here + assertEquals( DivideExpression.class, genExp.getSources().get(0).getClass() ); + DivideExpression add = (DivideExpression) genExp.getSources().get(0); + assertEquals( ls.getField(0).uid, add.getLhs().getUid() ); + assertEquals( ls.getField(1).uid, add.getRhs().getUid() ); + assertTrue( ls.getField(0).uid != add.getUid() ); + assertTrue( ls.getField(1).uid != add.getUid() ); + + assertEquals( 1, inputPln.getLeaves().size() ); + assertEquals( Divide.class, inputPln.getLeaves().get(0).getClass() ); + Divide pDivide = (Divide) inputPln.getLeaves().get(0); + assertEquals( 2, inputPln.getRoots().size() ); + assertEquals( POProject.class, pDivide.getLhs().getClass() ); + assertEquals( POProject.class, pDivide.getRhs().getClass() ); + } + + public void testPlanwithMod() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load 'd.txt' as (a:int, b:int);"); + lpt.buildPlan("b = foreach a generate a%b;"); + LogicalPlan plan = lpt.buildPlan("store b into 'empty';"); + + org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan); + LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0); + assertEquals( LOLoad.class, ld.getClass() ); + LOLoad load = (LOLoad)ld; + LogicalSchema ls = load.getSchema(); + + PhysicalPlan phyPlan = translatePlan(newLogicalPlan); + + PhysicalOperator pFE = phyPlan.getSuccessors( phyPlan.getRoots().get(0) ).get(0); + assertEquals( POForEach.class, pFE.getClass() ); + POForEach pForEach = (POForEach)pFE; + PhysicalPlan inputPln = pForEach.getInputPlans().get(0); + + assertEquals(1, ls.getField(0).uid); + assertEquals(2, ls.getField(1).uid); + + LogicalRelationalOperator fe = + (LogicalRelationalOperator) newLogicalPlan.getSuccessors(load).get(0); + assertEquals( LOForEach.class, fe.getClass() ); + LOForEach forEach = (LOForEach)fe; + + org.apache.pig.experimental.logical.relational.LogicalPlan innerPlan = + forEach.getInnerPlan(); + + assertEquals( 1, innerPlan.getSinks().size() ); + assertEquals( LOGenerate.class, innerPlan.getSinks().get(0).getClass() ); + LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0); + assertEquals( 1, gen.getOutputPlans().size() ); + LogicalExpressionPlan genExp = gen.getOutputPlans().get(0); + + assertEquals( 1, genExp.getSources().size() ); + + // Main Tests start here + assertEquals( ModExpression.class, genExp.getSources().get(0).getClass() ); + ModExpression add = (ModExpression) genExp.getSources().get(0); + assertEquals( ls.getField(0).uid, add.getLhs().getUid() ); + assertEquals( ls.getField(1).uid, add.getRhs().getUid() ); + assertTrue( ls.getField(0).uid != add.getUid() ); + assertTrue( ls.getField(1).uid != add.getUid() ); + + assertEquals( 1, inputPln.getLeaves().size() ); + assertEquals( Mod.class, inputPln.getLeaves().get(0).getClass() ); + Mod pMod = (Mod) inputPln.getLeaves().get(0); + assertEquals( 2, inputPln.getRoots().size() ); + assertEquals( POProject.class, pMod.getLhs().getClass() ); + assertEquals( POProject.class, pMod.getRhs().getClass() ); + } + + public void testPlanwithNegative() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load 'd.txt' as (a:int, b:int);"); + lpt.buildPlan("b = foreach a generate -a;"); + LogicalPlan plan = lpt.buildPlan("store b into 'empty';"); + + org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan); + LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0); + assertEquals( LOLoad.class, ld.getClass() ); + LOLoad load = (LOLoad)ld; + LogicalSchema ls = load.getSchema(); + + PhysicalPlan phyPlan = translatePlan(newLogicalPlan); + + PhysicalOperator pFE = phyPlan.getSuccessors( phyPlan.getRoots().get(0) ).get(0); + assertEquals( POForEach.class, pFE.getClass() ); + POForEach pForEach = (POForEach)pFE; + PhysicalPlan inputPln = pForEach.getInputPlans().get(0); + + assertEquals(1, ls.getField(0).uid); + assertEquals(2, ls.getField(1).uid); + + LogicalRelationalOperator fe = + (LogicalRelationalOperator) newLogicalPlan.getSuccessors(load).get(0); + assertEquals( LOForEach.class, fe.getClass() ); + LOForEach forEach = (LOForEach)fe; + + org.apache.pig.experimental.logical.relational.LogicalPlan innerPlan = + forEach.getInnerPlan(); + + assertEquals( 1, innerPlan.getSinks().size() ); + assertEquals( LOGenerate.class, innerPlan.getSinks().get(0).getClass() ); + LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0); + assertEquals( 1, gen.getOutputPlans().size() ); + LogicalExpressionPlan genExp = gen.getOutputPlans().get(0); + + assertEquals( 1, genExp.getSources().size() ); + + // Main Tests start here + assertEquals( NegativeExpression.class, genExp.getSources().get(0).getClass() ); + NegativeExpression add = (NegativeExpression) genExp.getSources().get(0); + assertEquals( ls.getField(0).uid, add.getExpression().getUid() ); + assertTrue( ls.getField(0).uid != add.getUid() ); + assertTrue( ls.getField(1).uid != add.getUid() ); + + assertEquals( 1, inputPln.getLeaves().size() ); + assertEquals( PONegative.class, inputPln.getLeaves().get(0).getClass() ); + PONegative pNegative = (PONegative) inputPln.getLeaves().get(0); + assertEquals( 1, inputPln.getRoots().size() ); + assertEquals( POProject.class, pNegative.getInputs().get(0).getClass() ); + } + + public void testPlanwithisNull() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load 'd.txt' as (a:int, b:int);"); + lpt.buildPlan("b = filter a by a is null;"); + LogicalPlan plan = lpt.buildPlan("store b into 'empty';"); + + org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan); + LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0); + assertEquals( LOLoad.class, ld.getClass() ); + LOLoad load = (LOLoad)ld; + LogicalSchema ls = load.getSchema(); + + PhysicalPlan phyPlan = translatePlan(newLogicalPlan); + + printPlan(plan); + printPlan(newLogicalPlan); + printPlan(phyPlan); + + assertEquals(1, ls.getField(0).uid); + assertEquals(2, ls.getField(1).uid); + + LogicalRelationalOperator fil = (LogicalRelationalOperator) + newLogicalPlan.getSuccessors( newLogicalPlan.getSources().get(0) ).get(0); + assertEquals( LOFilter.class, + fil.getClass() ); + LOFilter filter = (LOFilter)fil; + + LogicalExpressionPlan filPlan = filter.getFilterPlan(); + + assertEquals( 1, filPlan.getSources().size() ); + assertEquals( 2, filPlan.size() ); + assertEquals( 1, filPlan.getSinks().size() ); + assertEquals( IsNullExpression.class, filPlan.getSources().get(0).getClass() ); + IsNullExpression isNull = (IsNullExpression)filPlan.getSources().get(0); + assertTrue( ls.getField(0).uid != isNull.getUid() ); + assertTrue( ls.getField(1).uid != isNull.getUid() ); + + assertEquals( ProjectExpression.class, isNull.getExpression().getClass() ); + ProjectExpression prj = (ProjectExpression) isNull.getExpression(); + assertEquals( ls.getField(0).uid, prj.getUid() ); + } + + public void testPlanwithisNotNull() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load 'd.txt' as (a:int, b:int);"); + lpt.buildPlan("b = filter a by a is not null;"); + LogicalPlan plan = lpt.buildPlan("store b into 'empty';"); + + org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan); + LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0); + assertEquals( LOLoad.class, ld.getClass() ); + LOLoad load = (LOLoad)ld; + LogicalSchema ls = load.getSchema(); + + PhysicalPlan phyPlan = translatePlan(newLogicalPlan); + + printPlan(plan); + printPlan(newLogicalPlan); + printPlan(phyPlan); + + assertEquals(1, ls.getField(0).uid); + assertEquals(2, ls.getField(1).uid); + + LogicalRelationalOperator fil = (LogicalRelationalOperator) + newLogicalPlan.getSuccessors( newLogicalPlan.getSources().get(0) ).get(0); + assertEquals( LOFilter.class, + fil.getClass() ); + LOFilter filter = (LOFilter)fil; + + LogicalExpressionPlan filPlan = filter.getFilterPlan(); + + assertEquals( 1, filPlan.getSources().size() ); + assertEquals( 3, filPlan.size() ); + assertEquals( 1, filPlan.getSinks().size() ); + assertEquals( NotExpression.class, filPlan.getSources().get(0).getClass() ); + NotExpression notExp = (NotExpression)filPlan.getSources().get(0); + assertTrue( ls.getField(0).uid != notExp.getUid() ); + assertTrue( ls.getField(1).uid != notExp.getUid() ); + assertEquals( IsNullExpression.class, notExp.getExpression().getClass() ); + IsNullExpression isNull = (IsNullExpression)notExp.getExpression(); + assertTrue( ls.getField(0).uid != isNull.getUid() ); + assertTrue( ls.getField(1).uid != isNull.getUid() ); + + assertEquals( ProjectExpression.class, isNull.getExpression().getClass() ); + ProjectExpression prj = (ProjectExpression) isNull.getExpression(); + assertEquals( ls.getField(0).uid, prj.getUid() ); + } + + public void printPlan(org.apache.pig.experimental.logical.relational.LogicalPlan logicalPlan ) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(out); + PlanPrinter pp = new PlanPrinter(logicalPlan,ps); + pp.visit(); + System.err.println(out.toString()); + } + + public void printPlan(LogicalPlan logicalPlan) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(out); + logicalPlan.explain(ps, "text", true); + System.err.println(out.toString()); + } + + public void printPlan(PhysicalPlan physicalPlan) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(out); + physicalPlan.explain(ps, "text", true); + System.err.println(out.toString()); + } + }
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogicalOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogicalOptimizer.java?rev=919634&r1=919633&r2=919634&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogicalOptimizer.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogicalOptimizer.java Fri Mar 5 21:55:19 2010 @@ -31,7 +31,6 @@ import org.apache.pig.experimental.logical.relational.LOJoin; import org.apache.pig.experimental.logical.relational.LOLoad; import org.apache.pig.experimental.logical.relational.LogicalPlan; -import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator; import org.apache.pig.experimental.logical.relational.LogicalSchema; import org.apache.pig.experimental.logical.relational.LOJoin.JOINTYPE; import org.apache.pig.impl.io.FileSpec; @@ -104,7 +103,9 @@ mm.put(1, bprojplan); LOJoin C = new LOJoin(lp, mm, JOINTYPE.HASH, new boolean[] {true, true}); C.neverUseForRealSetSchema(cschema); - lp.add(new LogicalRelationalOperator[] {A, B}, C, null); + lp.add(C); + lp.connect(A, C); + lp.connect(B, C); // D = filter LogicalExpressionPlan filterPlan = new LogicalExpressionPlan(); @@ -133,7 +134,8 @@ LOFilter D = new LOFilter(lp, filterPlan); D.neverUseForRealSetSchema(cschema); // Connect D to B, since the transform has happened. - lp.add(C, D, (LogicalRelationalOperator)null); + lp.add(D); + lp.connect(C, D); } LogicalPlanOptimizer optimizer = new LogicalPlanOptimizer(lp, 500); @@ -161,7 +163,8 @@ LOFilter DA = new LOFilter(expected, DAfilterPlan); DA.neverUseForRealSetSchema(aschema); - expected.add(A, DA, (LogicalRelationalOperator)null); + expected.add(DA); + expected.connect(A, DA); // B = load LogicalSchema bschema = new LogicalSchema(); @@ -183,7 +186,8 @@ LOFilter DB = new LOFilter(expected, DBfilterPlan); DB.neverUseForRealSetSchema(bschema); - expected.add(B, DB, (LogicalRelationalOperator)null); + expected.add(DB); + expected.connect(B, DB); // C = join LogicalSchema cschema = new LogicalSchema(); @@ -211,7 +215,9 @@ mm.put(1, bprojplan); LOJoin C = new LOJoin(expected, mm, JOINTYPE.HASH, new boolean[] {true, true}); C.neverUseForRealSetSchema(cschema); - expected.add(new LogicalRelationalOperator[] {DA, DB}, C, null); + expected.add(C); + expected.connect(DA, C); + expected.connect(DB, C); // D = filter LogicalExpressionPlan filterPlan = new LogicalExpressionPlan(); @@ -229,7 +235,8 @@ LOFilter D = new LOFilter(expected, filterPlan); D.neverUseForRealSetSchema(cschema); - expected.add(C, D, (LogicalRelationalOperator)null); + expected.add(D); + expected.connect(C, D); } assertTrue( lp.isEqual(expected) ); Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalOperatorPlan.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalOperatorPlan.java?rev=919634&r1=919633&r2=919634&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalOperatorPlan.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalOperatorPlan.java Fri Mar 5 21:55:19 2010 @@ -35,7 +35,6 @@ import org.apache.pig.experimental.logical.relational.LOLoad; import org.apache.pig.experimental.logical.relational.LogicalPlan; import org.apache.pig.experimental.logical.relational.LogicalPlanVisitor; -import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator; import org.apache.pig.experimental.logical.relational.LogicalSchema; import org.apache.pig.experimental.logical.relational.LOJoin.JOINTYPE; import org.apache.pig.experimental.plan.BaseOperatorPlan; @@ -684,8 +683,9 @@ public void testLogicalPlanVisitor() throws IOException { LogicalPlan lp = new LogicalPlan(); LOLoad load = new LOLoad(null, null, lp); - lp.add((LogicalRelationalOperator)null, load, - (LogicalRelationalOperator)null); + /*lp.add((LogicalRelationalOperator)null, load, + (LogicalRelationalOperator)null);*/ + lp.add(load); TestLogicalVisitor v = new TestLogicalVisitor(lp); v.visit(); @@ -838,7 +838,9 @@ mm.put(1, bprojplan); LOJoin C = new LOJoin(lp, mm, JOINTYPE.HASH, new boolean[] {true, true}); C.neverUseForRealSetSchema(cschema); - lp.add(new LogicalRelationalOperator[] {A, B}, C, null); + lp.add(C); + lp.connect(A, C); + lp.connect(B, C); // D = filter LogicalExpressionPlan filterPlan = new LogicalExpressionPlan(); @@ -847,7 +849,8 @@ new EqualExpression(filterPlan, fy, fc); LOFilter D = new LOFilter(lp, filterPlan); D.neverUseForRealSetSchema(cschema); - lp.add(C, D, (LogicalRelationalOperator)null); + lp.add(D); + lp.connect(C, D); } // Build a second similar plan to test equality @@ -885,7 +888,10 @@ mm.put(1, bprojplan); LOJoin C = new LOJoin(lp1, mm, JOINTYPE.HASH, new boolean[] {true, true}); C.neverUseForRealSetSchema(cschema); - lp1.add(new LogicalRelationalOperator[] {A, B}, C, null); + lp1.add(C); + lp1.connect(A, C); + lp1.connect(B, C); + // D = filter LogicalExpressionPlan filterPlan = new LogicalExpressionPlan(); @@ -894,7 +900,9 @@ new EqualExpression(filterPlan, fy, fc); LOFilter D = new LOFilter(lp1, filterPlan); D.neverUseForRealSetSchema(cschema); - lp1.add(C, D, (LogicalRelationalOperator)null); + lp1.add(D); + lp1.connect(C, D); + } assertTrue( lp.isEqual(lp1)); @@ -1095,7 +1103,9 @@ mm1.put(1, bprojplan1); LOJoin C1 = new LOJoin(lp, mm1, JOINTYPE.HASH, new boolean[] {true, true}); C1.neverUseForRealSetSchema(jcschema1); - lp.add(new LogicalRelationalOperator[] {A1, B1}, C1, null); + lp.add(C1); + lp.connect(A1, C1); + lp.connect(B1, C1); // A = load LogicalSchema jaschema2 = new LogicalSchema(); @@ -1129,7 +1139,9 @@ mm2.put(1, bprojplan2); LOJoin C2 = new LOJoin(lp, mm2, JOINTYPE.SKEWED, new boolean[] {true, true}); C2.neverUseForRealSetSchema(jcschema2); - lp.add(new LogicalRelationalOperator[] {A2, B2}, C2, null); + lp.add(C2); + lp.connect(A2, C2); + lp.connect(B2, C2); assertFalse(C1.isEqual(C2)); } @@ -1168,7 +1180,10 @@ mm1.put(1, bprojplan1); LOJoin C1 = new LOJoin(lp, mm1, JOINTYPE.HASH, new boolean[] {true, true}); C1.neverUseForRealSetSchema(jcschema1); - lp.add(new LogicalRelationalOperator[] {A1, B1}, C1, null); + lp.add(C1); + lp.connect(A1, C1); + lp.connect(B1, C1); + // Test different inner status // A = load @@ -1203,7 +1218,10 @@ mm3.put(1, bprojplan3); LOJoin C3 = new LOJoin(lp, mm3, JOINTYPE.HASH, new boolean[] {true, false}); C3.neverUseForRealSetSchema(jcschema3); - lp.add(new LogicalRelationalOperator[] {A3, B3}, C3, null); + lp.add(C3); + lp.connect(A3, C3); + lp.connect(B3, C3); + assertFalse(C1.isEqual(C3)); } @@ -1242,7 +1260,9 @@ mm1.put(1, bprojplan1); LOJoin C1 = new LOJoin(lp, mm1, JOINTYPE.HASH, new boolean[] {true, true}); C1.neverUseForRealSetSchema(jcschema1); - lp.add(new LogicalRelationalOperator[] {A1, B1}, C1, null); + lp.add(C1); + lp.connect(A1, C1); + lp.connect(B1, C1); // A = load LogicalSchema jaschema5 = new LogicalSchema(); @@ -1287,7 +1307,10 @@ mm5.put(2, betaprojplan5); LOJoin C5 = new LOJoin(lp, mm5, JOINTYPE.HASH, new boolean[] {true, true}); C5.neverUseForRealSetSchema(jcschema5); - lp.add(new LogicalRelationalOperator[] {A5, B5, Beta5}, C5, null); + lp.add(C5); + lp.connect(A5, C5); + lp.connect(B5, C5); + lp.connect(Beta5, C5); assertFalse(C1.isEqual(C5)); } @@ -1333,7 +1356,10 @@ mm6.put(1, b2projplan6); LOJoin C6 = new LOJoin(lp, mm6, JOINTYPE.HASH, new boolean[] {true, true}); C6.neverUseForRealSetSchema(jcschema6); - lp.add(new LogicalRelationalOperator[] {A6, B6}, C6, null); + lp.add(C6); + lp.connect(A6, C6); + lp.connect(B6, C6); + LogicalSchema jaschema7 = new LogicalSchema(); jaschema7.addField(new LogicalSchema.LogicalFieldSchema( @@ -1371,7 +1397,9 @@ mm7.put(1, b2projplan7); LOJoin C7 = new LOJoin(lp, mm7, JOINTYPE.HASH, new boolean[] {true, true}); C7.neverUseForRealSetSchema(jcschema7); - lp.add(new LogicalRelationalOperator[] {A7, B7}, C7, null); + lp.add(C7); + lp.connect(A7, C7); + lp.connect(B7, C7); assertFalse(C6.isEqual(C7)); } @@ -1417,7 +1445,9 @@ mm6.put(1, b2projplan6); LOJoin C6 = new LOJoin(lp, mm6, JOINTYPE.HASH, new boolean[] {true, true}); C6.neverUseForRealSetSchema(jcschema6); - lp.add(new LogicalRelationalOperator[] {A6, B6}, C6, null); + lp.add(C6); + lp.connect(A6, C6); + lp.connect(B6, C6); // Test different different number of join keys LogicalSchema jaschema8 = new LogicalSchema(); @@ -1453,7 +1483,9 @@ mm8.put(1, bprojplan8); LOJoin C8 = new LOJoin(lp, mm8, JOINTYPE.HASH, new boolean[] {true, true}); C8.neverUseForRealSetSchema(jcschema8); - lp.add(new LogicalRelationalOperator[] {A8, B8}, C8, null); + lp.add(C8); + lp.connect(A8, C8); + lp.connect(B8, C8); assertFalse(C6.isEqual(C8)); } @@ -1478,7 +1510,8 @@ cschema.addField(new LogicalSchema.LogicalFieldSchema( "x", null, DataType.INTEGER)); D1.neverUseForRealSetSchema(cschema); - lp1.add(A1, D1, (LogicalRelationalOperator)null); + lp1.add(D1); + lp1.connect(A1, D1); LogicalPlan lp2 = new LogicalPlan(); LOLoad A2 = new LOLoad(new FileSpec("/abc", @@ -1492,7 +1525,8 @@ new EqualExpression(fp2, fy2, fc2); LOFilter D2 = new LOFilter(lp2, fp2); D2.neverUseForRealSetSchema(cschema); - lp2.add(A2, D2, (LogicalRelationalOperator)null); + lp2.add(D2); + lp2.connect(A2, D2); assertFalse(D1.isEqual(D2)); } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java?rev=919634&r1=919633&r2=919634&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java Fri Mar 5 21:55:19 2010 @@ -284,11 +284,45 @@ aschema.addField(new LogicalSchema.LogicalFieldSchema("d", aschema2, DataType.BAG)); assertTrue(aschema.isEqual(op.getSchema())); - } + + // check with defined data type + lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load '/test/d.txt' as (id, d:bag{t:(v:int, s)});"); + lpt.buildPlan("b = foreach a generate id, FLATTEN(d);"); + plan = lpt.buildPlan("store b into '/test/empty';"); + + newPlan = migratePlan(plan); + op = (LogicalRelationalOperator)newPlan.getSinks().get(0); + op = (LogicalRelationalOperator)newPlan.getPredecessors(op).get(0); + LogicalSchema schema = op.getSchema(); + + aschema = new LogicalSchema(); + aschema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY)); + aschema.addField(new LogicalSchema.LogicalFieldSchema("v", null, DataType.INTEGER)); + aschema.addField(new LogicalSchema.LogicalFieldSchema("s", null, DataType.BYTEARRAY)); + assertTrue(schema.isEqual(aschema)); + + + // test with add + lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load '/test/d.txt' as (id, v:int, s:int);"); + lpt.buildPlan("b = foreach a generate id, v+s;"); + plan = lpt.buildPlan("store b into '/test/empty';"); + + newPlan = migratePlan(plan); + op = (LogicalRelationalOperator)newPlan.getSinks().get(0); + op = (LogicalRelationalOperator)newPlan.getPredecessors(op).get(0); + schema = op.getSchema(); + + aschema = new LogicalSchema(); + aschema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY)); + aschema.addField(new LogicalSchema.LogicalFieldSchema(null, null, DataType.INTEGER)); + assertTrue(schema.isEqual(aschema)); + } public void testForeachPlan2() throws Exception { LogicalPlanTester lpt = new LogicalPlanTester(); - lpt.buildPlan("a = load '/test/d.txt' as (id, d:bag{t:(v, s)});"); + lpt.buildPlan("a = load '/test/d.txt' as (id, d:bag{t:(id:int, s)});"); lpt.buildPlan("b = foreach a generate id, FLATTEN(d);"); LogicalPlan plan = lpt.buildPlan("store b into '/test/empty';"); @@ -302,7 +336,7 @@ aschema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY)); LogicalSchema aschema2 = new LogicalSchema(); LogicalSchema aschema3 = new LogicalSchema(); - aschema3.addField(new LogicalSchema.LogicalFieldSchema("v", null, DataType.BYTEARRAY)); + aschema3.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.INTEGER)); aschema3.addField(new LogicalSchema.LogicalFieldSchema("s", null, DataType.BYTEARRAY)); aschema2.addField(new LogicalSchema.LogicalFieldSchema("t", aschema3, DataType.TUPLE)); aschema.addField(new LogicalSchema.LogicalFieldSchema("d", aschema2, DataType.BAG)); @@ -351,9 +385,11 @@ LogicalSchema schema = foreach2.getSchema(); aschema = new LogicalSchema(); aschema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY)); - aschema.addField(new LogicalSchema.LogicalFieldSchema("v", null, DataType.BYTEARRAY)); + aschema.addField(new LogicalSchema.LogicalFieldSchema("d::id", null, DataType.INTEGER)); aschema.addField(new LogicalSchema.LogicalFieldSchema("s", null, DataType.BYTEARRAY)); assertTrue(schema.isEqual(aschema)); + assertTrue(schema.getField("id")==schema.getField(0)); + assertTrue(schema.getField("d::id")==schema.getField(1)); } private org.apache.pig.experimental.logical.relational.LogicalPlan migratePlan(LogicalPlan lp) throws VisitorException{