Author: pradeepkth Date: Tue Apr 14 17:57:56 2009 New Revision: 764904 URL: http://svn.apache.org/viewvc?rev=764904&view=rev Log: PIG-627: multiquery support incremental patch (hagleitn via pradeepkth)
Modified: hadoop/pig/branches/multiquery/CHANGES.txt hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLoad.java hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java Modified: hadoop/pig/branches/multiquery/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/CHANGES.txt?rev=764904&r1=764903&r2=764904&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/CHANGES.txt (original) +++ hadoop/pig/branches/multiquery/CHANGES.txt Tue Apr 14 17:57:56 2009 @@ -600,3 +600,5 @@ PIG-627: multiquery support incremental patch (hagleitn via pradeepkth) PIG-627: multiquery support incremental patch (hagleitn via pradeepkth) + + PIG-627: multiquery support incremental patch (hagleitn via pradeepkth) Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java?rev=764904&r1=764903&r2=764904&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java (original) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java Tue Apr 14 17:57:56 2009 @@ -884,7 +884,8 @@ this.batchMode = batchMode; this.processedStores = 0; this.ignoreNumStores = 0; - this.jobName = "DefaultJobName"; + this.jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME, + PigContext.JOB_NAME_PREFIX+":DefaultJobName"); this.lp = new LogicalPlan(); }; @@ -901,7 +902,7 @@ boolean isBatchEmpty() { return processedStores == storeOpTable.keySet().size(); } void execute() throws ExecException, FrontendException { - pigContext.getProperties().setProperty(PigContext.JOB_NAME, PigContext.JOB_NAME_PREFIX + ":" + jobName); + pigContext.getProperties().setProperty(PigContext.JOB_NAME, jobName); PigServer.this.execute(null); processedStores = storeOpTable.keySet().size(); } @@ -911,7 +912,7 @@ } void setJobName(String name) { - jobName = name; + jobName = PigContext.JOB_NAME_PREFIX+":"+name; } LogicalPlan getPlan(String alias) throws IOException { Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=764904&r1=764903&r2=764904&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Tue Apr 14 17:57:56 2009 @@ -222,10 +222,6 @@ POPackageAnnotator pkgAnnotator = new POPackageAnnotator(plan); pkgAnnotator.visit(); - // check whether stream operator is present - MRStreamHandler checker = new MRStreamHandler(plan); - checker.visit(); - // optimize joins LastInputStreamingOptimizer liso = new MRCompiler.LastInputStreamingOptimizer(plan, lastInputChunkSize); @@ -250,6 +246,12 @@ // NoopFilterRemover. NoopStoreRemover sRem = new NoopStoreRemover(plan); sRem.visit(); + + // check whether stream operator is present + // after MultiQueryOptimizer because it can shift streams from + // map to reduce, etc. + MRStreamHandler checker = new MRStreamHandler(plan); + checker.visit(); return plan; } Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java?rev=764904&r1=764903&r2=764904&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java (original) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java Tue Apr 14 17:57:56 2009 @@ -71,6 +71,7 @@ for (Pair<POFilter, PhysicalPlan> pair: removalQ) { removeFilter(pair.first, pair.second); } + removalQ.clear(); } @Override @@ -94,7 +95,15 @@ private void removeFilter(POFilter filter, PhysicalPlan plan) { if (plan.size() > 1) { try { + List<PhysicalOperator> fInputs = filter.getInputs(); + List<PhysicalOperator> sucs = plan.getSuccessors(filter); + plan.removeAndReconnect(filter); + if(sucs!=null && sucs.size()!=0){ + for (PhysicalOperator suc : sucs) { + suc.setInputs(fInputs); + } + } } catch (PlanException pe) { log.info("Couldn't remove a filter in optimizer: "+pe.getMessage()); } Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=764904&r1=764903&r2=764904&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Tue Apr 14 17:57:56 2009 @@ -65,6 +65,7 @@ for (PhysicalPlan plan : inpPlans) { pushWalker(mCurrentWalker.spawnChildWalker(plan)); visit(); + popWalker(); } } @@ -115,6 +116,7 @@ for (PhysicalPlan plan : inpPlans) { pushWalker(mCurrentWalker.spawnChildWalker(plan)); visit(); + popWalker(); } } @@ -258,6 +260,7 @@ for (PhysicalPlan plan : inpPlans) { pushWalker(mCurrentWalker.spawnChildWalker(plan)); visit(); + popWalker(); } } Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=764904&r1=764903&r2=764904&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java (original) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java Tue Apr 14 17:57:56 2009 @@ -95,6 +95,8 @@ private static Result empty = new Result(POStatus.STATUS_NULL, null); + private boolean inpEOP = false; + /** * Constructs an operator with the specified key * @param k the operator key @@ -200,6 +202,12 @@ @Override public Result getNext(Tuple t) throws ExecException { + + if (this.parentPlan.endOfAllInput) { + + return getStreamCloseResult(); + + } if (processedSet.cardinality() == myPlans.size()) { @@ -258,5 +266,53 @@ return res; } + + private Result getStreamCloseResult() throws ExecException { + Result res = null; + + while (true) { + + if (processedSet.cardinality() == myPlans.size()) { + Result inp = processInput(); + if (inp.returnStatus == POStatus.STATUS_OK) { + Tuple tuple = (Tuple)inp.result; + for (PhysicalPlan pl : myPlans) { + pl.attachInput(tuple); + } + inpEOP = false; + } else if (inp.returnStatus == POStatus.STATUS_EOP){ + inpEOP = true; + } else if (inp.returnStatus == POStatus.STATUS_NULL) { + inpEOP = false; + } else if (inp.returnStatus == POStatus.STATUS_ERR) { + return inp; + } + processedSet.clear(); + } + + int idx = processedSet.nextClearBit(0); + PhysicalOperator leaf = myPlans.get(idx).getLeaves().get(0); + + res = leaf.getNext(dummyTuple); + + if (res.returnStatus == POStatus.STATUS_EOP) { + processedSet.set(idx++); + if (idx < myPlans.size()) { + continue; + } + } else { + break; + } + + if (!inpEOP && res.returnStatus == POStatus.STATUS_EOP) { + continue; + } else { + break; + } + } + + return res; + + } } Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=764904&r1=764903&r2=764904&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue Apr 14 17:57:56 2009 @@ -65,6 +65,7 @@ import org.apache.pig.backend.datastorage.DataStorage; import org.apache.pig.backend.datastorage.ContainerDescriptor; import org.apache.pig.backend.datastorage.ElementDescriptor; +import org.apache.hadoop.fs.Path; public class QueryParser { private PigContext pigContext; @@ -213,6 +214,13 @@ path = uri.getSchemeSpecificPart(); } + if ((scheme == null && pigContext.getExecType() == ExecType.MAPREDUCE) || + "hdfs".equalsIgnoreCase(scheme)) { + // We need to get the path from a hadoop path object, + // otherwise special glob characters could get removed. + path = new Path(fname).toUri().getPath(); + } + if (scheme == null || scheme.equals("file") || scheme.equals("hdfs")) { if (pigContext.getExecType() != ExecType.LOCAL) { if (fname.startsWith(FileLocalizer.LOCAL_PREFIX)) { Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java?rev=764904&r1=764903&r2=764904&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java (original) +++ hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java Tue Apr 14 17:57:56 2009 @@ -322,6 +322,7 @@ if (batch) { setBatchOn(); + mPigServer.setJobName(script); try { loadScript(script, true, mLoadOnly, params, files); executeBatch(); Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLoad.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLoad.java?rev=764904&r1=764903&r2=764904&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLoad.java (original) +++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLoad.java Tue Apr 14 17:57:56 2009 @@ -132,7 +132,7 @@ @Test public void testLoadRemoteRelScheme() throws Exception { - checkLoadPath("hdfs:test","/tmp/test"); + checkLoadPath("test","/tmp/test"); } @Test Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java?rev=764904&r1=764903&r2=764904&view=diff ============================================================================== --- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java (original) +++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java Tue Apr 14 17:57:56 2009 @@ -217,7 +217,7 @@ @Test public void testStoreRemoteRelScheme() throws Exception { - checkStorePath("hdfs:test","/tmp/test"); + checkStorePath("test","/tmp/test"); } @Test