Author: pradeepkth
Date: Mon Nov  2 20:04:45 2009
New Revision: 832084

URL: http://svn.apache.org/viewvc?rev=832084&view=rev
Log:
PIG-1030: explain and dump not working with two UDFs inside inner plan of 
foreach (rding via pradeepkth)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=832084&r1=832083&r2=832084&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Nov  2 20:04:45 2009
@@ -109,6 +109,9 @@
 
 BUG FIXES
 
+PIG-1030: explain and dump not working with two UDFs inside inner plan of
+foreach (rding via pradeepkth)
+
 PIG-1048: inner join using 'skewed' produces multiple rows for keys with
           single row in both input relations (sriranjan via gates)
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=832084&r1=832083&r2=832084&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
 Mon Nov  2 20:04:45 2009
@@ -712,6 +712,7 @@
                 // we apparently have seen a PODistinct before, so lets not
                 // combine.
                 sawNonAlgebraic = true;
+                return;
             }
             // check that this distinct is the only input to an agg
             // We could have the following two cases
@@ -748,6 +749,16 @@
             PhysicalOperator leaf = mPlan.getLeaves().get(0);
             // the leaf has to be a POUserFunc (need not be algebraic)
             if(leaf instanceof POUserFunc) {
+                
+                // we want to combine only in the case where there is only
+                // one PODistinct which is the only input to an agg.
+                // Do not combine if there are additional inputs.
+                List<PhysicalOperator> preds = mPlan.getPredecessors(leaf);
+                if (preds.size() > 1) {
+                    sawNonAlgebraic = true;
+                    return;
+                }
+                
                 List<PhysicalOperator> immediateSuccs = 
mPlan.getSuccessors(distinct);
                 if(immediateSuccs.size() == 1 && immediateSuccs.get(0) 
instanceof POProject) {
                     if(checkSuccessorIsLeaf(leaf, immediateSuccs.get(0))) { // 
script 1 above

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java?rev=832084&r1=832083&r2=832084&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java Mon Nov  2 
20:04:45 2009
@@ -33,6 +33,7 @@
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
+import org.apache.pig.EvalFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.PigStorage;
@@ -341,4 +342,53 @@
         }
     }
 
+    public static class JiraPig1030 extends EvalFunc<String> {
+        
+        public String exec(Tuple input) throws IOException {
+            return "";
+        }
+    }
+    
+    @Test
+    public void testJiraPig1030() {
+        // test that combiner is NOT invoked when
+        // one of the elements in the foreach generate
+        // has a non-algebraic UDF that have multiple inputs
+        // (one of them is distinct).
+        
+        String input[] = {
+                "pig1\t18\t2.1",
+                "pig2\t24\t3.3",
+                "pig5\t45\t2.4",
+                "pig1\t18\t2.1",
+                "pig1\t19\t2.1",
+                "pig2\t24\t4.5",
+                "pig1\t20\t3.1" };
+ 
+        try {
+            Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
+            PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties());
+            pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as 
(name:chararray, age:int, gpa:double);");
+            pigServer.registerQuery("b = group a all;");
+            pigServer.registerQuery("c = foreach b  {" +
+                    "        d = distinct a.age;" +
+                    "        generate group, " + JiraPig1030.class.getName() + 
"(d, 0);};");
+            
+            // make sure there isn't a combine plan in the explain output
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            PrintStream ps = new PrintStream(baos);
+            pigServer.explain("c", ps);
+            assertFalse(baos.toString().matches("(?si).*combine plan.*"));    
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            try {
+                Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+    }
 }


Reply via email to