Author: pradeepkth
Date: Thu Oct 29 23:36:48 2009
New Revision: 831153

URL: http://svn.apache.org/viewvc?rev=831153&view=rev
Log:
PIG-746:  Works in --exectype local, fails on grid - ERROR 2113: SingleTupleBag 
should never be serialized (rding via pradeepkth)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=831153&r1=831152&r2=831153&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Oct 29 23:36:48 2009
@@ -103,6 +103,9 @@
 
 BUG FIXES
 
+PIG-746: Works in --exectype local, fails on grid - ERROR 2113: SingleTupleBag
+should never be serialized (rding via pradeepkth)
+
 PIG-1027: Number of bytes written are always zero in local mode (zjffdu via 
gates)
 
 PIG-976: Multi-query optimization throws ClassCastException (rding via

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=831153&r1=831152&r2=831153&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
 Thu Oct 29 23:36:48 2009
@@ -406,6 +406,11 @@
             // the reduce side.
             if (pp.getPredecessors(proj) != null) return 
ExprType.NOT_ALGEBRAIC;
 
+            // Check if it's a projection of bag. Currently we can't use 
combiner 
+            // for statement like c = foreach b generate group, SUM(a), a; 
+            // where a is a bag.
+            if (proj.getResultType() == DataType.BAG) return 
ExprType.NOT_ALGEBRAIC;
+            
             // Check to see if this is a projection of the grouping column.
             // If so, it will be a projection of col 0 and will have no
             // predecessors (to avoid things like group.$0, which isn't what we

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java?rev=831153&r1=831152&r2=831153&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java Thu Oct 29 
23:36:48 2009
@@ -29,6 +29,8 @@
 import java.util.Properties;
 
 import org.junit.Test;
+
+import junit.framework.Assert;
 import junit.framework.TestCase;
 
 import org.apache.pig.ExecType;
@@ -284,5 +286,59 @@
         Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
         
     }
+    
+    @Test
+    public void testJiraPig746() {
+        // test that combiner is NOT invoked when
+        // one of the elements in the foreach generate
+        // has a foreach in the plan without a distinct agg
+        String input[] = {
+                "pig1\t18\t2.1",
+                "pig2\t24\t3.3",
+                "pig5\t45\t2.4",
+                "pig1\t18\t2.1",
+                "pig1\t19\t2.1",
+                "pig2\t24\t4.5",
+                "pig1\t20\t3.1" };
+        
+        String expected[] = {
+                
"(pig1,75L,{(pig1,18,2.1),(pig1,18,2.1),(pig1,19,2.1),(pig1,20,3.1)})",
+                "(pig2,48L,{(pig2,24,3.3),(pig2,24,4.5)})",
+                "(pig5,45L,{(pig5,45,2.4)})"
+        };
+
+        try {
+            Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
+ 
+            PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties());
+            pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as 
(name:chararray, age:int, gpa:double);");
+            pigServer.registerQuery("b = group a by name;");
+            pigServer.registerQuery("c = foreach b " +
+                    "        generate group, SUM(a.age), a;};");
+            
+            // make sure there isn't a combine plan in the explain output
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            PrintStream ps = new PrintStream(baos);
+            pigServer.explain("c", ps);
+            assertFalse(baos.toString().matches("(?si).*combine plan.*"));
+    
+            Iterator<Tuple> it = pigServer.openIterator("c");
+            int count = 0;
+            while (it.hasNext()) {
+                Tuple t = it.next();
+                assertEquals(expected[count++], t.toString());
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            try {
+                Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+    }
 
 }


Reply via email to