Author: olga
Date: Sun Dec 6 23:30:18 2009
New Revision: 887797
URL: http://svn.apache.org/viewvc?rev=887797&view=rev
Log:
PIG-1105: COUNT_STAR accumulate interface implementation cases failure
(sriranjan via olgan)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT_STAR.java
hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=887797&r1=887796&r2=887797&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Sun Dec 6 23:30:18 2009
@@ -47,6 +47,9 @@
BUG FIXES
+PIG-1105: COUNT_STAR accumulate interface implementation cases failure
+(sriranjan via olgan)
+
PIG-1118: expression with aggregate functions returning null, with accumulate
interface (yinghe via olgan)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=887797&r1=887796&r2=887797&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Sun Dec 6 23:30:18 2009
@@ -368,7 +368,8 @@
pc.getProperties().getProperty(
"last.input.chunksize", POJoinPackage.DEFAULT_CHUNK_SIZE);
- String prop = System.getProperty("pig.exec.nocombiner");
+ //String prop = System.getProperty("pig.exec.nocombiner");
+ String prop = pc.getProperties().getProperty("pig.exec.nocombiner");
if (!("true".equals(prop))) {
CombinerOptimizer co = new CombinerOptimizer(plan,
lastInputChunkSize);
co.visit();
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT_STAR.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT_STAR.java?rev=887797&r1=887796&r2=887797&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT_STAR.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT_STAR.java Sun Dec 6
23:30:18 2009
@@ -135,7 +135,11 @@
@Override
public void accumulate(Tuple b) throws IOException {
try {
- intermediateCount += sum(b);
+ DataBag values = (DataBag)b.get(0);
+ for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+ it.next();
+ intermediateCount++;
+ }
} catch (ExecException ee) {
throw ee;
} catch (Exception e) {
@@ -143,7 +147,7 @@
String msg = "Error while computing min in " +
this.getClass().getSimpleName();
throw new ExecException(msg, errCode, PigException.BUG, e);
}
- }
+ }
@Override
public void cleanup() {
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java?rev=887797&r1=887796&r2=887797&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java Sun Dec 6
23:30:18 2009
@@ -42,6 +42,7 @@
// pigServer = new PigServer(ExecType.LOCAL);
pigServer.getPigContext().getProperties().setProperty("pig.accumulative.batchsize",
"2");
pigServer.getPigContext().getProperties().setProperty("pig.exec.batchsize",
"2");
+
pigServer.getPigContext().getProperties().setProperty("pig.exec.nocombiner",
"true");
}
@Before
@@ -100,7 +101,6 @@
Util.deleteFile(cluster, INPUT_FILE3);
}
-
public void testAccumBasic() throws IOException{
// test group by
pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int,
fruit);");
@@ -405,12 +405,12 @@
Tuple t = iter.next();
assertEquals(expected.get((Integer)t.get(0)), (String)t.get(1));
}
- }
+ }
public void testAccumWithBuildin() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int,
v:double);");
pigServer.registerQuery("C = group A by id;");
- pigServer.registerQuery("D = foreach C generate group, SUM(A.v),
AVG(A.v), COUNT(A.v), MIN(A.v), MAX(A.v);");
+ pigServer.registerQuery("D = foreach C generate group, SUM(A.v),
AVG(A.v), COUNT(A.v), MIN(A.v), MAX(A.v);");
HashMap<Integer, Double[]> expected = new HashMap<Integer, Double[]>();
expected.put(100, new Double[]{15.0,3.0,5.0,1.0,5.0});
@@ -441,4 +441,18 @@
t.get(1).toString().equals("3401");
}
}
+
+ // Pig 1105
+ public void testAccumCountStar() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int,
v:double);");
+ pigServer.registerQuery("C = group A by id;");
+ pigServer.registerQuery("D = foreach C generate group,
COUNT_STAR(A.id);");
+
+ try {
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+ } catch (Exception e) {
+ fail("COUNT_STAR should be supported by accumulator
interface");
+ }
+ }
+
}