Author: gates
Date: Tue May 20 16:16:46 2008
New Revision: 658485

URL: http://svn.apache.org/viewvc?rev=658485&view=rev
Log:
Removed most of the TODO fixes to tie things together.  Also includes code from 
Shravan to rework the launcher so it can report status.


Added:
    
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MRPrinter.java
Removed:
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/MapRedPhysicalPlan.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalPhysicalPlan.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POVisitor.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PlanCompiler.java
Modified:
    incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java
    incubator/pig/branches/types/src/org/apache/pig/PigServer.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
    incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java
    incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/FileSpec.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POPrinter.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java

Modified: incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java?rev=658485&r1=658484&r2=658485&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java Tue May 20 
16:16:46 2008
@@ -23,8 +23,7 @@
 import java.lang.reflect.Type;
 
 import org.apache.pig.data.Tuple;
-// TODO FIX
-// import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.physicalLayer.PigProgressable;
 
@@ -77,8 +76,6 @@
         
         
         //Type check the initial, intermediate, and final functions
-        // TODO FIX
-        /*
         if (this instanceof Algebraic){
             Algebraic a = (Algebraic)this;
             
@@ -90,16 +87,13 @@
             if (getReturnTypeFromSpec(a.getFinal()) != returnType)
                     throw new RuntimeException("Final " + errMsg);
         }
-        */
         
     }
     
 
     private Type getReturnTypeFromSpec(String funcSpec){
         try{
-            // TODO FIX
-            // return 
((EvalFunc)PigContext.instantiateFuncFromSpec(funcSpec)).getReturnType();
-            return null;
+            return 
((EvalFunc)PigContext.instantiateFuncFromSpec(funcSpec)).getReturnType();
         }catch (ClassCastException e){
             throw new RuntimeException(funcSpec + " does not specify an eval 
func", e);
         }
@@ -140,8 +134,6 @@
      * @return Schema of the output
      */
     public Schema outputSchema(Schema input) {
-        // TODO FIX
-        // return new TupleSchema();
         return null;
     }
     

Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=658485&r1=658484&r2=658485&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Tue May 20 
16:16:46 2008
@@ -39,8 +39,7 @@
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecPhysicalPlan;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
-//TODO FIX Need to uncomment this with the right imports
-//import 
org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -50,9 +49,14 @@
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
+import org.apache.pig.impl.logicalLayer.LOPrinter;
 import org.apache.pig.impl.logicalLayer.OperatorKey;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.parser.QueryParser;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.physicalLayer.POPrinter;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.WrappedIOException;
 
 
@@ -82,7 +86,8 @@
 
     Map<String, LogicalPlan> aliases = new HashMap<String, LogicalPlan>();
     Map<OperatorKey, LogicalOperator> opTable = new HashMap<OperatorKey, 
LogicalOperator>();
-    
+    Map<String, LogicalOperator> aliasOp = new HashMap<String, 
LogicalOperator>();
+    Map<String, ExpressionOperator> defineAliases = new HashMap<String, 
ExpressionOperator>();
     PigContext pigContext;
     
     private String scope = constructScope();
@@ -215,8 +220,6 @@
         }
             
         LogicalPlan lp = null;
-        Map<String, LogicalOperator> aliasOp = new HashMap<String, 
LogicalOperator>();
-        Map<String, ExpressionOperator> defineAliases = new HashMap<String, 
ExpressionOperator>();
         try {
             lp = (new LogicalPlanBuilder(pigContext).parse(scope, query,
                     aliases, opTable, aliasOp, defineAliases));
@@ -252,10 +255,25 @@
         if (!aliases.containsKey(id))
             throw new IOException("Invalid alias: " + id);
 
+        try {
+            ExecJob job = execute(id);
+            // invocation of "execute" is synchronous!
+            if (job.getStatus() == JOB_STATUS.COMPLETED) {
+                    return job.getResults();
+            } else {
+                throw new IOException("Job terminated with anomalous status "
+                    + job.getStatus().toString());
+            }
+        } catch (ExecException ee) {
+            throw WrappedIOException.wrap(
+                "Unable to open iterator for alias: " + id, ee);
+        }
+
         // TODO: front-end could actually remember what logical plans have been
         // already submitted to the back-end for compilation and
         // execution.
         
+        /*
         LogicalPlan readFrom = (LogicalPlan) aliases.get(id);
 
         try {
@@ -275,6 +293,7 @@
         catch (ExecException e) {
             throw WrappedIOException.wrap("Unable to open iterator for alias: 
" + id, e);
         }
+        */
     }
     
     /**
@@ -300,18 +319,20 @@
 
         LogicalPlan readFrom = aliases.get(id);
         
-        store(readFrom,filename,func);
+        store(id, readFrom, filename, func);
     }
         
-    public void store(LogicalPlan readFrom, String filename, String func) 
throws IOException {
-        /*
-        LogicalPlan storePlan = 
QueryParser.generateStorePlan(readFrom.getOpTable(),
-                                                              scope,
-                                                              readFrom,
-                                                              filename,
-                                                              func,
-                                                              pigContext);
+    public void store(String id, LogicalPlan readFrom, String filename, String 
func) throws IOException {
+        try {
+            LogicalPlan storePlan = QueryParser.generateStorePlan(opTable,
+                scope, readFrom, filename, func, pigContext);
+            execute(id);
+        } catch (ExecException e) {
+            throw WrappedIOException.wrap("Unable to store for alias: " + id,
+                e);
+        }
 
+        /*
         try {
             ExecPhysicalPlan pp = 
                 pigContext.getExecutionEngine().compile(storePlan, null);
@@ -334,32 +355,28 @@
      */
     public void explain(String alias,
                         PrintStream stream) throws IOException {
-        stream.println("Logical Plan:");
-        LogicalPlan lp = aliases.get(alias);
-        if (lp == null) {
-            log.error("Invalid alias: " + alias);
-            stream.println("Invalid alias: " + alias);
-            throw new IOException("Invalid alias: " + alias);
-        }
-
-        lp.explain(stream);
-        
-        // TODO FIX
-        /*
-        stream.println("-----------------------------------------------");
-        stream.println("Physical Plan:");
         try {
-            ExecPhysicalPlan pp = 
-                pigContext.getExecutionEngine().compile(lp, null);
-        
-            pp.explain(stream);
+            stream.println("Logical Plan:");
+            LogicalPlan lp = compileLp();
+            LOPrinter lv = new LOPrinter(stream, lp);
+            lv.visit();
+
+            PhysicalPlan pp = compilePp(lp);
+            stream.println("-----------------------------------------------");
+            stream.println("Physical Plan:");
+            POPrinter pv = new POPrinter(stream, pp);
+            pv.visit();
+
+            stream.println("-----------------------------------------------");
+            pigContext.getExecutionEngine().explain(pp, stream);
+        
+        } catch (VisitorException ve) {
+            throw WrappedIOException.wrap("Unable to explain alias " + alias,
+                ve);
+        } catch (ExecException ee) {
+            throw WrappedIOException.wrap("Unable to explain alias " + alias,
+                ee);
         }
-        catch (ExecException e) {
-            log.error("Failed to compile to physical plan: " + alias);
-            stream.println("Failed to compile the logical plan for " + alias + 
" into a physical plan");
-            throw WrappedIOException.wrap("Failed to compile to phyiscal plan: 
" + alias, e);
-        }
-        */
     }
 
     /**
@@ -465,4 +482,34 @@
         //
         // pigContext.getExecutionEngine().reclaimScope(this.scope);
     }
+
+    private ExecJob execute(String jobName) throws ExecException {
+        ExecJob job = null;
+
+        LogicalPlan lp = compileLp();
+        PhysicalPlan pp = compilePp(lp);
+        // execute using appropriate engine
+        return pigContext.getExecutionEngine().execute(pp, jobName);
+    }
+
+    // TODO FIX
+    private LogicalPlan compileLp() {
+        LogicalPlan lp = null;
+        // TODO, stitch together logical plans
+
+        // TODO run through validator
+
+        // TODO optimize
+
+        return lp;
+    }
+
+    private PhysicalPlan compilePp(LogicalPlan lp) throws ExecException {
+        // translate lp to physical plan
+        PhysicalPlan pp = pigContext.getExecutionEngine().compile(lp, null);
+
+        // TODO optimize
+
+        return pp;
+    }
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=658485&r1=658484&r2=658485&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
 Tue May 20 16:16:46 2008
@@ -18,12 +18,14 @@
 
 package org.apache.pig.backend.executionengine;
 
+import java.io.PrintStream;
 import java.util.Collection;
 import java.util.Properties;
 import java.util.Map;
 
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 
 /**
  * This is the main interface that various execution engines
@@ -88,27 +90,39 @@
      * @param properties
      * @return physical plan
      */
-    public ExecPhysicalPlan compile(LogicalPlan plan,
-                                    Properties properties)
-        throws ExecException;
+    public PhysicalPlan compile(LogicalPlan plan,
+                                Properties properties) throws ExecException;
 
-    public ExecPhysicalPlan compile(LogicalPlan[] plans,
-                                    Properties properties)
-        throws ExecException;
+    public PhysicalPlan compile(LogicalPlan[] plans,
+                                Properties properties) throws ExecException;
 
     /**
      * Execute the physical plan in blocking mode.
-     * 
-     * @throws
+     *
+     * @param plan PhysicalPlan to execute. 
+     * @param jobName Name of this plan, will be used to identify the plan
+     * @throws ExecException
      */
-    public ExecJob execute(ExecPhysicalPlan plan) throws ExecException;
+    public ExecJob execute(PhysicalPlan plan,
+                           String jobName) throws ExecException;
 
     /**
      * Execute the physical plan in non-blocking mode
      * 
+     * @param plan PhysicalPlan to submit. 
+     * @param jobName Name of this plan, will be used to identify the plan
      * @throws ExecException
      */
-    public ExecJob submit(ExecPhysicalPlan plan) throws ExecException;
+    public ExecJob submit(PhysicalPlan plan,
+                          String jobName) throws ExecException;
+
+    /**
+     * Explain executor specific information.
+     *
+     * @param plan PhysicalPlan to explain
+     * @param stream Stream to print output to
+     */
+    public void explain(PhysicalPlan plan, PrintStream stream);
 
     /**
      * Return currently running jobs (can be useful for admin purposes)

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=658485&r1=658484&r2=658485&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
 Tue May 20 16:16:46 2008
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.PrintStream;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.net.SocketException;
@@ -42,13 +43,20 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
-import org.apache.pig.backend.executionengine.ExecPhysicalPlan;
 import org.apache.pig.backend.executionengine.ExecutionEngine;
 import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
+import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
+import org.apache.pig.impl.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.shock.SSHSocketImplFactory;
 
 
@@ -204,15 +212,15 @@
         throw new UnsupportedOperationException();
     }
 
-    public ExecPhysicalPlan compile(LogicalPlan plan,
-                                    Properties properties) throws 
ExecException {
+    public PhysicalPlan compile(LogicalPlan plan,
+                                Properties properties) throws ExecException {
         return compile(new LogicalPlan[] { plan }, properties);
     }
 
-    public ExecPhysicalPlan compile(LogicalPlan[] plans,
-                                    Properties properties)
+    public PhysicalPlan compile(LogicalPlan[] plans,
+                                Properties properties)
             throws ExecException {
-        // TODO FIX Need to uncomment this with the right logic
+        // TODO FIX Plug in Shubham's translator here.
         /*if (plans == null) {
             throw new ExecException("No Plans to compile");
         }
@@ -247,8 +255,35 @@
         throw new ExecException("Unsupported Operation");
     }
 
-    public ExecJob execute(ExecPhysicalPlan plan) 
-            throws ExecException {
+    public ExecJob execute(PhysicalPlan plan,
+                           String jobName) throws ExecException {
+        try {
+            PhysicalOperator leaf = (PhysicalOperator)plan.getLeaves().get(0);
+            FileSpec spec = null;
+            if(!(leaf instanceof POStore)){
+                POStore str = new POStore(new OperatorKey("HExecEngine",
+                    
NodeIdGenerator.getGenerator().getNextNodeId("HExecEngine")));
+                str.setPc(pigContext);
+                spec = new FileSpec(FileLocalizer.getTemporaryPath(null,
+                    pigContext).toString(),
+                    BinStorage.class.getName());
+                str.setSFile(spec);
+            }
+            else{
+                spec = ((POStore)leaf).getSFile();
+            }
+
+            MapReduceLauncher launcher = new MapReduceLauncher();
+            launcher.launchPig(plan, jobName, pigContext);
+            return new HJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, spec);
+
+        } catch (Exception e) {
+            // There are a lot of exceptions thrown by the launcher.  If this
+            // is an ExecException, just let it through.  Else wrap it.
+            if (e instanceof ExecException) throw (ExecException)e;
+            else throw new ExecException(e.getMessage(), e);
+        }
+
         // TODO FIX Need to uncomment this with the right logic
         /*POMapreduce pom = (POMapreduce) physicalOpTable.get(plan.getRoot());
 
@@ -287,14 +322,18 @@
         }
         
         return new HJob(JOB_STATUS.COMPLETED, pigContext, 
pom.outputFileSpec);*/
-        throw new ExecException("Unsupported Operation");
 
     }
 
-    public ExecJob submit(ExecPhysicalPlan plan) throws ExecException {
+    public ExecJob submit(PhysicalPlan plan,
+                          String jobName) throws ExecException {
         throw new UnsupportedOperationException();
     }
 
+    public void explain(PhysicalPlan plan, PrintStream stream) {
+        // TODO FIX
+    }
+
     public Collection<ExecJob> runningJobs(Properties properties) throws 
ExecException {
         throw new UnsupportedOperationException();
     }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=658485&r1=658484&r2=658485&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
 Tue May 20 16:16:46 2008
@@ -18,9 +18,10 @@
 
 package org.apache.pig.backend.local.executionengine;
 
+import java.io.IOException;
+import java.io.PrintStream;
 import java.util.Collection;
 import java.util.Properties;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.HashMap;
@@ -35,14 +36,19 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
-import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
 import org.apache.pig.backend.executionengine.ExecScopedLogicalOperator;
 import org.apache.pig.backend.executionengine.ExecPhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.HJob;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.*;
-import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
-import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.mapReduceLayer.LocalLauncher;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
 import java.util.Iterator;
 
 
@@ -56,7 +62,7 @@
     // val: the operator key for the root of the phyisical plan
     protected Map<OperatorKey, OperatorKey> logicalToPhysicalKeys;
     
-    protected Map<OperatorKey, ExecPhysicalOperator> physicalOpTable;
+    protected Map<OperatorKey, PhysicalOperator> physicalOpTable;
     
     // map from LOGICAL key to into about the execution
     protected Map<OperatorKey, LocalResult> materializedResults;
@@ -66,7 +72,7 @@
         this.ds = pigContext.getLfs();
         this.nodeIdGenerator = NodeIdGenerator.getGenerator(); 
         this.logicalToPhysicalKeys = new HashMap<OperatorKey, OperatorKey>();
-        this.physicalOpTable = new HashMap<OperatorKey, 
ExecPhysicalOperator>();
+        this.physicalOpTable = new HashMap<OperatorKey, PhysicalOperator>();
         this.materializedResults = new HashMap<OperatorKey, LocalResult>();
     }
 
@@ -97,20 +103,18 @@
     }
 
     
-    public LocalPhysicalPlan compile(LogicalPlan plan,
-                                     Properties properties) throws 
ExecException {
+    public PhysicalPlan compile(LogicalPlan plan,
+                                Properties properties) throws ExecException {
         if (plan == null) {
             throw new ExecException("No Plan to compile");
         }
 
-        // TODO FIX
-        // return compile(new ExecLogicalPlan[]{ plan } , properties);
-        return null;
+        return compile(new LogicalPlan[]{ plan } , properties);
     }
 
-    public LocalPhysicalPlan compile(LogicalPlan[] plans,
-                                     Properties properties) throws 
ExecException {
-        // TODO FIX
+    public PhysicalPlan compile(LogicalPlan[] plans,
+                                Properties properties) throws ExecException {
+        // TODO FIX Plug in logical to physical translator
         /*
         if (plans == null) {
             throw new ExecException("No Plans to compile");
@@ -140,7 +144,37 @@
         return null;
     }
 
-    public LocalJob execute(ExecPhysicalPlan plan) throws ExecException {
+    public ExecJob execute(PhysicalPlan plan,
+                            String jobName) throws ExecException {
+        try {
+            PhysicalOperator leaf = (PhysicalOperator)plan.getLeaves().get(0);
+            FileSpec spec = null;
+            if(!(leaf instanceof POStore)){
+                POStore str = new POStore(new OperatorKey("HExecEngine",
+                    
NodeIdGenerator.getGenerator().getNextNodeId("HExecEngine")));
+                str.setPc(pigContext);
+                spec = new FileSpec(FileLocalizer.getTemporaryPath(null,
+                    pigContext).toString(),
+                    BinStorage.class.getName());
+                str.setSFile(spec);
+            }
+            else{
+                spec = ((POStore)leaf).getSFile();
+            }
+
+            LocalLauncher launcher = new LocalLauncher();
+            launcher.launchPig(plan, jobName, pigContext);
+            return new HJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, spec);
+        } catch (Exception e) {
+            // There are a lot of exceptions thrown by the launcher.  If this
+            // is an ExecException, just let it through.  Else wrap it.
+            if (e instanceof ExecException) throw (ExecException)e;
+            else throw new ExecException(e.getMessage(), e);
+        }
+
+
+        // TODO Fix connect to local job runner
+        /*
         DataBag results = BagFactory.getInstance().newDefaultBag();
         try {
             PhysicalOperator pp = 
(PhysicalOperator)physicalOpTable.get(plan.getRoot());
@@ -159,12 +193,18 @@
         }
         
         return new LocalJob(results, JOB_STATUS.COMPLETED);
+        */
     }
 
-    public LocalJob submit(ExecPhysicalPlan plan) throws ExecException {
+    public LocalJob submit(PhysicalPlan plan,
+                           String jobName) throws ExecException {
         throw new UnsupportedOperationException();
     }
 
+    public void explain(PhysicalPlan plan, PrintStream stream) {
+        // TODO FIX
+    }
+
     public Collection<ExecJob> runningJobs(Properties properties) throws 
ExecException {
         return new HashSet<ExecJob>();
     }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java?rev=658485&r1=658484&r2=658485&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java 
(original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java 
Tue May 20 16:16:46 2008
@@ -29,8 +29,8 @@
 import java.util.ArrayList;
 
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
 import org.apache.pig.impl.util.Spillable;
-//import 
org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
 
 /**
  * A collection of Tuples.  A DataBag may or may not fit into memory.
@@ -348,13 +348,9 @@
      * Report progress to HDFS.
      */
     protected void reportProgress() {
-        // TODO FIX Need to hook this into the progress reporting
-        // infrastructure once Shravan finishs that.
-        /*
-        if (PigMapReduce.reporter != null) {
-            PigMapReduce.reporter.progress();
+        if (PhysicalOperator.reporter != null) {
+            PhysicalOperator.reporter.progress();
         }
-        */
     }
 
     public static abstract class BagDelimiterTuple extends DefaultTuple{}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java?rev=658485&r1=658484&r2=658485&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java 
(original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java Tue 
May 20 16:16:46 2008
@@ -39,8 +39,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-//TODO FIX Need to uncomment this with the right imports
-//import org.apache.pig.Main;
+import org.apache.pig.Main;
 import org.apache.pig.ExecType;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.datastorage.DataStorageException;
@@ -49,11 +48,11 @@
 import org.apache.pig.backend.executionengine.ExecutionEngine;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
-//TODO FIX Need to uncomment this with the right imports
 //import 
org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher;
 //import 
org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
-//import org.apache.pig.backend.local.executionengine.LocalExecutionEngine;
-//import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
+import org.apache.pig.backend.local.executionengine.LocalExecutionEngine;
+import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
+import org.apache.pig.impl.mapReduceLayer.MapReduceLauncher;
 import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.WrappedIOException;
 
@@ -110,9 +109,7 @@
         this.execType = execType;
 
         initProperties();
-        // TODO FIX Need to change this after Main starts working
-//        String pigJar = JarManager.findContainingJar(Main.class);
-        String pigJar = JarManager.findContainingJar(PigContext.class);
+        String pigJar = JarManager.findContainingJar(Main.class);
         String hadoopJar = JarManager.findContainingJar(FileSystem.class);
         if (pigJar != null) {
             skipJars.add(pigJar);
@@ -135,9 +132,7 @@
             
         try{        
             // first read the properties in the jar file
-            // TODO FIX Need to uncomment this with the right class
-//            InputStream pis = 
MapReduceLauncher.class.getClassLoader().getResourceAsStream("properties");
-            InputStream pis = 
PigContext.class.getClassLoader().getResourceAsStream("properties");
+            InputStream pis = 
MapReduceLauncher.class.getClassLoader().getResourceAsStream("properties");
             if (pis != null) {
                 fileProperties.load(pis);
             }
@@ -175,8 +170,7 @@
                                        new Configuration());
                 
                 dfs = lfs;
-//              TODO FIX Need to uncomment this with the right logic           
     
-                //executionEngine = new LocalExecutionEngine(this);
+                executionEngine = new LocalExecutionEngine(this);
             }
             break;
 
@@ -234,8 +228,7 @@
     public void addJar(URL resource) throws MalformedURLException{
         if (resource != null) {
             extraJars.add(resource);
-//          TODO FIX Need to uncomment this with the right logic
-//            LogicalPlanBuilder.classloader = createCl(null);
+            LogicalPlanBuilder.classloader = createCl(null);
         }
     }
 
@@ -351,8 +344,7 @@
         for (int i = 0; i < extraJars.size(); i++) {
             urls[i + passedJar] = extraJars.get(i);
         }
-//      TODO FIX Need to uncomment this with the right logic
-//        return new URLClassLoader(urls, PigMapReduce.class.getClassLoader());
+        //return new URLClassLoader(urls, PigMapReduce.class.getClassLoader());
         return new URLClassLoader(urls, PigContext.class.getClassLoader());
     }
     
@@ -377,10 +369,8 @@
         for(String prefix: packageImportList) {
             Class c;
         try {
-//          TODO FIX Need to uncomment this with the right logic
-//            c = Class.forName(prefix+name,true, 
LogicalPlanBuilder.classloader);
-            c = Class.forName(prefix+name,true, 
PigContext.class.getClassLoader());
-            return c;
+                c = Class.forName(prefix+name,true, 
LogicalPlanBuilder.classloader);
+                return c;
             } catch (ClassNotFoundException e) {
             } catch (LinkageError e) {}
         }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/FileSpec.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/FileSpec.java?rev=658485&r1=658484&r2=658485&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/FileSpec.java 
(original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/FileSpec.java Tue 
May 20 16:16:46 2008
@@ -19,7 +19,7 @@
 
 import java.io.Serializable;
 
-//import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigContext;
 
 
 /**
@@ -52,16 +52,9 @@
         return fileName + ":" + funcSpec;
     }
 
-    //TODO FIX
-    //Commenting out the method getFuncName as it calls getClassNameFromSpec
-    //which is part of PigContext. PigContext pulls in HExecutionEngine which
-    //is completely commented out. The import org.apache.pig.impl.PigContext
-    //is also commented out
-    /*    
     public String getFuncName(){
             return PigContext.getClassNameFromSpec(funcSpec);
     }
-    */
 
     public int getSize() {
         throw new UnsupportedOperationException("File Size not implemented 
yet");

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java?rev=658485&r1=658484&r2=658485&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
 Tue May 20 16:16:46 2008
@@ -34,20 +34,4 @@
     public LogicalPlan() {
         super();
     }
-
-    /*
-    public LogicalPlan(Map<OperatorKey, LogicalOperator> keys) {
-        super(keys);
-    }
-    */
-
-    public void explain(OutputStream out) {
-        // TODO FIX
-        /*
-        LOVisitor lprinter = new LOPrinter(new PrintStream(out));
-        
-        opTable.get(root).visit(lprinter);
-        */
-    }
-
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java?rev=658485&r1=658484&r2=658485&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java
 Tue May 20 16:16:46 2008
@@ -25,8 +25,10 @@
 public class Launcher {
     private static final Log log = LogFactory.getLog(Launcher.class);
     
+    int totalHadoopTimeSpent;
+    
     protected Launcher(){
-        
+        totalHadoopTimeSpent = 0;
     }
     /**
      * Method to launch pig for hadoop either for a cluster's
@@ -57,7 +59,7 @@
      * @throws ExecException
      * @throws JobCreationException
      */
-    protected void launchPig(PhysicalPlan<PhysicalOperator> php, String 
grpName, PigContext pc)
+    protected boolean launchPig(PhysicalPlan<PhysicalOperator> php, String 
grpName, PigContext pc)
             throws PlanException, VisitorException, IOException, ExecException,
             JobCreationException {
         long sleepTime = 500;
@@ -87,20 +89,48 @@
             lastProg = prog;
         }
         lastProg = calculateProgress(jc, jobClient)/numMRJobs;
-        if(lastProg==1.0)
+        if(isComplete(lastProg))
             log.info("Completed Successfully");
         else{
             log.info("Unsuccessful attempt. Completed " + lastProg * 100 + "% 
of the job");
             List<Job> failedJobs = jc.getFailedJobs();
+            if(failedJobs==null)
+                throw new ExecException("Something terribly wrong with Job 
Control.");
             for (Job job : failedJobs) {
-                String MRJobID = job.getMapredJobID();
-                getErrorMessages(jobClient.getMapTaskReports(MRJobID), "map");
-                getErrorMessages(jobClient.getReduceTaskReports(MRJobID), 
"reduce");
+                getStats(job,jobClient);
             }
         }
+        List<Job> succJobs = jc.getSuccessfulJobs();
+        if(succJobs!=null)
+            for(Job job : succJobs){
+                getStats(job,jobClient);
+            }
 
         jc.stop(); 
         
+        return isComplete(lastProg);
+    }
+    
+    private boolean isComplete(double prog){
+        return (int)(Math.ceil(prog)) == (int)1;
+    }
+    
+    private void getStats(Job job, JobClient jobClient) throws IOException{
+        String MRJobID = job.getMapredJobID();
+        TaskReport[] mapRep = jobClient.getMapTaskReports(MRJobID);
+        getErrorMessages(mapRep, "map");
+        totalHadoopTimeSpent += computeTimeSpent(mapRep);
+        TaskReport[] redRep = jobClient.getReduceTaskReports(MRJobID);
+        getErrorMessages(redRep, "reduce");
+        totalHadoopTimeSpent += computeTimeSpent(mapRep);
+    }
+    
+    private int computeTimeSpent(TaskReport[] mapReports) {
+        int timeSpent = 0;
+        for (TaskReport r : mapReports) {
+            timeSpent += (r.getFinishTime() - r.getStartTime());
+        }
+        return timeSpent;
     }
     
     protected static void getErrorMessages(TaskReport reports[], String type)
@@ -159,4 +189,7 @@
             return (mapProg + redProg)/2;
         }
     }
+    public int getTotalHadoopTimeSpent() {
+        return totalHadoopTimeSpent;
+    }
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java?rev=658485&r1=658484&r2=658485&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java
 Tue May 20 16:16:46 2008
@@ -12,7 +12,13 @@
 
 public class LocalLauncher extends Launcher{
     @Override
-    public void launchPig(PhysicalPlan<PhysicalOperator> php, String grpName, 
PigContext pc) throws PlanException, VisitorException, IOException, 
ExecException, JobCreationException {
-        super.launchPig(php, grpName, pc);
+    public boolean launchPig(PhysicalPlan<PhysicalOperator> php,
+                             String grpName,
+                             PigContext pc) throws PlanException,
+                                                   VisitorException,
+                                                   IOException,
+                                                   ExecException,
+                                                   JobCreationException {
+        return super.launchPig(php, grpName, pc);
     }
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java?rev=658485&r1=658484&r2=658485&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java
 Tue May 20 16:16:46 2008
@@ -16,7 +16,13 @@
 public class MapReduceLauncher extends Launcher{
 
     @Override
-    public void launchPig(PhysicalPlan<PhysicalOperator> php, String grpName, 
PigContext pc) throws PlanException, VisitorException, IOException, 
ExecException, JobCreationException {
-        super.launchPig(php, grpName, pc);
+    public boolean launchPig(PhysicalPlan<PhysicalOperator> php,
+                             String grpName,
+                             PigContext pc) throws PlanException,
+                                                   VisitorException,
+                                                   IOException,
+                                                   ExecException,
+                                                   JobCreationException {
+        return super.launchPig(php, grpName, pc);
     }
 }

Added: 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MRPrinter.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MRPrinter.java?rev=658485&view=auto
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MRPrinter.java
 (added)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MRPrinter.java
 Tue May 20 16:16:46 2008
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.mapReduceLayer.plans;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.io.PrintStream;
+
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor mechanism printing out the logical plan.
+ */
+public class MRPrinter extends MROpPlanVisitor {
+
+    private PrintStream mStream = null;
+    private int mIndent = 0;
+
+    /**
+     * @param ps PrintStream to output plan information to
+     * @param plan MR plan to print
+     */
+    public MRPrinter(PrintStream ps, MROperPlan plan) {
+        super(plan, new DepthFirstWalker(plan));
+    }
+
+    /* TODO FIX
+    public void visit(LOAdd a) throws VisitorException {
+        visitBinary(a, "+");
+    }
+
+    public void visit(LOAnd a) throws VisitorException {
+        visitBinary(a, "AND");
+    }
+    
+    public void visit(LOBinCond bc) throws VisitorException {
+        print(bc);
+        mStream.print(" COND: (");
+        bc.getCond().visit(this);
+        mStream.print(") TRUE: (");
+        bc.getLhsOp().visit(this);
+        mStream.print(") FALSE (");
+        bc.getRhsOp().visit(this);
+        mStream.print(")");
+    }
+
+    public void visit(LOCogroup g) throws VisitorException {
+        print(g);
+        mStream.print("GROUP BY PLANS:");
+        MultiMap<LogicalOperator, LogicalPlan> plans = g.getGroupByPlans();
+        for (LogicalOperator lo : plans.keySet()) {
+            // Visit the associated plans
+            for (LogicalPlan plan : plans.get(lo)) {
+                mIndent++;
+                pushWalker(new DepthFirstWalker(plan));
+                visit();
+                popWalker();
+                mIndent--;
+            }
+            mStream.println();
+        }
+        // Visit input operators
+        for (LogicalOperator lo : plans.keySet()) {
+            // Visit the operator
+            lo.visit(this);
+        }
+    }
+        
+    public void visit(LOConst c) throws VisitorException {
+        print(c);
+        mStream.print(" VALUE (" + c.getValue() + ")");
+    }
+
+    public void visit(LOCross c) throws VisitorException {
+        print(c);
+        mStream.println();
+        super.visit(c);
+    }
+
+    public void visit(LODistinct d) throws VisitorException {
+        print(d);
+        mStream.println();
+        super.visit(d);
+    }
+
+    public void visit(LODivide d) throws VisitorException {
+        visitBinary(d, "/");
+    }
+
+    public void visit(LOEqual e) throws VisitorException {
+        visitBinary(e, "==");
+    }
+
+    public void visit(LOFilter f) throws VisitorException {
+        print(f);
+        mStream.print(" COMP: ");
+        mIndent++;
+        pushWalker(new DepthFirstWalker(f.getComparisonPlan()));
+        visit();
+        mIndent--;
+        mStream.println();
+        f.getInput().visit(this);
+    }
+
+     public void visit(LOForEach f) throws VisitorException {
+        print(f);
+        mStream.print(" PLAN: ");
+        mIndent++;
+        pushWalker(new DepthFirstWalker(f.getForEachPlan()));
+        visit();
+        mIndent--;
+        mStream.println();
+        // Visit our input
+        mPlan.getPredecessors((LogicalOperator)f).get(0).visit(this);
+    }
+ 
+    public void visit(LOGreaterThan gt) throws VisitorException {
+        visitBinary(gt, ">");
+    }
+
+    public void visit(LOGreaterThanEqual gte) throws VisitorException {
+        visitBinary(gte, ">=");
+    }
+
+    public void visit(LOLesserThan lt) throws VisitorException {
+        visitBinary(lt, "<");
+    }
+
+    public void visit(LOLesserThanEqual lte) throws VisitorException {
+        visitBinary(lte, "<=");
+    }
+
+    public void visit(LOLoad load) throws VisitorException {
+        print(load);
+        mStream.print(" FILE: " + load.getInputFile().getFileName());
+        mStream.print(" FUNC: " + load.getLoadFunc().getClass().getName());
+        mStream.println();
+    }
+
+    public void visit(LOMapLookup mlu) throws VisitorException {
+        print(mlu);
+        mStream.print("(");
+        mlu.getMap().visit(this);
+        mStream.print(")# " + mlu.getKey());
+    }
+
+    public void visit(LOMod m) throws VisitorException {
+        visitBinary(m, "MOD");
+    }
+
+    public void visit(LOMultiply m) throws VisitorException {
+        visitBinary(m, "*");
+    }
+
+    public void visit(LONegative n) throws VisitorException {
+        visitUnary(n, "-");
+    }
+
+    public void visit(LONot n) throws VisitorException {
+        visitUnary(n, "NOT");
+    }
+
+    public void visit(LONotEqual ne) throws VisitorException {
+        visitBinary(ne, "!=");
+    }
+
+    public void visit(LOOr or) throws VisitorException {
+        visitBinary(or, "OR");
+    }
+
+    public void visit(LOProject p) throws VisitorException {
+        print(p);
+        if (p.isStar()) {
+            mStream.print(" ALL ");
+        } else {
+            List<Integer> cols = p.getProjection();
+            mStream.print(" COL");
+            if (cols.size() > 1) mStream.print("S");
+            mStream.print(" (");
+            for (int i = 0; i < cols.size(); i++) {
+                if (i > 0) mStream.print(", ");
+                mStream.print(cols.get(i));
+            }
+            mStream.print(")");
+        }
+        mStream.print(" FROM ");
+        if (p.getSentinel()) {
+            // This project is connected to some other relation, don't follow
+            // that path or we'll cycle in the graph.
+            p.getExpression().name();
+        } else {
+            mIndent++;
+            p.getExpression().visit(this);
+            mIndent--;
+        }
+    }
+
+    public void visit(LORegexp r) throws VisitorException {
+        print(r);
+        mStream.print(" REGEX (" + r.getRegexp() + ") LOOKING IN (");
+        r.getOperand().visit(this);
+        mStream.print(")");
+    }
+
+    private void print(LogicalOperator lo, String name) {
+        List<EvalSpec> empty = new ArrayList<EvalSpec>();
+        print(lo, name, empty);
+    }
+
+    private void visitBinary(
+            BinaryExpressionOperator b,
+            String op) throws VisitorException {
+        print(b);
+        mStream.print(" (");
+        b.getLhsOperand().visit(this);
+        mStream.print(") " + op + " (");
+        b.getRhsOperand().visit(this);
+        mStream.print(") ");
+    }
+
+    private void visitUnary(
+            UnaryExpressionOperator e,
+            String op) throws VisitorException {
+        print(e);
+        mStream.print(op + " (");
+        e.getOperand().visit(this);
+        mStream.print(") ");
+    }
+
+    private void print(LogicalOperator lo) {
+        for (int i = 0; i < mIndent; i++) mStream.print("    ");
+
+        printName(lo);
+
+        if (!(lo instanceof ExpressionOperator)) {
+            mStream.print("Inputs: ");
+            for (LogicalOperator predecessor : mPlan.getPredecessors(lo)) {
+                printName(predecessor);
+            }
+            mStream.print("Schema: ");
+            try {
+                printSchema(lo.getSchema());
+            } catch (FrontendException fe) {
+                // ignore it, nothing we can do
+                mStream.print("()");
+            }
+        }
+        mStream.print(" : ");
+    }
+
+    private void printName(LogicalOperator lo) {
+        mStream.println(lo.name() + " key(" + lo.getOperatorKey().scope + 
+            ", " + lo.getOperatorKey().id + ") ");
+    }
+
+    private void printSchema(Schema schema) {
+        mStream.print("(");
+        for (Schema.FieldSchema fs : schema.getFields()) {
+            if (fs.alias != null) mStream.print(fs.alias + ": ");
+            mStream.print(DataType.findTypeName(fs.type));
+            if (fs.schema != null) {
+                if (fs.type == DataType.BAG) mStream.print("{");
+                printSchema(fs.schema);
+                if (fs.type == DataType.BAG) mStream.print("}");
+            }
+        }
+        mStream.print(")");
+    }
+    */
+}
+
+        

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POPrinter.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POPrinter.java?rev=658485&r1=658484&r2=658485&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POPrinter.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POPrinter.java
 Tue May 20 16:16:46 2008
@@ -18,29 +18,17 @@
 package org.apache.pig.impl.physicalLayer;
 
 import java.io.PrintStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Iterator;
-
-/*
-import org.apache.pig.backend.hadoop.executionengine.POMapreduce;
-import org.apache.pig.backend.local.executionengine.POCogroup;
-import org.apache.pig.backend.local.executionengine.POEval;
-import org.apache.pig.backend.local.executionengine.POLoad;
-import org.apache.pig.backend.local.executionengine.POSort;
-import org.apache.pig.backend.local.executionengine.POSplit;
-import org.apache.pig.backend.local.executionengine.POStore;
-import org.apache.pig.backend.local.executionengine.POUnion;
-import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
-import org.apache.pig.impl.eval.EvalSpec;
-import org.apache.pig.impl.eval.EvalSpecPrinter;
-import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.logicalLayer.OperatorKey;
-*/
 
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
 
-public class POPrinter extends POVisitor {
+
+public class POPrinter extends PhyPlanVisitor {
+
+    public POPrinter(PrintStream ps, PhysicalPlan pp) {
+        super(pp, new DependencyOrderWalker(pp));
+    }
 
     // TODO FIX
     /*

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java?rev=658485&r1=658484&r2=658485&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java
 Tue May 20 16:16:46 2008
@@ -90,7 +90,7 @@
     // Will be used by operators to report status or transmit heartbeat
     // Should be set by the backends to appropriate implementations that
     // wrap their own version of a reporter.
-    protected static PigProgressable reporter;
+    public static PigProgressable reporter;
 
     // Dummy types used to access the getNext of appropriate
     // type. These will be null

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java?rev=658485&r1=658484&r2=658485&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java 
(original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java Tue 
May 20 16:16:46 2008
@@ -29,18 +29,16 @@
 
 import org.apache.pig.FilterFunc;
 import org.apache.pig.LoadFunc;
-// TODO FIX
-// import org.apache.pig.PigServer;
-// import org.apache.pig.PigServer.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.ExecType;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.builtin.*;
 import org.apache.pig.data.*;
-// TODO FIX
-// import org.apache.pig.impl.builtin.ShellBagEvalFunc;
-// import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.builtin.ShellBagEvalFunc;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
-// import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigContext;
 
 public class TestBuiltin extends TestCase {
     
@@ -439,8 +437,8 @@
         assertTrue(f1.equals(f2));        
     }
     
-        // TODO FIX
-        /*
+    // TODO FIX
+    /*
     @Test
     public void testShellFuncSingle() throws Throwable {
         //ShellBagEvalFunc func = new ShellBagEvalFunc("tr o 0");

Modified: 
incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java?rev=658485&r1=658484&r2=658485&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java 
(original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java 
Tue May 20 16:16:46 2008
@@ -59,9 +59,7 @@
 
                @Override
                public Schema outputSchema(Schema input) {
-                       // TODO FIX
-                       // return new AtomSchema("arity");
-                       return null;
+            return new Schema(new Schema.FieldSchema(null, DataType.INTEGER)); 
                }
        }
 
@@ -219,9 +217,7 @@
 
                @Override
                public Schema outputSchema(Schema input) {
-                       // TODO FIX
-                       // return new AtomSchema("average");
-                       return null;
+            return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
                }
 
        }


Reply via email to