Author: gates Date: Thu May 29 08:13:56 2008 New Revision: 661348 URL: http://svn.apache.org/viewvc?rev=661348&view=rev Log: PIG-158 Santhosh's fix so that UDF returns types are figured out from the UDF when the user does not explicitly declare them.
Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=661348&r1=661347&r2=661348&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Thu May 29 08:13:56 2008 @@ -251,7 +251,11 @@ */ public Iterator<Tuple> openIterator(String id) throws IOException { try { - ExecJob job = execute(id); + LogicalOperator op = aliasOp.get(id); + if(null == op) { + throw new IOException("Unable to find an operator for alias " + id); + } + ExecJob job = execute(getPlanFromAlias(id, op.getClass().getName())); // invocation of "execute" is synchronous! if (job.getStatus() == JOB_STATUS.COMPLETED) { return job.getResults(); @@ -301,8 +305,8 @@ String func) throws IOException { try { LogicalPlan storePlan = QueryParser.generateStorePlan(opTable, - scope, readFrom, filename, func, aliasOp.get(id)); - execute(id); + scope, readFrom, filename, func, aliasOp.get(id), aliases); + execute(storePlan); } catch (Exception e) { throw WrappedIOException.wrap("Unable to store for alias: " + id, e); @@ -321,7 +325,11 @@ public void explain(String alias, PrintStream stream) throws IOException { try { - LogicalPlan lp = compileLp(alias, "explain"); + LogicalOperator op = aliasOp.get(alias); + if(null == op) { + throw new IOException("Unable to find an operator for alias " + alias); + } + LogicalPlan lp = compileLp(getPlanFromAlias(alias, op.getClass().getName()), "explain"); stream.println("Logical Plan:"); LOPrinter lv = new LOPrinter(stream, lp); lv.visit(); @@ -453,21 +461,24 @@ } private ExecJob execute( - String jobName) throws FrontendException, ExecException { + LogicalPlan lp) throws FrontendException, ExecException { ExecJob job = null; - LogicalPlan lp = compileLp(jobName, "execute"); - PhysicalPlan pp = compilePp(lp); + LogicalPlan typeCheckedLp = compileLp(lp, "execute"); + PhysicalPlan pp = compilePp(typeCheckedLp); // execute using appropriate engine - return pigContext.getExecutionEngine().execute(pp, jobName); + return pigContext.getExecutionEngine().execute(pp, "execute"); } private LogicalPlan compileLp( - String alias, - String op) throws ExecException, FrontendException { + LogicalPlan lp, + String operation) throws ExecException, FrontendException { // Look up the logical plan in the aliases map. That plan will be // properly connected to all the others. - LogicalPlan lp = getPlanFromAlias(alias, op); + + if(null == lp) { + throw new FrontendException("Cannot operate on null logical plan"); + } // run through validator LogicalPlanValidationExecutor validator = Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java?rev=661348&r1=661347&r2=661348&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java Thu May 29 08:13:56 2008 @@ -379,6 +379,8 @@ } poPackage.setKeyType(type); poPackage.setResultType(DataType.TUPLE); + poPackage.setNumInps(count); + poPackage.setInner(cg.getInner()); LogToPhyMap.put(cg, poPackage); } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=661348&r1=661347&r2=661348&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu May 29 08:13:56 2008 @@ -31,6 +31,7 @@ package org.apache.pig.impl.logicalLayer.parser; import java.io.*; import java.util.*; +import java.lang.reflect.Type; import org.apache.pig.impl.logicalLayer.*; import org.apache.pig.impl.logicalLayer.schema.*; import org.apache.pig.data.DataType; @@ -103,7 +104,8 @@ LogicalPlan readFrom, String fileName, String func, - LogicalOperator input) throws FrontendException { + LogicalOperator input, + Map<LogicalOperator, LogicalPlan> aliases) throws FrontendException { if (func == null) { func = PigStorage.class.getName(); @@ -113,11 +115,11 @@ long storeNodeId = NodeIdGenerator.getGenerator().getNextNodeId(scope); - LogicalPlan rootPlan = new LogicalPlan(); + LogicalPlan storePlan = new LogicalPlan(); LogicalOperator store; try { - store = new LOStore(rootPlan, + store = new LOStore(storePlan, new OperatorKey(scope, storeNodeId), new FileSpec(fileName, func)); } catch (IOException ioe) { @@ -125,15 +127,16 @@ } try { - rootPlan.add(store); - rootPlan.add(input); - rootPlan.connect(input, store); - attachPlan(rootPlan, input, readFrom); + storePlan.add(store); + storePlan.add(input); + storePlan.connect(input, store); + attachPlan(storePlan, input, readFrom); } catch (ParseException pe) { throw new FrontendException(pe.getMessage()); } - - return readFrom; + + aliases.put(store, storePlan); + return storePlan; } static String unquote(String s) { @@ -814,6 +817,7 @@ Token t1; String funcName; List<ExpressionOperator> args; + byte type = DataType.BOOLEAN; log.trace("Entering PUnaryCond"); } { @@ -913,10 +917,10 @@ log.debug("PUnaryCond: Connected operator " + cond.getClass().getName() + " " + cond + " to " + lhs + " logical plan " + lp); } ) -| LOOKAHEAD(EvalFunction() "(") - (funcName=EvalFunction() "(" args=EvalArgs(over,specs,lp,input) ")" +| LOOKAHEAD(EvalFunction(type) "(") + (funcName=EvalFunction(type) "(" args=EvalArgs(over,specs,lp,input) ")" { - cond = new LOUserFunc(lp, new OperatorKey(scope, getNextId()), funcName, args, DataType.BOOLEAN); + cond = new LOUserFunc(lp, new OperatorKey(scope, getNextId()), funcName, args, type); lp.add(cond); log.debug("PUnaryCond: Added operator " + cond.getClass().getName() + " " + cond + " to logical plan " + lp); for(ExpressionOperator exprOp: args) { @@ -1031,6 +1035,7 @@ ) | <ALL> { es = new LOConst(groupByPlan = new LogicalPlan(), new OperatorKey(scope, getNextId()), "all"); + es.setType(DataType.CHARARRAY); groupByPlan.add(es); log.debug("GroupItem: Added operator " + es.getClass().getName() + " " + es + " to logical plan " + groupByPlan); listPlans.add(groupByPlan); @@ -1776,10 +1781,11 @@ String funcName; List<ExpressionOperator> args; ExpressionOperator userFunc; + byte type = DataType.BYTEARRAY; log.trace("Entering FuncEvalSpec"); } { - funcName=EvalFunction() "(" args=EvalArgs(over,specs,lp,input) ")" + funcName=EvalFunction(type) "(" args=EvalArgs(over,specs,lp,input) ")" { //check if the function name is an alias //if the user has defined an alias then @@ -1791,7 +1797,7 @@ if(null == userAliasFunc) { - userFunc = new LOUserFunc(lp, new OperatorKey(scope, getNextId()), funcName, args, DataType.BYTEARRAY); + userFunc = new LOUserFunc(lp, new OperatorKey(scope, getNextId()), funcName, args, type); } else { //we have an alias int expectedNumArgs = userAliasFunc.getArguments().size(); @@ -1856,12 +1862,17 @@ { String funcName; List<ExpressionOperator> args; + byte userSpecifiedType = DataType.BYTEARRAY; + boolean userSpecified = false; byte type = DataType.BYTEARRAY; log.trace("Entering FuncDeclareSpec"); } { - [type = Type()] funcName=EvalFunction() "(" args=FuncDeclareArgs(lp) ")" + [userSpecifiedType = Type() {userSpecified = true;}] funcName=EvalFunction(type) "(" args=FuncDeclareArgs(lp) ")" { + if(userSpecified && (userSpecifiedType != type)) { + throw new ParseException("User specified return type " + DataType.findTypeName(userSpecifiedType) + " and deduced return type " + DataType.findTypeName(type) + " are not the same"); + } ExpressionOperator userFunc = new LOUserFunc(lp, new OperatorKey(scope, getNextId()), funcName, args, type); log.trace("Exiting FuncDeclareSpec"); return userFunc; @@ -2085,7 +2096,7 @@ // These the simple non-terminals that are shared across many -String EvalFunction() : +String EvalFunction(byte type) : { String funcName; log.trace("Entering EvalFunction"); @@ -2099,6 +2110,9 @@ //Commented out the code for instaniateFunc as it's failing try{ EvalFunc ef = (EvalFunc) pigContext.instantiateFuncFromAlias(funcName); + Type javaType = ((EvalFunc)ef).getReturnType(); + log.debug("Type: " + javaType); + type = DataType.findType(javaType); }catch (Exception e){ throw new ParseException(e.getMessage()); }