Author: pradeepkth Date: Thu Oct 29 23:36:48 2009 New Revision: 831153 URL: http://svn.apache.org/viewvc?rev=831153&view=rev Log: PIG-746: Works in --exectype local, fails on grid - ERROR 2113: SingleTupleBag should never be serialized (rding via pradeepkth)
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=831153&r1=831152&r2=831153&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Thu Oct 29 23:36:48 2009 @@ -103,6 +103,9 @@ BUG FIXES +PIG-746: Works in --exectype local, fails on grid - ERROR 2113: SingleTupleBag +should never be serialized (rding via pradeepkth) + PIG-1027: Number of bytes written are always zero in local mode (zjffdu via gates) PIG-976: Multi-query optimization throws ClassCastException (rding via Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=831153&r1=831152&r2=831153&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Thu Oct 29 23:36:48 2009 @@ -406,6 +406,11 @@ // the reduce side. if (pp.getPredecessors(proj) != null) return ExprType.NOT_ALGEBRAIC; + // Check if it's a projection of bag. Currently we can't use combiner + // for statement like c = foreach b generate group, SUM(a), a; + // where a is a bag. + if (proj.getResultType() == DataType.BAG) return ExprType.NOT_ALGEBRAIC; + // Check to see if this is a projection of the grouping column. // If so, it will be a projection of col 0 and will have no // predecessors (to avoid things like group.$0, which isn't what we Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java?rev=831153&r1=831152&r2=831153&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestCombiner.java Thu Oct 29 23:36:48 2009 @@ -29,6 +29,8 @@ import java.util.Properties; import org.junit.Test; + +import junit.framework.Assert; import junit.framework.TestCase; import org.apache.pig.ExecType; @@ -284,5 +286,59 @@ Util.deleteFile(cluster, "forEachNoCombinerInput.txt"); } + + @Test + public void testJiraPig746() { + // test that combiner is NOT invoked when + // one of the elements in the foreach generate + // has a foreach in the plan without a distinct agg + String input[] = { + "pig1\t18\t2.1", + "pig2\t24\t3.3", + "pig5\t45\t2.4", + "pig1\t18\t2.1", + "pig1\t19\t2.1", + "pig2\t24\t4.5", + "pig1\t20\t3.1" }; + + String expected[] = { + "(pig1,75L,{(pig1,18,2.1),(pig1,18,2.1),(pig1,19,2.1),(pig1,20,3.1)})", + "(pig2,48L,{(pig2,24,3.3),(pig2,24,4.5)})", + "(pig5,45L,{(pig5,45,2.4)})" + }; + + try { + Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input); + + PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);"); + pigServer.registerQuery("b = group a by name;"); + pigServer.registerQuery("c = foreach b " + + " generate group, SUM(a.age), a;};"); + + // make sure there isn't a combine plan in the explain output + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(baos); + pigServer.explain("c", ps); + assertFalse(baos.toString().matches("(?si).*combine plan.*")); + + Iterator<Tuple> it = pigServer.openIterator("c"); + int count = 0; + while (it.hasNext()) { + Tuple t = it.next(); + assertEquals(expected[count++], t.toString()); + } + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } finally { + try { + Util.deleteFile(cluster, "forEachNoCombinerInput.txt"); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + } + } }