Author: olga
Date: Fri Dec 18 22:26:00 2009
New Revision: 892388

URL: http://svn.apache.org/viewvc?rev=892388&view=rev
Log:
PIG-1157: Sucessive replicated joins do not generate Map Reduce plan and fails
due to OOM (rding via olgan)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=892388&r1=892387&r2=892388&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Dec 18 22:26:00 2009
@@ -59,6 +59,9 @@
 
 BUG FIXES
 
+PIG-1157: Sucessive replicated joins do not generate Map Reduce plan and fails
+due to OOM (rding via olgan)
+
 PIG-1075: Error in Cogroup when key fields types don't match (rding via olgan)
 
 PIG-973: type resolution inconsistency (rding via olgan)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=892388&r1=892387&r2=892388&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 Fri Dec 18 22:26:00 2009
@@ -113,13 +113,15 @@
                 continue;
             }
             if (isMapOnly(successor)) {
-                if (isSingleLoadMapperPlan(successor.mapPlan)) {               
     
+                if (isSingleLoadMapperPlan(successor.mapPlan)
+                        && isSinglePredecessor(successor)) {                   
 
                     mappers.add(successor);                
                 } else {                    
                     multiLoadMROpers.add(successor);
                 }
             } else {
-                if (isSingleLoadMapperPlan(successor.mapPlan)) {               
      
+                if (isSingleLoadMapperPlan(successor.mapPlan)
+                        && isSinglePredecessor(successor)) {                   
  
                     mapReducers.add(successor);                  
                 } else {                    
                     multiLoadMROpers.add(successor);                      
@@ -1121,6 +1123,10 @@
         return (pl.getRoots().size() == 1);
     }
     
+    private boolean isSinglePredecessor(MapReduceOper mr) {
+        return (getPlan().getPredecessors(mr).size() == 1);
+    }
+    
     private POSplit getSplit(){
         return new POSplit(new OperatorKey(scope, nig.getNextNodeId(scope)));
     } 

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=892388&r1=892387&r2=892388&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Fri Dec 18 
22:26:00 2009
@@ -88,6 +88,66 @@
         myPig = null;
     }
     
+    public void testMultiQueryJiraPig1157() {
+
+        // test case: Sucessive replicated joins do not generate Map Reduce 
plan and fails due to OOM
+        
+        String INPUT_FILE = "abc";
+        String INPUT_FILE_1 = "xyz";
+        
+        try {
+    
+            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+            w.println("1\tapple\t3");
+            w.println("2\torange\t4");
+            w.println("3\tpersimmon\t5");
+            w.close();
+    
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE_1);
+    
+            myPig.setBatchOn();
+    
+            myPig.registerQuery("A = load '" + INPUT_FILE 
+                    + "' as (a:long, b, c);");
+            myPig.registerQuery("A1 = FOREACH A GENERATE a;");
+            myPig.registerQuery("B = GROUP A1 BY a;");
+            myPig.registerQuery("C = load '" + INPUT_FILE_1 
+                    + "' as (x:long, y);");
+            myPig.registerQuery("D = JOIN C BY x, B BY group USING 
\"replicated\";");  
+            myPig.registerQuery("E = JOIN A BY a, D by x USING 
\"replicated\";");  
+            
+            Iterator<Tuple> iter = myPig.openIterator("E");
+
+            List<Tuple> expectedResults = 
Util.getTuplesFromConstantTupleStrings(
+                    new String[] { 
+                            "(1L,1L,'apple',1L,{(1L)})",
+                            "(2L,2L,'orange',2L,{(2L)})",
+                            "(3L,3L,'persimmon',3L,{(3L)})"
+                    });
+            
+            int counter = 0;
+            while (iter.hasNext()) {
+                assertEquals(expectedResults.get(counter++).toString(), 
iter.next().toString());                  
+            }
+
+            assertEquals(expectedResults.size(), counter);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            new File(INPUT_FILE).delete();
+            try {
+                Util.deleteFile(cluster, INPUT_FILE);
+                Util.deleteFile(cluster, INPUT_FILE_1);
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+    }
+    
     public void testMultiQueryJiraPig1068() {
 
         // test case: COGROUP fails with 'Type mismatch in key from map: 
@@ -1524,7 +1584,7 @@
 
             PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 16);
 
-            checkMRPlan(pp, 1, 1, 2);            
+            checkMRPlan(pp, 2, 1, 3);            
             
         } catch (Exception e) {
             e.printStackTrace();


Reply via email to