Author: olga
Date: Mon Jan  5 15:57:28 2009
New Revision: 731777

URL: http://svn.apache.org/viewvc?rev=731777&view=rev
Log:
PIG-580: use of combiner for distributed agg computations

Added:
    hadoop/pig/branches/types/src/org/apache/pig/builtin/Distinct.java
Modified:
    hadoop/pig/branches/types/CHANGES.txt
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
    hadoop/pig/branches/types/test/org/apache/pig/test/MiniCluster.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestCombiner.java
    hadoop/pig/branches/types/test/org/apache/pig/test/Util.java

Modified: hadoop/pig/branches/types/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Mon Jan  5 15:57:28 2009
@@ -344,4 +344,6 @@
 
     PIG-522: make negation work (pradeepk via olgan)
 
-    PIG-563: support for multiple combiner invocations
+    PIG-563: support for multiple combiner invocations (pradeepk via olgan)
+
+    PIG-580: using combiner to compute distinct aggs (pradeepk via olgan)

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
 Mon Jan  5 15:57:28 2009
@@ -264,7 +264,7 @@
 
     public void explain(PhysicalPlan plan, PrintStream stream) {
         try {
-            PlanPrinter printer = new PlanPrinter(plan);
+            PlanPrinter printer = new PlanPrinter(plan, stream);
             printer.visit();
             stream.println();
 

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
 Mon Jan  5 15:57:28 2009
@@ -18,11 +18,13 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import org.apache.pig.FuncSpec;
 import org.apache.pig.data.DataType;
 import org.apache.pig.backend.executionengine.ExecException;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
@@ -43,10 +45,12 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.PlanWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.Pair;
 
@@ -93,6 +97,8 @@
  */
 public class CombinerOptimizer extends MROpPlanVisitor {
 
+    private static final String DISTINCT_UDF_CLASSNAME = 
org.apache.pig.builtin.Distinct.class.getName();
+
     private Log log = LogFactory.getLog(getClass());
 
     private enum ExprType { SIMPLE_PROJECT, ALGEBRAIC, NOT_ALGEBRAIC,
@@ -340,7 +346,8 @@
         for (int i = 0; i < plans.size(); i++) {
             ExprType t = algebraic(plans.get(i), flattens.get(i), i);
             types.add(t);
-            atLeastOneAlgebraic |= (t == ExprType.ALGEBRAIC);
+            atLeastOneAlgebraic |= 
+                (t == ExprType.ALGEBRAIC || t == ExprType.DISTINCT);
             noNonAlgebraics &= (t != ExprType.NOT_ALGEBRAIC);
         }
         if (!atLeastOneAlgebraic || !noNonAlgebraics) return null;
@@ -368,7 +375,10 @@
         AlgebraicPlanChecker apc = new AlgebraicPlanChecker(pp);
         apc.visit();
         if (apc.sawNonAlgebraic) return ExprType.NOT_ALGEBRAIC;
-
+        if(apc.sawDistinctAgg) return ExprType.DISTINCT;
+        
+        // we did not see a Non algebraic or a distinct so far
+        // proceed to check leaves
         PhysicalOperator leaf = leaves.get(0);
         if (leaf instanceof POProject) {
             POProject proj = (POProject)leaf;
@@ -414,6 +424,49 @@
                 changeFunc(mfe, mPlans.get(i), POUserFunc.INITIAL);
                 changeFunc(cfe, cPlans.get(i), POUserFunc.INTERMEDIATE);
                 changeFunc(rfe, rPlans.get(i), POUserFunc.FINAL);
+            } else if (exprs.get(i) == ExprType.DISTINCT) {
+                // A PODistinct in the plan will always have
+                // a Project[bag](*) as its successor.
+                // We will replace it with a POUserFunc with "Distinct" as 
+                // the underlying UDF. 
+                // In the map and combine, we will make this POUserFunc
+                // the leaf of the plan by removing other operators which
+                // are descendants up to the leaf.
+                // In the reduce we will keep descendants intact. Further
+                // down in fixProjectAndInputs we will change the inputs to
+                // this POUserFunc in the combine and reduce plans to be
+                // just projections of the column "i"
+                PhysicalPlan[] plans = new PhysicalPlan[] { 
+                        mPlans.get(i), cPlans.get(i), rPlans.get(i) };
+                byte[] funcTypes = new byte[] { POUserFunc.INITIAL, 
+                        POUserFunc.INTERMEDIATE, POUserFunc.FINAL };
+                for (int j = 0; j < plans.length; j++) {
+                    DistinctPatcher dp = new DistinctPatcher(plans[j]);
+                    try {
+                        dp.visit();
+                    } catch (VisitorException e) {
+                        throw new PlanException(e);
+                    }
+                    
+                    
+                    PhysicalOperator leaf = plans[j].getLeaves().get(0);
+                    // make the Distinct POUserFunc the leaf in the map and 
combine plans.
+                    if( j != plans.length - 1) {
+                        while(!((leaf instanceof POUserFunc) && 
+                                
((POUserFunc)leaf).getFuncSpec().getClassName().startsWith(DISTINCT_UDF_CLASSNAME)))
 {
+                            plans[j].remove(leaf);
+                            // get the new leaf
+                            leaf = plans[j].getLeaves().get(0);
+                        }
+                        
+                    } 
+                    // Also set the Distinct's function to type Initial in map
+                    // to type Intermediate in combine plan and to type Final 
in
+                    // the reduce
+                    POUserFunc distinctFunc = 
(POUserFunc)getDistinctUserFunc(plans[j], leaf);
+                    distinctFunc.setAlgebraicFunction(funcTypes[j]);
+                }
+                
             }
         }
 
@@ -439,44 +492,94 @@
         // they are in ( we just want to take output from the combine and
         // use that as input in the reduce/combine plan).  UDFs will be left 
the same but their
         // inputs altered.  Any straight projections will also be altered.
-        fixProjectAndInputs(cPlans);
-        fixProjectAndInputs(rPlans);
+        fixProjectAndInputs(cPlans, exprs);
+        fixProjectAndInputs(rPlans, exprs);
+        
+        
+        // we have modified the foreach inner plans - so set them
+        // again for the foreach so that foreach can do any re-initialization
+        // around them.
+        // FIXME - this is a necessary evil right now because the leaves are 
explicitly
+        // stored in the POForeach as a list rather than computed each time at 
+        // run time from the plans for optimization. Do we want to have the 
Foreach
+        // compute the leaves each time and have Java optimize it (will Java 
optimize?)?
+        mfe.setInputPlans(mPlans);
+        cfe.setInputPlans(cPlans);
+        rfe.setInputPlans(rPlans);
     }
 
     /**
      * @param plans
+     * @param exprs 
      * @throws PlanException 
      */
-    private void fixProjectAndInputs(List<PhysicalPlan> plans) throws 
PlanException {
+    private void fixProjectAndInputs(List<PhysicalPlan> plans, List<ExprType> 
exprs) throws PlanException {
         for (int i = 0; i < plans.size(); i++) {
-            List<PhysicalOperator> leaves = plans.get(i).getLeaves();
-            if (leaves == null || leaves.size() != 1) {
-                throw new RuntimeException("Expected to find plan with single 
leaf!");
-            }
-            PhysicalOperator leaf = leaves.get(0);
+                List<PhysicalOperator> leaves = plans.get(i).getLeaves();
+                if (leaves == null || leaves.size() != 1) {
+                    throw new RuntimeException("Expected to find plan with 
single leaf!");
+                }
+                PhysicalOperator leaf = leaves.get(0);
+                // the combine plan could have an extra foreach inner plan
+                // to project the key - so make sure we check the index
+                // before looking in exprs
+                if(i < exprs.size()  && exprs.get(i) == ExprType.DISTINCT) {
+                    // if there is a distinctagg, we have to
+                    // look for the Distinct POUserFunc and 
+                    // change its input to be a project of
+                    // column "i"
+                    PhysicalOperator op = getDistinctUserFunc(plans.get(i), 
leaf);
+                    setProjectInput(op, plans.get(i), i);
+                } else {
+                    // Leaf should be either a projection or a UDF
+                    if (leaf instanceof POProject) {
+                        ((POProject)leaf).setColumn(i);
+                    } else if (leaf instanceof POUserFunc) {
+                        setProjectInput(leaf, plans.get(i), i);
+                    }
+              }
+        }
+    }
 
-            // Leaf should be either a projection or a UDF
-            if (leaf instanceof POProject) {
-                ((POProject)leaf).setColumn(i);
-            } else if (leaf instanceof POUserFunc) {
-                String scope = leaf.getOperatorKey().scope;
-                POProject proj = new POProject(new OperatorKey(scope, 
-                    NodeIdGenerator.getGenerator().getNextNodeId(scope)),
-                    leaf.getRequestedParallelism(), i);
-                proj.setResultType(DataType.BAG);
-                // Remove old connections and elements from the plan
-                plans.get(i).trimAbove(leaf);
-                plans.get(i).add(proj);
-                plans.get(i).connect(proj, leaf);
-                List<PhysicalOperator> inputs =
-                    new ArrayList<PhysicalOperator>(1);
-                inputs.add(proj);
-                leaf.setInputs(inputs);
+    /**
+     * @param op
+     * @param index 
+     * @param plan 
+     * @throws PlanException 
+     */
+    private void setProjectInput(PhysicalOperator op, PhysicalPlan plan, int 
index) throws PlanException {
+        String scope = op.getOperatorKey().scope;
+        POProject proj = new POProject(new OperatorKey(scope, 
+            NodeIdGenerator.getGenerator().getNextNodeId(scope)),
+            op.getRequestedParallelism(), index);
+        proj.setResultType(DataType.BAG);
+        // Remove old connections and elements from the plan
+        plan.trimAbove(op);
+        plan.add(proj);
+        plan.connect(proj, op);
+        List<PhysicalOperator> inputs =
+            new ArrayList<PhysicalOperator>(1);
+        inputs.add(proj);
+        op.setInputs(inputs);
+        
+    }
+
+    /**
+     * @param plan
+     * @param operator
+     * @return
+     */
+    private PhysicalOperator getDistinctUserFunc(PhysicalPlan plan, 
PhysicalOperator operator) {
+        if(operator instanceof POUserFunc ) { 
+            
if(((POUserFunc)operator).getFuncSpec().getClassName().startsWith(DISTINCT_UDF_CLASSNAME))
 {
+                return operator;
             }
         }
-
+        return getDistinctUserFunc(plan, 
plan.getPredecessors(operator).get(0));
+        
     }
 
+
     /**
      * @param fe
      */
@@ -520,16 +623,106 @@
 
     private class AlgebraicPlanChecker extends PhyPlanVisitor {
         boolean sawNonAlgebraic = false;
+        boolean sawDistinctAgg = false;
+        private boolean sawForeach = false;
 
         AlgebraicPlanChecker(PhysicalPlan plan) {
-            super(plan, new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(plan));
+            super(plan, new DependencyOrderWalker<PhysicalOperator, 
PhysicalPlan>(plan));
         }
 
+        /* (non-Javadoc)
+         * @see org.apache.pig.impl.plan.PlanVisitor#visit()
+         */
+        @Override
+        public void visit() throws VisitorException {
+            super.visit();
+            // if we saw foreach and distinct agg its ok
+            // else if we only saw foreach, mark it as non algebraic
+            if(sawForeach && !sawDistinctAgg) {
+                sawNonAlgebraic = true;
+            }
+        }
+        
         @Override
         public void visitDistinct(PODistinct distinct) throws VisitorException 
{
+            if(sawDistinctAgg) {
+                // we want to combine only in the case where there is only
+                // one PODistinct which is the only input to an agg
+                // we apparently have seen a PODistinct before, so lets not
+                // combine.
+                sawNonAlgebraic = true;
+            }
+            // check that this distinct is the only input to an agg
+            // We could have the following two cases
+            // script 1:
+            // ..
+            // b = group a by ...
+            // c = foreach b { x = distinct a; generate AGG(x), ...}
+            // The above script leads to the following plan for AGG(x):
+            // POUserFunc(org.apache.pig.builtin.COUNT)[long] 
+            //   |
+            //   |---Project[bag][*] 
+            //       |
+            //       |---PODistinct[bag] 
+            //           |
+            //           |---Project[tuple][1] 
+
+            // script 2:
+            // ..
+            // b = group a by ...
+            // c = foreach b { x = distinct a; generate AGG(x.$1), ...}
+            // The above script leads to the following plan for AGG(x.$1):
+            // POUserFunc(org.apache.pig.builtin.IntSum)[long]
+            //   |
+            //   |---Project[bag][1]
+            //       |
+            //       |---Project[bag][*]
+            //           |
+            //           |---PODistinct[bag]
+            //               |
+            //               |---Project[tuple][1]
+            // So tracing from the PODistinct to its successors upto the leaf, 
we should
+            // see a Project[bag][*] as the immediate successor and an 
optional Project[bag]
+            // as the next successor till we see the leaf.
+            PhysicalOperator leaf = mPlan.getLeaves().get(0);
+            // the leaf has to be a POUserFunc (need not be algebraic)
+            if(leaf instanceof POUserFunc) {
+                List<PhysicalOperator> immediateSuccs = 
mPlan.getSuccessors(distinct);
+                if(immediateSuccs.size() == 1 && immediateSuccs.get(0) 
instanceof POProject) {
+                    if(checkSuccessorIsLeaf(leaf, immediateSuccs.get(0))) { // 
script 1 above
+                        sawDistinctAgg = true;
+                        return;
+                    } else { // check for script 2 scenario above
+                        List<PhysicalOperator> nextSuccs = 
mPlan.getSuccessors(immediateSuccs.get(0));
+                        if(nextSuccs.size() == 1) {
+                            PhysicalOperator op = nextSuccs.get(0);
+                            if(op instanceof POProject) {
+                                if(checkSuccessorIsLeaf(leaf, op)) {
+                                    sawDistinctAgg = true;
+                                    return;
+                                }
+                            }
+                        }
+                        
+                    }
+                }
+            }
+            // if we did not return above, that means we did not see
+            // the pattern we expected
             sawNonAlgebraic = true;
         }
-
+        
+        private boolean checkSuccessorIsLeaf(PhysicalOperator leaf, 
PhysicalOperator opToCheck) {
+            List<PhysicalOperator> succs = mPlan.getSuccessors(opToCheck);
+            if(succs.size() == 1) {
+                PhysicalOperator op = succs.get(0);
+                if(op == leaf) {
+                    return true;
+                }
+            }
+            return false;
+        }
+        
         @Override
         public void visitFilter(POFilter filter) throws VisitorException {
             sawNonAlgebraic = true;
@@ -537,7 +730,11 @@
 
         @Override
         public void visitPOForEach(POForEach fe) throws VisitorException {
-            sawNonAlgebraic = true;
+            // we need to allow foreach as input for distinct
+            // but don't want it for other things (why?). So lets
+            // flag the presence of Foreach and if this is present
+            // with a distinct agg, it will be allowed.
+            sawForeach = true;
         }
 
         @Override
@@ -546,6 +743,84 @@
         }
 
     }
+    
+    /**
+     * A visitor to replace   
+     * Project[bag][*] 
+     *  |
+     *  |---PODistinct[bag]
+     * with 
+     * POUserFunc(org.apache.pig.builtin.Distinct)[DataBag]    
+     */
+    private class DistinctPatcher extends PhyPlanVisitor {
+
+        public boolean patched = false;
+        /**
+         * @param plan
+         * @param walker
+         */
+        public DistinctPatcher(PhysicalPlan plan,
+                PlanWalker<PhysicalOperator, PhysicalPlan> walker) {
+            super(plan, walker);
+        }
+
+        /**
+         * @param physicalPlan
+         */
+        public DistinctPatcher(PhysicalPlan physicalPlan) {
+            this(physicalPlan, new DependencyOrderWalker<PhysicalOperator, 
PhysicalPlan>(physicalPlan));
+        }
+        
+        /* (non-Javadoc)
+         * @see 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitProject(org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject)
+         */
+        @Override
+        public void visitProject(POProject proj) throws VisitorException {
+            // check if this project is preceded by PODistinct and
+            // has the return type bag
+            
+            
+            List<PhysicalOperator> preds = mPlan.getPredecessors(proj);
+            if(preds == null) return; // this is a leaf project and so not 
interesting for patching
+            PhysicalOperator pred = preds.get(0);
+            if(preds.size() == 1 && pred instanceof PODistinct) {
+                if(patched) {
+                    // we should not already have been patched since the
+                    // Project-Distinct pair should occur only once
+                    throw new VisitorException(
+                            "Unexpected Project-Distinct pair while trying to 
set up plans for use with combiner.");
+                }
+                // we have stick in the 
POUserfunc(org.apache.pig.builtin.Distinct)[DataBag]
+                // in place of the Project-PODistinct pair
+                PhysicalOperator distinctPredecessor = 
mPlan.getPredecessors(pred).get(0);
+                
+                try {
+                    String scope = proj.getOperatorKey().scope;
+                    List<PhysicalOperator> funcInput = new 
ArrayList<PhysicalOperator>();
+                    FuncSpec fSpec = new FuncSpec(DISTINCT_UDF_CLASSNAME);
+                    funcInput.add(distinctPredecessor);
+                    // explicitly set distinctPredecessor's result type to
+                    // be tuple - this is relevant when distinctPredecessor is
+                    // originally a POForeach with return type BAG - we need to
+                    // set it to tuple so we get a stream of tuples. 
+                    distinctPredecessor.setResultType(DataType.TUPLE);
+                    POUserFunc func = new POUserFunc(new OperatorKey(scope, 
+                            
NodeIdGenerator.getGenerator().getNextNodeId(scope)),-1, funcInput, fSpec);
+                    func.setResultType(DataType.BAG);
+                    mPlan.replace(proj, func);
+                    mPlan.remove(pred);
+                    // connect the the newly add "func" to
+                    // the predecessor to the earlier PODistinct
+                    mPlan.connect(distinctPredecessor, func);
+                } catch (PlanException e) {
+                    throw new VisitorException(e);
+                }
+                patched = true;
+            } 
+        }
+
+    }
+
 
     // Reset any member variables since we may have already visited one
     // combine.

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java
 Mon Jan  5 15:57:28 2009
@@ -52,19 +52,19 @@
         mStream.println("MapReduce node " + mr.getOperatorKey().toString());
         if (mr.mapPlan != null && mr.mapPlan.size() > 0) {
             mStream.println("Map Plan");
-            PlanPrinter printer = new PlanPrinter(mr.mapPlan);
+            PlanPrinter printer = new PlanPrinter(mr.mapPlan, mStream);
             printer.visit();
             mStream.println("--------");
         }
         if (mr.combinePlan != null && mr.combinePlan.size() > 0) {
             mStream.println("Combine Plan");
-            PlanPrinter printer = new PlanPrinter(mr.combinePlan);
+            PlanPrinter printer = new PlanPrinter(mr.combinePlan, mStream);
             printer.visit();
             mStream.println("--------");
         }
         if (mr.reducePlan != null && mr.reducePlan.size() > 0) {
             mStream.println("Reduce Plan");
-            PlanPrinter printer = new PlanPrinter(mr.reducePlan);
+            PlanPrinter printer = new PlanPrinter(mr.reducePlan, mStream);
             printer.visit();
             mStream.println("--------");
         }

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
 Mon Jan  5 15:57:28 2009
@@ -118,6 +118,29 @@
         }
         super.remove(op);
     }
+    
+    /* (non-Javadoc)
+     * @see 
org.apache.pig.impl.plan.OperatorPlan#replace(org.apache.pig.impl.plan.Operator,
 org.apache.pig.impl.plan.Operator)
+     */
+    @Override
+    public void replace(PhysicalOperator oldNode, PhysicalOperator newNode)
+            throws PlanException {
+        List<PhysicalOperator> oldNodeSuccessors = getSuccessors(oldNode);
+        super.replace(oldNode, newNode);
+        if(oldNodeSuccessors != null) {
+            for (PhysicalOperator preds : oldNodeSuccessors) {
+                List<PhysicalOperator> inputs = preds.getInputs();
+                // now replace oldNode with newNode in
+                // the input list of oldNode's successors
+                for(int i = 0; i < inputs.size(); i++) {
+                    if(inputs.get(i) == oldNode) {
+                        inputs.set(i, newNode);
+                    }
+                }    
+            }
+        }
+        
+    }
 
     /* (non-Javadoc)
      * @see 
org.apache.pig.impl.plan.OperatorPlan#add(org.apache.pig.impl.plan.Operator)

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
 Mon Jan  5 15:57:28 2009
@@ -20,6 +20,7 @@
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -48,15 +49,22 @@
     int levelCntr = -1;
 
     OutputStream printer;
+    
+    PrintStream stream = System.out;
 
     public PlanPrinter(P plan) {
         super(plan, new DepthFirstWalker<O, P>(plan));
     }
+    
+    public PlanPrinter(P plan, PrintStream stream) {
+        super(plan, new DepthFirstWalker<O, P>(plan));
+        this.stream = stream;
+    }
 
     @Override
     public void visit() throws VisitorException {
         try {
-            System.out.write(depthFirstPP().getBytes());
+            stream.write(depthFirstPP().getBytes());
         } catch (IOException e) {
             throw new VisitorException(e.getMessage());
         }
@@ -89,7 +97,7 @@
             }
         }
         if (newPredecessors.size() > 0) {
-            System.out.println();
+            stream.println();
             breadthFirst(newPredecessors, seen);
         }
     }
@@ -182,35 +190,35 @@
 
     private void dispTabs() {
         for (int i = 0; i < levelCntr; i++)
-            System.out.print(TAB1);
+            stream.print(TAB1);
     }
 
     public void visitLoad(POLoad op) {
-        System.out.print(op.name() + "   ");
+        stream.print(op.name() + "   ");
     }
 
     public void visitStore(POStore op) {
-        System.out.print(op.name() + "   ");
+        stream.print(op.name() + "   ");
     }
 
     public void visitFilter(POFilter op) {
-        System.out.print(op.name() + "   ");
+        stream.print(op.name() + "   ");
     }
 
     public void visitLocalRearrange(POLocalRearrange op) {
-        System.out.print(op.name() + "   ");
+        stream.print(op.name() + "   ");
     }
 
     public void visitGlobalRearrange(POGlobalRearrange op) {
-        System.out.print(op.name() + "   ");
+        stream.print(op.name() + "   ");
     }
 
     public void visitPackage(POPackage op) {
-        System.out.print(op.name() + "   ");
+        stream.print(op.name() + "   ");
     }
 
     public void visitStartMap(POUnion op) {
-        System.out.print(op.name() + "   ");
+        stream.print(op.name() + "   ");
     }
     
 }

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
 Mon Jan  5 15:57:28 2009
@@ -32,6 +32,7 @@
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -42,7 +43,7 @@
  * 
  * 
  */
-public class PODistinct extends PhysicalOperator {
+public class PODistinct extends PhysicalOperator implements Cloneable {
 
     private boolean inputsAccumulated = false;
     private DataBag distinctBag = null;
@@ -121,5 +122,13 @@
     public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitDistinct(this);
     }
+    /* (non-Javadoc)
+     * @see 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator#clone()
+     */
+    @Override
+    public PODistinct clone() throws CloneNotSupportedException {
+        // TODO Auto-generated method stub
+        return new PODistinct(new OperatorKey(this.mKey.scope, 
NodeIdGenerator.getGenerator().getNextNodeId(this.mKey.scope)), 
this.requestedParallelism, this.inputs);
+    }
 
 }

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
 Mon Jan  5 15:57:28 2009
@@ -497,9 +497,11 @@
                 flattens.add(b);
             }
         }
-        return new POForEach(new OperatorKey(mKey.scope, 
-            NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
-            requestedParallelism, plans, flattens);
+        POForEach clone = new POForEach(new OperatorKey(mKey.scope, 
+                NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
+                requestedParallelism, plans, flattens);
+        clone.setResultType(getResultType());
+        return clone;
     }
 
     public boolean inProcessing()

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
 Mon Jan  5 15:57:28 2009
@@ -186,7 +186,7 @@
 
     public void explain(PhysicalPlan plan, PrintStream stream) {
         try {
-            PlanPrinter printer = new PlanPrinter(plan);
+            PlanPrinter printer = new PlanPrinter(plan, stream);
             printer.visit();
             stream.println();
 

Added: hadoop/pig/branches/types/src/org/apache/pig/builtin/Distinct.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/Distinct.java?rev=731777&view=auto
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/Distinct.java (added)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/Distinct.java Mon Jan  
5 15:57:28 2009
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.WrappedIOException;
+
+/**
+ * Find the distinct set of tuples in a bag.
+ * This is a blocking operator. All the input is put in the hashset implemented
+ * in DistinctDataBag which also provides the other DataBag interfaces.
+ * 
+ * 
+ */
+public class Distinct  extends EvalFunc<DataBag> implements Algebraic {
+
+    private static BagFactory bagFactory = BagFactory.getInstance();
+    private static TupleFactory tupleFactory = TupleFactory.getInstance();
+    /* (non-Javadoc)
+     * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
+     */
+    @Override
+    public DataBag exec(Tuple input) throws IOException {
+        return getDistinct(input);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.Algebraic#getFinal()
+     */
+    @Override
+    public String getFinal() {
+        return Final.class.getName();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.Algebraic#getInitial()
+     */
+    @Override
+    public String getInitial() {
+        return Initial.class.getName();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.Algebraic#getIntermed()
+     */
+    @Override
+    public String getIntermed() {
+        return Intermediate.class.getName();
+    }
+
+    static public class Initial extends EvalFunc<Tuple> {
+
+        /* (non-Javadoc)
+         * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
+         */
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            // the input has  a single field which is a tuple
+            // representing the data we want to distinct. 
+            // unwrap, put in a bag and send down
+            try {
+                DataBag bag = bagFactory.newDefaultBag();
+                bag.add((Tuple)input.get(0));
+                return tupleFactory.newTuple(bag);
+            } catch (ExecException e) {
+                throw WrappedIOException.wrap(e);
+            }
+        }
+    }
+
+    static public class Intermediate extends EvalFunc<Tuple> {
+
+        /* (non-Javadoc)
+         * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
+         */
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            return tupleFactory.newTuple(getDistinctFromNestedBags(input));
+        }
+    }
+
+    static public class Final extends EvalFunc<DataBag> {
+
+        /* (non-Javadoc)
+         * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
+         */
+        @Override
+        public DataBag exec(Tuple input) throws IOException {
+            return getDistinctFromNestedBags(input);
+        }
+    }
+    
+    static private DataBag getDistinctFromNestedBags(Tuple input) throws 
IOException {
+        DataBag result = bagFactory.newDistinctBag();
+        try {
+            DataBag bg = (DataBag)input.get(0);
+            for (Tuple tuple : bg) {
+                // Each tuple has a single column
+                // which is a bag. Get tuples out of it
+                // and distinct over all tuples
+                for (Tuple t : (DataBag)tuple.get(0)) {
+                    result.add(t);
+                }
+            }
+        } catch (ExecException e) {
+           throw WrappedIOException.wrap(e);
+        }
+        return result;
+    }
+    
+    static protected DataBag getDistinct(Tuple input) throws IOException {
+        try {
+            DataBag inputBg = (DataBag)input.get(0);
+            DataBag result = bagFactory.newDistinctBag();
+            for (Tuple tuple : inputBg) {
+                result.add(tuple);
+            }
+            return result;
+        } catch (ExecException e) {
+             throw WrappedIOException.wrap(e);
+        }
+    }
+
+}

Modified: hadoop/pig/branches/types/test/org/apache/pig/test/MiniCluster.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/MiniCluster.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/MiniCluster.java 
(original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/MiniCluster.java Mon Jan 
 5 15:57:28 2009
@@ -105,4 +105,8 @@
     public Properties getProperties() {
         return ConfigurationUtil.toProperties(m_conf);
     }
+    
+    public FileSystem getFileSystem() {
+        return m_fileSys;
+    }
 }

Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java 
(original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java Mon Jan 
 5 15:57:28 2009
@@ -908,6 +908,40 @@
 
     }
     
+    
+    @Test
+    public void testDistinct() throws Exception {
+    
+        Integer[] inp = new Integer[] { 1, 2 , 3, 1 ,4, 5, 3};
+        DataBag inputBag = Util.createBagOfOneColumn(inp);
+        EvalFunc<Tuple> initial = new Distinct.Initial();
+        DataBag intermedInputBg1 = bagFactory.newDefaultBag();
+        DataBag intermedInputBg2 = bagFactory.newDefaultBag();
+        int i = 0;
+        for (Tuple t : inputBag) {
+            Tuple initialOutput = initial.exec(tupleFactory.newTuple(t));
+            if(i < inp.length/2 ) {
+                intermedInputBg1.add(initialOutput);
+            } else {
+                intermedInputBg2.add(initialOutput);
+            }
+            i++;
+        }
+        
+        EvalFunc<Tuple> intermed = new Distinct.Intermediate();
+        
+        DataBag finalInputBg = bagFactory.newDefaultBag();
+        
finalInputBg.add(intermed.exec(tupleFactory.newTuple(intermedInputBg1)));
+        
finalInputBg.add(intermed.exec(tupleFactory.newTuple(intermedInputBg2)));
+        EvalFunc<DataBag> fin = new Distinct.Final();
+        DataBag result = fin.exec(tupleFactory.newTuple(finalInputBg));
+        
+        Integer[] exp = new Integer[] { 1, 2, 3, 4, 5};
+        DataBag expectedBag = Util.createBagOfOneColumn(exp);
+        assertEquals(expectedBag, result);
+        
+    }
+    
     @Test
     public void testCONCAT() throws Exception {
         

Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestCombiner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestCombiner.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestCombiner.java 
(original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestCombiner.java Mon 
Jan  5 15:57:28 2009
@@ -17,11 +17,13 @@
  */
 package org.apache.pig.test;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
@@ -105,13 +107,20 @@
                 input[i] = Integer.toString(0);
             }
         }
-        File f1 = Util.createFile(input);
+        Util.createInputFile(cluster, "MultiCombinerUseInput.txt", input);
         Properties props = cluster.getProperties();
         props.setProperty("io.sort.mb", "1");
         PigServer pigServer = new PigServer(ExecType.MAPREDUCE, props);
-        pigServer.registerQuery("a = load '" + Util.generateURI(f1.toString()) 
+ "' as (x:int);");
+        pigServer.registerQuery("a = load 'MultiCombinerUseInput.txt' as 
(x:int);");
         pigServer.registerQuery("b = group a all;");
         pigServer.registerQuery("c = foreach b generate COUNT(a), SUM(a.$0), 
MIN(a.$0), MAX(a.$0), AVG(a.$0);");
+
+        // make sure there is a combine plan in the explain output
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(baos);
+        pigServer.explain("c", ps);
+        assertTrue(baos.toString().matches("(?si).*combine plan.*"));
+
         Iterator<Tuple> it = pigServer.openIterator("c");
         Tuple t = it.next();
         assertEquals(512000L, t.get(0));
@@ -120,6 +129,156 @@
         assertEquals(1, t.get(3));
         assertEquals(0.5, t.get(4));
         assertFalse(it.hasNext());
+        Util.deleteFile(cluster, "MultiCombinerUseInput.txt");
+    }
+    
+    @Test
+    public void testDistinctAggs1() throws Exception {
+        // test the use of combiner for distinct aggs:
+        String input[] = {
+                "pig1\t18\t2.1",
+                "pig2\t24\t3.3",
+                "pig5\t45\t2.4",
+                "pig1\t18\t2.1",
+                "pig1\t19\t2.1",
+                "pig2\t24\t4.5",
+                "pig1\t20\t3.1" };
+
+        Util.createInputFile(cluster, "distinctAggs1Input.txt", input);
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties());
+        pigServer.registerQuery("a = load 'distinctAggs1Input.txt' as 
(name:chararray, age:int, gpa:double);");
+        pigServer.registerQuery("b = group a by name;");
+        pigServer.registerQuery("c = foreach b  {" +
+                       "        x = distinct a.age;" +
+                       "        y = distinct a.gpa;" +
+                       "        z = distinct a;" +
+                       "        generate group, COUNT(x), SUM(x.age), 
SUM(y.gpa), SUM(a.age), " +
+                       "                       SUM(a.gpa), COUNT(z.age), 
COUNT(z), SUM(z.age);};");
+        
+        // make sure there is a combine plan in the explain output
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(baos);
+        pigServer.explain("c", ps);
+        assertTrue(baos.toString().matches("(?si).*combine plan.*"));
+
+        HashMap<String, Object[]> results = new HashMap<String, Object[]>();
+        results.put("pig1", new Object[] 
{"pig1",3L,57L,5.2,75L,9.4,3L,3L,57L});
+        results.put("pig2", new Object[] 
{"pig2",1L,24L,7.8,48L,7.8,2L,2L,48L});
+        results.put("pig5", new Object[] 
{"pig5",1L,45L,2.4,45L,2.4,1L,1L,45L});
+        Iterator<Tuple> it = pigServer.openIterator("c");
+        while(it.hasNext()) {
+            Tuple t = it.next();
+            List<Object> fields = t.getAll();
+            Object[] expected = results.get((String)fields.get(0));
+            int i = 0;
+            for (Object field : fields) {
+                assertEquals(expected[i++], field);
+            }
+        }
+        Util.deleteFile(cluster, "distinctAggs1Input.txt");
+        
+    }
+
+    @Test
+    public void testDistinctNoCombiner() throws Exception {
+        // test that combiner is NOT invoked when
+        // one of the elements in the foreach generate
+        // is a distinct() as the leaf
+        String input[] = {
+                "pig1\t18\t2.1",
+                "pig2\t24\t3.3",
+                "pig5\t45\t2.4",
+                "pig1\t18\t2.1",
+                "pig1\t19\t2.1",
+                "pig2\t24\t4.5",
+                "pig1\t20\t3.1" };
+
+        Util.createInputFile(cluster, "distinctNoCombinerInput.txt", input);
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties());
+        pigServer.registerQuery("a = load 'distinctNoCombinerInput.txt' as 
(name:chararray, age:int, gpa:double);");
+        pigServer.registerQuery("b = group a by name;");
+        pigServer.registerQuery("c = foreach b  {" +
+                "        z = distinct a;" +
+                "        generate group, z, SUM(a.age), SUM(a.gpa);};");
+        
+        // make sure there is a combine plan in the explain output
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(baos);
+        pigServer.explain("c", ps);
+        assertFalse(baos.toString().matches("(?si).*combine plan.*"));
+
+        HashMap<String, Object[]> results = new HashMap<String, Object[]>();
+        results.put("pig1", new Object[] {"pig1","bag-place-holder",75L,9.4});
+        results.put("pig2", new Object[] {"pig2","bag-place-holder",48L,7.8});
+        results.put("pig5", new Object[] {"pig5","bag-place-holder",45L,2.4});
+        Iterator<Tuple> it = pigServer.openIterator("c");
+        while(it.hasNext()) {
+            Tuple t = it.next();
+            List<Object> fields = t.getAll();
+            Object[] expected = results.get((String)fields.get(0));
+            int i = 0;
+            for (Object field : fields) {
+                if(i == 1) {
+                    // ignore the second field which is a bag
+                    // for comparison here
+                    continue;
+                }
+                assertEquals(expected[i++], field);
+            }
+        }
+        Util.deleteFile(cluster, "distinctNoCombinerInput.txt");
+        
+    }
+
+    @Test
+    public void testForEachNoCombiner() throws Exception {
+        // test that combiner is NOT invoked when
+        // one of the elements in the foreach generate
+        // has a foreach in the plan without a distinct agg
+        String input[] = {
+                "pig1\t18\t2.1",
+                "pig2\t24\t3.3",
+                "pig5\t45\t2.4",
+                "pig1\t18\t2.1",
+                "pig1\t19\t2.1",
+                "pig2\t24\t4.5",
+                "pig1\t20\t3.1" };
+
+        Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties());
+        pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as 
(name:chararray, age:int, gpa:double);");
+        pigServer.registerQuery("b = group a by name;");
+        pigServer.registerQuery("c = foreach b  {" +
+                "        z = a.age;" +
+                "        generate group, z, SUM(a.age), SUM(a.gpa);};");
+        
+        // make sure there is a combine plan in the explain output
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(baos);
+        pigServer.explain("c", ps);
+        assertFalse(baos.toString().matches("(?si).*combine plan.*"));
+
+        HashMap<String, Object[]> results = new HashMap<String, Object[]>();
+        results.put("pig1", new Object[] {"pig1","bag-place-holder",75L,9.4});
+        results.put("pig2", new Object[] {"pig2","bag-place-holder",48L,7.8});
+        results.put("pig5", new Object[] {"pig5","bag-place-holder",45L,2.4});
+        Iterator<Tuple> it = pigServer.openIterator("c");
+        while(it.hasNext()) {
+            Tuple t = it.next();
+            List<Object> fields = t.getAll();
+            Object[] expected = results.get((String)fields.get(0));
+            int i = 0;
+            for (Object field : fields) {
+                if(i == 1) {
+                    // ignore the second field which is a bag
+                    // for comparison here
+                    continue;
+                }
+                assertEquals(expected[i++], field);
+            }
+        }
+        Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
+        
     }
 
 }

Modified: hadoop/pig/branches/types/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/Util.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/Util.java Mon Jan  5 
15:57:28 2009
@@ -29,6 +29,9 @@
 import static java.util.regex.Matcher.quoteReplacement;
 import junit.framework.Assert;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.*;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
@@ -118,6 +121,16 @@
         return b;
     }
     
+    static public<T> DataBag createBagOfOneColumn(T[] input) throws 
ExecException {
+        DataBag result = mBagFactory.newDefaultBag();
+        for (int i = 0; i < input.length; i++) {
+            Tuple t = mTupleFactory.newTuple(1);
+            t.set(0, input[i]);
+            result.add(t);
+        }
+        return result;
+    }
+    
     static public Map<Object, Object> createMap(String[] contents)
     {
         Map<Object, Object> m = new HashMap<Object, Object>();
@@ -179,6 +192,44 @@
                pw.close();
                return f;
        }
+       
+       /**
+     * Helper to create a dfs file on the Minicluster DFS with given
+     * input data for use in test cases.
+     * 
+     * @param miniCluster reference to the Minicluster where the file should 
be created
+     * @param fileName pathname of the file to be created
+     * @param inputData input for test cases, each string in inputData[] is 
written
+     *                  on one line
+     * @throws IOException
+     */
+    static public void createInputFile(MiniCluster miniCluster, String 
fileName, 
+                                       String[] inputData) 
+    throws IOException {
+        FileSystem fs = miniCluster.getFileSystem();
+        if(fs.exists(new Path(fileName))) {
+            throw new IOException("File " + fileName + " already exists on the 
minicluster");
+        }
+        FSDataOutputStream stream = fs.create(new Path(fileName));
+        PrintWriter pw = new PrintWriter(new OutputStreamWriter(stream, 
"UTF-8"));
+        for (int i=0; i<inputData.length; i++){
+            pw.println(inputData[i]);
+        }
+        pw.close();
+    }
+    
+    /**
+     * Helper to remove a dfs file from the minicluster DFS
+     * 
+     * @param miniCluster reference to the Minicluster where the file should 
be deleted
+     * @param fileName pathname of the file to be deleted
+     * @throws IOException
+     */
+    static public void deleteFile(MiniCluster miniCluster, String fileName) 
+    throws IOException {
+        FileSystem fs = miniCluster.getFileSystem();
+        fs.delete(new Path(fileName), true);
+    }
 
        /**
         * Helper function to check if the result of a Pig Query is in line 
with 


Reply via email to