Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java?rev=656043&r1=656042&r2=656043&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java
 Tue May 13 15:52:02 2008
@@ -38,26 +38,26 @@
         super(plan, walker);
     }
 
-       /**
-        * check if the transform should be done.  If this is being called then
-        * the pattern matches, but there may be other criteria that must be met
-        * as well.
-        * @param nodes - List of nodes declared in transform ($1 = nodes[0],
-        * etc.)  Remember that somes entries in node[] may be NULL since they 
may
-        * not be created until after the transform.
-        * @returns - true if the transform should be done.
-        */
-       public abstract boolean check(List<O> nodes);
+    /**
+     * check if the transform should be done.  If this is being called then
+     * the pattern matches, but there may be other criteria that must be met
+     * as well.
+     * @param nodes - List of nodes declared in transform ($1 = nodes[0],
+     * etc.)  Remember that somes entries in node[] may be NULL since they may
+     * not be created until after the transform.
+     * @returns - true if the transform should be done.
+     */
+    public abstract boolean check(List<O> nodes);
 
-       /**
-        * Transform the tree
-        * @param nodes - List of nodes declared in transform ($1 = nodes[0],
-        * etc.)  This call must destruct any nodes that are being removed as 
part
-        * of the transform and remove them from the nodes vector and construct
-        * any that are being created as part of the transform and add them at 
the
-        * appropriate point to the nodes vector.
-        */
-       public abstract void transform(List<O> nodes);
+    /**
+     * Transform the tree
+     * @param nodes - List of nodes declared in transform ($1 = nodes[0],
+     * etc.)  This call must destruct any nodes that are being removed as part
+     * of the transform and remove them from the nodes vector and construct
+     * any that are being created as part of the transform and add them at the
+     * appropriate point to the nodes vector.
+     */
+    public abstract void transform(List<O> nodes);
 
 }
 

Modified: 
incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=656043&r1=656042&r2=656043&view=diff
==============================================================================
--- 
incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
 (original)
+++ 
incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
 Tue May 13 15:52:02 2008
@@ -694,12 +694,12 @@
         }
     } 
 
-       @Test
+    @Test
     public void testQueryFail17(){
         buildPlan("a = load 'a' as (url, host, rank);");
         buildPlan("b = group a by url; ");
         try {
-               LogicalPlan lp = buildPlan("c = foreach b generate group.url;");
+            LogicalPlan lp = buildPlan("c = foreach b generate group.url;");
         } catch (AssertionFailedError e) {
             assertTrue(e.getMessage().contains("Exception"));
         }
@@ -713,7 +713,7 @@
         //buildPlan("b = group a by 2*3;");
         //String query = "d = foreach b generate group;";
         buildPlan(query);
-               buildPlan("e = foreach a generate name, details;");
+        buildPlan("e = foreach a generate name, details;");
     }
 
     @Test
@@ -723,15 +723,15 @@
         buildPlan("b = group a by details;");
         String query = "d = foreach b generate group.age;";
         buildPlan(query);
-               buildPlan("e = foreach a generate name, details;");
-               buildPlan("f = LOAD 'myfile' AS (garage: bag{num_tools: 
integer}, links: bag{websites: chararray}, page: bag{something_stupid: 
tuple(yeah_double: double)}, coordinates: bag{another_tuple: tuple(ok_float: 
float, bite_the_array: bytearray), bag_of_unknown: bag{}});");
+        buildPlan("e = foreach a generate name, details;");
+        buildPlan("f = LOAD 'myfile' AS (garage: bag{num_tools: integer}, 
links: bag{websites: chararray}, page: bag{something_stupid: tuple(yeah_double: 
double)}, coordinates: bag{another_tuple: tuple(ok_float: float, 
bite_the_array: bytearray), bag_of_unknown: bag{}});");
     }
 
     @Test
     public void testQueryFail18() {
         String query = "foreach (load 'myfile' as (col1, col2 : (sub1, sub2), 
col3 : (bag1))) generate col1 ;";
         try {
-               buildPlan(query);
+            buildPlan(query);
         } catch (AssertionFailedError e) {
             assertTrue(e.getMessage().contains("Exception"));
         }
@@ -744,7 +744,7 @@
         String query = "c = cross a,b;";
         buildPlan(query);
         try {
-               buildPlan("d = order c by name, b::name, height, a::gpa;");
+            buildPlan("d = order c by name, b::name, height, a::gpa;");
         } catch (AssertionFailedError e) {
             assertTrue(e.getMessage().contains("Exception"));
         }
@@ -783,7 +783,7 @@
             
             //Just the top level roots and their children
             //Need a recursive one to travel down the tree
-                       /*
+            /*
             for(LogicalOperator op: lp.getRoots()) {
                 System.err.println("Logical Plan Root: " + 
op.getClass().getName() + " object " + op);    
 
@@ -797,7 +797,7 @@
                     }
                 }
             }
-                       */
+            */
             assertTrue(lp != null);
             return lp;
         } catch (IOException e) {

Modified: 
incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java?rev=656043&r1=656042&r2=656043&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java 
(original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java 
Tue May 13 15:52:02 2008
@@ -40,68 +40,68 @@
 import junit.framework.TestCase;
 
 public class TestPOBinCond extends TestCase {
-       Random r = new Random();
-       DataBag bag = BagFactory.getInstance().newDefaultBag();
-       final int MAX = 10;
-       
-       @Before
-       @Override
-       public void setUp() {
-               for(int i = 0; i < 10; i ++) {
-                       Tuple t = TupleFactory.getInstance().newTuple();
-                       t.append(r.nextInt(2));
-                       t.append(0);
-                       t.append(1);
-                       bag.add(t);
-               }
-       }
-       
-       public void testPOBinCond() throws ExecException, PlanException {
-               ConstantExpression rt = (ConstantExpression) 
GenPhyOp.exprConst();
-               rt.setValue(1);
-               rt.setResultType(DataType.INTEGER);
-               
-               POProject prj1 = GenPhyOp.exprProject();
-               prj1.setColumn(0);
-               prj1.setResultType(DataType.INTEGER);
-               
-               EqualToExpr equal = (EqualToExpr) GenPhyOp.compEqualToExpr();
-               equal.setLhs(prj1);
-               equal.setRhs(rt);
-               
-               POProject prjLhs = GenPhyOp.exprProject();
-               prjLhs.setResultType(DataType.INTEGER);
-               prjLhs.setColumn(1);
-               
-               POProject prjRhs = GenPhyOp.exprProject();
-               prjRhs.setResultType(DataType.INTEGER);
-               prjRhs.setColumn(2);
-               
-               POBinCond op = new POBinCond(new OperatorKey("", r.nextLong()), 
-1, equal, prjLhs, prjRhs);
-               op.setResultType(DataType.INTEGER);
-               
-               ExprPlan plan = new ExprPlan();
-               plan.add(op);
-               plan.add(prjLhs);
-               plan.add(prjRhs);
-               plan.add(equal);
-               plan.connect(equal, op);
-               plan.connect(prjLhs, op);
-               plan.connect(prjRhs, op);
-               
-               plan.add(prj1);
-               plan.add(rt);
-               plan.connect(prj1, equal);
-               plan.connect(rt, equal);
-               
-               for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
-                       Tuple t = it.next();
-                       plan.attachInput(t);
-                       Integer i = (Integer) t.get(0);
-                       assertEquals(1, i | (Integer)op.getNext(i).result);
-//                     System.out.println(t + " " + 
op.getNext(i).result.toString());
-               }
-               
-               
-       }
+    Random r = new Random();
+    DataBag bag = BagFactory.getInstance().newDefaultBag();
+    final int MAX = 10;
+    
+    @Before
+    @Override
+    public void setUp() {
+        for(int i = 0; i < 10; i ++) {
+            Tuple t = TupleFactory.getInstance().newTuple();
+            t.append(r.nextInt(2));
+            t.append(0);
+            t.append(1);
+            bag.add(t);
+        }
+    }
+    
+    public void testPOBinCond() throws ExecException, PlanException {
+        ConstantExpression rt = (ConstantExpression) GenPhyOp.exprConst();
+        rt.setValue(1);
+        rt.setResultType(DataType.INTEGER);
+        
+        POProject prj1 = GenPhyOp.exprProject();
+        prj1.setColumn(0);
+        prj1.setResultType(DataType.INTEGER);
+        
+        EqualToExpr equal = (EqualToExpr) GenPhyOp.compEqualToExpr();
+        equal.setLhs(prj1);
+        equal.setRhs(rt);
+        
+        POProject prjLhs = GenPhyOp.exprProject();
+        prjLhs.setResultType(DataType.INTEGER);
+        prjLhs.setColumn(1);
+        
+        POProject prjRhs = GenPhyOp.exprProject();
+        prjRhs.setResultType(DataType.INTEGER);
+        prjRhs.setColumn(2);
+        
+        POBinCond op = new POBinCond(new OperatorKey("", r.nextLong()), -1, 
equal, prjLhs, prjRhs);
+        op.setResultType(DataType.INTEGER);
+        
+        ExprPlan plan = new ExprPlan();
+        plan.add(op);
+        plan.add(prjLhs);
+        plan.add(prjRhs);
+        plan.add(equal);
+        plan.connect(equal, op);
+        plan.connect(prjLhs, op);
+        plan.connect(prjRhs, op);
+        
+        plan.add(prj1);
+        plan.add(rt);
+        plan.connect(prj1, equal);
+        plan.connect(rt, equal);
+        
+        for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            plan.attachInput(t);
+            Integer i = (Integer) t.get(0);
+            assertEquals(1, i | (Integer)op.getNext(i).result);
+//            System.out.println(t + " " + op.getNext(i).result.toString());
+        }
+        
+        
+    }
 }

Modified: 
incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java?rev=656043&r1=656042&r2=656043&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java 
(original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java 
Tue May 13 15:52:02 2008
@@ -41,49 +41,49 @@
 import org.junit.Test;
 
 public class TestPODistinct extends TestCase {
-       DataBag input = BagFactory.getInstance().newDefaultBag();
-       Random r = new Random();
-       final int MAX_VALUE = 10;
-       final int MAX_SAMPLES = 100;
+    DataBag input = BagFactory.getInstance().newDefaultBag();
+    Random r = new Random();
+    final int MAX_VALUE = 10;
+    final int MAX_SAMPLES = 100;
 
-       @Before
-       public void setUp() {
-               TupleFactory tf = TupleFactory.getInstance();
-               for (int i = 0; i < MAX_SAMPLES; i++) {
-                       Tuple t = tf.newTuple();
-                       t.append(r.nextInt(MAX_VALUE));
-                       input.add(t);
-                       // System.out.println(t);
-               }
-               // System.out.println();
-       }
+    @Before
+    public void setUp() {
+        TupleFactory tf = TupleFactory.getInstance();
+        for (int i = 0; i < MAX_SAMPLES; i++) {
+            Tuple t = tf.newTuple();
+            t.append(r.nextInt(MAX_VALUE));
+            input.add(t);
+            // System.out.println(t);
+        }
+        // System.out.println();
+    }
 
-       @Test
-       public void testPODistict() throws ExecException {
-               PORead read = new PORead(new OperatorKey("", r.nextLong()), 
input);
-               List<PhysicalOperator> inputs = new 
LinkedList<PhysicalOperator>();
-               inputs.add(read);
-               PODistinct distinct = new PODistinct(new OperatorKey("", 
r.nextLong()),
-                               -1, inputs);
-               Map<Tuple, Integer> output = new HashMap<Tuple, Integer>();
-               Tuple t = null;
-               Result res = distinct.getNext(t);
-               t = (Tuple) res.result;
-               while (res.returnStatus != POStatus.STATUS_EOP) {
-                       if (output.containsKey(t)) {
-                               int i = output.get(t);
-                               output.put(t, ++i);
-                       } else {
-                               output.put(t, 1);
-                       }
-                       res = distinct.getNext(t);
-                       t = (Tuple) res.result;
-               }
-               for (Map.Entry<Tuple, Integer> e : output.entrySet()) {
-                       int i = e.getValue();
-                       // System.out.println(e.getKey());
-                       assertEquals(1, i);
-               }
-       }
+    @Test
+    public void testPODistict() throws ExecException {
+        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+        inputs.add(read);
+        PODistinct distinct = new PODistinct(new OperatorKey("", r.nextLong()),
+                -1, inputs);
+        Map<Tuple, Integer> output = new HashMap<Tuple, Integer>();
+        Tuple t = null;
+        Result res = distinct.getNext(t);
+        t = (Tuple) res.result;
+        while (res.returnStatus != POStatus.STATUS_EOP) {
+            if (output.containsKey(t)) {
+                int i = output.get(t);
+                output.put(t, ++i);
+            } else {
+                output.put(t, 1);
+            }
+            res = distinct.getNext(t);
+            t = (Tuple) res.result;
+        }
+        for (Map.Entry<Tuple, Integer> e : output.entrySet()) {
+            int i = e.getValue();
+            // System.out.println(e.getKey());
+            assertEquals(1, i);
+        }
+    }
 
 }

Modified: 
incubator/pig/branches/types/test/org/apache/pig/test/TestPONegative.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPONegative.java?rev=656043&r1=656042&r2=656043&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPONegative.java 
(original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPONegative.java 
Tue May 13 15:52:02 2008
@@ -36,113 +36,113 @@
 import junit.framework.TestCase;
 
 public class TestPONegative extends TestCase {
-       
-       DataBag bag = BagFactory.getInstance().newDefaultBag();
-       Random r = new Random();
-       TupleFactory tf = TupleFactory.getInstance();
-       final int MAX = 10;
-       
-       public void testPONegInt () throws PlanException, ExecException {
-               for(int i = 0; i < MAX; i++) {
-                       Tuple t = tf.newTuple();
-                       t.append(r.nextInt());
-                       bag.add(t);
-               }
-               
-               POProject prj = new POProject(new OperatorKey("", 
r.nextLong()), -1, 0);
-               prj.setResultType(DataType.INTEGER);
-               PONegative pn = new PONegative(new OperatorKey("", 
r.nextLong()), -1, prj);
-               pn.setResultType(DataType.INTEGER);
-               
-               ExprPlan plan = new ExprPlan();
-               plan.add(prj); plan.add(pn);
-               plan.connect(prj, pn);
-               
-               for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
-                       Tuple t = it.next();
-                       plan.attachInput(t);
-                       Integer expected = -(Integer)t.get(0);
-                       int output = (Integer) pn.getNext(expected).result;
-                       assertEquals(expected.intValue(), output);
-               }
-               
-       }
-       
-       public void testPONegLong () throws PlanException, ExecException {
-               for(int i = 0; i < MAX; i++) {
-                       Tuple t = tf.newTuple();
-                       t.append(r.nextLong());
-                       bag.add(t);
-               }
-               
-               POProject prj = new POProject(new OperatorKey("", 
r.nextLong()), -1, 0);
-               prj.setResultType(DataType.LONG);
-               PONegative pn = new PONegative(new OperatorKey("", 
r.nextLong()), -1, prj);
-               pn.setResultType(DataType.LONG);
-               
-               ExprPlan plan = new ExprPlan();
-               plan.add(prj); plan.add(pn);
-               plan.connect(prj, pn);
-               
-               for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
-                       Tuple t = it.next();
-                       plan.attachInput(t);
-                       Long expected = -(Long)t.get(0);
-                       long output = (Long) pn.getNext(expected).result;
-                       assertEquals(expected.longValue(), output);
-               }
-               
-       }
-       
-       public void testPONegDouble() throws PlanException, ExecException {
-               for(int i = 0; i < MAX; i++) {
-                       Tuple t = tf.newTuple();
-                       t.append(r.nextDouble());
-                       bag.add(t);
-               }
-               
-               POProject prj = new POProject(new OperatorKey("", 
r.nextLong()), -1, 0);
-               prj.setResultType(DataType.DOUBLE);
-               PONegative pn = new PONegative(new OperatorKey("", 
r.nextLong()), -1, prj);
-               pn.setResultType(DataType.DOUBLE);
-               
-               ExprPlan plan = new ExprPlan();
-               plan.add(prj); plan.add(pn);
-               plan.connect(prj, pn);
-               
-               for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
-                       Tuple t = it.next();
-                       plan.attachInput(t);
-                       Double expected = -(Double)t.get(0);
-                       double output = (Double) pn.getNext(expected).result;
-                       assertEquals(expected.doubleValue(), output);
-               }
-               
-       }
-       
-       public void testPONegFloat() throws PlanException, ExecException {
-               for(int i = 0; i < MAX; i++) {
-                       Tuple t = tf.newTuple();
-                       t.append(r.nextFloat());
-                       bag.add(t);
-               }
-               
-               POProject prj = new POProject(new OperatorKey("", 
r.nextLong()), -1, 0);
-               prj.setResultType(DataType.FLOAT);
-               PONegative pn = new PONegative(new OperatorKey("", 
r.nextLong()), -1, prj);
-               pn.setResultType(DataType.FLOAT);
-               
-               ExprPlan plan = new ExprPlan();
-               plan.add(prj); plan.add(pn);
-               plan.connect(prj, pn);
-               
-               for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
-                       Tuple t = it.next();
-                       plan.attachInput(t);
-                       Float expected = -(Float)t.get(0);
-                       float output = (Float) pn.getNext(expected).result;
-                       assertEquals(expected.floatValue(), output);
-               }
-               
-       }
+    
+    DataBag bag = BagFactory.getInstance().newDefaultBag();
+    Random r = new Random();
+    TupleFactory tf = TupleFactory.getInstance();
+    final int MAX = 10;
+    
+    public void testPONegInt () throws PlanException, ExecException {
+        for(int i = 0; i < MAX; i++) {
+            Tuple t = tf.newTuple();
+            t.append(r.nextInt());
+            bag.add(t);
+        }
+        
+        POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 
0);
+        prj.setResultType(DataType.INTEGER);
+        PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, 
prj);
+        pn.setResultType(DataType.INTEGER);
+        
+        ExprPlan plan = new ExprPlan();
+        plan.add(prj); plan.add(pn);
+        plan.connect(prj, pn);
+        
+        for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            plan.attachInput(t);
+            Integer expected = -(Integer)t.get(0);
+            int output = (Integer) pn.getNext(expected).result;
+            assertEquals(expected.intValue(), output);
+        }
+        
+    }
+    
+    public void testPONegLong () throws PlanException, ExecException {
+        for(int i = 0; i < MAX; i++) {
+            Tuple t = tf.newTuple();
+            t.append(r.nextLong());
+            bag.add(t);
+        }
+        
+        POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 
0);
+        prj.setResultType(DataType.LONG);
+        PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, 
prj);
+        pn.setResultType(DataType.LONG);
+        
+        ExprPlan plan = new ExprPlan();
+        plan.add(prj); plan.add(pn);
+        plan.connect(prj, pn);
+        
+        for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            plan.attachInput(t);
+            Long expected = -(Long)t.get(0);
+            long output = (Long) pn.getNext(expected).result;
+            assertEquals(expected.longValue(), output);
+        }
+        
+    }
+    
+    public void testPONegDouble() throws PlanException, ExecException {
+        for(int i = 0; i < MAX; i++) {
+            Tuple t = tf.newTuple();
+            t.append(r.nextDouble());
+            bag.add(t);
+        }
+        
+        POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 
0);
+        prj.setResultType(DataType.DOUBLE);
+        PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, 
prj);
+        pn.setResultType(DataType.DOUBLE);
+        
+        ExprPlan plan = new ExprPlan();
+        plan.add(prj); plan.add(pn);
+        plan.connect(prj, pn);
+        
+        for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            plan.attachInput(t);
+            Double expected = -(Double)t.get(0);
+            double output = (Double) pn.getNext(expected).result;
+            assertEquals(expected.doubleValue(), output);
+        }
+        
+    }
+    
+    public void testPONegFloat() throws PlanException, ExecException {
+        for(int i = 0; i < MAX; i++) {
+            Tuple t = tf.newTuple();
+            t.append(r.nextFloat());
+            bag.add(t);
+        }
+        
+        POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 
0);
+        prj.setResultType(DataType.FLOAT);
+        PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, 
prj);
+        pn.setResultType(DataType.FLOAT);
+        
+        ExprPlan plan = new ExprPlan();
+        plan.add(prj); plan.add(pn);
+        plan.connect(prj, pn);
+        
+        for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            plan.attachInput(t);
+            Float expected = -(Float)t.get(0);
+            float output = (Float) pn.getNext(expected).result;
+            assertEquals(expected.floatValue(), output);
+        }
+        
+    }
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java?rev=656043&r1=656042&r2=656043&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java 
(original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java Tue 
May 13 15:52:02 2008
@@ -42,183 +42,183 @@
 import org.junit.Test;
 
 public class TestPOSort extends TestCase {
-       Random r = new Random();
-       int MAX_TUPLES = 10;
+    Random r = new Random();
+    int MAX_TUPLES = 10;
 
-       @Test
-       public void testPOSortAscString() throws ExecException {
-               DataBag input = (DataBag) 
GenRandomData.genRandSmallTupDataBag(r,
-                               MAX_TUPLES, 100);
-               List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
-               POProject pr1 = new POProject(new OperatorKey("", 
r.nextLong()), -1, 0);
-               pr1.setResultType(DataType.CHARARRAY);
-               ExprPlan expPlan = new ExprPlan();
-               expPlan.add(pr1);
-               sortPlans.add(expPlan);
-               List<Boolean> mAscCols = new LinkedList<Boolean>();
-               mAscCols.add(true);
-               PORead read = new PORead(new OperatorKey("", r.nextLong()), 
input);
-               List<PhysicalOperator> inputs = new 
LinkedList<PhysicalOperator>();
-               inputs.add(read);
-               POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, 
inputs,
-                               sortPlans, mAscCols, null);
-               Tuple t = null;
-               Result res1 = sort.getNext(t);
-               // System.out.println(res1.result);
-               Result res2 = sort.getNext(t);
-               while (res2.returnStatus != POStatus.STATUS_EOP) {
-                       Object i1 = ((Tuple) res1.result).get(0);
-                       Object i2 = ((Tuple) res2.result).get(0);
-                       int i = DataType.compare(i1, i2);
-                       // System.out.println(res2.result + " i = " + i);
-                       assertEquals(true, (i <= 0));
-                       res1 = res2;
-                       res2 = sort.getNext(t);
-               }
-       }
-
-       @Test
-       public void testPOSortDescString() throws ExecException {
-               DataBag input = (DataBag) 
GenRandomData.genRandSmallTupDataBag(r,
-                               MAX_TUPLES, 100);
-               List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
-               POProject pr1 = new POProject(new OperatorKey("", 
r.nextLong()), -1, 0);
-               pr1.setResultType(DataType.CHARARRAY);
-               ExprPlan expPlan = new ExprPlan();
-               expPlan.add(pr1);
-               sortPlans.add(expPlan);
-               List<Boolean> mAscCols = new LinkedList<Boolean>();
-               mAscCols.add(false);
-               PORead read = new PORead(new OperatorKey("", r.nextLong()), 
input);
-               List<PhysicalOperator> inputs = new 
LinkedList<PhysicalOperator>();
-               inputs.add(read);
-               POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, 
inputs,
-                               sortPlans, mAscCols, null);
-               Tuple t = null;
-               Result res1 = sort.getNext(t);
-               // System.out.println(res1.result);
-               Result res2 = sort.getNext(t);
-               while (res2.returnStatus != POStatus.STATUS_EOP) {
-                       Object i1 = ((Tuple) res1.result).get(0);
-                       Object i2 = ((Tuple) res2.result).get(0);
-                       int i = DataType.compare(i1, i2);
-                       // System.out.println(res2.result + " i = " + i);
-                       assertEquals(true, (i >= 0));
-                       res1 = res2;
-                       res2 = sort.getNext(t);
-               }
-       }
-
-       @Test
-       public void testPOSortAsc() throws ExecException {
-               DataBag input = (DataBag) 
GenRandomData.genRandSmallTupDataBag(r,
-                               MAX_TUPLES, 100);
-               List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
-               POProject pr1 = new POProject(new OperatorKey("", 
r.nextLong()), -1, 1);
-               pr1.setResultType(DataType.INTEGER);
-               ExprPlan expPlan = new ExprPlan();
-               expPlan.add(pr1);
-               sortPlans.add(expPlan);
-               List<Boolean> mAscCols = new LinkedList<Boolean>();
-               mAscCols.add(true);
-               PORead read = new PORead(new OperatorKey("", r.nextLong()), 
input);
-               List<PhysicalOperator> inputs = new 
LinkedList<PhysicalOperator>();
-               inputs.add(read);
-               POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, 
inputs,
-                               sortPlans, mAscCols, null);
-               Tuple t = null;
-               Result res1 = sort.getNext(t);
-               // System.out.println(res1.result);
-               Result res2 = sort.getNext(t);
-               while (res2.returnStatus != POStatus.STATUS_EOP) {
-                       Object i1 = ((Tuple) res1.result).get(1);
-                       Object i2 = ((Tuple) res2.result).get(1);
-                       int i = DataType.compare(i1, i2);
-                       assertEquals(true, (i <= 0));
-                       // System.out.println(res2.result);
-                       res1 = res2;
-                       res2 = sort.getNext(t);
-               }
-       }
-
-       @Test
-       public void testPOSortDesc() throws ExecException {
-               DataBag input = (DataBag) 
GenRandomData.genRandSmallTupDataBag(r,
-                               MAX_TUPLES, 100);
-               List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
-               POProject pr1 = new POProject(new OperatorKey("", 
r.nextLong()), -1, 1);
-               pr1.setResultType(DataType.INTEGER);
-               ExprPlan expPlan = new ExprPlan();
-               expPlan.add(pr1);
-               sortPlans.add(expPlan);
-               List<Boolean> mAscCols = new LinkedList<Boolean>();
-               mAscCols.add(false);
-               PORead read = new PORead(new OperatorKey("", r.nextLong()), 
input);
-               List<PhysicalOperator> inputs = new 
LinkedList<PhysicalOperator>();
-               inputs.add(read);
-               POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, 
inputs,
-                               sortPlans, mAscCols, null);
-               Tuple t = null;
-               Result res1 = sort.getNext(t);
-               // System.out.println(res1.result);
-               Result res2 = sort.getNext(t);
-               while (res2.returnStatus != POStatus.STATUS_EOP) {
-                       Object i1 = ((Tuple) res1.result).get(1);
-                       Object i2 = ((Tuple) res2.result).get(1);
-                       int i = DataType.compare(i1, i2);
-                       assertEquals(true, (i >= 0));
-                       // System.out.println(res2.result);
-                       res1 = res2;
-                       res2 = sort.getNext(t);
-               }
-       }
-
-       @Test
-       public void testPOSortUDF() throws ExecException {
-               DataBag input = (DataBag) 
GenRandomData.genRandSmallTupDataBag(r,
-                               MAX_TUPLES, 100);
-               PORead read = new PORead(new OperatorKey("", r.nextLong()), 
input);
-               List<PhysicalOperator> inputs = new 
LinkedList<PhysicalOperator>();
-               inputs.add(read);
-               String funcName = WeirdComparator.class.getName() + "()";
-               /*POUserFunc comparator = new POUserFunc(
-                               new OperatorKey("", r.nextLong()), -1, inputs, 
funcName);*/
-               POUserFunc comparator = new POUserComparisonFunc(
-                               new OperatorKey("", r.nextLong()), -1, null, 
funcName);
-               POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, 
inputs,
-                               null, null, comparator);
-               Tuple t = null;
-               Result res1 = sort.getNext(t);
-               // System.out.println(res1.result);
-               Result res2 = sort.getNext(t);
-               while (res2.returnStatus != POStatus.STATUS_EOP) {
-                       int i1 = (Integer) ((Tuple) res1.result).get(1);
-                       int i2 = (Integer) ((Tuple) res2.result).get(1);
-                       int i = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
-                       assertEquals(true, (i <= 0));
-                       System.out.println(i + " : " + res2.result);
-                       res1 = res2;
-                       res2 = sort.getNext(t);
-               }
-       }
-
-       // sorts values in ascending order of their distance from 50
-       public static class WeirdComparator extends ComparisonFunc {
-
-               @Override
-               public int compare(Tuple t1, Tuple t2) {
-                       // TODO Auto-generated method stub
-                       int result = 0;
-                       try {
-                               int i1 = (Integer) t1.get(1);
-                               int i2 = (Integer) t2.get(1);
-                               result = (i1 - 50) * (i1 - 50) - (i2 - 50) * 
(i2 - 50);
-                       } catch (ExecException e) {
-                               // TODO Auto-generated catch block
-                               e.printStackTrace();
-                       }
-                       return result;
-               }
+    @Test
+    public void testPOSortAscString() throws ExecException {
+        DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+                MAX_TUPLES, 100);
+        List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+        POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 
0);
+        pr1.setResultType(DataType.CHARARRAY);
+        ExprPlan expPlan = new ExprPlan();
+        expPlan.add(pr1);
+        sortPlans.add(expPlan);
+        List<Boolean> mAscCols = new LinkedList<Boolean>();
+        mAscCols.add(true);
+        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+        inputs.add(read);
+        POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+                sortPlans, mAscCols, null);
+        Tuple t = null;
+        Result res1 = sort.getNext(t);
+        // System.out.println(res1.result);
+        Result res2 = sort.getNext(t);
+        while (res2.returnStatus != POStatus.STATUS_EOP) {
+            Object i1 = ((Tuple) res1.result).get(0);
+            Object i2 = ((Tuple) res2.result).get(0);
+            int i = DataType.compare(i1, i2);
+            // System.out.println(res2.result + " i = " + i);
+            assertEquals(true, (i <= 0));
+            res1 = res2;
+            res2 = sort.getNext(t);
+        }
+    }
+
+    @Test
+    public void testPOSortDescString() throws ExecException {
+        DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+                MAX_TUPLES, 100);
+        List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+        POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 
0);
+        pr1.setResultType(DataType.CHARARRAY);
+        ExprPlan expPlan = new ExprPlan();
+        expPlan.add(pr1);
+        sortPlans.add(expPlan);
+        List<Boolean> mAscCols = new LinkedList<Boolean>();
+        mAscCols.add(false);
+        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+        inputs.add(read);
+        POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+                sortPlans, mAscCols, null);
+        Tuple t = null;
+        Result res1 = sort.getNext(t);
+        // System.out.println(res1.result);
+        Result res2 = sort.getNext(t);
+        while (res2.returnStatus != POStatus.STATUS_EOP) {
+            Object i1 = ((Tuple) res1.result).get(0);
+            Object i2 = ((Tuple) res2.result).get(0);
+            int i = DataType.compare(i1, i2);
+            // System.out.println(res2.result + " i = " + i);
+            assertEquals(true, (i >= 0));
+            res1 = res2;
+            res2 = sort.getNext(t);
+        }
+    }
+
+    @Test
+    public void testPOSortAsc() throws ExecException {
+        DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+                MAX_TUPLES, 100);
+        List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+        POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 
1);
+        pr1.setResultType(DataType.INTEGER);
+        ExprPlan expPlan = new ExprPlan();
+        expPlan.add(pr1);
+        sortPlans.add(expPlan);
+        List<Boolean> mAscCols = new LinkedList<Boolean>();
+        mAscCols.add(true);
+        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+        inputs.add(read);
+        POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+                sortPlans, mAscCols, null);
+        Tuple t = null;
+        Result res1 = sort.getNext(t);
+        // System.out.println(res1.result);
+        Result res2 = sort.getNext(t);
+        while (res2.returnStatus != POStatus.STATUS_EOP) {
+            Object i1 = ((Tuple) res1.result).get(1);
+            Object i2 = ((Tuple) res2.result).get(1);
+            int i = DataType.compare(i1, i2);
+            assertEquals(true, (i <= 0));
+            // System.out.println(res2.result);
+            res1 = res2;
+            res2 = sort.getNext(t);
+        }
+    }
+
+    @Test
+    public void testPOSortDesc() throws ExecException {
+        DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+                MAX_TUPLES, 100);
+        List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+        POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 
1);
+        pr1.setResultType(DataType.INTEGER);
+        ExprPlan expPlan = new ExprPlan();
+        expPlan.add(pr1);
+        sortPlans.add(expPlan);
+        List<Boolean> mAscCols = new LinkedList<Boolean>();
+        mAscCols.add(false);
+        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+        inputs.add(read);
+        POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+                sortPlans, mAscCols, null);
+        Tuple t = null;
+        Result res1 = sort.getNext(t);
+        // System.out.println(res1.result);
+        Result res2 = sort.getNext(t);
+        while (res2.returnStatus != POStatus.STATUS_EOP) {
+            Object i1 = ((Tuple) res1.result).get(1);
+            Object i2 = ((Tuple) res2.result).get(1);
+            int i = DataType.compare(i1, i2);
+            assertEquals(true, (i >= 0));
+            // System.out.println(res2.result);
+            res1 = res2;
+            res2 = sort.getNext(t);
+        }
+    }
+
+    @Test
+    public void testPOSortUDF() throws ExecException {
+        DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+                MAX_TUPLES, 100);
+        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+        inputs.add(read);
+        String funcName = WeirdComparator.class.getName() + "()";
+        /*POUserFunc comparator = new POUserFunc(
+                new OperatorKey("", r.nextLong()), -1, inputs, funcName);*/
+        POUserFunc comparator = new POUserComparisonFunc(
+                new OperatorKey("", r.nextLong()), -1, null, funcName);
+        POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+                null, null, comparator);
+        Tuple t = null;
+        Result res1 = sort.getNext(t);
+        // System.out.println(res1.result);
+        Result res2 = sort.getNext(t);
+        while (res2.returnStatus != POStatus.STATUS_EOP) {
+            int i1 = (Integer) ((Tuple) res1.result).get(1);
+            int i2 = (Integer) ((Tuple) res2.result).get(1);
+            int i = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
+            assertEquals(true, (i <= 0));
+            System.out.println(i + " : " + res2.result);
+            res1 = res2;
+            res2 = sort.getNext(t);
+        }
+    }
+
+    // sorts values in ascending order of their distance from 50
+    public static class WeirdComparator extends ComparisonFunc {
+
+        @Override
+        public int compare(Tuple t1, Tuple t2) {
+            // TODO Auto-generated method stub
+            int result = 0;
+            try {
+                int i1 = (Integer) t1.get(1);
+                int i2 = (Integer) t2.get(1);
+                result = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
+            } catch (ExecException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+            return result;
+        }
 
-       }
+    }
 }

Modified: 
incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java?rev=656043&r1=656042&r2=656043&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java 
(original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java 
Tue May 13 15:52:02 2008
@@ -47,286 +47,286 @@
 import org.junit.Test;
 
 public class TestPOUserFunc extends TestCase {
-       Random r = new Random();
-       int MAX_TUPLES = 10;
+    Random r = new Random();
+    int MAX_TUPLES = 10;
 
-       public static class ARITY extends EvalFunc<Integer> {
+    public static class ARITY extends EvalFunc<Integer> {
 
-               @Override
-               public Integer exec(Tuple input) throws IOException {
-                       return new Integer(input.size());
-               }
-
-               @Override
-               public Schema outputSchema(Schema input) {
-                       // TODO FIX
-                       // return new AtomSchema("arity");
-                       return null;
-               }
-       }
-
-       public static class WeirdComparator extends ComparisonFunc {
-
-               @Override
-               public int compare(Tuple t1, Tuple t2) {
-                       // TODO Auto-generated method stub
-                       Object o1 = null;
-                       Object o2 = null;
-                       try {
-                               o1 = t1.get(2);
-                               o2 = t2.get(2);
-                       } catch (ExecException e) {
-                               // TODO Auto-generated catch block
-                               e.printStackTrace();
-                       }
-                       int i1 = (Integer) o1 - 2;
-                       int i2 = (Integer) o2 - 2;
-
-                       return (int) (i1 * i1 - i2 * i2);
-               }
-
-       }
-
-       /**
-        * Generates the average of the values of the first field of a tuple. 
This
-        * class is Algebraic in implemenation, so if possible the execution 
will be
-        * split into a local and global application
-        */
-       public static class AVG extends EvalFunc<Double> implements Algebraic {
-
-               private static TupleFactory mTupleFactory = 
TupleFactory.getInstance();
-
-               @Override
-               public Double exec(Tuple input) throws IOException {
-                       double sum = 0;
-                       double count = 0;
-                       
-                       try {
-                               sum = sum(input);
-                               count = count(input);
-                       } catch (ExecException e) {
-                               e.printStackTrace();
-                       }
-
-                       double avg = 0;
-                       if (count > 0)
-                               avg = sum / count;
-
-                       return new Double(avg);
-               }
-
-               public String getInitial() {
-                       return Initial.class.getName();
-               }
-
-               public String getIntermed() {
-                       return Intermed.class.getName();
-               }
-
-               public String getFinal() {
-                       return Final.class.getName();
-               }
-
-               static public class Initial extends EvalFunc<Tuple> {
-                       @Override
-                       public Tuple exec(Tuple input) throws IOException {
-                               try {
-                                       Tuple t = mTupleFactory.newTuple(2);
-                                       t.set(0, new Double(sum(input)));
-                                       t.set(1, new Long(count(input)));
-                                       return t;
-                               } catch (ExecException t) {
-                                       throw new 
RuntimeException(t.getMessage() + ": " + input);
-                               }
-                       }
-               }
-
-               static public class Intermed extends EvalFunc<Tuple> {
-                       @Override
-                       public Tuple exec(Tuple input) throws IOException {
-                               DataBag b = null;
-                               Tuple t = null;
-                               try {
-                                       b = (DataBag) input.get(0);
-                                       t = combine(b);
-                               } catch (ExecException e) {
-                                       // TODO Auto-generated catch block
-                                       e.printStackTrace();
-                               }
-                               return t;
-                       }
-               }
-
-               static public class Final extends EvalFunc<Double> {
-                       @Override
-                       public Double exec(Tuple input) throws IOException {
-                               double sum = 0;
-                               double count = 0;
-                               try {
-                                       DataBag b = (DataBag) input.get(0);
-                                       Tuple combined = combine(b);
-
-                                       sum = (Double) combined.get(0);
-                                       count = (Long) combined.get(1);
-                               } catch (ExecException e) {
-                                       e.printStackTrace();
-                               }
-
-                               double avg = 0;
-                               if (count > 0) {
-                                       avg = sum / count;
-                               }
-                               return new Double(avg);
-                       }
-               }
-
-               static protected Tuple combine(DataBag values) throws 
ExecException {
-                       double sum = 0;
-                       long count = 0;
-
-                       Tuple output = mTupleFactory.newTuple(2);
-
-                       for (Iterator<Tuple> it = values.iterator(); 
it.hasNext();) {
-                               Tuple t = it.next();
-                               sum += (Double) t.get(0);
-                               count += (Long) t.get(1);
-                       }
-
-                       output.set(0, new Double(sum));
-                       output.set(1, new Long(count));
-                       return output;
-               }
-
-               static protected long count(Tuple input) throws ExecException {
-                       DataBag values = (DataBag) input.get(0);
-                       return values.size();
-               }
-
-               static protected double sum(Tuple input) throws ExecException {
-                       DataBag values = (DataBag) input.get(0);
-
-                       double sum = 0;
-                       for (Iterator<Tuple> it = values.iterator(); 
it.hasNext();) {
-                               Tuple t = it.next();
-                               Double d = DataType.toDouble(t.get(0));
-                               if (d == null)
-                                       continue;
-                               sum += d;
-                       }
-
-                       return sum;
-               }
-
-               @Override
-               public Schema outputSchema(Schema input) {
-                       // TODO FIX
-                       // return new AtomSchema("average");
-                       return null;
-               }
-
-       }
-
-       @Test
-       public void testUserFuncArity() throws ExecException {
-               DataBag input = (DataBag) 
GenRandomData.genRandSmallTupDataBag(r,
-                               MAX_TUPLES, 100);
-               String funcSpec = ARITY.class.getName() + "()";
-               PORead read = new PORead(new OperatorKey("", r.nextLong()), 
input);
-               List<PhysicalOperator> inputs = new 
LinkedList<PhysicalOperator>();
-               inputs.add(read);
-               POUserFunc userFunc = new POUserFunc(new OperatorKey("", 
r.nextLong()),
-                               -1, inputs, funcSpec);
-               Result res = new Result();
-               Integer i = null;
-               res = userFunc.getNext(i);
-               while (res.returnStatus != POStatus.STATUS_EOP) {
-                       // System.out.println(res.result);
-                       int result = (Integer) res.result;
-                       assertEquals(2, result);
-                       res = userFunc.getNext(i);
-               }
-       }
-
-       @Test
-       public void testUDFCompare() throws ExecException {
-               DataBag input = (DataBag) 
GenRandomData.genRandSmallTupDataBag(r, 2,
-                               100);
-               String funcSpec = WeirdComparator.class.getName() + "()";
-               POUserFunc userFunc = new POUserComparisonFunc(new 
OperatorKey("", r.nextLong()),
-                               -1, null, funcSpec);
-               Iterator<Tuple> it = input.iterator();
-               Tuple t1 = it.next();
-               Tuple t2 = it.next();
-               t1.append(2);
-               t2.append(3);
-               ((POUserComparisonFunc)userFunc).attachInput(t1, t2);
-               Integer i = null;
-               // System.out.println(t1 + " " + t2);
-               int result = (Integer) (userFunc.getNext(i).result);
-               assertEquals(-1, result);
-       }
-
-       @Test
-       public void testAlgebraicAVG() throws IOException, ExecException {
-               int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
-               byte INIT = 0;
-               byte INTERMED = 1;
-               byte FINAL = 2;
-               Tuple tup1 = 
Util.loadNestTuple(TupleFactory.getInstance().newTuple(1),
-                               input);
-               Tuple tup2 = 
Util.loadNestTuple(TupleFactory.getInstance().newTuple(1),
-                               input);
-               // System.out.println("Input = " + tup1);
-               String funcSpec = AVG.class.getName() + "()";
-
-               POUserFunc po = new POUserFunc(new OperatorKey("", 
r.nextLong()), -1,
-                               null, funcSpec);
-
-               TupleFactory tf = TupleFactory.getInstance();
-
-               po.setAlgebraicFunction(INIT);
-               po.attachInput(tup1);
-               Tuple t = null;
-               Result res = po.getNext(t);
-               Tuple outputInitial1 = (res.returnStatus == POStatus.STATUS_OK) 
? (Tuple) res.result
-                               : null;
-               Tuple outputInitial2 = (res.returnStatus == POStatus.STATUS_OK) 
? (Tuple) res.result
-                               : null;
-               System.out.println(outputInitial1 + " " + outputInitial2);
-               assertEquals(outputInitial1, outputInitial2);
-               double sum = (Double) outputInitial1.get(0);
-               long count = (Long) outputInitial1.get(1);
-               assertEquals(55.0, sum);
-               assertEquals(10, count);
-               DataBag bag = BagFactory.getInstance().newDefaultBag();
-               bag.add(outputInitial1);
-               bag.add(outputInitial2);
-               Tuple outputInitial = tf.newTuple();
-               outputInitial.append(bag);
-               // Tuple outputIntermed = intermed.exec(outputInitial);
-               po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, null,
-                               funcSpec);
-               po.setAlgebraicFunction(INTERMED);
-               po.attachInput(outputInitial);
-               res = po.getNext(t);
-               Tuple outputIntermed = (res.returnStatus == POStatus.STATUS_OK) 
? (Tuple) res.result
-                               : null;
-
-               sum = (Double) outputIntermed.get(0);
-               count = (Long) outputIntermed.get(1);
-               assertEquals(110.0, sum);
-               assertEquals(20, count);
-               System.out.println(outputIntermed);
-               po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, null,
-                               funcSpec);
-               po.setAlgebraicFunction(FINAL);
-               po.attachInput(outputInitial);
-               res = po.getNext(t);
-               Double output = (res.returnStatus == POStatus.STATUS_OK) ? 
(Double) res.result
-                               : null;
-               // Double output = fin.exec(outputInitial);
-               assertEquals(5.5, output);
-               // System.out.println("output = " + output);
+        @Override
+        public Integer exec(Tuple input) throws IOException {
+            return new Integer(input.size());
+        }
+
+        @Override
+        public Schema outputSchema(Schema input) {
+            // TODO FIX
+            // return new AtomSchema("arity");
+            return null;
+        }
+    }
+
+    public static class WeirdComparator extends ComparisonFunc {
+
+        @Override
+        public int compare(Tuple t1, Tuple t2) {
+            // TODO Auto-generated method stub
+            Object o1 = null;
+            Object o2 = null;
+            try {
+                o1 = t1.get(2);
+                o2 = t2.get(2);
+            } catch (ExecException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+            int i1 = (Integer) o1 - 2;
+            int i2 = (Integer) o2 - 2;
+
+            return (int) (i1 * i1 - i2 * i2);
+        }
+
+    }
+
+    /**
+     * Generates the average of the values of the first field of a tuple. This
+     * class is Algebraic in implemenation, so if possible the execution will 
be
+     * split into a local and global application
+     */
+    public static class AVG extends EvalFunc<Double> implements Algebraic {
+
+        private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+        @Override
+        public Double exec(Tuple input) throws IOException {
+            double sum = 0;
+            double count = 0;
+            
+            try {
+                sum = sum(input);
+                count = count(input);
+            } catch (ExecException e) {
+                e.printStackTrace();
+            }
+
+            double avg = 0;
+            if (count > 0)
+                avg = sum / count;
+
+            return new Double(avg);
+        }
+
+        public String getInitial() {
+            return Initial.class.getName();
+        }
+
+        public String getIntermed() {
+            return Intermed.class.getName();
+        }
+
+        public String getFinal() {
+            return Final.class.getName();
+        }
+
+        static public class Initial extends EvalFunc<Tuple> {
+            @Override
+            public Tuple exec(Tuple input) throws IOException {
+                try {
+                    Tuple t = mTupleFactory.newTuple(2);
+                    t.set(0, new Double(sum(input)));
+                    t.set(1, new Long(count(input)));
+                    return t;
+                } catch (ExecException t) {
+                    throw new RuntimeException(t.getMessage() + ": " + input);
+                }
+            }
+        }
+
+        static public class Intermed extends EvalFunc<Tuple> {
+            @Override
+            public Tuple exec(Tuple input) throws IOException {
+                DataBag b = null;
+                Tuple t = null;
+                try {
+                    b = (DataBag) input.get(0);
+                    t = combine(b);
+                } catch (ExecException e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+                return t;
+            }
+        }
+
+        static public class Final extends EvalFunc<Double> {
+            @Override
+            public Double exec(Tuple input) throws IOException {
+                double sum = 0;
+                double count = 0;
+                try {
+                    DataBag b = (DataBag) input.get(0);
+                    Tuple combined = combine(b);
+
+                    sum = (Double) combined.get(0);
+                    count = (Long) combined.get(1);
+                } catch (ExecException e) {
+                    e.printStackTrace();
+                }
+
+                double avg = 0;
+                if (count > 0) {
+                    avg = sum / count;
+                }
+                return new Double(avg);
+            }
+        }
+
+        static protected Tuple combine(DataBag values) throws ExecException {
+            double sum = 0;
+            long count = 0;
+
+            Tuple output = mTupleFactory.newTuple(2);
+
+            for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+                Tuple t = it.next();
+                sum += (Double) t.get(0);
+                count += (Long) t.get(1);
+            }
+
+            output.set(0, new Double(sum));
+            output.set(1, new Long(count));
+            return output;
+        }
+
+        static protected long count(Tuple input) throws ExecException {
+            DataBag values = (DataBag) input.get(0);
+            return values.size();
+        }
+
+        static protected double sum(Tuple input) throws ExecException {
+            DataBag values = (DataBag) input.get(0);
+
+            double sum = 0;
+            for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+                Tuple t = it.next();
+                Double d = DataType.toDouble(t.get(0));
+                if (d == null)
+                    continue;
+                sum += d;
+            }
+
+            return sum;
+        }
+
+        @Override
+        public Schema outputSchema(Schema input) {
+            // TODO FIX
+            // return new AtomSchema("average");
+            return null;
+        }
+
+    }
+
+    @Test
+    public void testUserFuncArity() throws ExecException {
+        DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+                MAX_TUPLES, 100);
+        String funcSpec = ARITY.class.getName() + "()";
+        PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+        List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+        inputs.add(read);
+        POUserFunc userFunc = new POUserFunc(new OperatorKey("", r.nextLong()),
+                -1, inputs, funcSpec);
+        Result res = new Result();
+        Integer i = null;
+        res = userFunc.getNext(i);
+        while (res.returnStatus != POStatus.STATUS_EOP) {
+            // System.out.println(res.result);
+            int result = (Integer) res.result;
+            assertEquals(2, result);
+            res = userFunc.getNext(i);
+        }
+    }
+
+    @Test
+    public void testUDFCompare() throws ExecException {
+        DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, 2,
+                100);
+        String funcSpec = WeirdComparator.class.getName() + "()";
+        POUserFunc userFunc = new POUserComparisonFunc(new OperatorKey("", 
r.nextLong()),
+                -1, null, funcSpec);
+        Iterator<Tuple> it = input.iterator();
+        Tuple t1 = it.next();
+        Tuple t2 = it.next();
+        t1.append(2);
+        t2.append(3);
+        ((POUserComparisonFunc)userFunc).attachInput(t1, t2);
+        Integer i = null;
+        // System.out.println(t1 + " " + t2);
+        int result = (Integer) (userFunc.getNext(i).result);
+        assertEquals(-1, result);
+    }
+
+    @Test
+    public void testAlgebraicAVG() throws IOException, ExecException {
+        int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+        byte INIT = 0;
+        byte INTERMED = 1;
+        byte FINAL = 2;
+        Tuple tup1 = Util.loadNestTuple(TupleFactory.getInstance().newTuple(1),
+                input);
+        Tuple tup2 = Util.loadNestTuple(TupleFactory.getInstance().newTuple(1),
+                input);
+        // System.out.println("Input = " + tup1);
+        String funcSpec = AVG.class.getName() + "()";
+
+        POUserFunc po = new POUserFunc(new OperatorKey("", r.nextLong()), -1,
+                null, funcSpec);
+
+        TupleFactory tf = TupleFactory.getInstance();
+
+        po.setAlgebraicFunction(INIT);
+        po.attachInput(tup1);
+        Tuple t = null;
+        Result res = po.getNext(t);
+        Tuple outputInitial1 = (res.returnStatus == POStatus.STATUS_OK) ? 
(Tuple) res.result
+                : null;
+        Tuple outputInitial2 = (res.returnStatus == POStatus.STATUS_OK) ? 
(Tuple) res.result
+                : null;
+        System.out.println(outputInitial1 + " " + outputInitial2);
+        assertEquals(outputInitial1, outputInitial2);
+        double sum = (Double) outputInitial1.get(0);
+        long count = (Long) outputInitial1.get(1);
+        assertEquals(55.0, sum);
+        assertEquals(10, count);
+        DataBag bag = BagFactory.getInstance().newDefaultBag();
+        bag.add(outputInitial1);
+        bag.add(outputInitial2);
+        Tuple outputInitial = tf.newTuple();
+        outputInitial.append(bag);
+        // Tuple outputIntermed = intermed.exec(outputInitial);
+        po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, null,
+                funcSpec);
+        po.setAlgebraicFunction(INTERMED);
+        po.attachInput(outputInitial);
+        res = po.getNext(t);
+        Tuple outputIntermed = (res.returnStatus == POStatus.STATUS_OK) ? 
(Tuple) res.result
+                : null;
+
+        sum = (Double) outputIntermed.get(0);
+        count = (Long) outputIntermed.get(1);
+        assertEquals(110.0, sum);
+        assertEquals(20, count);
+        System.out.println(outputIntermed);
+        po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, null,
+                funcSpec);
+        po.setAlgebraicFunction(FINAL);
+        po.attachInput(outputInitial);
+        res = po.getNext(t);
+        Double output = (res.returnStatus == POStatus.STATUS_OK) ? (Double) 
res.result
+                : null;
+        // Double output = fin.exec(outputInitial);
+        assertEquals(5.5, output);
+        // System.out.println("output = " + output);
 
-       }
+    }
 }


Reply via email to