Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java?rev=757598&r1=757597&r2=757598&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java (original) +++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java Mon Mar 23 23:36:10 2009 @@ -17,8 +17,9 @@ */ package org.apache.pig.test; -import java.io.StringReader; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.StringReader; import java.util.ArrayList; import java.util.Collections; @@ -28,13 +29,11 @@ import org.apache.pig.ExecType; import org.apache.pig.PigException; import org.apache.pig.PigServer; -import org.apache.pig.backend.executionengine.ExecJob; import org.apache.pig.backend.executionengine.util.ExecTools; -import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.logicalLayer.LogicalPlan; import org.apache.pig.impl.plan.Operator; @@ -55,6 +54,7 @@ @Before public void setUp() throws Exception { myPig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + deleteOutputFiles(); } @After @@ -63,9 +63,42 @@ } @Test - public void testMultiQueryWithTwoStores() { + public void testMultiQueryWithDemoCase() { + + System.out.println("===== multi-query with demo case 2 ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = foreach a generate uname, uid, gid;"); + myPig.registerQuery("c = filter b by uid < 5;"); + myPig.registerQuery("d = filter c by gid >= 5;"); + myPig.registerQuery("store d into '/tmp/output1';"); + myPig.registerQuery("e = filter b by uid >= 5;"); + myPig.registerQuery("store e into '/tmp/output2';"); + myPig.registerQuery("f = filter c by gid < 5;"); + myPig.registerQuery("g = group f by uname;"); + myPig.registerQuery("h = foreach g generate group, COUNT(f.uid);"); + myPig.registerQuery("store h into '/tmp/output3';"); + + LogicalPlan lp = checkLogicalPlan(1, 3, 18); + + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 20); + + checkMRPlan(pp, 1, 1, 1); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryWithTwoStores2() { - System.out.println("===== test multi-query with 2 stores ====="); + System.out.println("===== multi-query with 2 stores (2) ====="); try { myPig.setBatchOn(); @@ -77,95 +110,597 @@ myPig.registerQuery("c = group b by gid;"); myPig.registerQuery("store c into '/tmp/output2';"); - LogicalPlan lp = checkLogicalPlan(1, 2, 9); + myPig.executeBatch(); - PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 11); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryWithTwoLoads2() { - checkMRPlan(pp, 1, 2, 3); + System.out.println("===== multi-query with two loads (2) ====="); - Assert.assertTrue(executePlan(pp)); + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); + myPig.registerQuery("b = load 'file:test/org/apache/pig/test/data/passwd2' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); + myPig.registerQuery("c = filter a by uid > 5;"); + myPig.registerQuery("d = filter b by uid > 10;"); + myPig.registerQuery("store c into '/tmp/output1';"); + myPig.registerQuery("store d into '/tmp/output2';"); + myPig.registerQuery("e = cogroup c by uid, d by uid;"); + myPig.registerQuery("store e into '/tmp/output3';"); + + myPig.executeBatch(); + myPig.discardBatch(); } catch (Exception e) { e.printStackTrace(); Assert.fail(); - } finally { - deleteOutputFiles(); - } + } + } + + @Test + public void testMultiQueryWithSingleMapReduceSplittee() { + + System.out.println("===== multi-query with single map reduce splittee ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname, passwd, uid, gid);"); + myPig.registerQuery("b = foreach a generate uname, uid, gid;"); + myPig.registerQuery("split b into c1 if uid > 5, c2 if uid <= 5 ;"); + myPig.registerQuery("f = group c2 by uname;"); + myPig.registerQuery("f1 = foreach f generate group, SUM(c2.gid);"); + myPig.registerQuery("store f1 into '/tmp/output1';"); + + LogicalPlan lp = checkLogicalPlan(1, 1, 6); + + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 1, 9); + + checkMRPlan(pp, 1, 1, 1); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } } + + @Test + public void testMultiQueryWithPigMixL12() { + + System.out.println("===== multi-query with PigMix L12 ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname, passwd, uid, gid);"); + myPig.registerQuery("b = foreach a generate uname, passwd, uid, gid;"); + myPig.registerQuery("split b into c1 if uid > 5, c2 if uid <= 5 ;"); + myPig.registerQuery("split c1 into d1 if gid < 5, d2 if gid >= 5;"); + myPig.registerQuery("e = group d1 by uname;"); + myPig.registerQuery("e1 = foreach e generate group, MAX(d1.uid);"); + myPig.registerQuery("store e1 into '/tmp/output1';"); + myPig.registerQuery("f = group c2 by uname;"); + myPig.registerQuery("f1 = foreach f generate group, SUM(c2.gid);"); + myPig.registerQuery("store f1 into '/tmp/output2';"); + myPig.registerQuery("g = group d2 by uname;"); + myPig.registerQuery("g1 = foreach g generate group, COUNT(d2);"); + myPig.registerQuery("store g1 into '/tmp/output3';"); + + LogicalPlan lp = checkLogicalPlan(1, 3, 15); + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 23); + + checkMRPlan(pp, 1, 2, 3); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + @Test - public void testEmptyExecute() { - System.out.println("=== test empty execute ==="); - + public void testMultiQueryWithPigMixL12_2() { + + System.out.println("===== multi-query with PigMix L12 (2) ====="); + try { myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname, passwd, uid, gid);"); + myPig.registerQuery("b = foreach a generate uname, passwd, uid, gid;"); + myPig.registerQuery("split b into c1 if uid > 5, c2 if uid <= 5 ;"); + myPig.registerQuery("split c1 into d1 if gid < 5, d2 if gid >= 5;"); + myPig.registerQuery("e = group d1 by uname;"); + myPig.registerQuery("e1 = foreach e generate group, MAX(d1.uid);"); + myPig.registerQuery("store e1 into '/tmp/output1';"); + myPig.registerQuery("f = group c2 by uname;"); + myPig.registerQuery("f1 = foreach f generate group, SUM(c2.gid);"); + myPig.registerQuery("store f1 into '/tmp/output2';"); + myPig.registerQuery("g = group d2 by uname;"); + myPig.registerQuery("g1 = foreach g generate group, COUNT(d2);"); + myPig.registerQuery("store g1 into '/tmp/output3';"); + myPig.executeBatch(); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryWithCoGroup() { + + System.out.println("===== multi-query with CoGroup ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname, passwd, uid, gid);"); + myPig.registerQuery("store a into '/tmp/output1' using BinStorage();"); + myPig.registerQuery("b = load '/tmp/output1' using BinStorage() as (uname, passwd, uid, gid);"); + myPig.registerQuery("c = load 'file:test/org/apache/pig/test/data/passwd2' " + + "using PigStorage(':') as (uname, passwd, uid, gid);"); + myPig.registerQuery("d = cogroup b by (uname, uid) inner, c by (uname, uid) inner;"); + myPig.registerQuery("e = foreach d generate flatten(b), flatten(c);"); + myPig.registerQuery("store e into '/tmp/output2';"); + + LogicalPlan lp = checkLogicalPlan(2, 2, 9); + + PhysicalPlan pp = checkPhysicalPlan(lp, 2, 2, 12); + + checkMRPlan(pp, 1, 1, 2); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryWithCoGroup_2() { + + System.out.println("===== multi-query with CoGroup (2) ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname, passwd, uid, gid);"); + myPig.registerQuery("store a into '/tmp/output1' using BinStorage();"); + myPig.registerQuery("b = load '/tmp/output1' using BinStorage() as (uname, passwd, uid, gid);"); + myPig.registerQuery("c = load 'file:test/org/apache/pig/test/data/passwd2' " + + "using PigStorage(':') as (uname, passwd, uid, gid);"); + myPig.registerQuery("d = cogroup b by (uname, uid) inner, c by (uname, uid) inner;"); + myPig.registerQuery("e = foreach d generate flatten(b), flatten(c);"); + myPig.registerQuery("store e into '/tmp/output2';"); + myPig.executeBatch(); - myPig.discardBatch(); - } - catch (Exception e) { + + } catch (Exception e) { e.printStackTrace(); Assert.fail(); - } + } } - + @Test - public void testMultiQueryWithTwoStores2() { + public void testMultiQueryWithFJ() { - System.out.println("===== test multi-query with 2 stores (2) ====="); + System.out.println("===== multi-query with FJ ====="); try { myPig.setBatchOn(); myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + - "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); - myPig.registerQuery("b = filter a by uid > 5;"); + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("c = filter a by uid > 5;"); + myPig.registerQuery("store c into '/tmp/output1';"); + myPig.registerQuery("d = filter b by gid > 10;"); + myPig.registerQuery("store d into '/tmp/output2';"); + myPig.registerQuery("e = join c by gid, d by gid using \"repl\";"); + myPig.registerQuery("store e into '/tmp/output3';"); + + LogicalPlan lp = checkLogicalPlan(2, 3, 16); + + PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 16); + + checkMRPlan(pp, 1, 1, 2); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryWithFJ_2() { + + System.out.println("===== multi-query with FJ (2) ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("c = filter a by uid > 5;"); + myPig.registerQuery("store c into '/tmp/output1';"); + myPig.registerQuery("d = filter b by gid > 10;"); + myPig.registerQuery("store d into '/tmp/output2';"); + myPig.registerQuery("e = join c by gid, d by gid using \"repl\";"); + myPig.registerQuery("store e into '/tmp/output3';"); + + myPig.executeBatch(); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryWithExplicitSplitAndSideFiles() { + + System.out.println("===== multi-query with explicit split and side files ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("split a into b if uid > 500, c if uid <= 500;"); myPig.registerQuery("store b into '/tmp/output1';"); - myPig.registerQuery("c = group b by gid;"); myPig.registerQuery("store c into '/tmp/output2';"); + myPig.registerQuery("e = cogroup b by gid, c by gid;"); + myPig.registerQuery("d = foreach e generate flatten(c), flatten(b);"); + myPig.registerQuery("store d into '/tmp/output3';"); + + LogicalPlan lp = checkLogicalPlan(1, 3, 15); + + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 19); + + checkMRPlan(pp, 1, 1, 2); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryWithExplicitSplitAndOrderByAndSideFiles() { + + System.out.println("===== multi-query with explicit split, orderby and side files ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("split a into a1 if uid > 500, a2 if gid > 500;"); + myPig.registerQuery("b1 = distinct a1;"); + myPig.registerQuery("b2 = order a2 by uname;"); + myPig.registerQuery("store b1 into '/tmp/output1';"); + myPig.registerQuery("store b2 into '/tmp/output2';"); + myPig.registerQuery("c = cogroup b1 by uname, b2 by uname;"); + myPig.registerQuery("d = foreach c generate flatten(group), flatten($1), flatten($2);"); + myPig.registerQuery("store d into '/tmp/output3';"); + + LogicalPlan lp = checkLogicalPlan(1, 3, 17); + + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 21); + + checkMRPlan(pp, 1, 1, 4); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryWithIntermediateStores() { + + System.out.println("===== multi-query with intermediate stores ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("store a into '/tmp/output1';"); + myPig.registerQuery("b = load '/tmp/output1' using PigStorage(':'); "); + myPig.registerQuery("store b into '/tmp/output2';"); + + LogicalPlan lp = checkLogicalPlan(1, 2, 7); + + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 7); + + checkMRPlan(pp, 1, 1, 1); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryWithIntermediateStores_2() { + + System.out.println("===== multi-query with intermediate stores (2) ====="); + + try { + + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("store a into '/tmp/output1';"); + myPig.registerQuery("b = load '/tmp/output1' using PigStorage(':'); "); + myPig.registerQuery("store b into '/tmp/output2';"); myPig.executeBatch(); } catch (Exception e) { e.printStackTrace(); Assert.fail(); - } finally { - deleteOutputFiles(); - } - } + } + } @Test - public void testMultiQueryWithTwoStores2Execs() { + public void testMultiQueryWithImplicitSplitAndSideFiles() { + + System.out.println("===== multi-query with implicit split and side files ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = filter a by uid > 500;"); + myPig.registerQuery("c = filter a by gid > 500;"); + myPig.registerQuery("store c into '/tmp/output1';"); + myPig.registerQuery("d = cogroup b by uname, c by uname;"); + myPig.registerQuery("e = foreach d generate flatten(c), flatten(b);"); + myPig.registerQuery("store e into '/tmp/output2';"); + myPig.registerQuery("f = filter e by b::uid < 1000;"); + myPig.registerQuery("store f into '/tmp/output3';"); + + LogicalPlan lp = checkLogicalPlan(1, 3, 19); + + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 22); + + checkMRPlan(pp, 1, 1, 2); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } - System.out.println("===== test multi-query with 2 stores (2) ====="); + @Test + public void testMultiQueryWithTwoLoadsAndTwoStores() { + + System.out.println("===== multi-query with two loads and two stores ====="); try { myPig.setBatchOn(); myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); + myPig.registerQuery("b = load 'file:test/org/apache/pig/test/data/passwd2' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); + myPig.registerQuery("c = filter a by uid > 5;"); + myPig.registerQuery("d = filter b by uid > 10;"); + myPig.registerQuery("e = cogroup c by uid, d by uid;"); + myPig.registerQuery("f = foreach e generate flatten(c), flatten(d);"); + myPig.registerQuery("g = group f by d::gid;"); + myPig.registerQuery("h = filter f by c::gid > 5;"); + myPig.registerQuery("store g into '/tmp/output1';"); + myPig.registerQuery("store h into '/tmp/output2';"); + + LogicalPlan lp = checkLogicalPlan(2, 2, 15); + + PhysicalPlan pp = checkPhysicalPlan(lp, 2, 2, 20); + + checkMRPlan(pp, 1, 1, 2); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryWithSplitInReduce() { + + System.out.println("===== multi-query with split in reduce ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); myPig.registerQuery("b = filter a by uid > 5;"); - myPig.executeBatch(); - myPig.registerQuery("store b into '/tmp/output1';"); - myPig.executeBatch(); myPig.registerQuery("c = group b by gid;"); - myPig.registerQuery("store c into '/tmp/output2';"); + myPig.registerQuery("d = foreach c generate group, COUNT(b.uid);"); + myPig.registerQuery("store d into '/tmp/output1';"); + myPig.registerQuery("e = filter d by $1 > 5;"); + myPig.registerQuery("store e into '/tmp/output2';"); - myPig.executeBatch(); - myPig.discardBatch(); + LogicalPlan lp = checkLogicalPlan(1, 2, 11); + + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 13); + + checkMRPlan(pp, 1, 1, 1); } catch (Exception e) { e.printStackTrace(); Assert.fail(); - } finally { - deleteOutputFiles(); - } + } } + + @Test + public void testMultiQueryWithSplitInReduceAndReduceSplitee() { + + System.out.println("===== multi-query with split in reduce and reduce splitee ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = filter a by uid > 5;"); + myPig.registerQuery("c = group b by gid;"); + myPig.registerQuery("d = foreach c generate group, COUNT(b.uid);"); + myPig.registerQuery("store d into '/tmp/output1';"); + myPig.registerQuery("e = filter d by $1 > 5;"); + myPig.registerQuery("f = group e by $1;"); + myPig.registerQuery("g = foreach f generate group, SUM(e.$0);"); + myPig.registerQuery("store g into '/tmp/output2';"); + + LogicalPlan lp = checkLogicalPlan(1, 2, 13); + + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 17); + checkMRPlan(pp, 1, 1, 2); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + @Test - public void testMultiQueryWithThreeStores() { + public void testMultiQueryWithSplitInReduceAndReduceSplitees() { + + System.out.println("===== multi-query with split in reduce and reduce splitees ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = filter a by uid > 5;"); + myPig.registerQuery("c = group b by gid;"); + myPig.registerQuery("d = foreach c generate group, COUNT(b.uid);"); + myPig.registerQuery("e = filter d by $1 > 5;"); + myPig.registerQuery("e1 = group e by $1;"); + myPig.registerQuery("e2 = foreach e1 generate group, SUM(e.$0);"); + myPig.registerQuery("store e2 into '/tmp/output1';"); + myPig.registerQuery("f = filter d by $1 < 5;"); + myPig.registerQuery("f1 = group f by $1;"); + myPig.registerQuery("f2 = foreach f1 generate group, COUNT(f.$0);"); + myPig.registerQuery("store f2 into '/tmp/output2';"); + + LogicalPlan lp = checkLogicalPlan(1, 2, 16); + + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 22); + + checkMRPlan(pp, 1, 2, 3); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryWithSplitInReduceAndReduceSpliteesAndMore() { + + System.out.println("===== multi-query with split in reduce and reduce splitees and more ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = filter a by uid > 500;"); + myPig.registerQuery("c = group b by gid;"); + myPig.registerQuery("d = foreach c generate group, COUNT(b.uid);"); + myPig.registerQuery("e = filter d by $1 > 5;"); + myPig.registerQuery("e1 = group e by $1;"); + myPig.registerQuery("e2 = foreach e1 generate group, SUM(e.$0);"); + myPig.registerQuery("e3 = filter e2 by $1 > 10;"); + myPig.registerQuery("e4 = group e3 by $1;"); + myPig.registerQuery("e5 = foreach e4 generate group, SUM(e3.$0);"); + myPig.registerQuery("store e5 into '/tmp/output1';"); + myPig.registerQuery("f = filter d by $1 < 5;"); + myPig.registerQuery("f1 = group f by $1;"); + myPig.registerQuery("f2 = foreach f1 generate group, COUNT(f.$0);"); + myPig.registerQuery("f3 = filter f2 by $1 < 100;"); + myPig.registerQuery("f4 = group f3 by $1;"); + myPig.registerQuery("f5 = foreach f4 generate group, COUNT(f3.$0);"); + myPig.registerQuery("store f5 into '/tmp/output2';"); + + LogicalPlan lp = checkLogicalPlan(1, 2, 22); + + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 32); + + checkMRPlan(pp, 1, 2, 5); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryWithSplitInMapAndReduceSplitees() { - System.out.println("===== test multi-query with 3 stores ====="); + System.out.println("===== multi-query with split in map and reduce splitees ====="); + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = filter a by uid < 5;"); + myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;"); + myPig.registerQuery("d = filter a by uid >= 10;"); + myPig.registerQuery("b1 = group b by gid;"); + myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);"); + myPig.registerQuery("b3 = filter b2 by $1 > 5;"); + myPig.registerQuery("store b3 into '/tmp/output1';"); + myPig.registerQuery("c1 = group c by $1;"); + myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);"); + myPig.registerQuery("store c2 into '/tmp/output2';"); + myPig.registerQuery("d1 = group d by $1;"); + myPig.registerQuery("d2 = foreach d1 generate group, COUNT(d.uid);"); + myPig.registerQuery("store d2 into '/tmp/output3';"); + + LogicalPlan lp = checkLogicalPlan(1, 3, 19); + + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25); + + checkMRPlan(pp, 1, 2, 3); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryWithTwoStores() { + + System.out.println("===== multi-query with 2 stores ====="); try { myPig.setBatchOn(); @@ -174,31 +709,42 @@ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); myPig.registerQuery("b = filter a by uid > 5;"); myPig.registerQuery("store b into '/tmp/output1';"); - myPig.registerQuery("c = filter b by uid > 10;"); + myPig.registerQuery("c = group b by gid;"); myPig.registerQuery("store c into '/tmp/output2';"); - myPig.registerQuery("d = filter c by uid > 15;"); - myPig.registerQuery("store d into '/tmp/output3';"); - - LogicalPlan lp = checkLogicalPlan(1, 3, 14); - PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 14); + LogicalPlan lp = checkLogicalPlan(1, 2, 9); - checkMRPlan(pp, 1, 3, 5); + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 11); - Assert.assertTrue(executePlan(pp)); + checkMRPlan(pp, 1, 1, 1); } catch (Exception e) { e.printStackTrace(); Assert.fail(); - } finally { - deleteOutputFiles(); - } + } } @Test - public void testMultiQueryWithThreeStores2() { + public void testEmptyExecute() { + + System.out.println("==== empty execute ===="); + + try { + myPig.setBatchOn(); + myPig.executeBatch(); + myPig.executeBatch(); + myPig.discardBatch(); + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryWithTwoStores2Execs() { - System.out.println("===== test multi-query with 3 stores (2) ====="); + System.out.println("===== multi-query with 2 stores execs ====="); try { myPig.setBatchOn(); @@ -206,11 +752,11 @@ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); myPig.registerQuery("b = filter a by uid > 5;"); + myPig.executeBatch(); myPig.registerQuery("store b into '/tmp/output1';"); - myPig.registerQuery("c = filter b by uid > 10;"); + myPig.executeBatch(); + myPig.registerQuery("c = group b by gid;"); myPig.registerQuery("store c into '/tmp/output2';"); - myPig.registerQuery("d = filter c by uid > 15;"); - myPig.registerQuery("store d into '/tmp/output3';"); myPig.executeBatch(); myPig.discardBatch(); @@ -218,64 +764,54 @@ } catch (Exception e) { e.printStackTrace(); Assert.fail(); - } finally { - deleteOutputFiles(); - } + } } @Test - public void testMultiQueryWithTwoLoads() { + public void testMultiQueryWithThreeStores() { - System.out.println("===== test multi-query with two loads ====="); + System.out.println("===== multi-query with 3 stores ====="); try { myPig.setBatchOn(); myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); - myPig.registerQuery("b = load 'file:test/org/apache/pig/test/data/passwd2' " + - "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); - myPig.registerQuery("c = filter a by uid > 5;"); - myPig.registerQuery("d = filter b by uid > 10;"); - myPig.registerQuery("store c into '/tmp/output1';"); - myPig.registerQuery("store d into '/tmp/output2';"); - myPig.registerQuery("e = cogroup c by uid, d by uid;"); - myPig.registerQuery("store e into '/tmp/output3';"); - - LogicalPlan lp = checkLogicalPlan(2, 3, 16); + myPig.registerQuery("b = filter a by uid > 5;"); + myPig.registerQuery("store b into '/tmp/output1';"); + myPig.registerQuery("c = filter b by uid > 10;"); + myPig.registerQuery("store c into '/tmp/output2';"); + myPig.registerQuery("d = filter c by uid > 15;"); + myPig.registerQuery("store d into '/tmp/output3';"); - PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 19); + LogicalPlan lp = checkLogicalPlan(1, 3, 14); - checkMRPlan(pp, 2, 3, 5); + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 14); - Assert.assertTrue(executePlan(pp)); + checkMRPlan(pp, 1, 1, 1); } catch (Exception e) { e.printStackTrace(); Assert.fail(); - } finally { - deleteOutputFiles(); - } + } } @Test - public void testMultiQueryWithTwoLoads2() { + public void testMultiQueryWithThreeStores2() { - System.out.println("===== test multi-query with two loads (2) ====="); + System.out.println("===== multi-query with 3 stores (2) ====="); try { myPig.setBatchOn(); myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); - myPig.registerQuery("b = load 'file:test/org/apache/pig/test/data/passwd2' " + - "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); - myPig.registerQuery("c = filter a by uid > 5;"); - myPig.registerQuery("d = filter b by uid > 10;"); - myPig.registerQuery("store c into '/tmp/output1';"); - myPig.registerQuery("store d into '/tmp/output2';"); - myPig.registerQuery("e = cogroup c by uid, d by uid;"); - myPig.registerQuery("store e into '/tmp/output3';"); + myPig.registerQuery("b = filter a by uid > 5;"); + myPig.registerQuery("store b into '/tmp/output1';"); + myPig.registerQuery("c = filter b by uid > 10;"); + myPig.registerQuery("store c into '/tmp/output2';"); + myPig.registerQuery("d = filter c by uid > 15;"); + myPig.registerQuery("store d into '/tmp/output3';"); myPig.executeBatch(); myPig.discardBatch(); @@ -283,42 +819,44 @@ } catch (Exception e) { e.printStackTrace(); Assert.fail(); - } finally { - deleteOutputFiles(); - } + } } @Test - public void testMultiQueryWithNoStore() { + public void testMultiQueryWithTwoLoads() { - System.out.println("===== test multi-query with no store ====="); + System.out.println("===== multi-query with two loads ====="); try { myPig.setBatchOn(); myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); - myPig.registerQuery("b = filter a by uid > 5;"); - myPig.registerQuery("group b by gid;"); - - LogicalPlan lp = checkLogicalPlan(0, 0, 0); + myPig.registerQuery("b = load 'file:test/org/apache/pig/test/data/passwd2' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"); + myPig.registerQuery("c = filter a by uid > 5;"); + myPig.registerQuery("d = filter b by uid > 10;"); + myPig.registerQuery("store c into '/tmp/output1';"); + myPig.registerQuery("store d into '/tmp/output2';"); + myPig.registerQuery("e = cogroup c by uid, d by uid;"); + myPig.registerQuery("store e into '/tmp/output3';"); - PhysicalPlan pp = checkPhysicalPlan(lp, 0, 0, 0); + LogicalPlan lp = checkLogicalPlan(2, 3, 16); - //checkMRPlan(pp, 1, 1, 1); + PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 19); - //Assert.assertTrue(executePlan(pp)); + checkMRPlan(pp, 2, 1, 3); } catch (Exception e) { e.printStackTrace(); Assert.fail(); - } + } } @Test public void testMultiQueryWithNoStore2() { - System.out.println("===== test multi-query with no store (2) ====="); + System.out.println("===== multi-query with no store (2) ====="); try { myPig.setBatchOn(); @@ -341,7 +879,7 @@ @Test public void testMultiQueryWithExplain() { - System.out.println("===== test multi-query with explain ====="); + System.out.println("===== multi-query with explain ====="); try { String script = "a = load 'file:test/org/apache/pig/test/data/passwd' " @@ -358,15 +896,13 @@ } catch (Exception e) { e.printStackTrace(); Assert.fail(); - } finally { - deleteOutputFiles(); - } + } } @Test public void testMultiQueryWithDump() { - System.out.println("===== test multi-query with dump ====="); + System.out.println("===== multi-query with dump ====="); try { String script = "a = load 'file:test/org/apache/pig/test/data/passwd' " @@ -383,15 +919,13 @@ } catch (Exception e) { e.printStackTrace(); Assert.fail(); - } finally { - deleteOutputFiles(); - } + } } @Test public void testMultiQueryWithDescribe() { - System.out.println("===== test multi-query with describe ====="); + System.out.println("===== multi-query with describe ====="); try { String script = "a = load 'file:test/org/apache/pig/test/data/passwd' " @@ -408,15 +942,13 @@ } catch (Exception e) { e.printStackTrace(); Assert.fail(); - } finally { - deleteOutputFiles(); - } + } } @Test public void testMultiQueryWithIllustrate() { - System.out.println("===== test multi-query with illustrate ====="); + System.out.println("===== multi-query with illustrate ====="); try { String script = "a = load 'file:test/org/apache/pig/test/data/passwd' " @@ -433,9 +965,7 @@ } catch (Exception e) { e.printStackTrace(); Assert.fail(); - } finally { - deleteOutputFiles(); - } + } } // -------------------------------------------------------------------------- @@ -483,12 +1013,18 @@ } } + showPlanOperators(lp); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + lp.explain(out, System.out); + + System.out.println("===== Display Logical Plan ====="); + System.out.println(out.toString()); + Assert.assertEquals(expectedRoots, lp.getRoots().size()); Assert.assertEquals(expectedLeaves, lp.getLeaves().size()); Assert.assertEquals(expectedSize, lp.size()); - showPlanOperators(lp); - return lp; } @@ -500,12 +1036,18 @@ PhysicalPlan pp = myPig.getPigContext().getExecutionEngine().compile( lp, null); + showPlanOperators(pp); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + pp.explain(out); + + System.out.println("===== Display Physical Plan ====="); + System.out.println(out.toString()); + Assert.assertEquals(expectedRoots, pp.getRoots().size()); Assert.assertEquals(expectedLeaves, pp.getLeaves().size()); Assert.assertEquals(expectedSize, pp.size()); - showPlanOperators(pp); - return pp; } @@ -516,28 +1058,40 @@ ExecTools.checkLeafIsStore(pp, myPig.getPigContext()); - MRCompiler mrcomp = new MRCompiler(pp, myPig.getPigContext()); - MROperPlan mrp = mrcomp.compile(); + MapReduceLauncher launcher = new MapReduceLauncher(); + + MROperPlan mrp = null; + + try { + java.lang.reflect.Method compile = launcher.getClass() + .getDeclaredMethod("compile", + new Class[] { PhysicalPlan.class, PigContext.class }); + + compile.setAccessible(true); + + mrp = (MROperPlan) compile.invoke(launcher, new Object[] { pp, myPig.getPigContext() }); + + Assert.assertNotNull(mrp); + } catch (Exception e) { + PigException pe = Utils.getPigException(e); + if (pe != null) { + throw pe; + } else { + e.printStackTrace(); + Assert.fail(); + } + } + + showPlanOperators(mrp); + Assert.assertEquals(expectedRoots, mrp.getRoots().size()); Assert.assertEquals(expectedLeaves, mrp.getLeaves().size()); Assert.assertEquals(expectedSize, mrp.size()); - showPlanOperators(mrp); - return mrp; } - private boolean executePlan(PhysicalPlan pp) throws IOException { - FileLocalizer.clearDeleteOnFail(); - ExecJob job = myPig.getPigContext().getExecutionEngine().execute(pp, "execute"); - boolean failed = (job.getStatus() == ExecJob.JOB_STATUS.FAILED); - if (failed) { - FileLocalizer.triggerDeleteOnFail(); - } - return !failed; - } - private void deleteOutputFiles() { try { FileLocalizer.delete("/tmp/output1", myPig.getPigContext());