Modified: 
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java?rev=757598&r1=757597&r2=757598&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java 
(original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java 
Mon Mar 23 23:36:10 2009
@@ -17,8 +17,9 @@
  */
 package org.apache.pig.test;
 
-import java.io.StringReader;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Collections;
 
@@ -28,13 +29,11 @@
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.util.ExecTools;
-import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.plan.Operator;
@@ -55,6 +54,7 @@
     @Before
     public void setUp() throws Exception {
         myPig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        deleteOutputFiles();
     }
 
     @After
@@ -63,9 +63,42 @@
     }
 
     @Test
-    public void testMultiQueryWithTwoStores() {
+    public void testMultiQueryWithDemoCase() {
+
+        System.out.println("===== multi-query with demo case 2 =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = foreach a generate uname, uid, gid;");
+            myPig.registerQuery("c = filter b by uid < 5;");
+            myPig.registerQuery("d = filter c by gid >= 5;");
+            myPig.registerQuery("store d into '/tmp/output1';");
+            myPig.registerQuery("e = filter b by uid >= 5;");
+            myPig.registerQuery("store e into '/tmp/output2';");
+            myPig.registerQuery("f = filter c by gid < 5;");
+            myPig.registerQuery("g = group f by uname;");
+            myPig.registerQuery("h = foreach g generate group, COUNT(f.uid);");
+            myPig.registerQuery("store h into '/tmp/output3';");
+             
+            LogicalPlan lp = checkLogicalPlan(1, 3, 18);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 20);
+
+            checkMRPlan(pp, 1, 1, 1);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }         
+    
+    @Test
+    public void testMultiQueryWithTwoStores2() {
 
-        System.out.println("===== test multi-query with 2 stores =====");
+        System.out.println("===== multi-query with 2 stores (2) =====");
 
         try {
             myPig.setBatchOn();
@@ -77,95 +110,597 @@
             myPig.registerQuery("c = group b by gid;");
             myPig.registerQuery("store c into '/tmp/output2';");
 
-            LogicalPlan lp = checkLogicalPlan(1, 2, 9);
+            myPig.executeBatch();
 
-            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 11);
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }
+    
+    @Test
+    public void testMultiQueryWithTwoLoads2() {
 
-            checkMRPlan(pp, 1, 2, 3);
+        System.out.println("===== multi-query with two loads (2) =====");
 
-            Assert.assertTrue(executePlan(pp));
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
+            myPig.registerQuery("b = load 
'file:test/org/apache/pig/test/data/passwd2' " +
+                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
+            myPig.registerQuery("c = filter a by uid > 5;");
+            myPig.registerQuery("d = filter b by uid > 10;");
+            myPig.registerQuery("store c into '/tmp/output1';");
+            myPig.registerQuery("store d into '/tmp/output2';");
+            myPig.registerQuery("e = cogroup c by uid, d by uid;");
+            myPig.registerQuery("store e into '/tmp/output3';");
+
+            myPig.executeBatch();
+            myPig.discardBatch();
 
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();
-        } finally {
-            deleteOutputFiles();
-        }
+        } 
+    }    
+    
+    @Test
+    public void testMultiQueryWithSingleMapReduceSplittee() {
+
+        System.out.println("===== multi-query with single map reduce splittee 
=====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname, passwd, uid, 
gid);");
+            myPig.registerQuery("b = foreach a generate uname, uid, gid;");
+            myPig.registerQuery("split b into c1 if uid > 5, c2 if uid <= 5 
;"); 
+            myPig.registerQuery("f = group c2 by uname;");
+            myPig.registerQuery("f1 = foreach f generate group, SUM(c2.gid);");
+            myPig.registerQuery("store f1 into '/tmp/output1';");
+
+            LogicalPlan lp = checkLogicalPlan(1, 1, 6);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 1, 9);
+
+            checkMRPlan(pp, 1, 1, 1); 
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
     }
+    
+    @Test
+    public void testMultiQueryWithPigMixL12() {
+
+        System.out.println("===== multi-query with PigMix L12 =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname, passwd, uid, 
gid);");
+            myPig.registerQuery("b = foreach a generate uname, passwd, uid, 
gid;");
+            myPig.registerQuery("split b into c1 if uid > 5, c2 if uid <= 5 
;"); 
+            myPig.registerQuery("split c1 into d1 if gid < 5, d2 if gid >= 
5;");
+            myPig.registerQuery("e = group d1 by uname;");
+            myPig.registerQuery("e1 = foreach e generate group, MAX(d1.uid);");
+            myPig.registerQuery("store e1 into '/tmp/output1';");
+            myPig.registerQuery("f = group c2 by uname;");
+            myPig.registerQuery("f1 = foreach f generate group, SUM(c2.gid);");
+            myPig.registerQuery("store f1 into '/tmp/output2';");
+            myPig.registerQuery("g = group d2 by uname;");
+            myPig.registerQuery("g1 = foreach g generate group, COUNT(d2);");
+            myPig.registerQuery("store g1 into '/tmp/output3';");
+
+            LogicalPlan lp = checkLogicalPlan(1, 3, 15);
 
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 23);
+
+            checkMRPlan(pp, 1, 2, 3); 
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }
+    
     @Test
-    public void testEmptyExecute() {
-        System.out.println("=== test empty execute ===");
-        
+    public void testMultiQueryWithPigMixL12_2() {
+
+        System.out.println("===== multi-query with PigMix L12 (2) =====");
+
         try {
             myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname, passwd, uid, 
gid);");
+            myPig.registerQuery("b = foreach a generate uname, passwd, uid, 
gid;");
+            myPig.registerQuery("split b into c1 if uid > 5, c2 if uid <= 5 
;"); 
+            myPig.registerQuery("split c1 into d1 if gid < 5, d2 if gid >= 
5;");
+            myPig.registerQuery("e = group d1 by uname;");
+            myPig.registerQuery("e1 = foreach e generate group, MAX(d1.uid);");
+            myPig.registerQuery("store e1 into '/tmp/output1';");
+            myPig.registerQuery("f = group c2 by uname;");
+            myPig.registerQuery("f1 = foreach f generate group, SUM(c2.gid);");
+            myPig.registerQuery("store f1 into '/tmp/output2';");
+            myPig.registerQuery("g = group d2 by uname;");
+            myPig.registerQuery("g1 = foreach g generate group, COUNT(d2);");
+            myPig.registerQuery("store g1 into '/tmp/output3';");
+
             myPig.executeBatch();
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }
+
+    @Test
+    public void testMultiQueryWithCoGroup() {
+
+        System.out.println("===== multi-query with CoGroup =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname, passwd, uid, 
gid);");
+            myPig.registerQuery("store a into '/tmp/output1' using 
BinStorage();");
+            myPig.registerQuery("b = load '/tmp/output1' using BinStorage() as 
(uname, passwd, uid, gid);"); 
+            myPig.registerQuery("c = load 
'file:test/org/apache/pig/test/data/passwd2' " +
+                                "using PigStorage(':') as (uname, passwd, uid, 
gid);");
+            myPig.registerQuery("d = cogroup b by (uname, uid) inner, c by 
(uname, uid) inner;");
+            myPig.registerQuery("e = foreach d generate flatten(b), 
flatten(c);");
+            myPig.registerQuery("store e into '/tmp/output2';");
+
+            LogicalPlan lp = checkLogicalPlan(2, 2, 9);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 2, 2, 12);
+
+            checkMRPlan(pp, 1, 1, 2); 
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }
+    
+    @Test
+    public void testMultiQueryWithCoGroup_2() {
+
+        System.out.println("===== multi-query with CoGroup (2) =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname, passwd, uid, 
gid);");
+            myPig.registerQuery("store a into '/tmp/output1' using 
BinStorage();");
+            myPig.registerQuery("b = load '/tmp/output1' using BinStorage() as 
(uname, passwd, uid, gid);"); 
+            myPig.registerQuery("c = load 
'file:test/org/apache/pig/test/data/passwd2' " +
+                                "using PigStorage(':') as (uname, passwd, uid, 
gid);");
+            myPig.registerQuery("d = cogroup b by (uname, uid) inner, c by 
(uname, uid) inner;");
+            myPig.registerQuery("e = foreach d generate flatten(b), 
flatten(c);");
+            myPig.registerQuery("store e into '/tmp/output2';");
+
             myPig.executeBatch();
-            myPig.discardBatch();
-        }
-        catch (Exception e) {
+
+        } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();
-        }
+        } 
     }
-        
+    
     @Test
-    public void testMultiQueryWithTwoStores2() {
+    public void testMultiQueryWithFJ() {
 
-        System.out.println("===== test multi-query with 2 stores (2) =====");
+        System.out.println("===== multi-query with FJ =====");
 
         try {
             myPig.setBatchOn();
 
             myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
-                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("b = filter a by uid > 5;");
+                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("c = filter a by uid > 5;");
+            myPig.registerQuery("store c into '/tmp/output1';");
+            myPig.registerQuery("d = filter b by gid > 10;");
+            myPig.registerQuery("store d into '/tmp/output2';");
+            myPig.registerQuery("e = join c by gid, d by gid using \"repl\";");
+            myPig.registerQuery("store e into '/tmp/output3';");
+
+            LogicalPlan lp = checkLogicalPlan(2, 3, 16);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 16);
+
+            checkMRPlan(pp, 1, 1, 2);            
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }
+   
+    @Test
+    public void testMultiQueryWithFJ_2() {
+
+        System.out.println("===== multi-query with FJ (2) =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("c = filter a by uid > 5;");
+            myPig.registerQuery("store c into '/tmp/output1';");
+            myPig.registerQuery("d = filter b by gid > 10;");
+            myPig.registerQuery("store d into '/tmp/output2';");
+            myPig.registerQuery("e = join c by gid, d by gid using \"repl\";");
+            myPig.registerQuery("store e into '/tmp/output3';");
+
+            myPig.executeBatch();
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }
+
+    @Test
+    public void testMultiQueryWithExplicitSplitAndSideFiles() {
+
+        System.out.println("===== multi-query with explicit split and side 
files =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("split a into b if uid > 500, c if uid <= 
500;");
             myPig.registerQuery("store b into '/tmp/output1';");
-            myPig.registerQuery("c = group b by gid;");
             myPig.registerQuery("store c into '/tmp/output2';");
+            myPig.registerQuery("e = cogroup b by gid, c by gid;");
+            myPig.registerQuery("d = foreach e generate flatten(c), 
flatten(b);");
+            myPig.registerQuery("store d into '/tmp/output3';");
+
+            LogicalPlan lp = checkLogicalPlan(1, 3, 15);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 19);
+
+            checkMRPlan(pp, 1, 1, 2); 
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }        
+
+    @Test
+    public void testMultiQueryWithExplicitSplitAndOrderByAndSideFiles() {
+
+        System.out.println("===== multi-query with explicit split, orderby and 
side files  =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("split a into a1 if uid > 500, a2 if gid > 
500;");
+            myPig.registerQuery("b1 = distinct a1;");
+            myPig.registerQuery("b2 = order a2 by uname;");
+            myPig.registerQuery("store b1 into '/tmp/output1';");
+            myPig.registerQuery("store b2 into '/tmp/output2';");
+            myPig.registerQuery("c = cogroup b1 by uname, b2 by uname;");
+            myPig.registerQuery("d = foreach c generate flatten(group), 
flatten($1), flatten($2);");
+            myPig.registerQuery("store d into '/tmp/output3';");
+
+            LogicalPlan lp = checkLogicalPlan(1, 3, 17);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 21);
+
+            checkMRPlan(pp, 1, 1, 4); 
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }     
+    
+    @Test
+    public void testMultiQueryWithIntermediateStores() {
+
+        System.out.println("===== multi-query with intermediate stores =====");
+
+        try {            
+            myPig.setBatchOn();
+            
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("store a into '/tmp/output1';");
+            myPig.registerQuery("b = load '/tmp/output1' using 
PigStorage(':'); ");
+            myPig.registerQuery("store b into '/tmp/output2';");
+
+            LogicalPlan lp = checkLogicalPlan(1, 2, 7);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 7);
+
+            checkMRPlan(pp, 1, 1, 1); 
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }     
+    
+    @Test
+    public void testMultiQueryWithIntermediateStores_2() {
+
+        System.out.println("===== multi-query with intermediate stores (2) 
=====");
+
+        try {
+            
+            myPig.setBatchOn();
+            
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("store a into '/tmp/output1';");
+            myPig.registerQuery("b = load '/tmp/output1' using 
PigStorage(':'); ");
+            myPig.registerQuery("store b into '/tmp/output2';");
 
             myPig.executeBatch();
 
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();
-        } finally {
-            deleteOutputFiles();
-        }
-    }
+        } 
+    }      
 
     @Test
-    public void testMultiQueryWithTwoStores2Execs() {
+    public void testMultiQueryWithImplicitSplitAndSideFiles() {
+
+        System.out.println("===== multi-query with implicit split and side 
files  =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid > 500;");
+            myPig.registerQuery("c = filter a by gid > 500;");
+            myPig.registerQuery("store c into '/tmp/output1';");
+            myPig.registerQuery("d = cogroup b by uname, c by uname;");
+            myPig.registerQuery("e = foreach d generate flatten(c), 
flatten(b);");
+            myPig.registerQuery("store e into '/tmp/output2';");
+            myPig.registerQuery("f = filter e by b::uid < 1000;");
+            myPig.registerQuery("store f into '/tmp/output3';");
+
+            LogicalPlan lp = checkLogicalPlan(1, 3, 19);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 22);
+
+            checkMRPlan(pp, 1, 1, 2); 
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }        
 
-        System.out.println("===== test multi-query with 2 stores (2) =====");
+    @Test
+    public void testMultiQueryWithTwoLoadsAndTwoStores() {
+
+        System.out.println("===== multi-query with two loads and two stores 
=====");
 
         try {
             myPig.setBatchOn();
 
             myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
+            myPig.registerQuery("b = load 
'file:test/org/apache/pig/test/data/passwd2' " +
+                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
+            myPig.registerQuery("c = filter a by uid > 5;");
+            myPig.registerQuery("d = filter b by uid > 10;");
+            myPig.registerQuery("e = cogroup c by uid, d by uid;");
+            myPig.registerQuery("f = foreach e generate flatten(c), 
flatten(d);");
+            myPig.registerQuery("g = group f by d::gid;");
+            myPig.registerQuery("h = filter f by c::gid > 5;");
+            myPig.registerQuery("store g into '/tmp/output1';");
+            myPig.registerQuery("store h into '/tmp/output2';");
+            
+            LogicalPlan lp = checkLogicalPlan(2, 2, 15);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 2, 2, 20);
+
+            checkMRPlan(pp, 1, 1, 2); 
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }
+ 
+    @Test
+    public void testMultiQueryWithSplitInReduce() {
+
+        System.out.println("===== multi-query with split in reduce =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int, gid:int);");
             myPig.registerQuery("b = filter a by uid > 5;");
-            myPig.executeBatch();
-            myPig.registerQuery("store b into '/tmp/output1';");
-            myPig.executeBatch();
             myPig.registerQuery("c = group b by gid;");
-            myPig.registerQuery("store c into '/tmp/output2';");
+            myPig.registerQuery("d = foreach c generate group, COUNT(b.uid);");
+            myPig.registerQuery("store d into '/tmp/output1';");
+            myPig.registerQuery("e = filter d by $1 > 5;");
+            myPig.registerQuery("store e into '/tmp/output2';");
 
-            myPig.executeBatch();
-            myPig.discardBatch();
+            LogicalPlan lp = checkLogicalPlan(1, 2, 11);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 13);
+
+            checkMRPlan(pp, 1, 1, 1); 
 
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();
-        } finally {
-            deleteOutputFiles();
-        }
+        } 
     }
+   
+    @Test
+    public void testMultiQueryWithSplitInReduceAndReduceSplitee() {
+
+        System.out.println("===== multi-query with split in reduce and reduce 
splitee =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid > 5;");
+            myPig.registerQuery("c = group b by gid;");
+            myPig.registerQuery("d = foreach c generate group, COUNT(b.uid);");
+            myPig.registerQuery("store d into '/tmp/output1';");
+            myPig.registerQuery("e = filter d by $1 > 5;");
+            myPig.registerQuery("f = group e by $1;");
+            myPig.registerQuery("g = foreach f generate group, SUM(e.$0);");
+            myPig.registerQuery("store g into '/tmp/output2';");
+
+            LogicalPlan lp = checkLogicalPlan(1, 2, 13);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 17);
 
+            checkMRPlan(pp, 1, 1, 2); 
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }    
+  
     @Test
-    public void testMultiQueryWithThreeStores() {
+    public void testMultiQueryWithSplitInReduceAndReduceSplitees() {
+
+        System.out.println("===== multi-query with split in reduce and reduce 
splitees =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid > 5;");
+            myPig.registerQuery("c = group b by gid;");
+            myPig.registerQuery("d = foreach c generate group, COUNT(b.uid);");
+            myPig.registerQuery("e = filter d by $1 > 5;");
+            myPig.registerQuery("e1 = group e by $1;");
+            myPig.registerQuery("e2 = foreach e1 generate group, SUM(e.$0);");
+            myPig.registerQuery("store e2 into '/tmp/output1';");
+            myPig.registerQuery("f = filter d by $1 < 5;");
+            myPig.registerQuery("f1 = group f by $1;");
+            myPig.registerQuery("f2 = foreach f1 generate group, 
COUNT(f.$0);");
+            myPig.registerQuery("store f2 into '/tmp/output2';");
+            
+            LogicalPlan lp = checkLogicalPlan(1, 2, 16);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 22);
+
+            checkMRPlan(pp, 1, 2, 3); 
+        
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }    
+
+    @Test
+    public void testMultiQueryWithSplitInReduceAndReduceSpliteesAndMore() {
+
+        System.out.println("===== multi-query with split in reduce and reduce 
splitees and more =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid > 500;");
+            myPig.registerQuery("c = group b by gid;");
+            myPig.registerQuery("d = foreach c generate group, COUNT(b.uid);");
+            myPig.registerQuery("e = filter d by $1 > 5;");
+            myPig.registerQuery("e1 = group e by $1;");
+            myPig.registerQuery("e2 = foreach e1 generate group, SUM(e.$0);");
+            myPig.registerQuery("e3 = filter e2 by $1 > 10;");
+            myPig.registerQuery("e4 = group e3 by $1;");
+            myPig.registerQuery("e5 = foreach e4 generate group, SUM(e3.$0);");
+            myPig.registerQuery("store e5 into '/tmp/output1';");
+            myPig.registerQuery("f = filter d by $1 < 5;");
+            myPig.registerQuery("f1 = group f by $1;");
+            myPig.registerQuery("f2 = foreach f1 generate group, 
COUNT(f.$0);");
+            myPig.registerQuery("f3 = filter f2 by $1 < 100;");
+            myPig.registerQuery("f4 = group f3 by $1;");
+            myPig.registerQuery("f5 = foreach f4 generate group, 
COUNT(f3.$0);");
+            myPig.registerQuery("store f5 into '/tmp/output2';");
+            
+            LogicalPlan lp = checkLogicalPlan(1, 2, 22);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 32);
+
+            checkMRPlan(pp, 1, 2, 5);            
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }    
+    
+    @Test
+    public void testMultiQueryWithSplitInMapAndReduceSplitees() {
 
-        System.out.println("===== test multi-query with 3 stores =====");
+        System.out.println("===== multi-query with split in map and reduce 
splitees =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+            myPig.registerQuery("d = filter a by uid >= 10;");
+            myPig.registerQuery("b1 = group b by gid;");
+            myPig.registerQuery("b2 = foreach b1 generate group, 
COUNT(b.uid);");
+            myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+            myPig.registerQuery("store b3 into '/tmp/output1';");
+            myPig.registerQuery("c1 = group c by $1;");
+            myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+            myPig.registerQuery("store c2 into '/tmp/output2';");
+            myPig.registerQuery("d1 = group d by $1;");
+            myPig.registerQuery("d2 = foreach d1 generate group, 
COUNT(d.uid);");
+            myPig.registerQuery("store d2 into '/tmp/output3';");
+             
+            LogicalPlan lp = checkLogicalPlan(1, 3, 19);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25);
+
+            checkMRPlan(pp, 1, 2, 3);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }         
+
+    @Test
+    public void testMultiQueryWithTwoStores() {
+
+        System.out.println("===== multi-query with 2 stores =====");
 
         try {
             myPig.setBatchOn();
@@ -174,31 +709,42 @@
                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
             myPig.registerQuery("b = filter a by uid > 5;");
             myPig.registerQuery("store b into '/tmp/output1';");
-            myPig.registerQuery("c = filter b by uid > 10;");
+            myPig.registerQuery("c = group b by gid;");
             myPig.registerQuery("store c into '/tmp/output2';");
-            myPig.registerQuery("d = filter c by uid > 15;");
-            myPig.registerQuery("store d into '/tmp/output3';");
-
-            LogicalPlan lp = checkLogicalPlan(1, 3, 14);
 
-            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 14);
+            LogicalPlan lp = checkLogicalPlan(1, 2, 9);
 
-            checkMRPlan(pp, 1, 3, 5);
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 11);
 
-            Assert.assertTrue(executePlan(pp));
+            checkMRPlan(pp, 1, 1, 1);
 
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();
-        } finally {
-            deleteOutputFiles();
-        }
+        } 
     }
 
     @Test
-    public void testMultiQueryWithThreeStores2() {
+    public void testEmptyExecute() {
+        
+        System.out.println("==== empty execute ====");
+        
+        try {
+            myPig.setBatchOn();
+            myPig.executeBatch();
+            myPig.executeBatch();
+            myPig.discardBatch();
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+    }
+        
+    @Test
+    public void testMultiQueryWithTwoStores2Execs() {
 
-        System.out.println("===== test multi-query with 3 stores (2) =====");
+        System.out.println("===== multi-query with 2 stores execs =====");
 
         try {
             myPig.setBatchOn();
@@ -206,11 +752,11 @@
             myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
             myPig.registerQuery("b = filter a by uid > 5;");
+            myPig.executeBatch();
             myPig.registerQuery("store b into '/tmp/output1';");
-            myPig.registerQuery("c = filter b by uid > 10;");
+            myPig.executeBatch();
+            myPig.registerQuery("c = group b by gid;");
             myPig.registerQuery("store c into '/tmp/output2';");
-            myPig.registerQuery("d = filter c by uid > 15;");
-            myPig.registerQuery("store d into '/tmp/output3';");
 
             myPig.executeBatch();
             myPig.discardBatch();
@@ -218,64 +764,54 @@
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();
-        } finally {
-            deleteOutputFiles();
-        }
+        } 
     }
 
     @Test
-    public void testMultiQueryWithTwoLoads() {
+    public void testMultiQueryWithThreeStores() {
 
-        System.out.println("===== test multi-query with two loads =====");
+        System.out.println("===== multi-query with 3 stores =====");
 
         try {
             myPig.setBatchOn();
 
             myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("b = load 
'file:test/org/apache/pig/test/data/passwd2' " +
-                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("c = filter a by uid > 5;");
-            myPig.registerQuery("d = filter b by uid > 10;");
-            myPig.registerQuery("store c into '/tmp/output1';");
-            myPig.registerQuery("store d into '/tmp/output2';");
-            myPig.registerQuery("e = cogroup c by uid, d by uid;");
-            myPig.registerQuery("store e into '/tmp/output3';");
-
-            LogicalPlan lp = checkLogicalPlan(2, 3, 16);
+            myPig.registerQuery("b = filter a by uid > 5;");
+            myPig.registerQuery("store b into '/tmp/output1';");
+            myPig.registerQuery("c = filter b by uid > 10;");
+            myPig.registerQuery("store c into '/tmp/output2';");
+            myPig.registerQuery("d = filter c by uid > 15;");
+            myPig.registerQuery("store d into '/tmp/output3';");
 
-            PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 19);
+            LogicalPlan lp = checkLogicalPlan(1, 3, 14);
 
-            checkMRPlan(pp, 2, 3, 5);
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 14);
 
-            Assert.assertTrue(executePlan(pp));
+            checkMRPlan(pp, 1, 1, 1);            
 
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();
-        } finally {
-            deleteOutputFiles();
-        }
+        } 
     }
 
     @Test
-    public void testMultiQueryWithTwoLoads2() {
+    public void testMultiQueryWithThreeStores2() {
 
-        System.out.println("===== test multi-query with two loads (2) =====");
+        System.out.println("===== multi-query with 3 stores (2) =====");
 
         try {
             myPig.setBatchOn();
 
             myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("b = load 
'file:test/org/apache/pig/test/data/passwd2' " +
-                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("c = filter a by uid > 5;");
-            myPig.registerQuery("d = filter b by uid > 10;");
-            myPig.registerQuery("store c into '/tmp/output1';");
-            myPig.registerQuery("store d into '/tmp/output2';");
-            myPig.registerQuery("e = cogroup c by uid, d by uid;");
-            myPig.registerQuery("store e into '/tmp/output3';");
+            myPig.registerQuery("b = filter a by uid > 5;");
+            myPig.registerQuery("store b into '/tmp/output1';");
+            myPig.registerQuery("c = filter b by uid > 10;");
+            myPig.registerQuery("store c into '/tmp/output2';");
+            myPig.registerQuery("d = filter c by uid > 15;");
+            myPig.registerQuery("store d into '/tmp/output3';");
 
             myPig.executeBatch();
             myPig.discardBatch();
@@ -283,42 +819,44 @@
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();
-        } finally {
-            deleteOutputFiles();
-        }
+        } 
     }
 
     @Test
-    public void testMultiQueryWithNoStore() {
+    public void testMultiQueryWithTwoLoads() {
 
-        System.out.println("===== test multi-query with no store =====");
+        System.out.println("===== multi-query with two loads =====");
 
         try {
             myPig.setBatchOn();
 
             myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("b = filter a by uid > 5;");
-            myPig.registerQuery("group b by gid;");
-
-            LogicalPlan lp = checkLogicalPlan(0, 0, 0);
+            myPig.registerQuery("b = load 
'file:test/org/apache/pig/test/data/passwd2' " +
+                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
+            myPig.registerQuery("c = filter a by uid > 5;");
+            myPig.registerQuery("d = filter b by uid > 10;");
+            myPig.registerQuery("store c into '/tmp/output1';");
+            myPig.registerQuery("store d into '/tmp/output2';");
+            myPig.registerQuery("e = cogroup c by uid, d by uid;");
+            myPig.registerQuery("store e into '/tmp/output3';");
 
-            PhysicalPlan pp = checkPhysicalPlan(lp, 0, 0, 0);
+            LogicalPlan lp = checkLogicalPlan(2, 3, 16);
 
-            //checkMRPlan(pp, 1, 1, 1);
+            PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 19);
 
-            //Assert.assertTrue(executePlan(pp));
+            checkMRPlan(pp, 2, 1, 3);            
 
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();
-        }
+        } 
     }
 
     @Test
     public void testMultiQueryWithNoStore2() {
 
-        System.out.println("===== test multi-query with no store (2) =====");
+        System.out.println("===== multi-query with no store (2) =====");
 
         try {
             myPig.setBatchOn();
@@ -341,7 +879,7 @@
     @Test
     public void testMultiQueryWithExplain() {
 
-        System.out.println("===== test multi-query with explain =====");
+        System.out.println("===== multi-query with explain =====");
 
         try {
             String script = "a = load 
'file:test/org/apache/pig/test/data/passwd' "
@@ -358,15 +896,13 @@
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();
-        } finally {
-            deleteOutputFiles();
-        }
+        } 
     }
 
     @Test
     public void testMultiQueryWithDump() {
 
-        System.out.println("===== test multi-query with dump =====");
+        System.out.println("===== multi-query with dump =====");
 
         try {
             String script = "a = load 
'file:test/org/apache/pig/test/data/passwd' "
@@ -383,15 +919,13 @@
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();
-        } finally {
-            deleteOutputFiles();
-        }
+        } 
     }
 
     @Test
     public void testMultiQueryWithDescribe() {
 
-        System.out.println("===== test multi-query with describe =====");
+        System.out.println("===== multi-query with describe =====");
 
         try {
             String script = "a = load 
'file:test/org/apache/pig/test/data/passwd' "
@@ -408,15 +942,13 @@
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();
-        } finally {
-            deleteOutputFiles();
-        }
+        } 
     }
 
     @Test
     public void testMultiQueryWithIllustrate() {
 
-        System.out.println("===== test multi-query with illustrate =====");
+        System.out.println("===== multi-query with illustrate =====");
 
         try {
             String script = "a = load 
'file:test/org/apache/pig/test/data/passwd' "
@@ -433,9 +965,7 @@
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();
-        } finally {
-            deleteOutputFiles();
-        }
+        } 
     }
 
     // 
--------------------------------------------------------------------------
@@ -483,12 +1013,18 @@
             }
         }
 
+        showPlanOperators(lp);
+       
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        lp.explain(out, System.out);
+
+        System.out.println("===== Display Logical Plan =====");
+        System.out.println(out.toString());
+
         Assert.assertEquals(expectedRoots, lp.getRoots().size());
         Assert.assertEquals(expectedLeaves, lp.getLeaves().size());
         Assert.assertEquals(expectedSize, lp.size());
 
-        showPlanOperators(lp);
-
         return lp;
     }
 
@@ -500,12 +1036,18 @@
         PhysicalPlan pp = myPig.getPigContext().getExecutionEngine().compile(
                 lp, null);
 
+        showPlanOperators(pp);
+       
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        pp.explain(out);
+
+        System.out.println("===== Display Physical Plan =====");
+        System.out.println(out.toString());
+
         Assert.assertEquals(expectedRoots, pp.getRoots().size());
         Assert.assertEquals(expectedLeaves, pp.getLeaves().size());
         Assert.assertEquals(expectedSize, pp.size());
 
-        showPlanOperators(pp);
-
         return pp;
     }
 
@@ -516,28 +1058,40 @@
 
         ExecTools.checkLeafIsStore(pp, myPig.getPigContext());
         
-        MRCompiler mrcomp = new MRCompiler(pp, myPig.getPigContext());
-        MROperPlan mrp = mrcomp.compile();
+        MapReduceLauncher launcher = new MapReduceLauncher();
+
+        MROperPlan mrp = null;
+
+        try {
+            java.lang.reflect.Method compile = launcher.getClass()
+                    .getDeclaredMethod("compile",
+                            new Class[] { PhysicalPlan.class, PigContext.class 
});
+
+            compile.setAccessible(true);
+
+            mrp = (MROperPlan) compile.invoke(launcher, new Object[] { pp, 
myPig.getPigContext() });
+
+            Assert.assertNotNull(mrp);
 
+        } catch (Exception e) {
+            PigException pe = Utils.getPigException(e);
+            if (pe != null) {
+                throw pe;
+            } else {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }        
+
+        showPlanOperators(mrp);
+        
         Assert.assertEquals(expectedRoots, mrp.getRoots().size());
         Assert.assertEquals(expectedLeaves, mrp.getLeaves().size());
         Assert.assertEquals(expectedSize, mrp.size());
 
-        showPlanOperators(mrp);
-
         return mrp;
     }
 
-    private boolean executePlan(PhysicalPlan pp) throws IOException {
-        FileLocalizer.clearDeleteOnFail();
-        ExecJob job = myPig.getPigContext().getExecutionEngine().execute(pp, 
"execute");
-        boolean failed = (job.getStatus() == ExecJob.JOB_STATUS.FAILED);
-        if (failed) {
-            FileLocalizer.triggerDeleteOnFail();
-        }
-        return !failed;
-    }
-
     private void deleteOutputFiles() {
         try {
             FileLocalizer.delete("/tmp/output1", myPig.getPigContext());


Reply via email to