Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java?rev=656043&r1=656042&r2=656043&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java Tue May 13 15:52:02 2008 @@ -38,26 +38,26 @@ super(plan, walker); } - /** - * check if the transform should be done. If this is being called then - * the pattern matches, but there may be other criteria that must be met - * as well. - * @param nodes - List of nodes declared in transform ($1 = nodes[0], - * etc.) Remember that somes entries in node[] may be NULL since they may - * not be created until after the transform. - * @returns - true if the transform should be done. - */ - public abstract boolean check(List<O> nodes); + /** + * check if the transform should be done. If this is being called then + * the pattern matches, but there may be other criteria that must be met + * as well. + * @param nodes - List of nodes declared in transform ($1 = nodes[0], + * etc.) Remember that somes entries in node[] may be NULL since they may + * not be created until after the transform. + * @returns - true if the transform should be done. + */ + public abstract boolean check(List<O> nodes); - /** - * Transform the tree - * @param nodes - List of nodes declared in transform ($1 = nodes[0], - * etc.) This call must destruct any nodes that are being removed as part - * of the transform and remove them from the nodes vector and construct - * any that are being created as part of the transform and add them at the - * appropriate point to the nodes vector. - */ - public abstract void transform(List<O> nodes); + /** + * Transform the tree + * @param nodes - List of nodes declared in transform ($1 = nodes[0], + * etc.) This call must destruct any nodes that are being removed as part + * of the transform and remove them from the nodes vector and construct + * any that are being created as part of the transform and add them at the + * appropriate point to the nodes vector. + */ + public abstract void transform(List<O> nodes); }
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=656043&r1=656042&r2=656043&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java Tue May 13 15:52:02 2008 @@ -694,12 +694,12 @@ } } - @Test + @Test public void testQueryFail17(){ buildPlan("a = load 'a' as (url, host, rank);"); buildPlan("b = group a by url; "); try { - LogicalPlan lp = buildPlan("c = foreach b generate group.url;"); + LogicalPlan lp = buildPlan("c = foreach b generate group.url;"); } catch (AssertionFailedError e) { assertTrue(e.getMessage().contains("Exception")); } @@ -713,7 +713,7 @@ //buildPlan("b = group a by 2*3;"); //String query = "d = foreach b generate group;"; buildPlan(query); - buildPlan("e = foreach a generate name, details;"); + buildPlan("e = foreach a generate name, details;"); } @Test @@ -723,15 +723,15 @@ buildPlan("b = group a by details;"); String query = "d = foreach b generate group.age;"; buildPlan(query); - buildPlan("e = foreach a generate name, details;"); - buildPlan("f = LOAD 'myfile' AS (garage: bag{num_tools: integer}, links: bag{websites: chararray}, page: bag{something_stupid: tuple(yeah_double: double)}, coordinates: bag{another_tuple: tuple(ok_float: float, bite_the_array: bytearray), bag_of_unknown: bag{}});"); + buildPlan("e = foreach a generate name, details;"); + buildPlan("f = LOAD 'myfile' AS (garage: bag{num_tools: integer}, links: bag{websites: chararray}, page: bag{something_stupid: tuple(yeah_double: double)}, coordinates: bag{another_tuple: tuple(ok_float: float, bite_the_array: bytearray), bag_of_unknown: bag{}});"); } @Test public void testQueryFail18() { String query = "foreach (load 'myfile' as (col1, col2 : (sub1, sub2), col3 : (bag1))) generate col1 ;"; try { - buildPlan(query); + buildPlan(query); } catch (AssertionFailedError e) { assertTrue(e.getMessage().contains("Exception")); } @@ -744,7 +744,7 @@ String query = "c = cross a,b;"; buildPlan(query); try { - buildPlan("d = order c by name, b::name, height, a::gpa;"); + buildPlan("d = order c by name, b::name, height, a::gpa;"); } catch (AssertionFailedError e) { assertTrue(e.getMessage().contains("Exception")); } @@ -783,7 +783,7 @@ //Just the top level roots and their children //Need a recursive one to travel down the tree - /* + /* for(LogicalOperator op: lp.getRoots()) { System.err.println("Logical Plan Root: " + op.getClass().getName() + " object " + op); @@ -797,7 +797,7 @@ } } } - */ + */ assertTrue(lp != null); return lp; } catch (IOException e) { Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java?rev=656043&r1=656042&r2=656043&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java Tue May 13 15:52:02 2008 @@ -40,68 +40,68 @@ import junit.framework.TestCase; public class TestPOBinCond extends TestCase { - Random r = new Random(); - DataBag bag = BagFactory.getInstance().newDefaultBag(); - final int MAX = 10; - - @Before - @Override - public void setUp() { - for(int i = 0; i < 10; i ++) { - Tuple t = TupleFactory.getInstance().newTuple(); - t.append(r.nextInt(2)); - t.append(0); - t.append(1); - bag.add(t); - } - } - - public void testPOBinCond() throws ExecException, PlanException { - ConstantExpression rt = (ConstantExpression) GenPhyOp.exprConst(); - rt.setValue(1); - rt.setResultType(DataType.INTEGER); - - POProject prj1 = GenPhyOp.exprProject(); - prj1.setColumn(0); - prj1.setResultType(DataType.INTEGER); - - EqualToExpr equal = (EqualToExpr) GenPhyOp.compEqualToExpr(); - equal.setLhs(prj1); - equal.setRhs(rt); - - POProject prjLhs = GenPhyOp.exprProject(); - prjLhs.setResultType(DataType.INTEGER); - prjLhs.setColumn(1); - - POProject prjRhs = GenPhyOp.exprProject(); - prjRhs.setResultType(DataType.INTEGER); - prjRhs.setColumn(2); - - POBinCond op = new POBinCond(new OperatorKey("", r.nextLong()), -1, equal, prjLhs, prjRhs); - op.setResultType(DataType.INTEGER); - - ExprPlan plan = new ExprPlan(); - plan.add(op); - plan.add(prjLhs); - plan.add(prjRhs); - plan.add(equal); - plan.connect(equal, op); - plan.connect(prjLhs, op); - plan.connect(prjRhs, op); - - plan.add(prj1); - plan.add(rt); - plan.connect(prj1, equal); - plan.connect(rt, equal); - - for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { - Tuple t = it.next(); - plan.attachInput(t); - Integer i = (Integer) t.get(0); - assertEquals(1, i | (Integer)op.getNext(i).result); -// System.out.println(t + " " + op.getNext(i).result.toString()); - } - - - } + Random r = new Random(); + DataBag bag = BagFactory.getInstance().newDefaultBag(); + final int MAX = 10; + + @Before + @Override + public void setUp() { + for(int i = 0; i < 10; i ++) { + Tuple t = TupleFactory.getInstance().newTuple(); + t.append(r.nextInt(2)); + t.append(0); + t.append(1); + bag.add(t); + } + } + + public void testPOBinCond() throws ExecException, PlanException { + ConstantExpression rt = (ConstantExpression) GenPhyOp.exprConst(); + rt.setValue(1); + rt.setResultType(DataType.INTEGER); + + POProject prj1 = GenPhyOp.exprProject(); + prj1.setColumn(0); + prj1.setResultType(DataType.INTEGER); + + EqualToExpr equal = (EqualToExpr) GenPhyOp.compEqualToExpr(); + equal.setLhs(prj1); + equal.setRhs(rt); + + POProject prjLhs = GenPhyOp.exprProject(); + prjLhs.setResultType(DataType.INTEGER); + prjLhs.setColumn(1); + + POProject prjRhs = GenPhyOp.exprProject(); + prjRhs.setResultType(DataType.INTEGER); + prjRhs.setColumn(2); + + POBinCond op = new POBinCond(new OperatorKey("", r.nextLong()), -1, equal, prjLhs, prjRhs); + op.setResultType(DataType.INTEGER); + + ExprPlan plan = new ExprPlan(); + plan.add(op); + plan.add(prjLhs); + plan.add(prjRhs); + plan.add(equal); + plan.connect(equal, op); + plan.connect(prjLhs, op); + plan.connect(prjRhs, op); + + plan.add(prj1); + plan.add(rt); + plan.connect(prj1, equal); + plan.connect(rt, equal); + + for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { + Tuple t = it.next(); + plan.attachInput(t); + Integer i = (Integer) t.get(0); + assertEquals(1, i | (Integer)op.getNext(i).result); +// System.out.println(t + " " + op.getNext(i).result.toString()); + } + + + } } Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java?rev=656043&r1=656042&r2=656043&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java Tue May 13 15:52:02 2008 @@ -41,49 +41,49 @@ import org.junit.Test; public class TestPODistinct extends TestCase { - DataBag input = BagFactory.getInstance().newDefaultBag(); - Random r = new Random(); - final int MAX_VALUE = 10; - final int MAX_SAMPLES = 100; + DataBag input = BagFactory.getInstance().newDefaultBag(); + Random r = new Random(); + final int MAX_VALUE = 10; + final int MAX_SAMPLES = 100; - @Before - public void setUp() { - TupleFactory tf = TupleFactory.getInstance(); - for (int i = 0; i < MAX_SAMPLES; i++) { - Tuple t = tf.newTuple(); - t.append(r.nextInt(MAX_VALUE)); - input.add(t); - // System.out.println(t); - } - // System.out.println(); - } + @Before + public void setUp() { + TupleFactory tf = TupleFactory.getInstance(); + for (int i = 0; i < MAX_SAMPLES; i++) { + Tuple t = tf.newTuple(); + t.append(r.nextInt(MAX_VALUE)); + input.add(t); + // System.out.println(t); + } + // System.out.println(); + } - @Test - public void testPODistict() throws ExecException { - PORead read = new PORead(new OperatorKey("", r.nextLong()), input); - List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>(); - inputs.add(read); - PODistinct distinct = new PODistinct(new OperatorKey("", r.nextLong()), - -1, inputs); - Map<Tuple, Integer> output = new HashMap<Tuple, Integer>(); - Tuple t = null; - Result res = distinct.getNext(t); - t = (Tuple) res.result; - while (res.returnStatus != POStatus.STATUS_EOP) { - if (output.containsKey(t)) { - int i = output.get(t); - output.put(t, ++i); - } else { - output.put(t, 1); - } - res = distinct.getNext(t); - t = (Tuple) res.result; - } - for (Map.Entry<Tuple, Integer> e : output.entrySet()) { - int i = e.getValue(); - // System.out.println(e.getKey()); - assertEquals(1, i); - } - } + @Test + public void testPODistict() throws ExecException { + PORead read = new PORead(new OperatorKey("", r.nextLong()), input); + List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>(); + inputs.add(read); + PODistinct distinct = new PODistinct(new OperatorKey("", r.nextLong()), + -1, inputs); + Map<Tuple, Integer> output = new HashMap<Tuple, Integer>(); + Tuple t = null; + Result res = distinct.getNext(t); + t = (Tuple) res.result; + while (res.returnStatus != POStatus.STATUS_EOP) { + if (output.containsKey(t)) { + int i = output.get(t); + output.put(t, ++i); + } else { + output.put(t, 1); + } + res = distinct.getNext(t); + t = (Tuple) res.result; + } + for (Map.Entry<Tuple, Integer> e : output.entrySet()) { + int i = e.getValue(); + // System.out.println(e.getKey()); + assertEquals(1, i); + } + } } Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPONegative.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPONegative.java?rev=656043&r1=656042&r2=656043&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestPONegative.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestPONegative.java Tue May 13 15:52:02 2008 @@ -36,113 +36,113 @@ import junit.framework.TestCase; public class TestPONegative extends TestCase { - - DataBag bag = BagFactory.getInstance().newDefaultBag(); - Random r = new Random(); - TupleFactory tf = TupleFactory.getInstance(); - final int MAX = 10; - - public void testPONegInt () throws PlanException, ExecException { - for(int i = 0; i < MAX; i++) { - Tuple t = tf.newTuple(); - t.append(r.nextInt()); - bag.add(t); - } - - POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0); - prj.setResultType(DataType.INTEGER); - PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj); - pn.setResultType(DataType.INTEGER); - - ExprPlan plan = new ExprPlan(); - plan.add(prj); plan.add(pn); - plan.connect(prj, pn); - - for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { - Tuple t = it.next(); - plan.attachInput(t); - Integer expected = -(Integer)t.get(0); - int output = (Integer) pn.getNext(expected).result; - assertEquals(expected.intValue(), output); - } - - } - - public void testPONegLong () throws PlanException, ExecException { - for(int i = 0; i < MAX; i++) { - Tuple t = tf.newTuple(); - t.append(r.nextLong()); - bag.add(t); - } - - POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0); - prj.setResultType(DataType.LONG); - PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj); - pn.setResultType(DataType.LONG); - - ExprPlan plan = new ExprPlan(); - plan.add(prj); plan.add(pn); - plan.connect(prj, pn); - - for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { - Tuple t = it.next(); - plan.attachInput(t); - Long expected = -(Long)t.get(0); - long output = (Long) pn.getNext(expected).result; - assertEquals(expected.longValue(), output); - } - - } - - public void testPONegDouble() throws PlanException, ExecException { - for(int i = 0; i < MAX; i++) { - Tuple t = tf.newTuple(); - t.append(r.nextDouble()); - bag.add(t); - } - - POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0); - prj.setResultType(DataType.DOUBLE); - PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj); - pn.setResultType(DataType.DOUBLE); - - ExprPlan plan = new ExprPlan(); - plan.add(prj); plan.add(pn); - plan.connect(prj, pn); - - for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { - Tuple t = it.next(); - plan.attachInput(t); - Double expected = -(Double)t.get(0); - double output = (Double) pn.getNext(expected).result; - assertEquals(expected.doubleValue(), output); - } - - } - - public void testPONegFloat() throws PlanException, ExecException { - for(int i = 0; i < MAX; i++) { - Tuple t = tf.newTuple(); - t.append(r.nextFloat()); - bag.add(t); - } - - POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0); - prj.setResultType(DataType.FLOAT); - PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj); - pn.setResultType(DataType.FLOAT); - - ExprPlan plan = new ExprPlan(); - plan.add(prj); plan.add(pn); - plan.connect(prj, pn); - - for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { - Tuple t = it.next(); - plan.attachInput(t); - Float expected = -(Float)t.get(0); - float output = (Float) pn.getNext(expected).result; - assertEquals(expected.floatValue(), output); - } - - } + + DataBag bag = BagFactory.getInstance().newDefaultBag(); + Random r = new Random(); + TupleFactory tf = TupleFactory.getInstance(); + final int MAX = 10; + + public void testPONegInt () throws PlanException, ExecException { + for(int i = 0; i < MAX; i++) { + Tuple t = tf.newTuple(); + t.append(r.nextInt()); + bag.add(t); + } + + POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0); + prj.setResultType(DataType.INTEGER); + PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj); + pn.setResultType(DataType.INTEGER); + + ExprPlan plan = new ExprPlan(); + plan.add(prj); plan.add(pn); + plan.connect(prj, pn); + + for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { + Tuple t = it.next(); + plan.attachInput(t); + Integer expected = -(Integer)t.get(0); + int output = (Integer) pn.getNext(expected).result; + assertEquals(expected.intValue(), output); + } + + } + + public void testPONegLong () throws PlanException, ExecException { + for(int i = 0; i < MAX; i++) { + Tuple t = tf.newTuple(); + t.append(r.nextLong()); + bag.add(t); + } + + POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0); + prj.setResultType(DataType.LONG); + PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj); + pn.setResultType(DataType.LONG); + + ExprPlan plan = new ExprPlan(); + plan.add(prj); plan.add(pn); + plan.connect(prj, pn); + + for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { + Tuple t = it.next(); + plan.attachInput(t); + Long expected = -(Long)t.get(0); + long output = (Long) pn.getNext(expected).result; + assertEquals(expected.longValue(), output); + } + + } + + public void testPONegDouble() throws PlanException, ExecException { + for(int i = 0; i < MAX; i++) { + Tuple t = tf.newTuple(); + t.append(r.nextDouble()); + bag.add(t); + } + + POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0); + prj.setResultType(DataType.DOUBLE); + PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj); + pn.setResultType(DataType.DOUBLE); + + ExprPlan plan = new ExprPlan(); + plan.add(prj); plan.add(pn); + plan.connect(prj, pn); + + for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { + Tuple t = it.next(); + plan.attachInput(t); + Double expected = -(Double)t.get(0); + double output = (Double) pn.getNext(expected).result; + assertEquals(expected.doubleValue(), output); + } + + } + + public void testPONegFloat() throws PlanException, ExecException { + for(int i = 0; i < MAX; i++) { + Tuple t = tf.newTuple(); + t.append(r.nextFloat()); + bag.add(t); + } + + POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0); + prj.setResultType(DataType.FLOAT); + PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj); + pn.setResultType(DataType.FLOAT); + + ExprPlan plan = new ExprPlan(); + plan.add(prj); plan.add(pn); + plan.connect(prj, pn); + + for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) { + Tuple t = it.next(); + plan.attachInput(t); + Float expected = -(Float)t.get(0); + float output = (Float) pn.getNext(expected).result; + assertEquals(expected.floatValue(), output); + } + + } } Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java?rev=656043&r1=656042&r2=656043&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java Tue May 13 15:52:02 2008 @@ -42,183 +42,183 @@ import org.junit.Test; public class TestPOSort extends TestCase { - Random r = new Random(); - int MAX_TUPLES = 10; + Random r = new Random(); + int MAX_TUPLES = 10; - @Test - public void testPOSortAscString() throws ExecException { - DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, - MAX_TUPLES, 100); - List<ExprPlan> sortPlans = new LinkedList<ExprPlan>(); - POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0); - pr1.setResultType(DataType.CHARARRAY); - ExprPlan expPlan = new ExprPlan(); - expPlan.add(pr1); - sortPlans.add(expPlan); - List<Boolean> mAscCols = new LinkedList<Boolean>(); - mAscCols.add(true); - PORead read = new PORead(new OperatorKey("", r.nextLong()), input); - List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>(); - inputs.add(read); - POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs, - sortPlans, mAscCols, null); - Tuple t = null; - Result res1 = sort.getNext(t); - // System.out.println(res1.result); - Result res2 = sort.getNext(t); - while (res2.returnStatus != POStatus.STATUS_EOP) { - Object i1 = ((Tuple) res1.result).get(0); - Object i2 = ((Tuple) res2.result).get(0); - int i = DataType.compare(i1, i2); - // System.out.println(res2.result + " i = " + i); - assertEquals(true, (i <= 0)); - res1 = res2; - res2 = sort.getNext(t); - } - } - - @Test - public void testPOSortDescString() throws ExecException { - DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, - MAX_TUPLES, 100); - List<ExprPlan> sortPlans = new LinkedList<ExprPlan>(); - POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0); - pr1.setResultType(DataType.CHARARRAY); - ExprPlan expPlan = new ExprPlan(); - expPlan.add(pr1); - sortPlans.add(expPlan); - List<Boolean> mAscCols = new LinkedList<Boolean>(); - mAscCols.add(false); - PORead read = new PORead(new OperatorKey("", r.nextLong()), input); - List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>(); - inputs.add(read); - POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs, - sortPlans, mAscCols, null); - Tuple t = null; - Result res1 = sort.getNext(t); - // System.out.println(res1.result); - Result res2 = sort.getNext(t); - while (res2.returnStatus != POStatus.STATUS_EOP) { - Object i1 = ((Tuple) res1.result).get(0); - Object i2 = ((Tuple) res2.result).get(0); - int i = DataType.compare(i1, i2); - // System.out.println(res2.result + " i = " + i); - assertEquals(true, (i >= 0)); - res1 = res2; - res2 = sort.getNext(t); - } - } - - @Test - public void testPOSortAsc() throws ExecException { - DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, - MAX_TUPLES, 100); - List<ExprPlan> sortPlans = new LinkedList<ExprPlan>(); - POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1); - pr1.setResultType(DataType.INTEGER); - ExprPlan expPlan = new ExprPlan(); - expPlan.add(pr1); - sortPlans.add(expPlan); - List<Boolean> mAscCols = new LinkedList<Boolean>(); - mAscCols.add(true); - PORead read = new PORead(new OperatorKey("", r.nextLong()), input); - List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>(); - inputs.add(read); - POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs, - sortPlans, mAscCols, null); - Tuple t = null; - Result res1 = sort.getNext(t); - // System.out.println(res1.result); - Result res2 = sort.getNext(t); - while (res2.returnStatus != POStatus.STATUS_EOP) { - Object i1 = ((Tuple) res1.result).get(1); - Object i2 = ((Tuple) res2.result).get(1); - int i = DataType.compare(i1, i2); - assertEquals(true, (i <= 0)); - // System.out.println(res2.result); - res1 = res2; - res2 = sort.getNext(t); - } - } - - @Test - public void testPOSortDesc() throws ExecException { - DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, - MAX_TUPLES, 100); - List<ExprPlan> sortPlans = new LinkedList<ExprPlan>(); - POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1); - pr1.setResultType(DataType.INTEGER); - ExprPlan expPlan = new ExprPlan(); - expPlan.add(pr1); - sortPlans.add(expPlan); - List<Boolean> mAscCols = new LinkedList<Boolean>(); - mAscCols.add(false); - PORead read = new PORead(new OperatorKey("", r.nextLong()), input); - List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>(); - inputs.add(read); - POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs, - sortPlans, mAscCols, null); - Tuple t = null; - Result res1 = sort.getNext(t); - // System.out.println(res1.result); - Result res2 = sort.getNext(t); - while (res2.returnStatus != POStatus.STATUS_EOP) { - Object i1 = ((Tuple) res1.result).get(1); - Object i2 = ((Tuple) res2.result).get(1); - int i = DataType.compare(i1, i2); - assertEquals(true, (i >= 0)); - // System.out.println(res2.result); - res1 = res2; - res2 = sort.getNext(t); - } - } - - @Test - public void testPOSortUDF() throws ExecException { - DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, - MAX_TUPLES, 100); - PORead read = new PORead(new OperatorKey("", r.nextLong()), input); - List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>(); - inputs.add(read); - String funcName = WeirdComparator.class.getName() + "()"; - /*POUserFunc comparator = new POUserFunc( - new OperatorKey("", r.nextLong()), -1, inputs, funcName);*/ - POUserFunc comparator = new POUserComparisonFunc( - new OperatorKey("", r.nextLong()), -1, null, funcName); - POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs, - null, null, comparator); - Tuple t = null; - Result res1 = sort.getNext(t); - // System.out.println(res1.result); - Result res2 = sort.getNext(t); - while (res2.returnStatus != POStatus.STATUS_EOP) { - int i1 = (Integer) ((Tuple) res1.result).get(1); - int i2 = (Integer) ((Tuple) res2.result).get(1); - int i = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50); - assertEquals(true, (i <= 0)); - System.out.println(i + " : " + res2.result); - res1 = res2; - res2 = sort.getNext(t); - } - } - - // sorts values in ascending order of their distance from 50 - public static class WeirdComparator extends ComparisonFunc { - - @Override - public int compare(Tuple t1, Tuple t2) { - // TODO Auto-generated method stub - int result = 0; - try { - int i1 = (Integer) t1.get(1); - int i2 = (Integer) t2.get(1); - result = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50); - } catch (ExecException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - return result; - } + @Test + public void testPOSortAscString() throws ExecException { + DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, + MAX_TUPLES, 100); + List<ExprPlan> sortPlans = new LinkedList<ExprPlan>(); + POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0); + pr1.setResultType(DataType.CHARARRAY); + ExprPlan expPlan = new ExprPlan(); + expPlan.add(pr1); + sortPlans.add(expPlan); + List<Boolean> mAscCols = new LinkedList<Boolean>(); + mAscCols.add(true); + PORead read = new PORead(new OperatorKey("", r.nextLong()), input); + List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>(); + inputs.add(read); + POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs, + sortPlans, mAscCols, null); + Tuple t = null; + Result res1 = sort.getNext(t); + // System.out.println(res1.result); + Result res2 = sort.getNext(t); + while (res2.returnStatus != POStatus.STATUS_EOP) { + Object i1 = ((Tuple) res1.result).get(0); + Object i2 = ((Tuple) res2.result).get(0); + int i = DataType.compare(i1, i2); + // System.out.println(res2.result + " i = " + i); + assertEquals(true, (i <= 0)); + res1 = res2; + res2 = sort.getNext(t); + } + } + + @Test + public void testPOSortDescString() throws ExecException { + DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, + MAX_TUPLES, 100); + List<ExprPlan> sortPlans = new LinkedList<ExprPlan>(); + POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0); + pr1.setResultType(DataType.CHARARRAY); + ExprPlan expPlan = new ExprPlan(); + expPlan.add(pr1); + sortPlans.add(expPlan); + List<Boolean> mAscCols = new LinkedList<Boolean>(); + mAscCols.add(false); + PORead read = new PORead(new OperatorKey("", r.nextLong()), input); + List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>(); + inputs.add(read); + POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs, + sortPlans, mAscCols, null); + Tuple t = null; + Result res1 = sort.getNext(t); + // System.out.println(res1.result); + Result res2 = sort.getNext(t); + while (res2.returnStatus != POStatus.STATUS_EOP) { + Object i1 = ((Tuple) res1.result).get(0); + Object i2 = ((Tuple) res2.result).get(0); + int i = DataType.compare(i1, i2); + // System.out.println(res2.result + " i = " + i); + assertEquals(true, (i >= 0)); + res1 = res2; + res2 = sort.getNext(t); + } + } + + @Test + public void testPOSortAsc() throws ExecException { + DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, + MAX_TUPLES, 100); + List<ExprPlan> sortPlans = new LinkedList<ExprPlan>(); + POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1); + pr1.setResultType(DataType.INTEGER); + ExprPlan expPlan = new ExprPlan(); + expPlan.add(pr1); + sortPlans.add(expPlan); + List<Boolean> mAscCols = new LinkedList<Boolean>(); + mAscCols.add(true); + PORead read = new PORead(new OperatorKey("", r.nextLong()), input); + List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>(); + inputs.add(read); + POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs, + sortPlans, mAscCols, null); + Tuple t = null; + Result res1 = sort.getNext(t); + // System.out.println(res1.result); + Result res2 = sort.getNext(t); + while (res2.returnStatus != POStatus.STATUS_EOP) { + Object i1 = ((Tuple) res1.result).get(1); + Object i2 = ((Tuple) res2.result).get(1); + int i = DataType.compare(i1, i2); + assertEquals(true, (i <= 0)); + // System.out.println(res2.result); + res1 = res2; + res2 = sort.getNext(t); + } + } + + @Test + public void testPOSortDesc() throws ExecException { + DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, + MAX_TUPLES, 100); + List<ExprPlan> sortPlans = new LinkedList<ExprPlan>(); + POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1); + pr1.setResultType(DataType.INTEGER); + ExprPlan expPlan = new ExprPlan(); + expPlan.add(pr1); + sortPlans.add(expPlan); + List<Boolean> mAscCols = new LinkedList<Boolean>(); + mAscCols.add(false); + PORead read = new PORead(new OperatorKey("", r.nextLong()), input); + List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>(); + inputs.add(read); + POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs, + sortPlans, mAscCols, null); + Tuple t = null; + Result res1 = sort.getNext(t); + // System.out.println(res1.result); + Result res2 = sort.getNext(t); + while (res2.returnStatus != POStatus.STATUS_EOP) { + Object i1 = ((Tuple) res1.result).get(1); + Object i2 = ((Tuple) res2.result).get(1); + int i = DataType.compare(i1, i2); + assertEquals(true, (i >= 0)); + // System.out.println(res2.result); + res1 = res2; + res2 = sort.getNext(t); + } + } + + @Test + public void testPOSortUDF() throws ExecException { + DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, + MAX_TUPLES, 100); + PORead read = new PORead(new OperatorKey("", r.nextLong()), input); + List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>(); + inputs.add(read); + String funcName = WeirdComparator.class.getName() + "()"; + /*POUserFunc comparator = new POUserFunc( + new OperatorKey("", r.nextLong()), -1, inputs, funcName);*/ + POUserFunc comparator = new POUserComparisonFunc( + new OperatorKey("", r.nextLong()), -1, null, funcName); + POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs, + null, null, comparator); + Tuple t = null; + Result res1 = sort.getNext(t); + // System.out.println(res1.result); + Result res2 = sort.getNext(t); + while (res2.returnStatus != POStatus.STATUS_EOP) { + int i1 = (Integer) ((Tuple) res1.result).get(1); + int i2 = (Integer) ((Tuple) res2.result).get(1); + int i = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50); + assertEquals(true, (i <= 0)); + System.out.println(i + " : " + res2.result); + res1 = res2; + res2 = sort.getNext(t); + } + } + + // sorts values in ascending order of their distance from 50 + public static class WeirdComparator extends ComparisonFunc { + + @Override + public int compare(Tuple t1, Tuple t2) { + // TODO Auto-generated method stub + int result = 0; + try { + int i1 = (Integer) t1.get(1); + int i2 = (Integer) t2.get(1); + result = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50); + } catch (ExecException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return result; + } - } + } } Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java?rev=656043&r1=656042&r2=656043&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java Tue May 13 15:52:02 2008 @@ -47,286 +47,286 @@ import org.junit.Test; public class TestPOUserFunc extends TestCase { - Random r = new Random(); - int MAX_TUPLES = 10; + Random r = new Random(); + int MAX_TUPLES = 10; - public static class ARITY extends EvalFunc<Integer> { + public static class ARITY extends EvalFunc<Integer> { - @Override - public Integer exec(Tuple input) throws IOException { - return new Integer(input.size()); - } - - @Override - public Schema outputSchema(Schema input) { - // TODO FIX - // return new AtomSchema("arity"); - return null; - } - } - - public static class WeirdComparator extends ComparisonFunc { - - @Override - public int compare(Tuple t1, Tuple t2) { - // TODO Auto-generated method stub - Object o1 = null; - Object o2 = null; - try { - o1 = t1.get(2); - o2 = t2.get(2); - } catch (ExecException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - int i1 = (Integer) o1 - 2; - int i2 = (Integer) o2 - 2; - - return (int) (i1 * i1 - i2 * i2); - } - - } - - /** - * Generates the average of the values of the first field of a tuple. This - * class is Algebraic in implemenation, so if possible the execution will be - * split into a local and global application - */ - public static class AVG extends EvalFunc<Double> implements Algebraic { - - private static TupleFactory mTupleFactory = TupleFactory.getInstance(); - - @Override - public Double exec(Tuple input) throws IOException { - double sum = 0; - double count = 0; - - try { - sum = sum(input); - count = count(input); - } catch (ExecException e) { - e.printStackTrace(); - } - - double avg = 0; - if (count > 0) - avg = sum / count; - - return new Double(avg); - } - - public String getInitial() { - return Initial.class.getName(); - } - - public String getIntermed() { - return Intermed.class.getName(); - } - - public String getFinal() { - return Final.class.getName(); - } - - static public class Initial extends EvalFunc<Tuple> { - @Override - public Tuple exec(Tuple input) throws IOException { - try { - Tuple t = mTupleFactory.newTuple(2); - t.set(0, new Double(sum(input))); - t.set(1, new Long(count(input))); - return t; - } catch (ExecException t) { - throw new RuntimeException(t.getMessage() + ": " + input); - } - } - } - - static public class Intermed extends EvalFunc<Tuple> { - @Override - public Tuple exec(Tuple input) throws IOException { - DataBag b = null; - Tuple t = null; - try { - b = (DataBag) input.get(0); - t = combine(b); - } catch (ExecException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - return t; - } - } - - static public class Final extends EvalFunc<Double> { - @Override - public Double exec(Tuple input) throws IOException { - double sum = 0; - double count = 0; - try { - DataBag b = (DataBag) input.get(0); - Tuple combined = combine(b); - - sum = (Double) combined.get(0); - count = (Long) combined.get(1); - } catch (ExecException e) { - e.printStackTrace(); - } - - double avg = 0; - if (count > 0) { - avg = sum / count; - } - return new Double(avg); - } - } - - static protected Tuple combine(DataBag values) throws ExecException { - double sum = 0; - long count = 0; - - Tuple output = mTupleFactory.newTuple(2); - - for (Iterator<Tuple> it = values.iterator(); it.hasNext();) { - Tuple t = it.next(); - sum += (Double) t.get(0); - count += (Long) t.get(1); - } - - output.set(0, new Double(sum)); - output.set(1, new Long(count)); - return output; - } - - static protected long count(Tuple input) throws ExecException { - DataBag values = (DataBag) input.get(0); - return values.size(); - } - - static protected double sum(Tuple input) throws ExecException { - DataBag values = (DataBag) input.get(0); - - double sum = 0; - for (Iterator<Tuple> it = values.iterator(); it.hasNext();) { - Tuple t = it.next(); - Double d = DataType.toDouble(t.get(0)); - if (d == null) - continue; - sum += d; - } - - return sum; - } - - @Override - public Schema outputSchema(Schema input) { - // TODO FIX - // return new AtomSchema("average"); - return null; - } - - } - - @Test - public void testUserFuncArity() throws ExecException { - DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, - MAX_TUPLES, 100); - String funcSpec = ARITY.class.getName() + "()"; - PORead read = new PORead(new OperatorKey("", r.nextLong()), input); - List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>(); - inputs.add(read); - POUserFunc userFunc = new POUserFunc(new OperatorKey("", r.nextLong()), - -1, inputs, funcSpec); - Result res = new Result(); - Integer i = null; - res = userFunc.getNext(i); - while (res.returnStatus != POStatus.STATUS_EOP) { - // System.out.println(res.result); - int result = (Integer) res.result; - assertEquals(2, result); - res = userFunc.getNext(i); - } - } - - @Test - public void testUDFCompare() throws ExecException { - DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, 2, - 100); - String funcSpec = WeirdComparator.class.getName() + "()"; - POUserFunc userFunc = new POUserComparisonFunc(new OperatorKey("", r.nextLong()), - -1, null, funcSpec); - Iterator<Tuple> it = input.iterator(); - Tuple t1 = it.next(); - Tuple t2 = it.next(); - t1.append(2); - t2.append(3); - ((POUserComparisonFunc)userFunc).attachInput(t1, t2); - Integer i = null; - // System.out.println(t1 + " " + t2); - int result = (Integer) (userFunc.getNext(i).result); - assertEquals(-1, result); - } - - @Test - public void testAlgebraicAVG() throws IOException, ExecException { - int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; - byte INIT = 0; - byte INTERMED = 1; - byte FINAL = 2; - Tuple tup1 = Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), - input); - Tuple tup2 = Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), - input); - // System.out.println("Input = " + tup1); - String funcSpec = AVG.class.getName() + "()"; - - POUserFunc po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, - null, funcSpec); - - TupleFactory tf = TupleFactory.getInstance(); - - po.setAlgebraicFunction(INIT); - po.attachInput(tup1); - Tuple t = null; - Result res = po.getNext(t); - Tuple outputInitial1 = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result - : null; - Tuple outputInitial2 = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result - : null; - System.out.println(outputInitial1 + " " + outputInitial2); - assertEquals(outputInitial1, outputInitial2); - double sum = (Double) outputInitial1.get(0); - long count = (Long) outputInitial1.get(1); - assertEquals(55.0, sum); - assertEquals(10, count); - DataBag bag = BagFactory.getInstance().newDefaultBag(); - bag.add(outputInitial1); - bag.add(outputInitial2); - Tuple outputInitial = tf.newTuple(); - outputInitial.append(bag); - // Tuple outputIntermed = intermed.exec(outputInitial); - po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, null, - funcSpec); - po.setAlgebraicFunction(INTERMED); - po.attachInput(outputInitial); - res = po.getNext(t); - Tuple outputIntermed = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result - : null; - - sum = (Double) outputIntermed.get(0); - count = (Long) outputIntermed.get(1); - assertEquals(110.0, sum); - assertEquals(20, count); - System.out.println(outputIntermed); - po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, null, - funcSpec); - po.setAlgebraicFunction(FINAL); - po.attachInput(outputInitial); - res = po.getNext(t); - Double output = (res.returnStatus == POStatus.STATUS_OK) ? (Double) res.result - : null; - // Double output = fin.exec(outputInitial); - assertEquals(5.5, output); - // System.out.println("output = " + output); + @Override + public Integer exec(Tuple input) throws IOException { + return new Integer(input.size()); + } + + @Override + public Schema outputSchema(Schema input) { + // TODO FIX + // return new AtomSchema("arity"); + return null; + } + } + + public static class WeirdComparator extends ComparisonFunc { + + @Override + public int compare(Tuple t1, Tuple t2) { + // TODO Auto-generated method stub + Object o1 = null; + Object o2 = null; + try { + o1 = t1.get(2); + o2 = t2.get(2); + } catch (ExecException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + int i1 = (Integer) o1 - 2; + int i2 = (Integer) o2 - 2; + + return (int) (i1 * i1 - i2 * i2); + } + + } + + /** + * Generates the average of the values of the first field of a tuple. This + * class is Algebraic in implemenation, so if possible the execution will be + * split into a local and global application + */ + public static class AVG extends EvalFunc<Double> implements Algebraic { + + private static TupleFactory mTupleFactory = TupleFactory.getInstance(); + + @Override + public Double exec(Tuple input) throws IOException { + double sum = 0; + double count = 0; + + try { + sum = sum(input); + count = count(input); + } catch (ExecException e) { + e.printStackTrace(); + } + + double avg = 0; + if (count > 0) + avg = sum / count; + + return new Double(avg); + } + + public String getInitial() { + return Initial.class.getName(); + } + + public String getIntermed() { + return Intermed.class.getName(); + } + + public String getFinal() { + return Final.class.getName(); + } + + static public class Initial extends EvalFunc<Tuple> { + @Override + public Tuple exec(Tuple input) throws IOException { + try { + Tuple t = mTupleFactory.newTuple(2); + t.set(0, new Double(sum(input))); + t.set(1, new Long(count(input))); + return t; + } catch (ExecException t) { + throw new RuntimeException(t.getMessage() + ": " + input); + } + } + } + + static public class Intermed extends EvalFunc<Tuple> { + @Override + public Tuple exec(Tuple input) throws IOException { + DataBag b = null; + Tuple t = null; + try { + b = (DataBag) input.get(0); + t = combine(b); + } catch (ExecException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return t; + } + } + + static public class Final extends EvalFunc<Double> { + @Override + public Double exec(Tuple input) throws IOException { + double sum = 0; + double count = 0; + try { + DataBag b = (DataBag) input.get(0); + Tuple combined = combine(b); + + sum = (Double) combined.get(0); + count = (Long) combined.get(1); + } catch (ExecException e) { + e.printStackTrace(); + } + + double avg = 0; + if (count > 0) { + avg = sum / count; + } + return new Double(avg); + } + } + + static protected Tuple combine(DataBag values) throws ExecException { + double sum = 0; + long count = 0; + + Tuple output = mTupleFactory.newTuple(2); + + for (Iterator<Tuple> it = values.iterator(); it.hasNext();) { + Tuple t = it.next(); + sum += (Double) t.get(0); + count += (Long) t.get(1); + } + + output.set(0, new Double(sum)); + output.set(1, new Long(count)); + return output; + } + + static protected long count(Tuple input) throws ExecException { + DataBag values = (DataBag) input.get(0); + return values.size(); + } + + static protected double sum(Tuple input) throws ExecException { + DataBag values = (DataBag) input.get(0); + + double sum = 0; + for (Iterator<Tuple> it = values.iterator(); it.hasNext();) { + Tuple t = it.next(); + Double d = DataType.toDouble(t.get(0)); + if (d == null) + continue; + sum += d; + } + + return sum; + } + + @Override + public Schema outputSchema(Schema input) { + // TODO FIX + // return new AtomSchema("average"); + return null; + } + + } + + @Test + public void testUserFuncArity() throws ExecException { + DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, + MAX_TUPLES, 100); + String funcSpec = ARITY.class.getName() + "()"; + PORead read = new PORead(new OperatorKey("", r.nextLong()), input); + List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>(); + inputs.add(read); + POUserFunc userFunc = new POUserFunc(new OperatorKey("", r.nextLong()), + -1, inputs, funcSpec); + Result res = new Result(); + Integer i = null; + res = userFunc.getNext(i); + while (res.returnStatus != POStatus.STATUS_EOP) { + // System.out.println(res.result); + int result = (Integer) res.result; + assertEquals(2, result); + res = userFunc.getNext(i); + } + } + + @Test + public void testUDFCompare() throws ExecException { + DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, 2, + 100); + String funcSpec = WeirdComparator.class.getName() + "()"; + POUserFunc userFunc = new POUserComparisonFunc(new OperatorKey("", r.nextLong()), + -1, null, funcSpec); + Iterator<Tuple> it = input.iterator(); + Tuple t1 = it.next(); + Tuple t2 = it.next(); + t1.append(2); + t2.append(3); + ((POUserComparisonFunc)userFunc).attachInput(t1, t2); + Integer i = null; + // System.out.println(t1 + " " + t2); + int result = (Integer) (userFunc.getNext(i).result); + assertEquals(-1, result); + } + + @Test + public void testAlgebraicAVG() throws IOException, ExecException { + int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + byte INIT = 0; + byte INTERMED = 1; + byte FINAL = 2; + Tuple tup1 = Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), + input); + Tuple tup2 = Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), + input); + // System.out.println("Input = " + tup1); + String funcSpec = AVG.class.getName() + "()"; + + POUserFunc po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, + null, funcSpec); + + TupleFactory tf = TupleFactory.getInstance(); + + po.setAlgebraicFunction(INIT); + po.attachInput(tup1); + Tuple t = null; + Result res = po.getNext(t); + Tuple outputInitial1 = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result + : null; + Tuple outputInitial2 = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result + : null; + System.out.println(outputInitial1 + " " + outputInitial2); + assertEquals(outputInitial1, outputInitial2); + double sum = (Double) outputInitial1.get(0); + long count = (Long) outputInitial1.get(1); + assertEquals(55.0, sum); + assertEquals(10, count); + DataBag bag = BagFactory.getInstance().newDefaultBag(); + bag.add(outputInitial1); + bag.add(outputInitial2); + Tuple outputInitial = tf.newTuple(); + outputInitial.append(bag); + // Tuple outputIntermed = intermed.exec(outputInitial); + po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, null, + funcSpec); + po.setAlgebraicFunction(INTERMED); + po.attachInput(outputInitial); + res = po.getNext(t); + Tuple outputIntermed = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result + : null; + + sum = (Double) outputIntermed.get(0); + count = (Long) outputIntermed.get(1); + assertEquals(110.0, sum); + assertEquals(20, count); + System.out.println(outputIntermed); + po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, null, + funcSpec); + po.setAlgebraicFunction(FINAL); + po.attachInput(outputInitial); + res = po.getNext(t); + Double output = (res.returnStatus == POStatus.STATUS_OK) ? (Double) res.result + : null; + // Double output = fin.exec(outputInitial); + assertEquals(5.5, output); + // System.out.println("output = " + output); - } + } }