Author: gates Date: Fri May 23 16:58:27 2008 New Revision: 659721 URL: http://svn.apache.org/viewvc?rev=659721&view=rev Log: Fixes in PigServer to deal with change of alias from map string->logicalplan to logicaloperator->logicalplan. Added TestMapReduce to build.xml to begin end to end testing. It currently fails, but I'm leaving it in so we can test with it.
Modified: incubator/pig/branches/types/build.xml incubator/pig/branches/types/src/org/apache/pig/PigServer.java Modified: incubator/pig/branches/types/build.xml URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=659721&r1=659720&r2=659721&view=diff ============================================================================== --- incubator/pig/branches/types/build.xml (original) +++ incubator/pig/branches/types/build.xml Fri May 23 16:58:27 2008 @@ -222,47 +222,6 @@ <batchtest fork="yes" todir="${test.log.dir}" unless="testcase"> <fileset dir="test"> - <!-- - <include name="**/TestBuiltin.java" /> - <include name="**/TestOperatorPlan.java" /> - <include name="**/TestPhyOp.java" /> - <include name="**/TestConstExpr.java" /> - <include name="**/TestProject.java" /> - <include name="**/TestFilter.java" /> - <include name="**/TestAdd.java" /> - <include name="**/TestSubtract.java" /> - <include name="**/TestMultiply.java" /> - <include name="**/TestDivide.java" /> - <include name="**/TestMod.java" /> - <include name="**/TestGreaterThan.java" /> - <include name="**/TestGTOrEqual.java" /> - <include name="**/TestLessThan.java" /> - <include name="**/TestLTOrEqual.java" /> - <include name="**/TestEqualTo.java" /> - <include name="**/TestNotEqualTo.java" /> - <include name="**/TestPOGenerate.java" /> - <include name="**/TestPOSort.java" /> - <include name="**/TestPOUserFunc.java" /> - <include name="**/TestPODistinct.java" /> - <include name="**/TestLoad.java" /> - <include name="**/TestStore.java" /> - <include name="**/TestPackage.java" /> - <include name="**/TestLocalRearrange.java" /> - <include name="**/TestForEach.java" /> - <include name="**/TestUnion.java" /> - <include name="**/TestMRCompiler.java" /> - <include name="**/TestJobSubmission.java" /> - <include name="**/TestInputOutputFileValidator.java" /> - <include name="**/TestTypeCheckingValidator.java" /> - <include name="**/TestSchema.java" /> - <include name="**/TestLogicalPlanBuilder.java" /> - <include name="**/TestLocalJobSubmission.java" /> - <include name="**/TestPOMapLookUp.java" /> - <include name="**/TestPOBinCond.java" /> - <include name="**/TestPONegative.java" /> - <include name="**/TestGrunt.java" /> - <include name="**/TestPOCast.java" /> - --> <include name="**/*Test*.java" /> <!-- Excluced because they are end-to-end, don't work yet. --> <exclude name="**/TestAlgebraicEval.java" /> @@ -272,7 +231,7 @@ <exclude name="**/TestFilterOpNumeric.java" /> <exclude name="**/TestFilterOpString.java" /> <exclude name="**/TestInfixArithmetic.java" /> - <exclude name="**/TestMapReduce.java" /> + <!-- <exclude name="**/TestMapReduce.java" /> --> <exclude name="**/TestPigFile.java" /> <exclude name="**/TestPigSplit.java" /> <exclude name="**/TestStoreOld.java" /> Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=659721&r1=659720&r2=659721&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Fri May 23 16:58:27 2008 @@ -231,17 +231,13 @@ } public void dumpSchema(String alias) throws IOException{ - LogicalPlan lp = aliases.get(alias); - if (lp == null) - throw new IOException("Invalid alias - " + alias); - try { + LogicalPlan lp = getPlanFromAlias(alias, "describe"); Schema schema = lp.getLeaves().get(0).getSchema(); System.out.println(schema.toString()); } catch (FrontendException fe) { - IOException ioe = new IOException(fe.getMessage()); - ioe.initCause(fe); - throw ioe; + throw WrappedIOException.wrap( + "Unable to describe schema for alias " + alias, fe); } } @@ -254,9 +250,6 @@ * result */ public Iterator<Tuple> openIterator(String id) throws IOException { - if (!aliases.containsKey(aliasOp.get(id))) - throw new IOException("Invalid alias: " + id); - try { ExecJob job = execute(id); // invocation of "execute" is synchronous! @@ -266,36 +259,10 @@ throw new IOException("Job terminated with anomalous status " + job.getStatus().toString()); } - } catch (ExecException ee) { + } catch (Exception e) { throw WrappedIOException.wrap( - "Unable to open iterator for alias: " + id, ee); - } - - // TODO: front-end could actually remember what logical plans have been - // already submitted to the back-end for compilation and - // execution. - - /* - LogicalPlan readFrom = (LogicalPlan) aliases.get(id); - - try { - ExecPhysicalPlan pp = - pigContext.getExecutionEngine().compile(readFrom, null); - - ExecJob job = pigContext.getExecutionEngine().execute(pp); - - // invocation of "execute" is synchronous! - if (job.getStatus() == JOB_STATUS.COMPLETED) { - return job.getResults(); - } - else { - throw new IOException("Job terminated with anomalous status " + job.getStatus().toString()); - } - } - catch (ExecException e) { - throw WrappedIOException.wrap("Unable to open iterator for alias: " + id, e); + "Unable to open iterator for alias: " + id, e); } - */ } /** @@ -312,39 +279,35 @@ /** * forces execution of query (and all queries from which it reads), in order to store result in file */ - public void store(String id, String filename, String func) throws IOException{ - if (!aliases.containsKey(id)) + public void store( + String id, + String filename, + String func) throws IOException{ + if (!aliasOp.containsKey(id)) throw new IOException("Invalid alias: " + id); - if (FileLocalizer.fileExists(filename, pigContext)) - throw new IOException("Output file " + filename + " already exists. Can't overwrite."); - - LogicalPlan readFrom = aliases.get(id); - - store(id, readFrom, filename, func); + try { + LogicalPlan readFrom = getPlanFromAlias(id, "store"); + store(id, readFrom, filename, func); + } catch (FrontendException fe) { + throw WrappedIOException.wrap("Unable to store alias " + id, fe); + } } - public void store(String id, LogicalPlan readFrom, String filename, String func) throws IOException { + public void store( + String id, + LogicalPlan readFrom, + String filename, + String func) throws IOException { try { LogicalPlan storePlan = QueryParser.generateStorePlan(opTable, scope, readFrom, filename, func, pigContext); execute(id); - } catch (ExecException e) { - throw WrappedIOException.wrap("Unable to store for alias: " + id, - e); + } catch (Exception e) { + throw WrappedIOException.wrap("Unable to store for alias: " + + id, e); } - /* - try { - ExecPhysicalPlan pp = - pigContext.getExecutionEngine().compile(storePlan, null); - - pigContext.getExecutionEngine().execute(pp); - } - catch (ExecException e) { - throw WrappedIOException.wrap("Unable to store alias " + readFrom.getAlias(), e); - } - */ } /** @@ -372,12 +335,9 @@ stream.println("-----------------------------------------------"); pigContext.getExecutionEngine().explain(pp, stream); - } catch (VisitorException ve) { - throw WrappedIOException.wrap("Unable to explain alias " + alias, - ve); - } catch (ExecException ee) { - throw WrappedIOException.wrap("Unable to explain alias " + alias, - ee); + } catch (Exception e) { + throw WrappedIOException.wrap("Unable to explain alias " + + alias, e); } } @@ -492,7 +452,8 @@ // pigContext.getExecutionEngine().reclaimScope(this.scope); } - private ExecJob execute(String jobName) throws ExecException { + private ExecJob execute( + String jobName) throws FrontendException, ExecException { ExecJob job = null; LogicalPlan lp = compileLp(jobName, "execute"); @@ -501,13 +462,12 @@ return pigContext.getExecutionEngine().execute(pp, jobName); } - private LogicalPlan compileLp(String alias, String op) throws ExecException { + private LogicalPlan compileLp( + String alias, + String op) throws ExecException, FrontendException { // Look up the logical plan in the aliases map. That plan will be // properly connected to all the others. - LogicalPlan lp = aliases.get(alias); - if (lp == null) { - throw new ExecException("No alias " + alias + " to " + op); - } + LogicalPlan lp = getPlanFromAlias(alias, op); // run through validator LogicalPlanValidationExecutor validator = @@ -556,4 +516,22 @@ return pp; } + + private LogicalPlan getPlanFromAlias( + String alias, + String operation) throws FrontendException { + LogicalOperator lo = aliasOp.get(alias); + if (lo == null) { + throw new FrontendException("No alias " + alias + " to " + + operation); + } + LogicalPlan lp = aliases.get(lo); + if (lp == null) { + throw new FrontendException("No plan for " + alias + " to " + + operation); + } + return lp; + } + + }