Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=771437&r1=771436&r2=771437&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Mon May 4 20:50:31 2009 @@ -181,6 +181,342 @@ } @Test + public void testMultiQueryPhase3BaseCase() { + + System.out.println("===== multi-query phase 3 base case ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = filter a by uid < 5;"); + myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;"); + myPig.registerQuery("d = filter a by uid >= 10;"); + myPig.registerQuery("b1 = group b by gid;"); + myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);"); + myPig.registerQuery("b3 = filter b2 by $1 > 5;"); + myPig.registerQuery("store b3 into '/tmp/output1';"); + myPig.registerQuery("c1 = group c by gid;"); + myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);"); + myPig.registerQuery("store c2 into '/tmp/output2';"); + myPig.registerQuery("d1 = group d by gid;"); + myPig.registerQuery("d2 = foreach d1 generate group, AVG(d.uid);"); + myPig.registerQuery("store d2 into '/tmp/output3';"); + + LogicalPlan lp = checkLogicalPlan(1, 3, 19); + + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25); + + checkMRPlan(pp, 1, 1, 1); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryPhase3BaseCase2() { + + System.out.println("===== multi-query phase 3 base case (2) ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = filter a by uid < 5;"); + myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;"); + myPig.registerQuery("d = filter a by uid >= 10;"); + myPig.registerQuery("b1 = group b by gid;"); + myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);"); + myPig.registerQuery("b3 = filter b2 by $1 > 5;"); + myPig.registerQuery("store b3 into '/tmp/output1';"); + myPig.registerQuery("c1 = group c by gid;"); + myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);"); + myPig.registerQuery("store c2 into '/tmp/output2';"); + myPig.registerQuery("d1 = group d by gid;"); + myPig.registerQuery("d2 = foreach d1 generate group, AVG(d.uid);"); + myPig.registerQuery("store d2 into '/tmp/output3';"); + + myPig.executeBatch(); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryPhase3WithoutCombiner() { + + System.out.println("===== multi-query phase 3 without combiner ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = filter a by uid < 5;"); + myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;"); + myPig.registerQuery("d = filter a by uid >= 10;"); + myPig.registerQuery("b1 = group b by gid;"); + myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid) + SUM(b.uid);"); + myPig.registerQuery("b3 = filter b2 by $1 > 5;"); + myPig.registerQuery("store b3 into '/tmp/output1';"); + myPig.registerQuery("c1 = group c by gid;"); + myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid) - COUNT(c.uid);"); + myPig.registerQuery("store c2 into '/tmp/output2';"); + myPig.registerQuery("d1 = group d by gid;"); + myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);"); + myPig.registerQuery("store d2 into '/tmp/output3';"); + + LogicalPlan lp = checkLogicalPlan(1, 3, 19); + + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25); + + checkMRPlan(pp, 1, 1, 1); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryPhase3WithoutCombiner2() { + + System.out.println("===== multi-query phase 3 without combiner (2) ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = filter a by uid < 5;"); + myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;"); + myPig.registerQuery("d = filter a by uid >= 10;"); + myPig.registerQuery("b1 = group b by gid;"); + myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid) + SUM(b.uid);"); + myPig.registerQuery("b3 = filter b2 by $1 > 5;"); + myPig.registerQuery("store b3 into '/tmp/output1';"); + myPig.registerQuery("c1 = group c by gid;"); + myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid) - COUNT(c.uid);"); + myPig.registerQuery("store c2 into '/tmp/output2';"); + myPig.registerQuery("d1 = group d by gid;"); + myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);"); + myPig.registerQuery("store d2 into '/tmp/output3';"); + + myPig.executeBatch(); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryPhase3WithMixedCombiner() { + + System.out.println("===== multi-query phase 3 with mixed combiner ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = filter a by uid < 5;"); + myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;"); + myPig.registerQuery("d = filter a by uid >= 10;"); + myPig.registerQuery("b1 = group b by gid;"); + myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);"); + myPig.registerQuery("b3 = filter b2 by $1 > 5;"); + myPig.registerQuery("store b3 into '/tmp/output1';"); + myPig.registerQuery("c1 = group c by gid;"); + myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);"); + myPig.registerQuery("store c2 into '/tmp/output2';"); + myPig.registerQuery("d1 = group d by gid;"); + myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);"); + myPig.registerQuery("store d2 into '/tmp/output3';"); + + LogicalPlan lp = checkLogicalPlan(1, 3, 19); + + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25); + + checkMRPlan(pp, 1, 1, 2); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryPhase3WithMixedCombiner2() { + + System.out.println("===== multi-query phase 3 with mixed combiner (2) ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = filter a by uid < 5;"); + myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;"); + myPig.registerQuery("d = filter a by uid >= 10;"); + myPig.registerQuery("b1 = group b by gid;"); + myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);"); + myPig.registerQuery("b3 = filter b2 by $1 > 5;"); + myPig.registerQuery("store b3 into '/tmp/output1';"); + myPig.registerQuery("c1 = group c by gid;"); + myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);"); + myPig.registerQuery("store c2 into '/tmp/output2';"); + myPig.registerQuery("d1 = group d by gid;"); + myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);"); + myPig.registerQuery("store d2 into '/tmp/output3';"); + + myPig.executeBatch(); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryPhase3WithDifferentMapDataTypes() { + + System.out.println("===== multi-query phase 3 with different map datatypes ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = filter a by uid < 5;"); + myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;"); + myPig.registerQuery("d = filter a by uid >= 10;"); + myPig.registerQuery("b1 = group b by gid parallel 2;"); + myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);"); + myPig.registerQuery("b3 = filter b2 by $1 > 5;"); + myPig.registerQuery("store b3 into '/tmp/output1';"); + myPig.registerQuery("c1 = group c by $1 parallel 3;"); + myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);"); + myPig.registerQuery("store c2 into '/tmp/output2';"); + myPig.registerQuery("d1 = group d by $1 parallel 4;"); + myPig.registerQuery("d2 = foreach d1 generate group, COUNT(d.uid);"); + myPig.registerQuery("store d2 into '/tmp/output3';"); + + LogicalPlan lp = checkLogicalPlan(1, 3, 19); + + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25); + + checkMRPlan(pp, 1, 1, 1); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryPhase3WithDifferentMapDataTypes2() { + + System.out.println("===== multi-query phase 3 with different map datatypes (2) ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = filter a by uid < 5;"); + myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;"); + myPig.registerQuery("d = filter a by uid >= 10;"); + myPig.registerQuery("b1 = group b by gid;"); + myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);"); + myPig.registerQuery("b3 = filter b2 by $1 > 5;"); + myPig.registerQuery("store b3 into '/tmp/output1';"); + myPig.registerQuery("c1 = group c by $1;"); + myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);"); + myPig.registerQuery("store c2 into '/tmp/output2';"); + myPig.registerQuery("d1 = group d by $1;"); + myPig.registerQuery("d2 = foreach d1 generate group, COUNT(d.uid);"); + myPig.registerQuery("store d2 into '/tmp/output3';"); + + myPig.executeBatch(); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryPhase3StreamingInReducer() { + + System.out.println("===== multi-query phase 3 with streaming in reducer ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("A = load 'file:test/org/apache/pig/test/data/passwd' split by 'file';"); + myPig.registerQuery("Split A into A1 if $2 > 5, A2 if $2 >= 5;"); + myPig.registerQuery("Split A1 into A3 if $0 > 'm', A4 if $0 >= 'm';"); + myPig.registerQuery("B = group A3 by $2;"); + myPig.registerQuery("C = foreach B generate flatten(A3);"); + myPig.registerQuery("D = stream B through `cat`;"); + myPig.registerQuery("store D into '/tmp/output1';"); + myPig.registerQuery("E = group A4 by $2;"); + myPig.registerQuery("F = foreach E generate group, COUNT(A4);"); + myPig.registerQuery("store F into '/tmp/output2';"); + myPig.registerQuery("G = group A1 by $2;"); + myPig.registerQuery("H = foreach G generate group, COUNT(A1);"); + myPig.registerQuery("store H into '/tmp/output3';"); + + LogicalPlan lp = checkLogicalPlan(1, 3, 16); + + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 24); + + checkMRPlan(pp, 1, 1, 2); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryPhase3StreamingInReducer2() { + + System.out.println("===== multi-query phase 3 with streaming in reducer (2) ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("A = load 'file:test/org/apache/pig/test/data/passwd' split by 'file';"); + myPig.registerQuery("Split A into A1 if $2 > 5, A2 if $2 >= 5;"); + myPig.registerQuery("Split A1 into A3 if $0 > 'm', A4 if $0 >= 'm';"); + myPig.registerQuery("B = group A3 by $2;"); + myPig.registerQuery("C = foreach B generate flatten(A3);"); + myPig.registerQuery("D = stream B through `cat`;"); + myPig.registerQuery("store D into '/tmp/output1';"); + myPig.registerQuery("E = group A4 by $2;"); + myPig.registerQuery("F = foreach E generate group, COUNT(A4);"); + myPig.registerQuery("store F into '/tmp/output2';"); + myPig.registerQuery("G = group A1 by $2;"); + myPig.registerQuery("H = foreach G generate group, COUNT(A1);"); + myPig.registerQuery("store H into '/tmp/output3';"); + + myPig.executeBatch(); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test public void testMultiQueryWithPigMixL12() { System.out.println("===== multi-query with PigMix L12 ====="); @@ -207,7 +543,7 @@ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 23); - checkMRPlan(pp, 1, 2, 3); + checkMRPlan(pp, 1, 1, 1); } catch (Exception e) { e.printStackTrace(); @@ -695,7 +1031,7 @@ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25); - checkMRPlan(pp, 1, 2, 3); + checkMRPlan(pp, 1, 1, 1); } catch (Exception e) { e.printStackTrace();
Modified: hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java?rev=771437&r1=771436&r2=771437&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java Mon May 4 20:50:31 2009 @@ -26,6 +26,7 @@ import org.apache.pig.FuncSpec; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.builtin.BinStorage; +import org.apache.pig.builtin.PigStorage; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; @@ -37,9 +38,6 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*; import org.apache.pig.impl.plan.PlanException; @@ -755,6 +753,12 @@ PORead ret = new PORead(new OperatorKey("", r.nextLong()), bag); return ret; } + + public static POStore dummyPigStorageOp() { + POStore ret = new POStore(new OperatorKey("", r.nextLong())); + ret.setSFile(new FileSpec("DummyFil", new FuncSpec(PigStorage.class.getName() + "()"))); + return ret; + } public static POStore topStoreOp() { POStore ret = new POStore(new OperatorKey("", r.nextLong()));