Author: gates Date: Thu May 22 12:04:25 2008 New Revision: 659206 URL: http://svn.apache.org/viewvc?rev=659206&view=rev Log: PIG-159 Santhosh's work to chain together logical plans.
Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java incubator/pig/branches/types/test/org/apache/pig/test/TestTypeChecking.java Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=659206&r1=659205&r2=659206&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Thu May 22 12:04:25 2008 @@ -84,7 +84,7 @@ } - Map<String, LogicalPlan> aliases = new HashMap<String, LogicalPlan>(); + Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>(); Map<OperatorKey, LogicalOperator> opTable = new HashMap<OperatorKey, LogicalOperator>(); Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>(); Map<String, ExpressionOperator> defineAliases = new HashMap<String, ExpressionOperator>(); @@ -252,7 +252,7 @@ * result */ public Iterator<Tuple> openIterator(String id) throws IOException { - if (!aliases.containsKey(id)) + if (!aliases.containsKey(aliasOp.get(id))) throw new IOException("Invalid alias: " + id); try { @@ -471,7 +471,14 @@ } public Map<String, LogicalPlan> getAliases() { - return this.aliases; + Map<String, LogicalPlan> aliasPlans = new HashMap<String, LogicalPlan>(); + for(LogicalOperator op: this.aliases.keySet()) { + String alias = op.getAlias(); + if(null != alias) { + aliasPlans.put(alias, this.aliases.get(op)); + } + } + return aliasPlans; } public void shutdown() { Modified: incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java?rev=659206&r1=659205&r2=659206&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java Thu May 22 12:04:25 2008 @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.util.Iterator; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -28,6 +29,7 @@ import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.LogicalPlan; +import org.apache.pig.impl.logicalLayer.LogicalOperator; public class StandAloneParser { @@ -66,9 +68,10 @@ try{ pig.registerQuery(query); System.out.print("Current aliases: "); - for (Iterator<String> it = pig.getAliases().keySet().iterator(); it.hasNext(); ) { + Map<String, LogicalPlan> aliasPlan = pig.getAliases(); + for (Iterator<String> it = aliasPlan.keySet().iterator(); it.hasNext(); ) { String alias = it.next(); - LogicalPlan lp = pig.getAliases().get(alias); + LogicalPlan lp = aliasPlan.get(alias); System.out.print(alias + "->" + lp.getLeaves().get(0).getSchema()); if (it.hasNext()) System.out.print(", \n"); else System.out.print("\n"); Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java?rev=659206&r1=659205&r2=659206&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java Thu May 22 12:04:25 2008 @@ -40,7 +40,7 @@ public LogicalPlan parse(String scope, String query, - Map<String, LogicalPlan> aliases, + Map<LogicalOperator, LogicalPlan> aliases, Map<OperatorKey, LogicalOperator> opTable, Map<String, LogicalOperator> aliasOp, Map<String, ExpressionOperator> defineAliases) Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=659206&r1=659205&r2=659206&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu May 22 12:04:25 2008 @@ -52,7 +52,7 @@ public class QueryParser { private PigContext pigContext; - private Map<String, LogicalPlan> aliases; + private Map<LogicalOperator, LogicalPlan> aliases; private Map<OperatorKey, LogicalOperator> opTable; private String scope; private NodeIdGenerator nodeIdGen; @@ -68,7 +68,7 @@ public QueryParser(InputStream in, PigContext pigContext, String scope, - Map<String, LogicalPlan> aliases, + Map<LogicalOperator, LogicalPlan> aliases, Map<OperatorKey, LogicalOperator> opTable, Map<String, LogicalOperator> aliasOp, Map<String, ExpressionOperator> defineAliases) { @@ -270,8 +270,8 @@ void addSplitOutput(LogicalPlan lp, LOSplit splitOp, String alias, LogicalPlan condPlan, int index) throws PlanException{ LogicalOperator splitOutput = new LOSplitOutput(lp, new OperatorKey(scope, getNextId()), index, condPlan); splitOp.addOutput(splitOutput); - //splitOp.addOutputAlias(alias, condPlan); addAlias(alias, splitOutput); + addLogicalPlan(splitOutput, lp); lp.add(splitOutput); log.debug("Added alias: " + splitOutput.getAlias() + " class: " @@ -332,6 +332,34 @@ //END private static Map<String, Byte> nameToTypeMap = DataType.genNameToTypeMap(); + + public void addLogicalPlan(LogicalOperator op, LogicalPlan plan) { + aliases.put(op, plan); + } + + public LogicalPlan getLogicalPlan(LogicalOperator op) { + return aliases.get(op); + } + + public void attachPlan(LogicalPlan lp, LogicalOperator root, LogicalPlan rootPlan) throws ParseException { + log.trace("Entering attachPlan"); + lp.add(root); + log.debug("Added operator " + root + " to the logical plan " + lp); + if(null == rootPlan.getPredecessors(root)) { + log.trace("Exiting attachPlan"); + return; + } + for(LogicalOperator rootPred: rootPlan.getPredecessors(root)) { + attachPlan(lp, rootPred, rootPlan); + try { + lp.connect(rootPred, root); + log.debug("Connected operator " + rootPred + " to " + root + " in the logical plan " + lp); + } catch (FrontendException fee) { + throw new ParseException(fee.getMessage()); + } + } + log.trace("Exiting attachPlan"); + } } @@ -435,8 +463,29 @@ ) { if(null != root) { - log.debug("Root: " + root.getClass().getName() + " schema: " + root.getSchema()); + log.debug("Adding " + root.getAlias() + " " + root + " to the lookup table " + aliases); + addLogicalPlan(root, lp); + log.debug("Root: " + root.getClass().getName() + " schema: " + root.getSchema()); } + + ArrayList<LogicalOperator> roots = new ArrayList<LogicalOperator>(lp.getRoots().size()); + for(LogicalOperator op: lp.getRoots()) { + roots.add(op); + } + + for(LogicalOperator op: roots) { + //At this point we have a logical plan for the pig statement + //In order to construct the entire logical plan we need to traverse + //each root and get the logical plan it belongs to. From each of those + //plans we need the predecessors of the root of the current logical plan + //and so on. This is a computationally intensive operatton but should + //be fine as its restricted to the parser + + LogicalPlan rootPlan = aliases.get(op); + if(null != rootPlan) { + attachPlan(lp, op, rootPlan); + } + } log.trace("Exiting Parse"); return lp; @@ -564,7 +613,7 @@ { ( ( - (<LOAD> op = LoadClause(lp) [<AS> (LOOKAHEAD(2) "(" schema = TupleSchema() ")" {op.setSchema(schema);log.debug("Load as schema()");schema.printAliases();} | fs = AtomSchema() {schema = new Schema(fs); op.setSchema(schema); log.info("Load as atomschema()");schema.printAliases();}) ]) + (<LOAD> op = LoadClause(lp) [<AS> (LOOKAHEAD(2) "(" schema = TupleSchema() ")" {op.setSchema(schema);log.debug("Load as schema()");schema.printAliases();} | fs = AtomSchema() {schema = new Schema(fs); op.setSchema(schema); log.debug("Load as atomschema()");schema.printAliases();}) ]) | ((<GROUP> | <COGROUP>) op = CogroupClause(lp)) | (<FILTER> op = FilterClause(lp)) | (<ORDER> op = OrderClause(lp)) @@ -1485,10 +1534,8 @@ } { ( -// lhs = UnaryExpr(over,specs,lp,input) lhs = CastExpr(over,specs,lp,input) ( -// ( t = <STAR> | t = "/" | t = "%") rhs = UnaryExpr(over,specs,lp,input) ( t = <STAR> | t = "/" | t = "%") rhs = CastExpr(over,specs,lp,input) { assertAtomic(lhs,true); @@ -1564,7 +1611,6 @@ log.trace("Entering NegativeExpr"); } { -// "-" c1=UnaryExpr(over,specs,lp,input) "-" c1=CastExpr(over,specs,lp,input) { ExpressionOperator eOp = new LONegative(lp, new OperatorKey(scope, getNextId()), c1); @@ -1768,12 +1814,6 @@ [type = Type()] funcName=EvalFunction() "(" args=FuncDeclareArgs(lp) ")" { ExpressionOperator userFunc = new LOUserFunc(lp, new OperatorKey(scope, getNextId()), funcName, args, type); - //lp.add(userFunc); - //log.debug("FuncDeclareSpec: Added operator " + userFunc.getClass().getName() + " " + userFunc + " to logical plan " + lp); - //for(ExpressionOperator exprOp: args) { - //lp.connect(exprOp, userFunc); - //log.debug("FuncDeclareSpec: Connected operator " + exprOp.getClass().getName() + " " + exprOp+ " to " + userFunc + " logical plan " + lp); - //} log.trace("Exiting FuncDeclareSpec"); return userFunc; } @@ -1815,8 +1855,6 @@ project.setAlias(t.image); } item = project; - //lp.add(project); - log.debug("FuncDeclareArgsItem: Added operator " + project.getClass().getName() + " " + project + " to logical plan " + lp); } ) {log.trace("Exiting FuncDeclareArgsItem");return item;} Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java?rev=659206&r1=659205&r2=659206&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java Thu May 22 12:04:25 2008 @@ -37,6 +37,7 @@ import org.apache.pig.impl.logicalLayer.ExpressionOperator; import org.apache.pig.impl.logicalLayer.LogToPhyTranslationVisitor; import org.apache.pig.impl.logicalLayer.LogicalOperator; +import org.apache.pig.impl.logicalLayer.LOLoad; import org.apache.pig.impl.logicalLayer.LogicalPlan; import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder; import org.apache.pig.impl.logicalLayer.OperatorKey; @@ -382,13 +383,12 @@ defineAliases); List<LogicalOperator> roots = lp.getRoots(); + if(roots.size() > 0) { - if (logicalOpTable.get(roots.get(0)) instanceof LogicalOperator){ - System.out.println(query); - System.out.println(logicalOpTable.get(roots.get(0))); - } - if ((roots.get(0)).getAlias()!=null){ - aliases.put((roots.get(0)).getAlias(), lp); + for(LogicalOperator op: roots) { + if (!(op instanceof LOLoad)){ + throw new Exception("Cannot have a root that is not the load operator LOLoad. Found " + op.getClass().getName()); + } } } @@ -427,7 +427,7 @@ return null; } - Map<String, LogicalPlan> aliases = new HashMap<String, LogicalPlan>(); + Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>(); Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>(); Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>(); Map<String, ExpressionOperator> defineAliases = new HashMap<String, ExpressionOperator>(); Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=659206&r1=659205&r2=659206&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java Thu May 22 12:04:25 2008 @@ -807,7 +807,7 @@ @Test public void testQuery68() { buildPlan(" a = load 'input1';"); - buildPlan(" b = foreach a generate {(16, 4.0e-2, 'hello'), (0.5f, 'another tuple', 12l, {()})};"); + buildPlan(" b = foreach a generate 10, {(16, 4.0e-2, 'hello'), (0.5f, 'another tuple', 12l, {()})};"); } @Test @@ -875,7 +875,8 @@ @Test public void testQuery75() { - buildPlan("union (load 'a'), (load 'b'), (load 'c');"); + buildPlan("a = union (load 'a'), (load 'b'), (load 'c');"); + buildPlan("b = foreach a {generate $0;} parallel 10;"); } // Helper Functions @@ -901,12 +902,10 @@ List<LogicalOperator> roots = lp.getRoots(); if(roots.size() > 0) { - if (logicalOpTable.get(roots.get(0)) instanceof LogicalOperator){ - System.out.println(query); - System.out.println(logicalOpTable.get(roots.get(0))); - } - if ((roots.get(0)).getAlias()!=null){ - aliases.put((roots.get(0)).getAlias(), lp); + for(LogicalOperator op: roots) { + if (!(op instanceof LOLoad)){ + throw new Exception("Cannot have a root that is not the load operator LOLoad. Found " + op.getClass().getName()); + } } } @@ -946,7 +945,7 @@ return null; } - Map<String, LogicalPlan> aliases = new HashMap<String, LogicalPlan>(); + Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>(); Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>(); Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>(); Map<String, ExpressionOperator> defineAliases = new HashMap<String, ExpressionOperator>(); Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestTypeChecking.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestTypeChecking.java?rev=659206&r1=659205&r2=659206&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestTypeChecking.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestTypeChecking.java Thu May 22 12:04:25 2008 @@ -207,7 +207,7 @@ System.out.println(logicalOpTable.get(roots.get(0))); } if ((roots.get(0)).getAlias()!=null){ - aliases.put((roots.get(0)).getAlias(), lp); + aliases.put(roots.get(0), lp); } } @@ -232,7 +232,7 @@ TypeCheckingTestUtil.printTypeGraph(plan) ; } - Map<String, LogicalPlan> aliases = new HashMap<String, LogicalPlan>(); + Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>(); Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>(); Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>(); Map<String, ExpressionOperator> defineAliases = new HashMap<String, ExpressionOperator>();