Author: gates
Date: Fri May 16 14:38:40 2008
New Revision: 657224

URL: http://svn.apache.org/viewvc?rev=657224&view=rev
Log:
PIG-162 Shravan's addition of PigProgressable and distinct for MR.


Added:
    
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/ProgressableReporter.java
    
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld
Removed:
    
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/RunnableReporter.java
Modified:
    incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java
    incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapBase.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PigProgressable.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUnion.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserComparisonFunc.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserFunc.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java

Modified: incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java?rev=657224&r1=657223&r2=657224&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java 
(original)
+++ incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java Fri May 
16 14:38:40 2008
@@ -23,9 +23,14 @@
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.physicalLayer.PigProgressable;
 
 
 public abstract class ComparisonFunc extends WritableComparator {
+    // If the comparison is a time consuming process
+    // this reporter must be used to report progress
+    protected PigProgressable reporter;
+    
     public ComparisonFunc() {
         super(TupleFactory.getInstance().tupleClass());
     }
@@ -48,4 +53,8 @@
      * @see java.util.Comparator
      */
     abstract public int compare(Tuple t1, Tuple t2);
+
+    public void setReporter(PigProgressable reporter) {
+        this.reporter = reporter;
+    }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java?rev=657224&r1=657223&r2=657224&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java Fri May 16 
14:38:40 2008
@@ -26,6 +26,7 @@
 // TODO FIX
 // import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.physicalLayer.PigProgressable;
 
 
 /**
@@ -44,6 +45,9 @@
  *
  */
 public abstract class EvalFunc<T>  {
+    // UDFs must use this to report progress
+    // if the exec is taking more that 300 ms
+    protected PigProgressable reporter;
     
     protected Type returnType;
     
@@ -107,16 +111,7 @@
         
     // report that progress is being made (otherwise hadoop times out after 
600 seconds working on one outer tuple)
     protected void progress() { 
-        //This part appears to be unused and is causing problems due to 
changing hadoop signature
-        /*
-        if (PigMapReduce.reporter != null) {
-            try {
-                PigMapReduce.reporter.progress();
-            } catch (IOException ignored) {
-            }
-        }
-        */
-        
+        if(reporter!=null) reporter.progress();
     }
 
     /**
@@ -158,4 +153,14 @@
     public boolean isAsynchronous(){
         return false;
     }
+
+
+    public PigProgressable getReporter() {
+        return reporter;
+    }
+
+
+    public void setReporter(PigProgressable reporter) {
+        this.reporter = reporter;
+    }
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java?rev=657224&r1=657223&r2=657224&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
 Fri May 16 14:38:40 2008
@@ -47,6 +47,8 @@
 import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
 import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.plans.PlanPrinter;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PODistinct;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
@@ -54,21 +56,19 @@
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
-import org.apache.pig.impl.physicalLayer.topLevelOperators.PORead;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POSort;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POSplit;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
 import 
org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
-import 
org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
 import 
org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
 import 
org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserFunc;
 import org.apache.pig.impl.plan.DepthFirstWalker;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
 
 /**
  * The compiler that compiles a given physical plan
@@ -657,6 +657,68 @@
             throw pe;
         }
     }
+    
+    
+
+    @Override
+    public void visitDistinct(PODistinct op) throws VisitorException {
+        try{
+            MapReduceOper mro = compiledInputs[0];
+            ExprPlan ep = new ExprPlan();
+            POProject prjStar = new POProject(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
+            prjStar.setResultType(DataType.TUPLE);
+            prjStar.setStar(true);
+            ep.add(prjStar);
+            
+            List<ExprPlan> eps = new ArrayList<ExprPlan>();
+            eps.add(ep);
+            
+            POLocalRearrange lr = new POLocalRearrange(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
+            lr.setIndex(0);
+            lr.setKeyType(DataType.TUPLE);
+            lr.setPlans(eps);
+            lr.setResultType(DataType.TUPLE);
+            if(!mro.isMapDone()){
+                mro.mapPlan.addAsLeaf(lr);
+            }
+            else if(mro.isMapDone() && ! mro.isReduceDone()){
+                mro.reducePlan.addAsLeaf(lr);
+            }
+            
+            blocking(op);
+            
+            POPackage pkg = new POPackage(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
+            pkg.setKeyType(DataType.TUPLE);
+            pkg.setNumInps(1);
+            boolean[] inner = {false}; 
+            pkg.setInner(inner);
+            curMROp.reducePlan.add(pkg);
+            
+            List<ExprPlan> eps1 = new ArrayList<ExprPlan>();
+            List<Boolean> flat1 = new ArrayList<Boolean>();
+            ExprPlan ep1 = new ExprPlan();
+            POProject prj1 = new POProject(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
+            prj1.setResultType(DataType.TUPLE);
+            prj1.setStar(false);
+            prj1.setColumn(0);
+            prj1.setOverloaded(false);
+            ep1.add(prj1);
+            eps1.add(ep1);
+            flat1.add(false);
+            POGenerate fe1Gen = new POGenerate(new 
OperatorKey(scope,nig.getNextNodeId(scope)),eps1,flat1);
+            fe1Gen.setResultType(DataType.TUPLE);
+            PhysicalPlan<PhysicalOperator> fe1Plan = new 
PhysicalPlan<PhysicalOperator>();
+            fe1Plan.add(fe1Gen);
+            POForEach fe1 = new POForEach(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
+            fe1.setPlan(fe1Plan);
+            fe1.setResultType(DataType.TUPLE);
+            curMROp.reducePlan.addAsLeaf(fe1);
+        }catch(Exception e){
+            VisitorException pe = new VisitorException(e.getMessage());
+            pe.initCause(e);
+            throw pe;
+        }
+    }
 
     @Override
     public void visitSort(POSort op) throws VisitorException {
@@ -909,10 +971,11 @@
         POLoad ld = comp.getLoad();
         pj.mapPlan.add(ld);
 
-        POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, null,
-                sortPlans, mAscCols, null);
-        
-        pj.mapPlan.addAsLeaf(sort);
+        /*POSort op = new POSort(new OperatorKey("", r.nextLong()), -1, null,
+                sortPlans, mAscCols, null);*/
+        PODistinct op = new PODistinct(new OperatorKey("", r.nextLong()),
+                -1, null);
+        pj.mapPlan.addAsLeaf(op);
         
         POStore st = comp.getStore();
         pj.mapPlan.addAsLeaf(st);
@@ -920,10 +983,7 @@
         MRCompiler c1 = new MRCompiler(pj.mapPlan,pc);
         c1.compile();
         MROperPlan plan = c1.getMRPlan();
-        for(int i=0;i<3;i++){
-            MapReduceOper job = plan.getLeaves().get(0);
-            System.out.println(job.name());
-            plan.remove(job);
-        }
+        PlanPrinter<MapReduceOper, MROperPlan> pp = new 
PlanPrinter<MapReduceOper, MROperPlan>(plan);
+        pp.print(System.out);
     }
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapBase.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapBase.java?rev=657224&r1=657223&r2=657224&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapBase.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapBase.java
 Fri May 16 14:38:40 2008
@@ -27,16 +27,12 @@
 public abstract class PigMapBase extends MapReduceBase{
     private final Log log = LogFactory.getLog(getClass());
     
-    protected String inputFile = null;
-    
     //Map Plan
     protected PhysicalPlan<PhysicalOperator> mp;
     
-    //The reporter that handles communicating progress
-    protected RunnableReporter runnableReporter;
-    
-    //The thread used to run the runnableReporter
-    protected Thread reporterThread;
+    // Reporter that will be used by operators
+    // to transmit heartbeat
+    ProgressableReporter pigReporter;
     
     /**
      * Will be called when all the tuples in the input
@@ -45,13 +41,8 @@
     @Override
     public void close() throws IOException {
         super.close();
-        if(runnableReporter!=null){
-            runnableReporter.setDone(true);
-            runnableReporter = null;
-        }
-        reporterThread = null;
+        PhysicalOperator.setReporter(null);
         mp = null;
-        inputFile = null;
     }
 
     /**
@@ -61,7 +52,6 @@
     @Override
     public void configure(JobConf job) {
         super.configure(job);
-        inputFile = job.get("map.input.file", "UNKNOWN");
         try {
             mp = (PhysicalPlan<PhysicalOperator>) 
ObjectSerializer.deserialize(job
                     .get("pig.mapPlan"));
@@ -77,9 +67,8 @@
             // till here
             
             long sleepTime = job.getLong("pig.reporter.sleep.time", 10000);
-            runnableReporter = new RunnableReporter(sleepTime);
-            reporterThread = new Thread(runnableReporter);
-            reporterThread.start();
+            
+            pigReporter = new ProgressableReporter();
         } catch (IOException e) {
             log.error(e.getMessage() + "was caused by:");
             log.error(e.getCause().getMessage());
@@ -99,8 +88,8 @@
             OutputCollector<WritableComparable, Writable> oc,
             Reporter reporter) throws IOException {
         
-        runnableReporter.setReporter(reporter);
-        runnableReporter.setInputFile(inputFile);
+        pigReporter.setRep(reporter);
+        PhysicalOperator.setReporter(pigReporter);
         
         if(mp.isEmpty()){
             try{

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java?rev=657224&r1=657223&r2=657224&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java
 Fri May 16 14:38:40 2008
@@ -38,10 +38,8 @@
 import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.TargetedTuple;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
@@ -78,141 +76,6 @@
             WritableComparable wcKey = 
DataType.getWritableComparableTypes(key);
             oc.collect(wcKey, it);
         }
-        /*private final Log log = LogFactory.getLog(getClass());
-        
-        private String inputFile = null;
-        
-        //Map Plan
-        private PhysicalPlan<PhysicalOperator> mp;
-        
-        //The reporter that handles communicating progress        
-        RunnableReporter runnableReporter;
-        
-        //The thread used to run the runnableReporter        
-        Thread reporterThread;
-        
-        *//**
-         * Configures the mapper with the map plan and the
-         * reproter thread
-         *//*
-        @Override
-        public void configure(JobConf jConf) {
-            super.configure(jConf);
-            inputFile = jConf.get("map.input.file", "UNKNOWN");
-            try {
-                mp = (PhysicalPlan<PhysicalOperator>) 
ObjectSerializer.deserialize(jConf
-                        .get("pig.mapPlan"));
-                
-                // To be removed
-                if(mp.isEmpty())
-                    log.debug("Map Plan empty!");
-                else{
-                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                    mp.explain(baos);
-                    log.debug(baos.toString());
-                }
-                // till here
-                
-                long sleepTime = jConf.getLong("pig.reporter.sleep.time", 
10000);
-                runnableReporter = new RunnableReporter(sleepTime);
-                reporterThread = new Thread(runnableReporter);
-                reporterThread.start();
-            } catch (IOException e) {
-                log.error(e.getMessage() + "was caused by:");
-                log.error(e.getCause().getMessage());
-            }
-        }
-        
-        *//**
-         * The map function that attaches the inpTuple appropriately
-         * and executes the map plan if its not empty. Collects the
-         * result of execution into oc or the input directly to oc
-         * if map plan empty. The collection is done after extracting
-         * the key and indexed tuple.
-         *//*
-        public void map(Text key, TargetedTuple inpTuple,
-                OutputCollector<WritableComparable, Writable> oc,
-                Reporter reporter) throws IOException {
-            
-            runnableReporter.setReporter(reporter);
-            runnableReporter.setInputFile(inputFile);
-            
-            if(mp.isEmpty()){
-                try{
-                    collectKeyAndTuple(oc,inpTuple.toTuple());
-                } catch (ExecException e) {
-                    IOException ioe = new IOException(e.getMessage());
-                    ioe.initCause(e.getCause());
-                    throw ioe;
-                }
-                return;
-            }
-            
-            for (OperatorKey targetKey : inpTuple.targetOps) {
-                PhysicalOperator<PhyPlanVisitor> target = 
mp.getOperator(targetKey);
-                Tuple t = inpTuple.toTuple();
-                target.attachInput(t);
-            }
-            
-            List<PhysicalOperator> leaves = mp.getLeaves();
-            
-            PhysicalOperator leaf = leaves.get(0);
-            try {
-                while(true){
-                    Result res = leaf.getNext(inpTuple);
-                    if(res.returnStatus==POStatus.STATUS_OK){
-                        collectKeyAndTuple(oc,(Tuple)res.result);
-                        continue;
-                    }
-                    
-                    if(res.returnStatus==POStatus.STATUS_EOP)
-                        return;
-                    
-                    if(res.returnStatus==POStatus.STATUS_NULL)
-                        continue;
-                    
-                    if(res.returnStatus==POStatus.STATUS_ERR){
-                        IOException ioe = new IOException("Received Error 
while " +
-                                "processing the map plan.");
-                        throw ioe;
-                    }
-                }
-            } catch (ExecException e) {
-                IOException ioe = new IOException(e.getMessage());
-                ioe.initCause(e.getCause());
-                throw ioe;
-            }
-        }
-        
-        *//**
-         * Assumes that the tuple is of the form (key,indexedTuple) and
-         * extracts the key & indexed tuple. The key is then converted
-         * to the appropriate Hadoop type and the Hadoop type and IndexedTup
-         * are collected into the output collector
-         * @param oc - Output Collector
-         * @param tuple - The tuple which is the result of a LR either directly
-         *                  or by loading a file which has the output of a LR
-         * @throws ExecException
-         * @throws IOException
-         *//*
-        private void collectKeyAndTuple(OutputCollector<WritableComparable, 
Writable> oc, Tuple tuple) throws ExecException, IOException {
-            Object key = tuple.get(0);
-            IndexedTuple it = (IndexedTuple)tuple.get(1);
-            WritableComparable wcKey = 
DataType.getWritableComparableTypes(key);
-            oc.collect(wcKey, it);
-        }
-        
-        *//**
-         * Will be called when all the tuples in the input
-         * are done. So reporter thread should be stopped.
-         *//*
-        @Override
-        public void close() throws IOException {
-            super.close();
-            if(runnableReporter!=null)
-                runnableReporter.setDone(true);
-        }*/
-
     }
 
     public static class Reduce extends MapReduceBase
@@ -230,11 +93,7 @@
         //plan
         private POPackage pack;
         
-        //The reporter that handles communicating progress        
-        RunnableReporter runnableReporter;
-        
-        //The thread used to run the runnableReporter        
-        Thread reporterThread;
+        ProgressableReporter pigReporter;
         
         /**
          * Configures the Reduce plan, the POPackage operator
@@ -258,9 +117,8 @@
                 // till here
                 
                 long sleepTime = jConf.getLong("pig.reporter.sleep.time", 
10000);
-                runnableReporter = new RunnableReporter(sleepTime);
-                reporterThread = new Thread(runnableReporter);
-                reporterThread.start();
+
+                pigReporter = new ProgressableReporter();
             } catch (IOException e) {
                 log.error(e.getMessage() + "was caused by:");
                 log.error(e.getCause().getMessage());
@@ -278,7 +136,7 @@
                 OutputCollector<WritableComparable, Writable> oc,
                 Reporter reporter) throws IOException {
             
-            runnableReporter.setReporter(reporter);
+            pigReporter.setRep(reporter);
             
             Object k = DataType.convertToPigType(key);
             pack.attachInput(k, indInp);
@@ -345,8 +203,9 @@
         @Override
         public void close() throws IOException {
             super.close();
-            if(runnableReporter!=null)
-                runnableReporter.setDone(true);
+            /*if(runnableReporter!=null)
+                runnableReporter.setDone(true);*/
+            PhysicalOperator.setReporter(null);
         }
     }
     

Added: 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/ProgressableReporter.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/ProgressableReporter.java?rev=657224&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/ProgressableReporter.java
 (added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/ProgressableReporter.java
 Fri May 16 14:38:40 2008
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.mapReduceLayer;
+
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.pig.impl.physicalLayer.PigProgressable;
+
+public class ProgressableReporter implements PigProgressable {
+    Reporter rep;
+    
+    public ProgressableReporter(){
+        
+    }
+    
+    public ProgressableReporter(Reporter rep) {
+        super();
+        this.rep = rep;
+    }
+
+    public void progress() {
+        rep.progress();
+    }
+
+    public void progress(String msg) {
+        rep.setStatus(msg);
+    }
+
+    public void setRep(Reporter rep) {
+        this.rep = rep;
+    }
+
+}

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PigProgressable.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PigProgressable.java?rev=657224&r1=657223&r2=657224&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PigProgressable.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PigProgressable.java
 Fri May 16 14:38:40 2008
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.pig.impl.physicalLayer;
 
 public interface PigProgressable {

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java?rev=657224&r1=657223&r2=657224&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java
 Fri May 16 14:38:40 2008
@@ -214,7 +214,7 @@
                     }
                     
                 }
-                
+                if(reporter!=null) reporter.progress();
                 //CreateTuple(data);
                 res.result = CreateTuple(data);
                 res.returnStatus = POStatus.STATUS_OK;

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java?rev=657224&r1=657223&r2=657224&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java
 Fri May 16 14:38:40 2008
@@ -170,6 +170,7 @@
         while (indTupIter.hasNext()) {
             IndexedTuple it = indTupIter.next();
             dbs[it.index].add(it.toTuple());
+            if(reporter!=null) reporter.progress();
         }
         
         //Construct the output tuple by appending

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUnion.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUnion.java?rev=657224&r1=657223&r2=657224&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUnion.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUnion.java
 Fri May 16 14:38:40 2008
@@ -118,6 +118,7 @@
             Result res;
             
             while(true){
+                if(reporter!=null) reporter.progress();
                 res = inputs.get(ind).getNext(t);
                 if(res.returnStatus == POStatus.STATUS_NULL)
                     continue;

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java?rev=657224&r1=657223&r2=657224&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java
 Fri May 16 14:38:40 2008
@@ -28,6 +28,7 @@
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.PigProgressable;
 import org.apache.pig.impl.physicalLayer.Result;
 import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.Operator;
@@ -85,6 +86,11 @@
 
     // The result of performing the operation along with the output
     protected Result res = null;
+    
+    // Will be used by operators to report status or transmit heartbeat
+    // Should be set by the backends to appropriate implementations that
+    // wrap their own version of a reporter.
+    protected static PigProgressable reporter;
 
     // Dummy types used to access the getNext of appropriate
     // type. These will be null
@@ -197,6 +203,7 @@
      * @throws ExecException
      */
     public Result processInput() throws ExecException {
+        
         Result res = new Result();
         Tuple inpValue = null;
         if (input == null && (inputs == null || inputs.size()==0)) {
@@ -204,6 +211,10 @@
             res.returnStatus = POStatus.STATUS_EOP;
             return res;
         }
+        
+        //Should be removed once the model is clear
+        if(reporter!=null) reporter.progress();
+            
         if (!isInputAttached())
             return inputs.get(0).getNext(inpValue);
         else {
@@ -256,4 +267,8 @@
         return res;
     }
 
+    public static void setReporter(PigProgressable reporter) {
+        PhysicalOperator.reporter = reporter;
+    }
+
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserComparisonFunc.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserComparisonFunc.java?rev=657224&r1=657223&r2=657224&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserComparisonFunc.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserComparisonFunc.java
 Fri May 16 14:38:40 2008
@@ -17,7 +17,11 @@
 
 public class POUserComparisonFunc extends POUserFunc {
 
-       transient ComparisonFunc func;
+       /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    transient ComparisonFunc func;
        private Log log = LogFactory.getLog(getClass());
        
        public POUserComparisonFunc(OperatorKey k, int rp, List inp, String 
funcSpec, ComparisonFunc func) {
@@ -34,6 +38,7 @@
        
        private void instantiateFunc() {
                this.func = (ComparisonFunc) 
PigContext.instantiateFuncFromSpec(this.funcSpec);
+        this.func.setReporter(reporter);
        }
        
        public ComparisonFunc getComparator() {

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserFunc.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserFunc.java?rev=657224&r1=657223&r2=657224&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserFunc.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserFunc.java
 Fri May 16 14:38:40 2008
@@ -38,12 +38,15 @@
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
 import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
-import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 
 public class POUserFunc extends ExpressionOperator {
 
-       transient EvalFunc func;
+       /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    transient EvalFunc func;
        Tuple t1, t2;
        private final Log log = LogFactory.getLog(getClass());
        String funcSpec;
@@ -73,6 +76,7 @@
 
        private void instantiateFunc() {
                this.func = (EvalFunc) 
PigContext.instantiateFuncFromSpec(this.funcSpec);
+        this.func.setReporter(reporter);
        }
 
        private Result getNext() throws ExecException {
@@ -85,6 +89,7 @@
                try {
                        if (inputAttached) {
                                result.result = func.exec(input);
+                if(reporter!=null) reporter.progress();
                                result.returnStatus = (result.result != null) ? 
POStatus.STATUS_OK
                                                : POStatus.STATUS_EOP;
                                return result;

Modified: 
incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java?rev=657224&r1=657223&r2=657224&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java 
(original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java 
Fri May 16 14:38:40 2008
@@ -42,6 +42,7 @@
 import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.physicalLayer.plans.PlanPrinter;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PODistinct;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
 import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
@@ -95,7 +96,7 @@
         GenPhyOp.setR(r);
         
         GenPhyOp.setPc(pc);
-        int numTests = 15;
+        int numTests = 16;
 //        int numTests = 9;
         tests = new String[numTests];
         int cnt = -1;
@@ -107,6 +108,7 @@
         for (int i = 1; i <= 3; i++)
             tests[++cnt] = "intTestSpl" + i;
         tests[++cnt] = "intTestSortUDF1";
+        tests[++cnt] = "intTestDistinct1";
     }
 
     @After
@@ -782,6 +784,28 @@
         php.addAsLeaf(st);
     }
     
+    public static void intTestDistinct1() throws PlanException, ExecException{
+        php = new PhysicalPlan<PhysicalOperator>();
+        PhysicalPlan<PhysicalOperator> ldFil1 = GenPhyOp.loadedFilter();
+        php.merge(ldFil1);
+        
+        PODistinct op = new PODistinct(new OperatorKey("", r.nextLong()),
+                -1, null);
+        
+        php.addAsLeaf(op);
+        
+        PhysicalPlan<PhysicalOperator> grpChain1 = GenPhyOp.grpChain();
+        php.merge(grpChain1);
+        php.connect(op,grpChain1.getRoots().get(0));
+        
+        PODistinct op1 = new PODistinct(new OperatorKey("", r.nextLong()),
+                -1, null);
+        
+        php.addAsLeaf(op1);
+        POStore st = GenPhyOp.topStoreOp();
+        php.addAsLeaf(st);
+    }
+    
     public static class WeirdComparator extends ComparisonFunc {
 
         @Override

Added: 
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld?rev=657224&view=auto
==============================================================================
--- 
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld
 (added)
+++ 
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld
 Fri May 16 14:38:40 2008
@@ -0,0 +1,41 @@
+MapReduce(-1) - MRCompiler-176:
+|   Store(DummyFil:DummyLdr) - -6079615556647418436
+|   |
+|   |---For Each - MRCompiler-180
+|       |   |
+|       |   POGenerate(false)  - MRCompiler-179
+|       |   |   |
+|       |   |   Project(0) - MRCompiler-178
+|       |
+|       |---Package - MRCompiler-177
+|   Load(/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) 
- MRCompiler-175
+|
+|---MapReduce(-1) - MRCompiler-171:
+    |   
Store(/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) - 
MRCompiler-174
+    |   |
+    |   |---Local Rearrange - MRCompiler-173
+    |       |   |
+    |       |   Project(*) - MRCompiler-172
+    |       |
+    |       |---Package - -8219725798912083822
+    |   
Load(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - 
MRCompiler-170
+    |
+    |---MapReduce(-1) - MRCompiler-162:
+        |   
Store(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - 
MRCompiler-169
+        |   |
+        |   |---Local Rearrange - --3889827013424534115
+        |       |
+        |       |---For Each - MRCompiler-168
+        |           |   |
+        |           |   POGenerate(false)  - MRCompiler-167
+        |           |   |   |
+        |           |   |   Project(0) - MRCompiler-166
+        |           |
+        |           |---Package - MRCompiler-165
+        |   Local Rearrange - MRCompiler-164
+        |   |   |
+        |   |   Project(*) - MRCompiler-163
+        |   |
+        |   |---Filter - --1613182091613226659
+        |       |
+        |       |---Load(DummyFil:DummyLdr) - -5321755951016030071
\ No newline at end of file


Reply via email to