Author: gates
Date: Tue May 27 11:29:37 2008
New Revision: 660636

URL: http://svn.apache.org/viewvc?rev=660636&view=rev
Log:
PIG-161.  Shubham's fix for translation of LOUserFunc to POUserFunc.


Modified:
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java
    
incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java?rev=660636&r1=660635&r2=660636&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java
 Tue May 27 11:29:37 2008
@@ -663,7 +663,7 @@
                }
                p.setResultType(func.getType());
                currentPlan.add(p);
-               List<LogicalOperator> fromList = mPlan.getPredecessors(func);
+               List<LogicalOperator> fromList = 
func.getPlan().getPredecessors(func);
                for(LogicalOperator op : fromList) {
                        PhysicalOperator<PhyPlanVisitor> from = 
LogToPhyMap.get(op);
                        try {
@@ -695,7 +695,7 @@
                store.setSFile(loStore.getOutputFile());
                store.setPc(pc);
                currentPlan.add(store);
-               PhysicalOperator<PhyPlanVisitor> from = 
LogToPhyMap.get(mPlan.getPredecessors(loStore).get(0));
+               PhysicalOperator<PhyPlanVisitor> from = 
LogToPhyMap.get(loStore.getPlan().getPredecessors(loStore).get(0));
                try {
                        currentPlan.connect(from, store);
                } catch (PlanException e) {
@@ -730,7 +730,7 @@
                
                currentPlan.add(physOp);
                
-               List<LogicalOperator> ops = op.mPlan.getPredecessors(op);
+               List<LogicalOperator> ops = op.getPlan().getPredecessors(op);
                
                for(LogicalOperator l : ops) {
                        ExpressionOperator from = (ExpressionOperator) 
LogToPhyMap.get(l);
@@ -750,7 +750,7 @@
                currentPlan.add(physOp);
                
                LogToPhyMap.put(op, physOp);
-               ExpressionOperator from = (ExpressionOperator) 
LogToPhyMap.get(op.mPlan.getPredecessors(op).get(0));
+               ExpressionOperator from = (ExpressionOperator) 
LogToPhyMap.get(op.getPlan().getPredecessors(op).get(0));
                ((PONegative)physOp).setInput(from);
                try {
                        currentPlan.connect(from, physOp);

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java?rev=660636&r1=660635&r2=660636&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java
 Tue May 27 11:29:37 2008
@@ -32,10 +32,12 @@
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.Result;
 import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
@@ -78,35 +80,93 @@
                this.func = (EvalFunc) 
PigContext.instantiateFuncFromSpec(this.funcSpec);
         this.func.setReporter(reporter);
        }
+       
+       public Result processInput() throws ExecException {
+
+               Result res = new Result();
+               Tuple inpValue = null;
+               if (input == null && (inputs == null || inputs.size()==0)) {
+//                     log.warn("No inputs found. Signaling End of 
Processing.");
+                       res.returnStatus = POStatus.STATUS_EOP;
+                       return res;
+               }
+
+               //Should be removed once the model is clear
+               if(reporter!=null) reporter.progress();
+
+               
+               if(isInputAttached()) {
+                       res.result = input;
+                       res.returnStatus = POStatus.STATUS_OK;
+                       detachInput();
+                       return res;
+               } else {
+                       res.result = TupleFactory.getInstance().newTuple();
+                       
+                       Result temp = null;
+                       for(PhysicalOperator op : inputs) {
+                               switch(op.getResultType()){
+                case DataType.BAG:
+                    temp = op.getNext(dummyBag);
+                    break;
+                case DataType.BOOLEAN:
+                    temp = op.getNext(dummyBool);
+                    break;
+                case DataType.BYTEARRAY:
+                    temp = op.getNext(dummyDBA);
+                    break;
+                case DataType.CHARARRAY:
+                    temp = op.getNext(dummyString);
+                    break;
+                case DataType.DOUBLE:
+                    temp = op.getNext(dummyDouble);
+                    break;
+                case DataType.FLOAT:
+                    temp = op.getNext(dummyFloat);
+                    break;
+                case DataType.INTEGER:
+                    temp = op.getNext(dummyInt);
+                    break;
+                case DataType.LONG:
+                    temp = op.getNext(dummyLong);
+                    break;
+                case DataType.MAP:
+                    temp = op.getNext(dummyMap);
+                    break;
+                case DataType.TUPLE:
+                    temp = op.getNext(dummyTuple);
+                    break;
+                }
+                if(temp.returnStatus!=POStatus.STATUS_OK)
+                    return temp;
+                ((Tuple)res.result).append(temp.result);
+                
+                       }
+                       res.returnStatus = temp.returnStatus;
+                       return res;
+               }
+       }
 
        private Result getNext() throws ExecException {
                Tuple t = null;
-               Result result = new Result();
+               Result result;
                // instantiate the function if its null
                if (func == null)
                        instantiateFunc();
 
+               result = processInput();
                try {
-                       if (inputAttached) {
-                               result.result = func.exec(input);
-                if(reporter!=null) reporter.progress();
-                               result.returnStatus = (result.result != null) ? 
POStatus.STATUS_OK
-                                               : POStatus.STATUS_EOP;
-                               return result;
-                       } else {
-                               Result in = inputs.get(0).getNext(t);
-                               if (in.returnStatus == POStatus.STATUS_EOP) {
-                                       result.returnStatus = 
POStatus.STATUS_EOP;
-                                       return result;
-                               }
-                               result.result = func.exec((Tuple) in.result);
-                               result.returnStatus = POStatus.STATUS_OK;
+                       if(result.returnStatus == POStatus.STATUS_OK) {
+                               result.result = func.exec((Tuple) 
result.result);
                                return result;
                        }
-               } catch (IOException e) {
-                       log.error(e);
-                       //throw new ExecException(e.getCause());
+                       return result;
+                       
+               } catch (IOException e1) {
+                       log.error(e1);
                }
+               
+               
                result.returnStatus = POStatus.STATUS_ERR;
                return result;
        }
@@ -272,7 +332,7 @@
        @Override
        public boolean supportsMultipleInputs() {
 
-               return false;
+               return true;
        }
 
        @Override

Modified: 
incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java?rev=660636&r1=660635&r2=660636&view=diff
==============================================================================
--- 
incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java 
(original)
+++ 
incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java 
Tue May 27 11:29:37 2008
@@ -32,6 +32,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.ExecType;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.COUNT;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.ExpressionOperator;
@@ -344,6 +345,16 @@
     }
     
     /[EMAIL PROTECTED]
+    public void testUserFunc() throws VisitorException {
+       String query = "foreach (group (load 'file:ABCD') all) generate " + 
COUNT.class.getName() + "($1) ;";
+       LogicalPlan plan = buildPlan(query);
+       
+       PhysicalPlan pp = buildPhysicalPlan(plan);
+       
+       pp.explain(System.out);
+    }*/
+    
+    /[EMAIL PROTECTED]
     public void testQuery4() throws VisitorException {
         String query = "foreach (load 'a') generate AVG($1, $2) ;";
         LogicalPlan lp = buildPlan(query);

Modified: 
incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java?rev=660636&r1=660635&r2=660636&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java 
(original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java 
Tue May 27 11:29:37 2008
@@ -238,7 +238,7 @@
                while (res.returnStatus != POStatus.STATUS_EOP) {
                        // System.out.println(res.result);
                        int result = (Integer) res.result;
-                       assertEquals(2, result);
+                       assertEquals(1, result);
                        res = userFunc.getNext(i);
                }
        }


Reply via email to