Author: gates
Date: Mon May 19 08:15:19 2008
New Revision: 657848

URL: http://svn.apache.org/viewvc?rev=657848&view=rev
Log:
Fixed a few TODO FIXs, added DOUBLE to isAtomic in DataType.


Modified:
    incubator/pig/branches/types/build.xml
    incubator/pig/branches/types/src/org/apache/pig/PigServer.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/ARITY.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
    incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java

Modified: incubator/pig/branches/types/build.xml
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Mon May 19 08:15:19 2008
@@ -134,6 +134,7 @@
     <target name="compile-sources">
         <javac encoding="${build.encoding}" srcdir="${sources}"
             includes="**/plan/*.java, **/plan/optimizer/*.java, 
**/data/*.java, **/pig/builtin/*.java,
+                **/impl/io/*.java, **/impl/mapReduceLayer/*.java,
                 **/test/utils/*.java, **/test/TestOperatorPlan.java, 
**/test/TestBuiltin.java,
                 **/test/TestConstExpr.java, **/test/TestFilter.java, 
**/test/TestPhyOp.java,
                 **/test/TestAdd.java, **/test/TestSubtract.java, 
**/test/TestMultiply.java,

Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Mon May 19 
08:15:19 2008
@@ -257,7 +257,6 @@
         
         LogicalPlan readFrom = (LogicalPlan) aliases.get(id);
 
-        // TODO FIX Make this work
         try {
             ExecPhysicalPlan pp = 
                 pigContext.getExecutionEngine().compile(readFrom, null);

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/ARITY.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/ARITY.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/ARITY.java 
(original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/ARITY.java Mon May 
19 08:15:19 2008
@@ -20,6 +20,7 @@
 import java.io.IOException;
 
 import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
@@ -32,8 +33,6 @@
 
     @Override
     public Schema outputSchema(Schema input) {
-        // TODO FIX
-        // return new AtomSchema("arity");
-        return null;
+        return new Schema(new Schema.FieldSchema(null, DataType.INTEGER)); 
     }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java Mon May 19 
08:15:19 2008
@@ -162,9 +162,7 @@
     
     @Override
     public Schema outputSchema(Schema input) {
-        // TODO FIX
-        // return new AtomSchema("average");
-        return null;
+        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
     }
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java 
(original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java Mon May 
19 08:15:19 2008
@@ -127,9 +127,7 @@
 
     @Override
     public Schema outputSchema(Schema input) {
-        // TODO FIX 
-        // return new AtomSchema("count" + count++);
-        return null;
+        return new Schema(new Schema.FieldSchema(null, DataType.LONG)); 
     }
 
     private static int count = 1;

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java Mon May 19 
08:15:19 2008
@@ -113,9 +113,7 @@
 
     @Override
     public Schema outputSchema(Schema input) {
-        // TODO FIX
-        // return new AtomSchema("max" + count++);
-        return null;
+        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
     }
 
     private static int count = 1;

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java Mon May 19 
08:15:19 2008
@@ -109,9 +109,7 @@
 
     @Override
     public Schema outputSchema(Schema input) {
-        // TODO FIX
-        // return new AtomSchema("min" + count++);
-        return null;
+        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
     }
 
     private static int count = 1;

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java Mon May 19 
08:15:19 2008
@@ -112,9 +112,7 @@
 
     @Override
     public Schema outputSchema(Schema input) {
-        // TODO FIX
-        // return new AtomSchema("sum" + count++);
-        return null;
+        return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE)); 
     }
 
     private static int count = 1;

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataType.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataType.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataType.java 
(original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataType.java Mon May 
19 08:15:19 2008
@@ -426,7 +426,8 @@
     public static boolean isAtomic(byte dataType) {
         return ((dataType == BYTEARRAY) || (dataType == CHARARRAY) ||
             (dataType == INTEGER) || (dataType == LONG) || 
-            (dataType == FLOAT) || (dataType == BOOLEAN));
+            (dataType == FLOAT) || (dataType == DOUBLE) ||
+            (dataType == FLOAT));
     }
 
     /**

Modified: 
incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java 
(original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java 
Mon May 19 08:15:19 2008
@@ -348,7 +348,8 @@
      * Report progress to HDFS.
      */
     protected void reportProgress() {
-        // TODO FIX
+        // TODO FIX Need to hook this into the progress reporting
+        // infrastructure once Shravan finishs that.
         /*
         if (PigMapReduce.reporter != null) {
             PigMapReduce.reporter.progress();

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
 Mon May 19 08:15:19 2008
@@ -18,12 +18,15 @@
 package org.apache.pig.impl.logicalLayer;
 
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Iterator;
+import java.util.List;
 import java.io.PrintStream;
 
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.MultiMap;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -31,40 +34,194 @@
  */
 public class LOPrinter extends LOVisitor {
 
-    public LOPrinter(LogicalPlan plan) {
+    private PrintStream mStream = null;
+    private int mIndent = 0;
+
+    /**
+     * @param ps PrintStream to output plan information to
+     * @param plan Logical plan to print
+     */
+    public LOPrinter(PrintStream ps, LogicalPlan plan) {
         super(plan, new DepthFirstWalker(plan));
     }
 
-    // TODO FIX
-    /*
-    private PrintStream mStream = null;
-
-    public LOPrinter(PrintStream ps) {
-        mStream = ps;
+    public void visit(LOAdd a) throws VisitorException {
+        visitBinary(a, "+");
     }
 
-    /**
-     * Only LOCogroup.visit() should ever call this method.
-     */
-    /*
-    @Override
-    public void visitCogroup(LOCogroup g) {
-        print(g, g.name(), g.getSpecs());
-        super.visitCogroup(g);
+    public void visit(LOAnd a) throws VisitorException {
+        visitBinary(a, "AND");
     }
-        
-    /**
-     * Only LOEval.visit() should ever call this method.
-     */
-    /*
-    @Override
-    public void visitEval(LOEval e) {
-        List<EvalSpec> ls = new ArrayList<EvalSpec>();
-        ls.add(e.getSpec());
-        print(e, e.name(), ls);
-        super.visitEval(e);
+    
+    public void visit(LOBinCond bc) throws VisitorException {
+        print(bc);
+        mStream.print(" COND: (");
+        bc.getCond().visit(this);
+        mStream.print(") TRUE: (");
+        bc.getLhsOp().visit(this);
+        mStream.print(") FALSE (");
+        bc.getRhsOp().visit(this);
+        mStream.print(")");
+    }
+
+    public void visit(LOCogroup g) throws VisitorException {
+        print(g);
+        mStream.print("GROUP BY PLANS:");
+        MultiMap<LogicalOperator, LogicalPlan> plans = g.getGroupByPlans();
+        for (LogicalOperator lo : plans.keySet()) {
+            // Visit the associated plans
+            for (LogicalPlan plan : plans.get(lo)) {
+                mIndent++;
+                pushWalker(new DepthFirstWalker(plan));
+                visit();
+                popWalker();
+                mIndent--;
+            }
+            mStream.println();
+        }
+        // Visit input operators
+        for (LogicalOperator lo : plans.keySet()) {
+            // Visit the operator
+            lo.visit(this);
+        }
     }
         
+    public void visit(LOConst c) throws VisitorException {
+        print(c);
+        mStream.print(" VALUE (" + c.getValue() + ")");
+    }
+
+    public void visit(LOCross c) throws VisitorException {
+        print(c);
+        mStream.println();
+        super.visit(c);
+    }
+
+    public void visit(LODistinct d) throws VisitorException {
+        print(d);
+        mStream.println();
+        super.visit(d);
+    }
+
+    public void visit(LODivide d) throws VisitorException {
+        visitBinary(d, "/");
+    }
+
+    public void visit(LOEqual e) throws VisitorException {
+        visitBinary(e, "==");
+    }
+
+    public void visit(LOFilter f) throws VisitorException {
+        print(f);
+        mStream.print(" COMP: ");
+        mIndent++;
+        pushWalker(new DepthFirstWalker(f.getComparisonPlan()));
+        visit();
+        mIndent--;
+        mStream.println();
+        f.getInput().visit(this);
+    }
+
+     public void visit(LOForEach f) throws VisitorException {
+        print(f);
+        mStream.print(" PLAN: ");
+        mIndent++;
+        pushWalker(new DepthFirstWalker(f.getForEachPlan()));
+        visit();
+        mIndent--;
+        mStream.println();
+        // Visit our input
+        mPlan.getPredecessors((LogicalOperator)f).get(0).visit(this);
+    }
+ 
+    public void visit(LOGreaterThan gt) throws VisitorException {
+        visitBinary(gt, ">");
+    }
+
+    public void visit(LOGreaterThanEqual gte) throws VisitorException {
+        visitBinary(gte, ">=");
+    }
+
+    public void visit(LOLesserThan lt) throws VisitorException {
+        visitBinary(lt, "<");
+    }
+
+    public void visit(LOLesserThanEqual lte) throws VisitorException {
+        visitBinary(lte, "<=");
+    }
+
+    public void visit(LOLoad load) throws VisitorException {
+        print(load);
+        mStream.print(" FILE: " + load.getInputFile().getFileName());
+        mStream.print(" FUNC: " + load.getLoadFunc().getClass().getName());
+        mStream.println();
+    }
+
+    public void visit(LOMapLookup mlu) throws VisitorException {
+        print(mlu);
+        mStream.print("(");
+        mlu.getMap().visit(this);
+        mStream.print(")# " + mlu.getKey());
+    }
+
+    public void visit(LOMod m) throws VisitorException {
+        visitBinary(m, "MOD");
+    }
+
+    public void visit(LOMultiply m) throws VisitorException {
+        visitBinary(m, "*");
+    }
+
+    public void visit(LONegative n) throws VisitorException {
+        visitUnary(n, "-");
+    }
+
+    public void visit(LONot n) throws VisitorException {
+        visitUnary(n, "NOT");
+    }
+
+    public void visit(LONotEqual ne) throws VisitorException {
+        visitBinary(ne, "!=");
+    }
+
+    public void visit(LOOr or) throws VisitorException {
+        visitBinary(or, "OR");
+    }
+
+    public void visit(LOProject p) throws VisitorException {
+        print(p);
+        if (p.isStar()) {
+            mStream.print(" ALL ");
+        } else {
+            List<Integer> cols = p.getProjection();
+            mStream.print(" COL");
+            if (cols.size() > 1) mStream.print("S");
+            mStream.print(" (");
+            for (int i = 0; i < cols.size(); i++) {
+                if (i > 0) mStream.print(", ");
+                mStream.print(cols.get(i));
+            }
+            mStream.print(")");
+        }
+        mStream.print(" FROM ");
+        if (p.getSentinel()) {
+            // This project is connected to some other relation, don't follow
+            // that path or we'll cycle in the graph.
+            p.getExpression().name();
+        } else {
+            mIndent++;
+            p.getExpression().visit(this);
+            mIndent--;
+        }
+    }
+
+    public void visit(LORegexp r) throws VisitorException {
+        print(r);
+        mStream.print(" REGEX (" + r.getRegexp() + ") LOOKING IN (");
+        r.getOperand().visit(this);
+        mStream.print(")");
+    }
+
     /**
      * Only LOUnion.visit() should ever call this method.
      */
@@ -76,16 +233,6 @@
     }
         
     /**
-     * Only LOLoad.visit() should ever call this method.
-     */
-    /*
-    @Override
-    public void visitLoad(LOLoad l) {
-        print(l, l.name());
-        super.visitLoad(l);
-    }
-        
-    /**
      * Only LOSort.visit() should ever call this method.
      */
     /*
@@ -121,55 +268,67 @@
         List<EvalSpec> empty = new ArrayList<EvalSpec>();
         print(lo, name, empty);
     }
+    */
 
-    private void print(
-            LogicalOperator lo,
-            String name,
-            List<EvalSpec> specs) {
-        mStream.println(name);
-        mStream.println("Object id: " + lo.hashCode());
-        mStream.print("Inputs: ");
-        List<OperatorKey> inputs = lo.getInputs();
-        Iterator<OperatorKey> i = inputs.iterator();
-        while (i.hasNext()) {
-            LogicalOperator input = lo.getOpTable().get(i.next());
-            mStream.print(input.hashCode() + " ");
-        }
-        mStream.println();
-
-        mStream.print("Schema: ");
-        printSchema(lo.outputSchema(), 0);
-        mStream.println();
-
-        mStream.println("EvalSpecs:");
-        //printSpecs(specs);
-        Iterator<EvalSpec> j = specs.iterator();
-        while (j.hasNext()) {
-            j.next().visit(new EvalSpecPrinter(mStream));
+    private void visitBinary(
+            BinaryExpressionOperator b,
+            String op) throws VisitorException {
+        print(b);
+        mStream.print(" (");
+        b.getLhsOperand().visit(this);
+        mStream.print(") " + op + " (");
+        b.getRhsOperand().visit(this);
+        mStream.print(") ");
+    }
+
+    private void visitUnary(
+            UnaryExpressionOperator e,
+            String op) throws VisitorException {
+        print(e);
+        mStream.print(op + " (");
+        e.getOperand().visit(this);
+        mStream.print(") ");
+    }
+
+    private void print(LogicalOperator lo) {
+        for (int i = 0; i < mIndent; i++) mStream.print("    ");
+
+        printName(lo);
+
+        if (!(lo instanceof ExpressionOperator)) {
+            mStream.print("Inputs: ");
+            for (LogicalOperator predecessor : mPlan.getPredecessors(lo)) {
+                printName(predecessor);
+            }
+            mStream.print("Schema: ");
+            try {
+                printSchema(lo.getSchema());
+            } catch (FrontendException fe) {
+                // ignore it, nothing we can do
+                mStream.print("()");
+            }
         }
+        mStream.print(" : ");
     }
 
-    private void printSchema(Schema schema, int pos) {
-        if (schema instanceof AtomSchema) {
-            String a = schema.getAlias();
-            if (a == null) mStream.print("$" + pos);
-            else mStream.print(a);
-        } else if (schema instanceof TupleSchema) {
-            mStream.print("(");
-            TupleSchema ts = (TupleSchema)schema;
-            int sz = ts.numFields();
-            for (int j = 0; j < sz; j++) {
-                if (j != 0) mStream.print(", ");
-                Schema s = ts.schemaFor(j);
-                if (s == null) mStream.print("$" + j);
-                else printSchema(s, j);
+    private void printName(LogicalOperator lo) {
+        mStream.println(lo.name() + " key(" + lo.getOperatorKey().scope + 
+            ", " + lo.getOperatorKey().id + ") ");
+    }
+
+    private void printSchema(Schema schema) {
+        mStream.print("(");
+        for (Schema.FieldSchema fs : schema.getFields()) {
+            if (fs.alias != null) mStream.print(fs.alias + ": ");
+            mStream.print(DataType.findTypeName(fs.type));
+            if (fs.schema != null) {
+                if (fs.type == DataType.BAG) mStream.print("{");
+                printSchema(fs.schema);
+                if (fs.type == DataType.BAG) mStream.print("}");
             }
-            mStream.print(")");
-        } else {
-            throw new AssertionError("Unknown schema type.");
         }
+        mStream.print(")");
     }
-    */
 }
 
         

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
 Mon May 19 08:15:19 2008
@@ -115,6 +115,10 @@
         return mExp;
     }
 
+    public boolean isStar() { 
+        return mIsStar;
+    }
+
     public List<Integer> getProjection() {
         return mProjection;
     }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
 Mon May 19 08:15:19 2008
@@ -120,6 +120,9 @@
         for(LogicalOperator op: cg.getInputs()) {
             for(LogicalPlan lp: mapGByPlans.get(op)) {
                 if (null != lp) {
+                    // TODO FIX - How do we know this should be a
+                    // DependencyOrderWalker?  We should be replicating the
+                    // walker the current visitor is using.
                     PlanWalker w = new DependencyOrderWalker(lp);
                     pushWalker(w);
                     for(LogicalOperator logicalOp: lp.getRoots()) {

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java?rev=657848&r1=657847&r2=657848&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java
 Mon May 19 08:15:19 2008
@@ -17,8 +17,6 @@
  */
 package org.apache.pig.impl.logicalLayer;
 
-// TODO FIX
-
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.util.Map;
@@ -33,7 +31,6 @@
  * 
  */
 public class LogicalPlanBuilder {
-    // TODO FIX
 
     public static ClassLoader classloader = 
LogicalPlanBuilder.class.getClassLoader();
     private PigContext pigContext;


Reply via email to