Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java Thu Nov 12 
18:33:15 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
 /**
  * Generates the min of the Long values in the first field of a tuple.
  */
-public class LongMin extends EvalFunc<Long> implements Algebraic {
+public class LongMin extends EvalFunc<Long> implements Algebraic, 
Accumulator<Long> {
 
     @Override
     public Long exec(Tuple input) throws IOException {
@@ -152,4 +153,38 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.LONG)); 
     }
+    
+    /* Accumulator interface implementation */
+    private Long intermediateMin = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Long curMin = min(b);
+            if (curMin == null) {
+                return;
+            }
+            /* if bag is not null, initialize intermediateMax to negative 
infinity */
+            if (intermediateMin == null) {
+                intermediateMin = Long.MAX_VALUE;
+            }
+            intermediateMin = java.lang.Math.min(intermediateMin, curMin);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + 
this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);        
   
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMin = null;
+    }
+
+    @Override
+    public Long getValue() {
+        return intermediateMin;
+    }    
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java Thu Nov 12 
18:33:15 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
 /**
  * Generates the sum of the Long values in the first field of a tuple.
  */
-public class LongSum extends EvalFunc<Long> implements Algebraic {
+public class LongSum extends EvalFunc<Long> implements Algebraic, 
Accumulator<Long> {
 
     @Override
     public Long exec(Tuple input) throws IOException {
@@ -155,5 +156,35 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.LONG)); 
     }
+    
+    /* Accumulator interface implementation*/
+    private Long intermediateSum = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Long curSum = sum(b);
+            if (curSum == null) {
+                return;
+            }
+            intermediateSum = (intermediateSum == null ? 0L : intermediateSum) 
+ curSum;
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + 
this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);        
   
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+    }
+
+    @Override
+    public Long getValue() {
+        return intermediateSum;
+    }    
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java Thu Nov 12 18:33:15 
2009
@@ -22,6 +22,7 @@
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
@@ -40,7 +41,7 @@
 /**
  * Generates the max of the values of the first field of a tuple.
  */
-public class MAX extends EvalFunc<Double> implements Algebraic {
+public class MAX extends EvalFunc<Double> implements Algebraic, 
Accumulator<Double> {
 
     @Override
     public Double exec(Tuple input) throws IOException {
@@ -216,5 +217,39 @@
         funcList.add(new FuncSpec(LongMax.class.getName(), 
Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
         funcList.add(new FuncSpec(StringMax.class.getName(), 
Schema.generateNestedSchema(DataType.BAG, DataType.CHARARRAY)));
         return funcList;
+    }
+
+    /* Accumulator interface implementation */
+    private Double intermediateMax = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double curMax = max(b);
+            if (curMax == null) {
+                return;
+            }
+            /* if bag is not null, initialize intermediateMax to negative 
infinity */
+            if (intermediateMax == null) {
+                intermediateMax = Double.NEGATIVE_INFINITY;
+            }
+            intermediateMax = java.lang.Math.max(intermediateMax, curMax);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + 
this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);        
   
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMax = null;
+    }
+
+    @Override
+    public Double getValue() {
+        return intermediateMax;
     }    
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java Thu Nov 12 18:33:15 
2009
@@ -22,6 +22,7 @@
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
@@ -40,7 +41,7 @@
 /**
  * Generates the min of the values of the first field of a tuple.
  */
-public class MIN extends EvalFunc<Double> implements Algebraic {
+public class MIN extends EvalFunc<Double> implements Algebraic, 
Accumulator<Double> {
 
     @Override
     public Double exec(Tuple input) throws IOException {
@@ -217,5 +218,39 @@
         funcList.add(new FuncSpec(LongMin.class.getName(), 
Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
         funcList.add(new FuncSpec(StringMin.class.getName(), 
Schema.generateNestedSchema(DataType.BAG, DataType.CHARARRAY)));
         return funcList;
+    }
+
+    /* Accumulator interface implementation */
+    private Double intermediateMin = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double curMin = min(b);
+            if (curMin == null) {
+                return;
+            }
+            /* if bag is not null, initialize intermediateMax to negative 
infinity */
+            if (intermediateMin == null) {
+                intermediateMin = Double.POSITIVE_INFINITY;
+            }
+            intermediateMin = java.lang.Math.min(intermediateMin, curMin);
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing min in " + 
this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);        
   
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMin = null;
+    }
+
+    @Override
+    public Double getValue() {
+        return intermediateMin;
     }    
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java Thu Nov 12 18:33:15 
2009
@@ -22,6 +22,7 @@
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
@@ -39,7 +40,7 @@
 /**
  * Generates the sum of the values of the first field of a tuple.
  */
-public class SUM extends EvalFunc<Double> implements Algebraic {
+public class SUM extends EvalFunc<Double> implements Algebraic, 
Accumulator<Double> {
 
     @Override
     public Double exec(Tuple input) throws IOException {
@@ -222,6 +223,36 @@
         funcList.add(new FuncSpec(IntSum.class.getName(), 
Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER)));
         funcList.add(new FuncSpec(LongSum.class.getName(), 
Schema.generateNestedSchema(DataType.BAG, DataType.LONG)));
         return funcList;
+    }
+
+    /* Accumulator interface implementation*/
+    private Double intermediateSum = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            Double curSum = sum(b);
+            if (curSum == null) {
+                return;
+            }
+            intermediateSum = (intermediateSum == null ? 0.0 : 
intermediateSum) + curSum;
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing sum in " + 
this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateSum = null;
+    }
+
+    @Override
+    public Double getValue() {
+        return intermediateSum;
     }    
     
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java Thu Nov 12 
18:33:15 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -33,7 +34,7 @@
 /**
  * Generates the max of the values of the first field of a tuple.
  */
-public class StringMax extends EvalFunc<String> implements Algebraic {
+public class StringMax extends EvalFunc<String> implements Algebraic, 
Accumulator<String> {
 
     @Override
     public String exec(Tuple input) throws IOException {
@@ -152,4 +153,39 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY)); 
     }
+
+
+    /* accumulator interface */
+    private String intermediateMax = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            String curMax = max(b);
+            if (curMax == null) {
+                return;
+            }
+            // check if it lexicographically follows curMax
+            if (intermediateMax == null || intermediateMax.compareTo(curMax) > 
0) {
+                intermediateMax = curMax;
+            }            
+
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing max in " + 
this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);        
   
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMax = null;
+    }
+
+    @Override
+    public String getValue() {
+        return intermediateMax;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java Thu Nov 12 
18:33:15 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigException;
@@ -35,7 +36,7 @@
 /**
  * Generates the min of the String values in the first field of a tuple.
  */
-public class StringMin extends EvalFunc<String> implements Algebraic {
+public class StringMin extends EvalFunc<String> implements Algebraic, 
Accumulator<String> {
 
     @Override
     public String exec(Tuple input) throws IOException {
@@ -154,5 +155,39 @@
     public Schema outputSchema(Schema input) {
         return new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY)); 
     }
+    
+    /* accumulator interface */
+    private String intermediateMin = null;
+    
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try {
+            String curMin = min(b);
+            if (curMin == null) {
+                return;
+            }
+            // check if it lexicographically follows curMax
+            if (intermediateMin == null || intermediateMin.compareTo(curMin) < 
0) {
+                intermediateMin = curMin;
+            }            
+
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;
+            String msg = "Error while computing max in " + 
this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);        
   
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        intermediateMin = null;
+    }
+
+    @Override
+    public String getValue() {
+        return intermediateMin;
+    }
 
 }

Added: hadoop/pig/trunk/src/org/apache/pig/data/AccumulativeBag.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/AccumulativeBag.java?rev=835487&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/AccumulativeBag.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/data/AccumulativeBag.java Thu Nov 12 
18:33:15 2009
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.*;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.AccumulativeTupleBuffer;
+
+public class AccumulativeBag implements DataBag {
+    private static final long serialVersionUID = 1L;    
+     
+    private transient AccumulativeTupleBuffer buffer;
+    private int index;
+    
+    public AccumulativeBag(AccumulativeTupleBuffer buffer, int index) {
+        this.buffer = buffer;
+        this.index = index;
+    }
+    
+    public void add(Tuple t) {
+        throw new RuntimeException("AccumulativeBag does not support add 
operation");
+    }
+
+    public void addAll(DataBag b) {
+        throw new RuntimeException("AccumulativeBag does not support add 
operation");
+    }
+
+    public void clear() {
+        throw new RuntimeException("AccumulativeBag does not support clear 
operation");
+    }
+
+    public boolean isDistinct() {              
+        return false;
+    }
+
+    public boolean isSorted() {                
+        return false;
+    }  
+    
+    public AccumulativeTupleBuffer getTuplebuffer() {
+        return buffer;
+    }
+
+    public Iterator<Tuple> iterator() {                                
+        return buffer.getTuples(index);
+    }
+
+    public void markStale(boolean stale) {             
+
+    }
+
+    public long size() {               
+        throw new RuntimeException("AccumulativeBag does not support size() 
operation");
+    }
+
+    public long getMemorySize() {      
+        return 0;
+    }
+
+    public long spill() {              
+        return 0;
+    }
+
+    public void readFields(DataInput datainput) throws IOException {
+        throw new IOException("AccumulativeBag does not support readFields 
operation");
+    }
+
+    public void write(DataOutput dataoutput) throws IOException {
+        throw new IOException("AccumulativeBag does not support write 
operation");
+    }
+
+    public int compareTo(Object other) {
+        throw new RuntimeException("AccumulativeBag does not support 
compareTo() operation");
+    }
+    
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        
+        return false;
+    }
+        
+    public int hashCode() {
+        assert false : "hashCode not designed";
+        return 42;
+    }
+
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java?rev=835487&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java Thu Nov 12 
18:33:15 2009
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.Iterator;
+import junit.framework.TestCase;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.After;
+import org.junit.Before;
+
+public class TestAccumulator extends TestCase{
+    private static final String INPUT_FILE = "AccumulatorInput.txt";
+    private static final String INPUT_FILE2 = "AccumulatorInput2.txt";
+    private static final String INPUT_FILE3 = "AccumulatorInput3.txt";
+ 
+    private PigServer pigServer;
+    private MiniCluster cluster = MiniCluster.buildCluster();
+    
+    public TestAccumulator() throws ExecException, IOException{
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        // pigServer = new PigServer(ExecType.LOCAL);
+        
pigServer.getPigContext().getProperties().setProperty("pig.accumulative.batchsize",
 "2");     
+        
pigServer.getPigContext().getProperties().setProperty("pig.exec.batchsize", 
"2");
+    }
+    
+    @Before
+    public void setUp() throws Exception {
+        createFiles();
+    }
+
+    private void createFiles() throws IOException {
+        PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+                
+        w.println("100\tapple");               
+        w.println("200\torange");      
+        w.println("300\tstrawberry");          
+        w.println("300\tpear");
+        w.println("100\tapple");
+        w.println("300\tpear");
+        w.println("400\tapple");    
+        w.close();   
+        
+        Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+        
+        w = new PrintWriter(new FileWriter(INPUT_FILE2));
+        
+        w.println("100\t");            
+        w.println("100\t");
+        w.println("200\t");            
+        w.println("200\t");            
+        w.println("300\tstrawberry");
+        w.close();   
+        
+        Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2);
+        
+        w = new PrintWriter(new FileWriter(INPUT_FILE3));
+        
+        w.println("100\t1.0");         
+        w.println("100\t2.0");
+        w.println("200\t1.1");         
+        w.println("200\t2.1");
+        w.println("100\t3.0");         
+        w.println("100\t4.0");
+        w.println("200\t3.1");
+        w.println("100\t5.0");
+        w.println("300\t3.3");
+        w.close();   
+        
+        Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        new File(INPUT_FILE).delete();
+        Util.deleteFile(cluster, INPUT_FILE);
+        new File(INPUT_FILE2).delete();
+        Util.deleteFile(cluster, INPUT_FILE2);
+        new File(INPUT_FILE3).delete();
+        Util.deleteFile(cluster, INPUT_FILE3);
+    }
+    
+    
+    public void testAccumBasic() throws IOException{
+        // test group by
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, 
fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group,  
org.apache.pig.test.utils.AccumulatorBagCount(A);");                     
+
+        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+        expected.put(100, 2);
+        expected.put(200, 1);
+        expected.put(300, 3);
+        expected.put(400, 1);
+        
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));  
              
+        }            
+        
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group,  " +
+                "org.apache.pig.test.utils.AccumulatorBagCount(A), 
org.apache.pig.test.utils.BagCount(A);");                     
+        
+        try{
+            iter = pigServer.openIterator("C");
+        
+            while(iter.hasNext()) {
+                Tuple t = iter.next();
+                assertEquals(expected.get((Integer)t.get(0)), 
(Integer)t.get(1));                
+            }      
+            fail("accumulator should not be called.");
+        }catch(IOException e) {
+            // should throw exception from AccumulatorBagCount.
+        }
+        
+        // test cogroup
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, 
fruit);");
+        pigServer.registerQuery("B = load '" + INPUT_FILE + "' as (id:int, 
fruit);");
+        pigServer.registerQuery("C = cogroup A by id, B by id;");
+        pigServer.registerQuery("D = foreach C generate group,  " +
+                "org.apache.pig.test.utils.AccumulatorBagCount(A), 
org.apache.pig.test.utils.AccumulatorBagCount(B);");                     
+
+        HashMap<Integer, String> expected2 = new HashMap<Integer, String>();
+        expected2.put(100, "2,2");
+        expected2.put(200, "1,1");
+        expected2.put(300, "3,3");
+        expected2.put(400, "1,1");
+        
+                  
+        iter = pigServer.openIterator("D");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected2.get((Integer)t.get(0)), 
t.get(1).toString()+","+t.get(2).toString());                
+        }            
+    }      
+    
+    public void testAccumWithNegative() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, 
fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group,  
-org.apache.pig.test.utils.AccumulatorBagCount(A);");                     
+
+        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+        expected.put(100, -2);
+        expected.put(200, -1);
+        expected.put(300, -3);
+        expected.put(400, -1);
+        
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));  
              
+        }            
+    }
+    
+    public void testAccumWithAdd() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, 
fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group,  
org.apache.pig.test.utils.AccumulatorBagCount(A)+1.0;");                     
+        
+        {
+            HashMap<Integer, Double> expected = new HashMap<Integer, Double>();
+            expected.put(100, 3.0);
+            expected.put(200, 2.0);
+            expected.put(300, 4.0);
+            expected.put(400, 2.0);
+            
+                      
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+            
+            while(iter.hasNext()) {
+                Tuple t = iter.next();
+                assertEquals(expected.get((Integer)t.get(0)), 
(Double)t.get(1));                
+            }                            
+        }
+        
+        {
+            pigServer.registerQuery("C = foreach B generate group,  " +
+            
"org.apache.pig.test.utils.AccumulatorBagCount(A)+org.apache.pig.test.utils.AccumulatorBagCount(A);");
                     
+
+            HashMap<Integer, Integer>expected = new HashMap<Integer, 
Integer>();
+            expected.put(100, 4);
+            expected.put(200, 2);
+            expected.put(300, 6);
+            expected.put(400, 2);
+    
+              
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+    
+            while(iter.hasNext()) {
+                Tuple t = iter.next();
+                assertEquals(expected.get((Integer)t.get(0)), 
(Integer)t.get(1));                
+            }
+        }
+    }      
+    
+    public void testAccumWithMinus() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, 
fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group, " +
+                " 
org.apache.pig.test.utils.AccumulatorBagCount(A)*3.0-org.apache.pig.test.utils.AccumulatorBagCount(A);");
                     
+
+        HashMap<Integer, Double> expected = new HashMap<Integer, Double>();
+        expected.put(100, 4.0);
+        expected.put(200, 2.0);
+        expected.put(300, 6.0);
+        expected.put(400, 2.0);
+        
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (Double)t.get(1));   
             
+        }                                   
+    }              
+    
+    public void testAccumWithMod() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, 
fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group,  " +
+                "org.apache.pig.test.utils.AccumulatorBagCount(A) % 2;");      
               
+
+        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+        expected.put(100, 0);
+        expected.put(200, 1);
+        expected.put(300, 1);
+        expected.put(400, 1);
+        
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));  
              
+        }                                   
+    }             
+    
+    public void testAccumWithDivide() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, 
fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group,  " +
+                "org.apache.pig.test.utils.AccumulatorBagCount(A)/2;");        
             
+
+        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+        expected.put(100, 1);
+        expected.put(200, 0);
+        expected.put(300, 1);
+        expected.put(400, 0);
+        
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));  
              
+        }                                   
+    }        
+    
+    public void testAccumWithAnd() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, 
fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group,  " +
+                "((org.apache.pig.test.utils.AccumulatorBagCount(A)>1 and " +
+                "org.apache.pig.test.utils.AccumulatorBagCount(A)<3)?0:1);");  
                   
+
+        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+        expected.put(100, 0);
+        expected.put(200, 1);
+        expected.put(300, 1);
+        expected.put(400, 1);
+        
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));  
              
+        }                                   
+    }          
+    
+    public void testAccumWithOr() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, 
fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group,  " +
+                "((org.apache.pig.test.utils.AccumulatorBagCount(A)>3 or " +
+                "org.apache.pig.test.utils.AccumulatorBagCount(A)<2)?0:1);");  
                   
+
+        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+        expected.put(100, 1);
+        expected.put(200, 0);
+        expected.put(300, 1);
+        expected.put(400, 0);
+        
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));  
              
+        }                                   
+    }  
+    
+    public void testAccumWithRegexp() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, 
fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group,  " +
+                
"(((chararray)org.apache.pig.test.utils.AccumulatorBagCount(A)) matches '1*' 
?0:1);");                     
+
+        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+        expected.put(100, 1);
+        expected.put(200, 0);
+        expected.put(300, 1);
+        expected.put(400, 0);
+        
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));  
              
+        }                                   
+    }              
+    
+
+    public void testAccumWithIsNull() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, 
fruit);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B generate group,  " +
+                "((chararray)org.apache.pig.test.utils.AccumulativeSumBag(A) 
is null?0:1);");                     
+
+        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+        expected.put(100, 0);
+        expected.put(200, 0);
+        expected.put(300, 1);                
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));  
              
+        }                                   
+    }              
+    
+    public void testAccumWithDistinct() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, 
f);");
+        pigServer.registerQuery("B = group A by id;");
+        pigServer.registerQuery("C = foreach B { D = distinct A; generate 
group, org.apache.pig.test.utils.AccumulatorBagCount(D)+1;};");                 
    
+
+        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+        expected.put(100, 2);
+        expected.put(200, 2);
+        expected.put(300, 3);
+        expected.put(400, 2);
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));  
              
+        }                                   
+    }             
+    
+    public void testAccumWithSort() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, 
f);");
+        pigServer.registerQuery("B = foreach A generate id, f, id as t;");
+        pigServer.registerQuery("C = group B by id;");
+        pigServer.registerQuery("D = foreach C { E = order B by f; F = E.f; 
generate group, org.apache.pig.test.utils.AccumulativeSumBag(F);};");           
          
+
+        HashMap<Integer, String> expected = new HashMap<Integer, String>();
+        expected.put(100, "(apple)(apple)");
+        expected.put(200, "(orange)");
+        expected.put(300, "(pear)(pear)(strawberry)");
+        expected.put(400, "(apple)");
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            assertEquals(expected.get((Integer)t.get(0)), (String)t.get(1));   
             
+        }                                   
+    }             
+    
+    public void testAccumWithBuildin() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, 
v:double);");
+        pigServer.registerQuery("C = group A by id;");
+        pigServer.registerQuery("D = foreach C generate group, SUM(A.v), 
AVG(A.v), COUNT(A.v), MIN(A.v), MAX(A.v);");                     
+
+        HashMap<Integer, Double[]> expected = new HashMap<Integer, Double[]>();
+        expected.put(100, new Double[]{15.0,3.0,5.0,1.0,5.0});
+        expected.put(200, new Double[]{6.3,2.1,3.0,1.1,3.1});
+        expected.put(300, new Double[]{3.3,3.3,1.0,3.3,3.3});
+                  
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+        while(iter.hasNext()) {
+            Tuple t = iter.next();
+            Double[] v = expected.get((Integer)t.get(0));
+            for(int i=0; i<v.length; i++) {
+                assertEquals(v[i].doubleValue(), 
((Number)t.get(i+1)).doubleValue(), 0.0001);
+            }            
+        }    
+    }
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulativeSumBag.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulativeSumBag.java?rev=835487&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulativeSumBag.java 
(added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulativeSumBag.java Thu 
Nov 12 18:33:15 2009
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test.utils;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.Accumulator;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+
+/**
+ * This class is for testing of accumulator udfs
+ *
+ */
+public class AccumulativeSumBag extends EvalFunc<String> implements 
Accumulator<String>
+{
+
+    StringBuffer sb;
+
+    public AccumulativeSumBag() {
+    }
+
+    public void accumulate(Tuple tuple) throws IOException {
+        DataBag databag = (DataBag)tuple.get(0);
+        if(databag == null)
+            return;
+        
+        if (sb == null) {
+            sb = new StringBuffer();
+        }
+        
+        Iterator<Tuple> iterator = databag.iterator();
+        while(iterator.hasNext()) {
+            Tuple t = iterator.next();
+            if (t.size()>1 && t.get(1) == null) {
+                continue;
+            }
+                        
+               sb.append(t.toString());
+        }
+    }
+
+    public String getValue() {
+        if (sb != null && sb.length()>0) {
+            return sb.toString();
+        }
+        return null;
+    }
+    
+    public void cleanup() {
+        sb = null;
+    }
+
+    public String exec(Tuple tuple) throws IOException {
+        throw new IOException("exec() should not be called");
+    }
+}
+                                                                               
  
\ No newline at end of file

Added: hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulatorBagCount.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulatorBagCount.java?rev=835487&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulatorBagCount.java 
(added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulatorBagCount.java 
Thu Nov 12 18:33:15 2009
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test.utils;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.Accumulator;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+
+public class AccumulatorBagCount extends EvalFunc<Integer> implements 
Accumulator<Integer> {
+
+    int count = 0;
+
+    public AccumulatorBagCount() {
+    }
+
+    public void accumulate(Tuple tuple) throws IOException {
+        DataBag databag = (DataBag)tuple.get(0);
+        if(databag == null)
+            return;
+        
+        Iterator<Tuple> iterator = databag.iterator();
+        while(iterator.hasNext()) {
+            iterator.next();
+            count++;
+        }
+    }
+
+    public Integer getValue() {
+        return new Integer(count);
+    }
+
+    public void cleanup() {
+        count = 0;
+    }
+
+    public Integer exec(Tuple tuple) throws IOException {
+        throw new IOException("exec() should not be called.");
+    }
+}
+                                                                               
  
\ No newline at end of file

Added: hadoop/pig/trunk/test/org/apache/pig/test/utils/BagCount.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/BagCount.java?rev=835487&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/BagCount.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/BagCount.java Thu Nov 12 
18:33:15 2009
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test.utils;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+
+public class BagCount extends EvalFunc<Integer> { 
+
+    
+    public BagCount() {
+    }
+
+
+    public Integer exec(Tuple tuple) throws IOException {
+        DataBag databag = (DataBag)tuple.get(0);
+        if(databag == null) {
+            return new Integer(0);
+        }
+        
+        int count = 0;
+
+        Iterator<Tuple> iterator = databag.iterator();
+        while(iterator.hasNext()) {
+            iterator.next();
+            count++;
+        }
+        
+        return new Integer(count);
+    }
+}
+                                                                               
  
\ No newline at end of file

Modified: hadoop/pig/trunk/test/org/apache/pig/test/utils/POCastDummy.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/POCastDummy.java?rev=835487&r1=835486&r2=835487&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/POCastDummy.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/POCastDummy.java Thu Nov 12 
18:33:15 2009
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.test.utils;
 
+import java.util.List;
+
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -96,7 +98,10 @@
         }
         return new Result();
     }
-    
-    
+
+    @Override
+    protected List<ExpressionOperator> getChildExpressions() {         
+        return null;
+    }    
 
 }


Reply via email to