Author: gates Date: Tue May 27 11:29:37 2008 New Revision: 660636 URL: http://svn.apache.org/viewvc?rev=660636&view=rev Log: PIG-161. Shubham's fix for translation of LOUserFunc to POUserFunc.
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java?rev=660636&r1=660635&r2=660636&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java Tue May 27 11:29:37 2008 @@ -663,7 +663,7 @@ } p.setResultType(func.getType()); currentPlan.add(p); - List<LogicalOperator> fromList = mPlan.getPredecessors(func); + List<LogicalOperator> fromList = func.getPlan().getPredecessors(func); for(LogicalOperator op : fromList) { PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(op); try { @@ -695,7 +695,7 @@ store.setSFile(loStore.getOutputFile()); store.setPc(pc); currentPlan.add(store); - PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(mPlan.getPredecessors(loStore).get(0)); + PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(loStore.getPlan().getPredecessors(loStore).get(0)); try { currentPlan.connect(from, store); } catch (PlanException e) { @@ -730,7 +730,7 @@ currentPlan.add(physOp); - List<LogicalOperator> ops = op.mPlan.getPredecessors(op); + List<LogicalOperator> ops = op.getPlan().getPredecessors(op); for(LogicalOperator l : ops) { ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(l); @@ -750,7 +750,7 @@ currentPlan.add(physOp); LogToPhyMap.put(op, physOp); - ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(op.mPlan.getPredecessors(op).get(0)); + ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(op.getPlan().getPredecessors(op).get(0)); ((PONegative)physOp).setInput(from); try { currentPlan.connect(from, physOp); Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java?rev=660636&r1=660635&r2=660636&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java Tue May 27 11:29:37 2008 @@ -32,10 +32,12 @@ import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.physicalLayer.POStatus; +import org.apache.pig.impl.physicalLayer.PhysicalOperator; import org.apache.pig.impl.physicalLayer.Result; import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor; import org.apache.pig.impl.plan.VisitorException; @@ -78,35 +80,93 @@ this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(this.funcSpec); this.func.setReporter(reporter); } + + public Result processInput() throws ExecException { + + Result res = new Result(); + Tuple inpValue = null; + if (input == null && (inputs == null || inputs.size()==0)) { +// log.warn("No inputs found. Signaling End of Processing."); + res.returnStatus = POStatus.STATUS_EOP; + return res; + } + + //Should be removed once the model is clear + if(reporter!=null) reporter.progress(); + + + if(isInputAttached()) { + res.result = input; + res.returnStatus = POStatus.STATUS_OK; + detachInput(); + return res; + } else { + res.result = TupleFactory.getInstance().newTuple(); + + Result temp = null; + for(PhysicalOperator op : inputs) { + switch(op.getResultType()){ + case DataType.BAG: + temp = op.getNext(dummyBag); + break; + case DataType.BOOLEAN: + temp = op.getNext(dummyBool); + break; + case DataType.BYTEARRAY: + temp = op.getNext(dummyDBA); + break; + case DataType.CHARARRAY: + temp = op.getNext(dummyString); + break; + case DataType.DOUBLE: + temp = op.getNext(dummyDouble); + break; + case DataType.FLOAT: + temp = op.getNext(dummyFloat); + break; + case DataType.INTEGER: + temp = op.getNext(dummyInt); + break; + case DataType.LONG: + temp = op.getNext(dummyLong); + break; + case DataType.MAP: + temp = op.getNext(dummyMap); + break; + case DataType.TUPLE: + temp = op.getNext(dummyTuple); + break; + } + if(temp.returnStatus!=POStatus.STATUS_OK) + return temp; + ((Tuple)res.result).append(temp.result); + + } + res.returnStatus = temp.returnStatus; + return res; + } + } private Result getNext() throws ExecException { Tuple t = null; - Result result = new Result(); + Result result; // instantiate the function if its null if (func == null) instantiateFunc(); + result = processInput(); try { - if (inputAttached) { - result.result = func.exec(input); - if(reporter!=null) reporter.progress(); - result.returnStatus = (result.result != null) ? POStatus.STATUS_OK - : POStatus.STATUS_EOP; - return result; - } else { - Result in = inputs.get(0).getNext(t); - if (in.returnStatus == POStatus.STATUS_EOP) { - result.returnStatus = POStatus.STATUS_EOP; - return result; - } - result.result = func.exec((Tuple) in.result); - result.returnStatus = POStatus.STATUS_OK; + if(result.returnStatus == POStatus.STATUS_OK) { + result.result = func.exec((Tuple) result.result); return result; } - } catch (IOException e) { - log.error(e); - //throw new ExecException(e.getCause()); + return result; + + } catch (IOException e1) { + log.error(e1); } + + result.returnStatus = POStatus.STATUS_ERR; return result; } @@ -272,7 +332,7 @@ @Override public boolean supportsMultipleInputs() { - return false; + return true; } @Override Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java?rev=660636&r1=660635&r2=660636&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java Tue May 27 11:29:37 2008 @@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.pig.ExecType; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.builtin.COUNT; import org.apache.pig.builtin.PigStorage; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.logicalLayer.ExpressionOperator; @@ -344,6 +345,16 @@ } /[EMAIL PROTECTED] + public void testUserFunc() throws VisitorException { + String query = "foreach (group (load 'file:ABCD') all) generate " + COUNT.class.getName() + "($1) ;"; + LogicalPlan plan = buildPlan(query); + + PhysicalPlan pp = buildPhysicalPlan(plan); + + pp.explain(System.out); + }*/ + + /[EMAIL PROTECTED] public void testQuery4() throws VisitorException { String query = "foreach (load 'a') generate AVG($1, $2) ;"; LogicalPlan lp = buildPlan(query); Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java?rev=660636&r1=660635&r2=660636&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java Tue May 27 11:29:37 2008 @@ -238,7 +238,7 @@ while (res.returnStatus != POStatus.STATUS_EOP) { // System.out.println(res.result); int result = (Integer) res.result; - assertEquals(2, result); + assertEquals(1, result); res = userFunc.getNext(i); } }