Author: olga Date: Sat Feb 20 02:26:04 2010 New Revision: 912064 URL: http://svn.apache.org/viewvc?rev=912064&view=rev Log: PIG-1241: Accumulator is turned on when a map is used with a non-accumulative UDF (yinghe vi olgan)
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=912064&r1=912063&r2=912064&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Sat Feb 20 02:26:04 2010 @@ -130,6 +130,9 @@ BUG FIXES +PIG-1241: Accumulator is turned on when a map is used with a non-accumulative +UDF (yinghe vi olgan) + PIG-1215: Make Hadoop jobId more prominent in the client log (ashutoshc) PIG-1216: New load store design does not allow Pig to validate inputs and Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java?rev=912064&r1=912063&r2=912064&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java Sat Feb 20 02:26:04 2010 @@ -155,7 +155,7 @@ } if (po instanceof POMapLookUp) { - return true; + return check(po.getInputs().get(0)); } if (po instanceof POProject) { Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=912064&r1=912063&r2=912064&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Sat Feb 20 02:26:04 2010 @@ -457,9 +457,12 @@ EndOfAllInputSetter checker = new EndOfAllInputSetter(plan); checker.visit(); - AccumulatorOptimizer accum = new AccumulatorOptimizer(plan); - accum.visit(); - + boolean isAccum = + "true".equalsIgnoreCase(pc.getProperties().getProperty("opt.accumulator","true")); + if (isAccum) { + AccumulatorOptimizer accum = new AccumulatorOptimizer(plan); + accum.visit(); + } return plan; } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java?rev=912064&r1=912063&r2=912064&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java Sat Feb 20 02:26:04 2010 @@ -33,6 +33,7 @@ private static final String INPUT_FILE = "AccumulatorInput.txt"; private static final String INPUT_FILE2 = "AccumulatorInput2.txt"; private static final String INPUT_FILE3 = "AccumulatorInput3.txt"; + private static final String INPUT_FILE4 = "AccumulatorInput4.txt"; private PigServer pigServer; private MiniCluster cluster = MiniCluster.buildCluster(); @@ -50,6 +51,7 @@ @Before public void setUp() throws Exception { + pigServer.getPigContext().getProperties().remove("opt.accumulator"); createFiles(); } @@ -94,6 +96,16 @@ w.close(); Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3); + + w = new PrintWriter(new FileWriter(INPUT_FILE4)); + + w.println("100\thttp://ibm.com,ibm"); + w.println("100\thttp://ibm.com,ibm"); + w.println("200\thttp://yahoo.com,yahoo"); + w.println("300\thttp://sun.com,sun"); + w.close(); + + Util.copyFromLocalToCluster(cluster, INPUT_FILE4, INPUT_FILE4); } @After @@ -103,7 +115,9 @@ new File(INPUT_FILE2).delete(); Util.deleteFile(cluster, INPUT_FILE2); new File(INPUT_FILE3).delete(); - Util.deleteFile(cluster, INPUT_FILE3); + Util.deleteFile(cluster, INPUT_FILE3); + new File(INPUT_FILE4).delete(); + Util.deleteFile(cluster, INPUT_FILE4); } public void testAccumBasic() throws IOException{ @@ -486,17 +500,58 @@ } } - // Pig 1105 + // Pig 1105 public void testAccumCountStar() throws IOException{ pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);"); pigServer.registerQuery("C = group A by id;"); pigServer.registerQuery("D = foreach C generate group, COUNT_STAR(A.id);"); - try { - Iterator<Tuple> iter = pigServer.openIterator("D"); - } catch (Exception e) { - fail("COUNT_STAR should be supported by accumulator interface"); - } - } - + try { + Iterator<Tuple> iter = pigServer.openIterator("D"); + } catch (Exception e) { + fail("COUNT_STAR should be supported by accumulator interface"); + } + } + + + public void testAccumulatorOff() throws IOException{ + pigServer.getPigContext().getProperties().setProperty("opt.accumulator", "false"); + + pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);"); + pigServer.registerQuery("B = group A by id;"); + pigServer.registerQuery("C = foreach B generate group, org.apache.pig.test.utils.AccumulativeSumBag(A);"); + + try { + Iterator<Tuple> iter = pigServer.openIterator("C"); + int c = 0; + while(iter.hasNext()) { + iter.next(); + c++; + } + fail("Accumulator should be off."); + }catch(Exception e) { + // we should get exception + } + + } + + public void testAccumWithMap() throws IOException{ + pigServer.registerQuery("A = load '" + INPUT_FILE4 + "' as (id, url);"); + pigServer.registerQuery("B = group A by (id, url);"); + pigServer.registerQuery("C = foreach B generate COUNT(A), org.apache.pig.test.utils.URLPARSE(group.url)#'url';"); + + HashMap<Integer, String> expected = new HashMap<Integer, String>(); + expected.put(2, "http://ibm.com"); + expected.put(1, "http://yahoo.com"); + expected.put(1, "http://sun.com"); + + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + Tuple t = iter.next(); + assertEquals(expected.get((Long)t.get(0)), (String)t.get(1)); + } + } + + }