Author: pradeepkth
Date: Thu Oct 29 23:36:48 2009
New Revision: 831153
URL: http://svn.apache.org/viewvc?rev=831153&view=rev
Log:
PIG-746: Works in --exectype local, fails on grid - ERROR 2113: SingleTupleBag
should never be serialized (rding via pradeepkth)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=831153&r1=831152&r2=831153&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Oct 29 23:36:48 2009
@@ -103,6 +103,9 @@
BUG FIXES
+PIG-746: Works in --exectype local, fails on grid - ERROR 2113: SingleTupleBag
+should never be serialized (rding via pradeepkth)
+
PIG-1027: Number of bytes written are always zero in local mode (zjffdu via
gates)
PIG-976: Multi-query optimization throws ClassCastException (rding via
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=831153&r1=831152&r2=831153&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
Thu Oct 29 23:36:48 2009
@@ -406,6 +406,11 @@
// the reduce side.
if (pp.getPredecessors(proj) != null) return
ExprType.NOT_ALGEBRAIC;
+ // Check if it's a projection of bag. Currently we can't use
combiner
+ // for statement like c = foreach b generate group, SUM(a), a;
+ // where a is a bag.
+ if (proj.getResultType() == DataType.BAG) return
ExprType.NOT_ALGEBRAIC;
+
// Check to see if this is a projection of the grouping column.
// If so, it will be a projection of col 0 and will have no
// predecessors (to avoid things like group.$0, which isn't what we
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java?rev=831153&r1=831152&r2=831153&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java Thu Oct 29
23:36:48 2009
@@ -29,6 +29,8 @@
import java.util.Properties;
import org.junit.Test;
+
+import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.pig.ExecType;
@@ -284,5 +286,59 @@
Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
}
+
+ @Test
+ public void testJiraPig746() {
+ // test that combiner is NOT invoked when
+ // one of the elements in the foreach generate
+ // has a foreach in the plan without a distinct agg
+ String input[] = {
+ "pig1\t18\t2.1",
+ "pig2\t24\t3.3",
+ "pig5\t45\t2.4",
+ "pig1\t18\t2.1",
+ "pig1\t19\t2.1",
+ "pig2\t24\t4.5",
+ "pig1\t20\t3.1" };
+
+ String expected[] = {
+
"(pig1,75L,{(pig1,18,2.1),(pig1,18,2.1),(pig1,19,2.1),(pig1,20,3.1)})",
+ "(pig2,48L,{(pig2,24,3.3),(pig2,24,4.5)})",
+ "(pig5,45L,{(pig5,45,2.4)})"
+ };
+
+ try {
+ Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
+
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE,
cluster.getProperties());
+ pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as
(name:chararray, age:int, gpa:double);");
+ pigServer.registerQuery("b = group a by name;");
+ pigServer.registerQuery("c = foreach b " +
+ " generate group, SUM(a.age), a;};");
+
+ // make sure there isn't a combine plan in the explain output
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ pigServer.explain("c", ps);
+ assertFalse(baos.toString().matches("(?si).*combine plan.*"));
+
+ Iterator<Tuple> it = pigServer.openIterator("c");
+ int count = 0;
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ assertEquals(expected[count++], t.toString());
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ try {
+ Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ }
}