Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java?rev=835487&r1=835486&r2=835487&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java Thu Nov 12 18:33:15 2009 @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Iterator; +import org.apache.pig.Accumulator; import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; import org.apache.pig.PigException; @@ -33,7 +34,7 @@ /** * Generates the min of the Long values in the first field of a tuple. */ -public class LongMin extends EvalFunc<Long> implements Algebraic { +public class LongMin extends EvalFunc<Long> implements Algebraic, Accumulator<Long> { @Override public Long exec(Tuple input) throws IOException { @@ -152,4 +153,38 @@ public Schema outputSchema(Schema input) { return new Schema(new Schema.FieldSchema(null, DataType.LONG)); } + + /* Accumulator interface implementation */ + private Long intermediateMin = null; + + @Override + public void accumulate(Tuple b) throws IOException { + try { + Long curMin = min(b); + if (curMin == null) { + return; + } + /* if bag is not null, initialize intermediateMax to negative infinity */ + if (intermediateMin == null) { + intermediateMin = Long.MAX_VALUE; + } + intermediateMin = java.lang.Math.min(intermediateMin, curMin); + } catch (ExecException ee) { + throw ee; + } catch (Exception e) { + int errCode = 2106; + String msg = "Error while computing min in " + this.getClass().getSimpleName(); + throw new ExecException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void cleanup() { + intermediateMin = null; + } + + @Override + public Long getValue() { + return intermediateMin; + } }
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java?rev=835487&r1=835486&r2=835487&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java Thu Nov 12 18:33:15 2009 @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Iterator; +import org.apache.pig.Accumulator; import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; import org.apache.pig.PigException; @@ -35,7 +36,7 @@ /** * Generates the sum of the Long values in the first field of a tuple. */ -public class LongSum extends EvalFunc<Long> implements Algebraic { +public class LongSum extends EvalFunc<Long> implements Algebraic, Accumulator<Long> { @Override public Long exec(Tuple input) throws IOException { @@ -155,5 +156,35 @@ public Schema outputSchema(Schema input) { return new Schema(new Schema.FieldSchema(null, DataType.LONG)); } + + /* Accumulator interface implementation*/ + private Long intermediateSum = null; + + @Override + public void accumulate(Tuple b) throws IOException { + try { + Long curSum = sum(b); + if (curSum == null) { + return; + } + intermediateSum = (intermediateSum == null ? 0L : intermediateSum) + curSum; + } catch (ExecException ee) { + throw ee; + } catch (Exception e) { + int errCode = 2106; + String msg = "Error while computing min in " + this.getClass().getSimpleName(); + throw new ExecException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void cleanup() { + intermediateSum = null; + } + + @Override + public Long getValue() { + return intermediateSum; + } } Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java?rev=835487&r1=835486&r2=835487&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java Thu Nov 12 18:33:15 2009 @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.List; +import org.apache.pig.Accumulator; import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; import org.apache.pig.FuncSpec; @@ -40,7 +41,7 @@ /** * Generates the max of the values of the first field of a tuple. */ -public class MAX extends EvalFunc<Double> implements Algebraic { +public class MAX extends EvalFunc<Double> implements Algebraic, Accumulator<Double> { @Override public Double exec(Tuple input) throws IOException { @@ -216,5 +217,39 @@ funcList.add(new FuncSpec(LongMax.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG))); funcList.add(new FuncSpec(StringMax.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.CHARARRAY))); return funcList; + } + + /* Accumulator interface implementation */ + private Double intermediateMax = null; + + @Override + public void accumulate(Tuple b) throws IOException { + try { + Double curMax = max(b); + if (curMax == null) { + return; + } + /* if bag is not null, initialize intermediateMax to negative infinity */ + if (intermediateMax == null) { + intermediateMax = Double.NEGATIVE_INFINITY; + } + intermediateMax = java.lang.Math.max(intermediateMax, curMax); + } catch (ExecException ee) { + throw ee; + } catch (Exception e) { + int errCode = 2106; + String msg = "Error while computing min in " + this.getClass().getSimpleName(); + throw new ExecException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void cleanup() { + intermediateMax = null; + } + + @Override + public Double getValue() { + return intermediateMax; } } Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java?rev=835487&r1=835486&r2=835487&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java Thu Nov 12 18:33:15 2009 @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.List; +import org.apache.pig.Accumulator; import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; import org.apache.pig.FuncSpec; @@ -40,7 +41,7 @@ /** * Generates the min of the values of the first field of a tuple. */ -public class MIN extends EvalFunc<Double> implements Algebraic { +public class MIN extends EvalFunc<Double> implements Algebraic, Accumulator<Double> { @Override public Double exec(Tuple input) throws IOException { @@ -217,5 +218,39 @@ funcList.add(new FuncSpec(LongMin.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG))); funcList.add(new FuncSpec(StringMin.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.CHARARRAY))); return funcList; + } + + /* Accumulator interface implementation */ + private Double intermediateMin = null; + + @Override + public void accumulate(Tuple b) throws IOException { + try { + Double curMin = min(b); + if (curMin == null) { + return; + } + /* if bag is not null, initialize intermediateMax to negative infinity */ + if (intermediateMin == null) { + intermediateMin = Double.POSITIVE_INFINITY; + } + intermediateMin = java.lang.Math.min(intermediateMin, curMin); + } catch (ExecException ee) { + throw ee; + } catch (Exception e) { + int errCode = 2106; + String msg = "Error while computing min in " + this.getClass().getSimpleName(); + throw new ExecException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void cleanup() { + intermediateMin = null; + } + + @Override + public Double getValue() { + return intermediateMin; } } Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java?rev=835487&r1=835486&r2=835487&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java Thu Nov 12 18:33:15 2009 @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.List; +import org.apache.pig.Accumulator; import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; import org.apache.pig.FuncSpec; @@ -39,7 +40,7 @@ /** * Generates the sum of the values of the first field of a tuple. */ -public class SUM extends EvalFunc<Double> implements Algebraic { +public class SUM extends EvalFunc<Double> implements Algebraic, Accumulator<Double> { @Override public Double exec(Tuple input) throws IOException { @@ -222,6 +223,36 @@ funcList.add(new FuncSpec(IntSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER))); funcList.add(new FuncSpec(LongSum.class.getName(), Schema.generateNestedSchema(DataType.BAG, DataType.LONG))); return funcList; + } + + /* Accumulator interface implementation*/ + private Double intermediateSum = null; + + @Override + public void accumulate(Tuple b) throws IOException { + try { + Double curSum = sum(b); + if (curSum == null) { + return; + } + intermediateSum = (intermediateSum == null ? 0.0 : intermediateSum) + curSum; + } catch (ExecException ee) { + throw ee; + } catch (Exception e) { + int errCode = 2106; + String msg = "Error while computing sum in " + this.getClass().getSimpleName(); + throw new ExecException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void cleanup() { + intermediateSum = null; + } + + @Override + public Double getValue() { + return intermediateSum; } } Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java?rev=835487&r1=835486&r2=835487&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java Thu Nov 12 18:33:15 2009 @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Iterator; +import org.apache.pig.Accumulator; import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; import org.apache.pig.PigException; @@ -33,7 +34,7 @@ /** * Generates the max of the values of the first field of a tuple. */ -public class StringMax extends EvalFunc<String> implements Algebraic { +public class StringMax extends EvalFunc<String> implements Algebraic, Accumulator<String> { @Override public String exec(Tuple input) throws IOException { @@ -152,4 +153,39 @@ public Schema outputSchema(Schema input) { return new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY)); } + + + /* accumulator interface */ + private String intermediateMax = null; + + @Override + public void accumulate(Tuple b) throws IOException { + try { + String curMax = max(b); + if (curMax == null) { + return; + } + // check if it lexicographically follows curMax + if (intermediateMax == null || intermediateMax.compareTo(curMax) > 0) { + intermediateMax = curMax; + } + + } catch (ExecException ee) { + throw ee; + } catch (Exception e) { + int errCode = 2106; + String msg = "Error while computing max in " + this.getClass().getSimpleName(); + throw new ExecException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void cleanup() { + intermediateMax = null; + } + + @Override + public String getValue() { + return intermediateMax; + } } Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java?rev=835487&r1=835486&r2=835487&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java Thu Nov 12 18:33:15 2009 @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Iterator; +import org.apache.pig.Accumulator; import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; import org.apache.pig.PigException; @@ -35,7 +36,7 @@ /** * Generates the min of the String values in the first field of a tuple. */ -public class StringMin extends EvalFunc<String> implements Algebraic { +public class StringMin extends EvalFunc<String> implements Algebraic, Accumulator<String> { @Override public String exec(Tuple input) throws IOException { @@ -154,5 +155,39 @@ public Schema outputSchema(Schema input) { return new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY)); } + + /* accumulator interface */ + private String intermediateMin = null; + + @Override + public void accumulate(Tuple b) throws IOException { + try { + String curMin = min(b); + if (curMin == null) { + return; + } + // check if it lexicographically follows curMax + if (intermediateMin == null || intermediateMin.compareTo(curMin) < 0) { + intermediateMin = curMin; + } + + } catch (ExecException ee) { + throw ee; + } catch (Exception e) { + int errCode = 2106; + String msg = "Error while computing max in " + this.getClass().getSimpleName(); + throw new ExecException(msg, errCode, PigException.BUG, e); + } + } + + @Override + public void cleanup() { + intermediateMin = null; + } + + @Override + public String getValue() { + return intermediateMin; + } } Added: hadoop/pig/trunk/src/org/apache/pig/data/AccumulativeBag.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/AccumulativeBag.java?rev=835487&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/data/AccumulativeBag.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/data/AccumulativeBag.java Thu Nov 12 18:33:15 2009 @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.data; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.*; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.AccumulativeTupleBuffer; + +public class AccumulativeBag implements DataBag { + private static final long serialVersionUID = 1L; + + private transient AccumulativeTupleBuffer buffer; + private int index; + + public AccumulativeBag(AccumulativeTupleBuffer buffer, int index) { + this.buffer = buffer; + this.index = index; + } + + public void add(Tuple t) { + throw new RuntimeException("AccumulativeBag does not support add operation"); + } + + public void addAll(DataBag b) { + throw new RuntimeException("AccumulativeBag does not support add operation"); + } + + public void clear() { + throw new RuntimeException("AccumulativeBag does not support clear operation"); + } + + public boolean isDistinct() { + return false; + } + + public boolean isSorted() { + return false; + } + + public AccumulativeTupleBuffer getTuplebuffer() { + return buffer; + } + + public Iterator<Tuple> iterator() { + return buffer.getTuples(index); + } + + public void markStale(boolean stale) { + + } + + public long size() { + throw new RuntimeException("AccumulativeBag does not support size() operation"); + } + + public long getMemorySize() { + return 0; + } + + public long spill() { + return 0; + } + + public void readFields(DataInput datainput) throws IOException { + throw new IOException("AccumulativeBag does not support readFields operation"); + } + + public void write(DataOutput dataoutput) throws IOException { + throw new IOException("AccumulativeBag does not support write operation"); + } + + public int compareTo(Object other) { + throw new RuntimeException("AccumulativeBag does not support compareTo() operation"); + } + + public boolean equals(Object other) { + if (this == other) { + return true; + } + + return false; + } + + public int hashCode() { + assert false : "hashCode not designed"; + return 42; + } + +} Added: hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java?rev=835487&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestAccumulator.java Thu Nov 12 18:33:15 2009 @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.io.*; +import java.util.HashMap; +import java.util.Iterator; +import junit.framework.TestCase; + +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.Tuple; +import org.junit.After; +import org.junit.Before; + +public class TestAccumulator extends TestCase{ + private static final String INPUT_FILE = "AccumulatorInput.txt"; + private static final String INPUT_FILE2 = "AccumulatorInput2.txt"; + private static final String INPUT_FILE3 = "AccumulatorInput3.txt"; + + private PigServer pigServer; + private MiniCluster cluster = MiniCluster.buildCluster(); + + public TestAccumulator() throws ExecException, IOException{ + pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + // pigServer = new PigServer(ExecType.LOCAL); + pigServer.getPigContext().getProperties().setProperty("pig.accumulative.batchsize", "2"); + pigServer.getPigContext().getProperties().setProperty("pig.exec.batchsize", "2"); + } + + @Before + public void setUp() throws Exception { + createFiles(); + } + + private void createFiles() throws IOException { + PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE)); + + w.println("100\tapple"); + w.println("200\torange"); + w.println("300\tstrawberry"); + w.println("300\tpear"); + w.println("100\tapple"); + w.println("300\tpear"); + w.println("400\tapple"); + w.close(); + + Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE); + + w = new PrintWriter(new FileWriter(INPUT_FILE2)); + + w.println("100\t"); + w.println("100\t"); + w.println("200\t"); + w.println("200\t"); + w.println("300\tstrawberry"); + w.close(); + + Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2); + + w = new PrintWriter(new FileWriter(INPUT_FILE3)); + + w.println("100\t1.0"); + w.println("100\t2.0"); + w.println("200\t1.1"); + w.println("200\t2.1"); + w.println("100\t3.0"); + w.println("100\t4.0"); + w.println("200\t3.1"); + w.println("100\t5.0"); + w.println("300\t3.3"); + w.close(); + + Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3); + } + + @After + public void tearDown() throws Exception { + new File(INPUT_FILE).delete(); + Util.deleteFile(cluster, INPUT_FILE); + new File(INPUT_FILE2).delete(); + Util.deleteFile(cluster, INPUT_FILE2); + new File(INPUT_FILE3).delete(); + Util.deleteFile(cluster, INPUT_FILE3); + } + + + public void testAccumBasic() throws IOException{ + // test group by + pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);"); + pigServer.registerQuery("B = group A by id;"); + pigServer.registerQuery("C = foreach B generate group, org.apache.pig.test.utils.AccumulatorBagCount(A);"); + + HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>(); + expected.put(100, 2); + expected.put(200, 1); + expected.put(300, 3); + expected.put(400, 1); + + + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + Tuple t = iter.next(); + assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); + } + + pigServer.registerQuery("B = group A by id;"); + pigServer.registerQuery("C = foreach B generate group, " + + "org.apache.pig.test.utils.AccumulatorBagCount(A), org.apache.pig.test.utils.BagCount(A);"); + + try{ + iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + Tuple t = iter.next(); + assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); + } + fail("accumulator should not be called."); + }catch(IOException e) { + // should throw exception from AccumulatorBagCount. + } + + // test cogroup + pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);"); + pigServer.registerQuery("B = load '" + INPUT_FILE + "' as (id:int, fruit);"); + pigServer.registerQuery("C = cogroup A by id, B by id;"); + pigServer.registerQuery("D = foreach C generate group, " + + "org.apache.pig.test.utils.AccumulatorBagCount(A), org.apache.pig.test.utils.AccumulatorBagCount(B);"); + + HashMap<Integer, String> expected2 = new HashMap<Integer, String>(); + expected2.put(100, "2,2"); + expected2.put(200, "1,1"); + expected2.put(300, "3,3"); + expected2.put(400, "1,1"); + + + iter = pigServer.openIterator("D"); + + while(iter.hasNext()) { + Tuple t = iter.next(); + assertEquals(expected2.get((Integer)t.get(0)), t.get(1).toString()+","+t.get(2).toString()); + } + } + + public void testAccumWithNegative() throws IOException{ + pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);"); + pigServer.registerQuery("B = group A by id;"); + pigServer.registerQuery("C = foreach B generate group, -org.apache.pig.test.utils.AccumulatorBagCount(A);"); + + HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>(); + expected.put(100, -2); + expected.put(200, -1); + expected.put(300, -3); + expected.put(400, -1); + + + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + Tuple t = iter.next(); + assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); + } + } + + public void testAccumWithAdd() throws IOException{ + pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);"); + pigServer.registerQuery("B = group A by id;"); + pigServer.registerQuery("C = foreach B generate group, org.apache.pig.test.utils.AccumulatorBagCount(A)+1.0;"); + + { + HashMap<Integer, Double> expected = new HashMap<Integer, Double>(); + expected.put(100, 3.0); + expected.put(200, 2.0); + expected.put(300, 4.0); + expected.put(400, 2.0); + + + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + Tuple t = iter.next(); + assertEquals(expected.get((Integer)t.get(0)), (Double)t.get(1)); + } + } + + { + pigServer.registerQuery("C = foreach B generate group, " + + "org.apache.pig.test.utils.AccumulatorBagCount(A)+org.apache.pig.test.utils.AccumulatorBagCount(A);"); + + HashMap<Integer, Integer>expected = new HashMap<Integer, Integer>(); + expected.put(100, 4); + expected.put(200, 2); + expected.put(300, 6); + expected.put(400, 2); + + + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + Tuple t = iter.next(); + assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); + } + } + } + + public void testAccumWithMinus() throws IOException{ + pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);"); + pigServer.registerQuery("B = group A by id;"); + pigServer.registerQuery("C = foreach B generate group, " + + " org.apache.pig.test.utils.AccumulatorBagCount(A)*3.0-org.apache.pig.test.utils.AccumulatorBagCount(A);"); + + HashMap<Integer, Double> expected = new HashMap<Integer, Double>(); + expected.put(100, 4.0); + expected.put(200, 2.0); + expected.put(300, 6.0); + expected.put(400, 2.0); + + + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + Tuple t = iter.next(); + assertEquals(expected.get((Integer)t.get(0)), (Double)t.get(1)); + } + } + + public void testAccumWithMod() throws IOException{ + pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);"); + pigServer.registerQuery("B = group A by id;"); + pigServer.registerQuery("C = foreach B generate group, " + + "org.apache.pig.test.utils.AccumulatorBagCount(A) % 2;"); + + HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>(); + expected.put(100, 0); + expected.put(200, 1); + expected.put(300, 1); + expected.put(400, 1); + + + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + Tuple t = iter.next(); + assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); + } + } + + public void testAccumWithDivide() throws IOException{ + pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);"); + pigServer.registerQuery("B = group A by id;"); + pigServer.registerQuery("C = foreach B generate group, " + + "org.apache.pig.test.utils.AccumulatorBagCount(A)/2;"); + + HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>(); + expected.put(100, 1); + expected.put(200, 0); + expected.put(300, 1); + expected.put(400, 0); + + + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + Tuple t = iter.next(); + assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); + } + } + + public void testAccumWithAnd() throws IOException{ + pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);"); + pigServer.registerQuery("B = group A by id;"); + pigServer.registerQuery("C = foreach B generate group, " + + "((org.apache.pig.test.utils.AccumulatorBagCount(A)>1 and " + + "org.apache.pig.test.utils.AccumulatorBagCount(A)<3)?0:1);"); + + HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>(); + expected.put(100, 0); + expected.put(200, 1); + expected.put(300, 1); + expected.put(400, 1); + + + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + Tuple t = iter.next(); + assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); + } + } + + public void testAccumWithOr() throws IOException{ + pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);"); + pigServer.registerQuery("B = group A by id;"); + pigServer.registerQuery("C = foreach B generate group, " + + "((org.apache.pig.test.utils.AccumulatorBagCount(A)>3 or " + + "org.apache.pig.test.utils.AccumulatorBagCount(A)<2)?0:1);"); + + HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>(); + expected.put(100, 1); + expected.put(200, 0); + expected.put(300, 1); + expected.put(400, 0); + + + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + Tuple t = iter.next(); + assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); + } + } + + public void testAccumWithRegexp() throws IOException{ + pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);"); + pigServer.registerQuery("B = group A by id;"); + pigServer.registerQuery("C = foreach B generate group, " + + "(((chararray)org.apache.pig.test.utils.AccumulatorBagCount(A)) matches '1*' ?0:1);"); + + HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>(); + expected.put(100, 1); + expected.put(200, 0); + expected.put(300, 1); + expected.put(400, 0); + + + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + Tuple t = iter.next(); + assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); + } + } + + + public void testAccumWithIsNull() throws IOException{ + pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);"); + pigServer.registerQuery("B = group A by id;"); + pigServer.registerQuery("C = foreach B generate group, " + + "((chararray)org.apache.pig.test.utils.AccumulativeSumBag(A) is null?0:1);"); + + HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>(); + expected.put(100, 0); + expected.put(200, 0); + expected.put(300, 1); + + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + Tuple t = iter.next(); + assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); + } + } + + public void testAccumWithDistinct() throws IOException{ + pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, f);"); + pigServer.registerQuery("B = group A by id;"); + pigServer.registerQuery("C = foreach B { D = distinct A; generate group, org.apache.pig.test.utils.AccumulatorBagCount(D)+1;};"); + + HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>(); + expected.put(100, 2); + expected.put(200, 2); + expected.put(300, 3); + expected.put(400, 2); + + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + Tuple t = iter.next(); + assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1)); + } + } + + public void testAccumWithSort() throws IOException{ + pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, f);"); + pigServer.registerQuery("B = foreach A generate id, f, id as t;"); + pigServer.registerQuery("C = group B by id;"); + pigServer.registerQuery("D = foreach C { E = order B by f; F = E.f; generate group, org.apache.pig.test.utils.AccumulativeSumBag(F);};"); + + HashMap<Integer, String> expected = new HashMap<Integer, String>(); + expected.put(100, "(apple)(apple)"); + expected.put(200, "(orange)"); + expected.put(300, "(pear)(pear)(strawberry)"); + expected.put(400, "(apple)"); + + Iterator<Tuple> iter = pigServer.openIterator("D"); + + while(iter.hasNext()) { + Tuple t = iter.next(); + assertEquals(expected.get((Integer)t.get(0)), (String)t.get(1)); + } + } + + public void testAccumWithBuildin() throws IOException{ + pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);"); + pigServer.registerQuery("C = group A by id;"); + pigServer.registerQuery("D = foreach C generate group, SUM(A.v), AVG(A.v), COUNT(A.v), MIN(A.v), MAX(A.v);"); + + HashMap<Integer, Double[]> expected = new HashMap<Integer, Double[]>(); + expected.put(100, new Double[]{15.0,3.0,5.0,1.0,5.0}); + expected.put(200, new Double[]{6.3,2.1,3.0,1.1,3.1}); + expected.put(300, new Double[]{3.3,3.3,1.0,3.3,3.3}); + + Iterator<Tuple> iter = pigServer.openIterator("D"); + + while(iter.hasNext()) { + Tuple t = iter.next(); + Double[] v = expected.get((Integer)t.get(0)); + for(int i=0; i<v.length; i++) { + assertEquals(v[i].doubleValue(), ((Number)t.get(i+1)).doubleValue(), 0.0001); + } + } + } +} Added: hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulativeSumBag.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulativeSumBag.java?rev=835487&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulativeSumBag.java (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulativeSumBag.java Thu Nov 12 18:33:15 2009 @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.test.utils; + +import java.io.IOException; +import java.util.Iterator; +import org.apache.pig.EvalFunc; +import org.apache.pig.Accumulator; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; + +/** + * This class is for testing of accumulator udfs + * + */ +public class AccumulativeSumBag extends EvalFunc<String> implements Accumulator<String> +{ + + StringBuffer sb; + + public AccumulativeSumBag() { + } + + public void accumulate(Tuple tuple) throws IOException { + DataBag databag = (DataBag)tuple.get(0); + if(databag == null) + return; + + if (sb == null) { + sb = new StringBuffer(); + } + + Iterator<Tuple> iterator = databag.iterator(); + while(iterator.hasNext()) { + Tuple t = iterator.next(); + if (t.size()>1 && t.get(1) == null) { + continue; + } + + sb.append(t.toString()); + } + } + + public String getValue() { + if (sb != null && sb.length()>0) { + return sb.toString(); + } + return null; + } + + public void cleanup() { + sb = null; + } + + public String exec(Tuple tuple) throws IOException { + throw new IOException("exec() should not be called"); + } +} + \ No newline at end of file Added: hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulatorBagCount.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulatorBagCount.java?rev=835487&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulatorBagCount.java (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/utils/AccumulatorBagCount.java Thu Nov 12 18:33:15 2009 @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.test.utils; + +import java.io.IOException; +import java.util.Iterator; +import org.apache.pig.EvalFunc; +import org.apache.pig.Accumulator; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; + +public class AccumulatorBagCount extends EvalFunc<Integer> implements Accumulator<Integer> { + + int count = 0; + + public AccumulatorBagCount() { + } + + public void accumulate(Tuple tuple) throws IOException { + DataBag databag = (DataBag)tuple.get(0); + if(databag == null) + return; + + Iterator<Tuple> iterator = databag.iterator(); + while(iterator.hasNext()) { + iterator.next(); + count++; + } + } + + public Integer getValue() { + return new Integer(count); + } + + public void cleanup() { + count = 0; + } + + public Integer exec(Tuple tuple) throws IOException { + throw new IOException("exec() should not be called."); + } +} + \ No newline at end of file Added: hadoop/pig/trunk/test/org/apache/pig/test/utils/BagCount.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/BagCount.java?rev=835487&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/utils/BagCount.java (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/utils/BagCount.java Thu Nov 12 18:33:15 2009 @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.test.utils; + +import java.io.IOException; +import java.util.Iterator; +import org.apache.pig.EvalFunc; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; + +public class BagCount extends EvalFunc<Integer> { + + + public BagCount() { + } + + + public Integer exec(Tuple tuple) throws IOException { + DataBag databag = (DataBag)tuple.get(0); + if(databag == null) { + return new Integer(0); + } + + int count = 0; + + Iterator<Tuple> iterator = databag.iterator(); + while(iterator.hasNext()) { + iterator.next(); + count++; + } + + return new Integer(count); + } +} + \ No newline at end of file Modified: hadoop/pig/trunk/test/org/apache/pig/test/utils/POCastDummy.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/POCastDummy.java?rev=835487&r1=835486&r2=835487&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/utils/POCastDummy.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/utils/POCastDummy.java Thu Nov 12 18:33:15 2009 @@ -17,6 +17,8 @@ */ package org.apache.pig.test.utils; +import java.util.List; + import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataByteArray; import org.apache.pig.impl.plan.OperatorKey; @@ -96,7 +98,10 @@ } return new Result(); } - - + + @Override + protected List<ExpressionOperator> getChildExpressions() { + return null; + } }