Author: gates Date: Fri May 16 14:38:40 2008 New Revision: 657224 URL: http://svn.apache.org/viewvc?rev=657224&view=rev Log: PIG-162 Shravan's addition of PigProgressable and distinct for MR.
Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/ProgressableReporter.java incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld Removed: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/RunnableReporter.java Modified: incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapBase.java incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PigProgressable.java incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUnion.java incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserComparisonFunc.java incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserFunc.java incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java Modified: incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java?rev=657224&r1=657223&r2=657224&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java Fri May 16 14:38:40 2008 @@ -23,9 +23,14 @@ import org.apache.hadoop.io.WritableComparator; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.physicalLayer.PigProgressable; public abstract class ComparisonFunc extends WritableComparator { + // If the comparison is a time consuming process + // this reporter must be used to report progress + protected PigProgressable reporter; + public ComparisonFunc() { super(TupleFactory.getInstance().tupleClass()); } @@ -48,4 +53,8 @@ * @see java.util.Comparator */ abstract public int compare(Tuple t1, Tuple t2); + + public void setReporter(PigProgressable reporter) { + this.reporter = reporter; + } } Modified: incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java?rev=657224&r1=657223&r2=657224&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/EvalFunc.java Fri May 16 14:38:40 2008 @@ -26,6 +26,7 @@ // TODO FIX // import org.apache.pig.impl.PigContext; import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.physicalLayer.PigProgressable; /** @@ -44,6 +45,9 @@ * */ public abstract class EvalFunc<T> { + // UDFs must use this to report progress + // if the exec is taking more that 300 ms + protected PigProgressable reporter; protected Type returnType; @@ -107,16 +111,7 @@ // report that progress is being made (otherwise hadoop times out after 600 seconds working on one outer tuple) protected void progress() { - //This part appears to be unused and is causing problems due to changing hadoop signature - /* - if (PigMapReduce.reporter != null) { - try { - PigMapReduce.reporter.progress(); - } catch (IOException ignored) { - } - } - */ - + if(reporter!=null) reporter.progress(); } /** @@ -158,4 +153,14 @@ public boolean isAsynchronous(){ return false; } + + + public PigProgressable getReporter() { + return reporter; + } + + + public void setReporter(PigProgressable reporter) { + this.reporter = reporter; + } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java?rev=657224&r1=657223&r2=657224&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java Fri May 16 14:38:40 2008 @@ -47,6 +47,8 @@ import org.apache.pig.impl.physicalLayer.plans.ExprPlan; import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.impl.physicalLayer.plans.PlanPrinter; +import org.apache.pig.impl.physicalLayer.topLevelOperators.PODistinct; import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter; import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach; import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate; @@ -54,21 +56,19 @@ import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad; import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange; import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage; -import org.apache.pig.impl.physicalLayer.topLevelOperators.PORead; import org.apache.pig.impl.physicalLayer.topLevelOperators.POSort; import org.apache.pig.impl.physicalLayer.topLevelOperators.POSplit; import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore; import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion; import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator; import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression; -import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator; import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject; import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserFunc; import org.apache.pig.impl.plan.DepthFirstWalker; -import org.apache.pig.impl.plan.VisitorException; -import org.apache.pig.impl.plan.PlanException; import org.apache.pig.impl.plan.Operator; import org.apache.pig.impl.plan.OperatorPlan; +import org.apache.pig.impl.plan.PlanException; +import org.apache.pig.impl.plan.VisitorException; /** * The compiler that compiles a given physical plan @@ -657,6 +657,68 @@ throw pe; } } + + + + @Override + public void visitDistinct(PODistinct op) throws VisitorException { + try{ + MapReduceOper mro = compiledInputs[0]; + ExprPlan ep = new ExprPlan(); + POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope))); + prjStar.setResultType(DataType.TUPLE); + prjStar.setStar(true); + ep.add(prjStar); + + List<ExprPlan> eps = new ArrayList<ExprPlan>(); + eps.add(ep); + + POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope))); + lr.setIndex(0); + lr.setKeyType(DataType.TUPLE); + lr.setPlans(eps); + lr.setResultType(DataType.TUPLE); + if(!mro.isMapDone()){ + mro.mapPlan.addAsLeaf(lr); + } + else if(mro.isMapDone() && ! mro.isReduceDone()){ + mro.reducePlan.addAsLeaf(lr); + } + + blocking(op); + + POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope))); + pkg.setKeyType(DataType.TUPLE); + pkg.setNumInps(1); + boolean[] inner = {false}; + pkg.setInner(inner); + curMROp.reducePlan.add(pkg); + + List<ExprPlan> eps1 = new ArrayList<ExprPlan>(); + List<Boolean> flat1 = new ArrayList<Boolean>(); + ExprPlan ep1 = new ExprPlan(); + POProject prj1 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope))); + prj1.setResultType(DataType.TUPLE); + prj1.setStar(false); + prj1.setColumn(0); + prj1.setOverloaded(false); + ep1.add(prj1); + eps1.add(ep1); + flat1.add(false); + POGenerate fe1Gen = new POGenerate(new OperatorKey(scope,nig.getNextNodeId(scope)),eps1,flat1); + fe1Gen.setResultType(DataType.TUPLE); + PhysicalPlan<PhysicalOperator> fe1Plan = new PhysicalPlan<PhysicalOperator>(); + fe1Plan.add(fe1Gen); + POForEach fe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope))); + fe1.setPlan(fe1Plan); + fe1.setResultType(DataType.TUPLE); + curMROp.reducePlan.addAsLeaf(fe1); + }catch(Exception e){ + VisitorException pe = new VisitorException(e.getMessage()); + pe.initCause(e); + throw pe; + } + } @Override public void visitSort(POSort op) throws VisitorException { @@ -909,10 +971,11 @@ POLoad ld = comp.getLoad(); pj.mapPlan.add(ld); - POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, null, - sortPlans, mAscCols, null); - - pj.mapPlan.addAsLeaf(sort); + /*POSort op = new POSort(new OperatorKey("", r.nextLong()), -1, null, + sortPlans, mAscCols, null);*/ + PODistinct op = new PODistinct(new OperatorKey("", r.nextLong()), + -1, null); + pj.mapPlan.addAsLeaf(op); POStore st = comp.getStore(); pj.mapPlan.addAsLeaf(st); @@ -920,10 +983,7 @@ MRCompiler c1 = new MRCompiler(pj.mapPlan,pc); c1.compile(); MROperPlan plan = c1.getMRPlan(); - for(int i=0;i<3;i++){ - MapReduceOper job = plan.getLeaves().get(0); - System.out.println(job.name()); - plan.remove(job); - } + PlanPrinter<MapReduceOper, MROperPlan> pp = new PlanPrinter<MapReduceOper, MROperPlan>(plan); + pp.print(System.out); } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapBase.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapBase.java?rev=657224&r1=657223&r2=657224&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapBase.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapBase.java Fri May 16 14:38:40 2008 @@ -27,16 +27,12 @@ public abstract class PigMapBase extends MapReduceBase{ private final Log log = LogFactory.getLog(getClass()); - protected String inputFile = null; - //Map Plan protected PhysicalPlan<PhysicalOperator> mp; - //The reporter that handles communicating progress - protected RunnableReporter runnableReporter; - - //The thread used to run the runnableReporter - protected Thread reporterThread; + // Reporter that will be used by operators + // to transmit heartbeat + ProgressableReporter pigReporter; /** * Will be called when all the tuples in the input @@ -45,13 +41,8 @@ @Override public void close() throws IOException { super.close(); - if(runnableReporter!=null){ - runnableReporter.setDone(true); - runnableReporter = null; - } - reporterThread = null; + PhysicalOperator.setReporter(null); mp = null; - inputFile = null; } /** @@ -61,7 +52,6 @@ @Override public void configure(JobConf job) { super.configure(job); - inputFile = job.get("map.input.file", "UNKNOWN"); try { mp = (PhysicalPlan<PhysicalOperator>) ObjectSerializer.deserialize(job .get("pig.mapPlan")); @@ -77,9 +67,8 @@ // till here long sleepTime = job.getLong("pig.reporter.sleep.time", 10000); - runnableReporter = new RunnableReporter(sleepTime); - reporterThread = new Thread(runnableReporter); - reporterThread.start(); + + pigReporter = new ProgressableReporter(); } catch (IOException e) { log.error(e.getMessage() + "was caused by:"); log.error(e.getCause().getMessage()); @@ -99,8 +88,8 @@ OutputCollector<WritableComparable, Writable> oc, Reporter reporter) throws IOException { - runnableReporter.setReporter(reporter); - runnableReporter.setInputFile(inputFile); + pigReporter.setRep(reporter); + PhysicalOperator.setReporter(pigReporter); if(mp.isEmpty()){ try{ Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java?rev=657224&r1=657223&r2=657224&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java Fri May 16 14:38:40 2008 @@ -38,10 +38,8 @@ import org.apache.pig.data.IndexedTuple; import org.apache.pig.data.TargetedTuple; import org.apache.pig.data.Tuple; -import org.apache.pig.impl.logicalLayer.OperatorKey; import org.apache.pig.impl.physicalLayer.POStatus; import org.apache.pig.impl.physicalLayer.Result; -import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan; import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage; import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator; @@ -78,141 +76,6 @@ WritableComparable wcKey = DataType.getWritableComparableTypes(key); oc.collect(wcKey, it); } - /*private final Log log = LogFactory.getLog(getClass()); - - private String inputFile = null; - - //Map Plan - private PhysicalPlan<PhysicalOperator> mp; - - //The reporter that handles communicating progress - RunnableReporter runnableReporter; - - //The thread used to run the runnableReporter - Thread reporterThread; - - *//** - * Configures the mapper with the map plan and the - * reproter thread - *//* - @Override - public void configure(JobConf jConf) { - super.configure(jConf); - inputFile = jConf.get("map.input.file", "UNKNOWN"); - try { - mp = (PhysicalPlan<PhysicalOperator>) ObjectSerializer.deserialize(jConf - .get("pig.mapPlan")); - - // To be removed - if(mp.isEmpty()) - log.debug("Map Plan empty!"); - else{ - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - mp.explain(baos); - log.debug(baos.toString()); - } - // till here - - long sleepTime = jConf.getLong("pig.reporter.sleep.time", 10000); - runnableReporter = new RunnableReporter(sleepTime); - reporterThread = new Thread(runnableReporter); - reporterThread.start(); - } catch (IOException e) { - log.error(e.getMessage() + "was caused by:"); - log.error(e.getCause().getMessage()); - } - } - - *//** - * The map function that attaches the inpTuple appropriately - * and executes the map plan if its not empty. Collects the - * result of execution into oc or the input directly to oc - * if map plan empty. The collection is done after extracting - * the key and indexed tuple. - *//* - public void map(Text key, TargetedTuple inpTuple, - OutputCollector<WritableComparable, Writable> oc, - Reporter reporter) throws IOException { - - runnableReporter.setReporter(reporter); - runnableReporter.setInputFile(inputFile); - - if(mp.isEmpty()){ - try{ - collectKeyAndTuple(oc,inpTuple.toTuple()); - } catch (ExecException e) { - IOException ioe = new IOException(e.getMessage()); - ioe.initCause(e.getCause()); - throw ioe; - } - return; - } - - for (OperatorKey targetKey : inpTuple.targetOps) { - PhysicalOperator<PhyPlanVisitor> target = mp.getOperator(targetKey); - Tuple t = inpTuple.toTuple(); - target.attachInput(t); - } - - List<PhysicalOperator> leaves = mp.getLeaves(); - - PhysicalOperator leaf = leaves.get(0); - try { - while(true){ - Result res = leaf.getNext(inpTuple); - if(res.returnStatus==POStatus.STATUS_OK){ - collectKeyAndTuple(oc,(Tuple)res.result); - continue; - } - - if(res.returnStatus==POStatus.STATUS_EOP) - return; - - if(res.returnStatus==POStatus.STATUS_NULL) - continue; - - if(res.returnStatus==POStatus.STATUS_ERR){ - IOException ioe = new IOException("Received Error while " + - "processing the map plan."); - throw ioe; - } - } - } catch (ExecException e) { - IOException ioe = new IOException(e.getMessage()); - ioe.initCause(e.getCause()); - throw ioe; - } - } - - *//** - * Assumes that the tuple is of the form (key,indexedTuple) and - * extracts the key & indexed tuple. The key is then converted - * to the appropriate Hadoop type and the Hadoop type and IndexedTup - * are collected into the output collector - * @param oc - Output Collector - * @param tuple - The tuple which is the result of a LR either directly - * or by loading a file which has the output of a LR - * @throws ExecException - * @throws IOException - *//* - private void collectKeyAndTuple(OutputCollector<WritableComparable, Writable> oc, Tuple tuple) throws ExecException, IOException { - Object key = tuple.get(0); - IndexedTuple it = (IndexedTuple)tuple.get(1); - WritableComparable wcKey = DataType.getWritableComparableTypes(key); - oc.collect(wcKey, it); - } - - *//** - * Will be called when all the tuples in the input - * are done. So reporter thread should be stopped. - *//* - @Override - public void close() throws IOException { - super.close(); - if(runnableReporter!=null) - runnableReporter.setDone(true); - }*/ - } public static class Reduce extends MapReduceBase @@ -230,11 +93,7 @@ //plan private POPackage pack; - //The reporter that handles communicating progress - RunnableReporter runnableReporter; - - //The thread used to run the runnableReporter - Thread reporterThread; + ProgressableReporter pigReporter; /** * Configures the Reduce plan, the POPackage operator @@ -258,9 +117,8 @@ // till here long sleepTime = jConf.getLong("pig.reporter.sleep.time", 10000); - runnableReporter = new RunnableReporter(sleepTime); - reporterThread = new Thread(runnableReporter); - reporterThread.start(); + + pigReporter = new ProgressableReporter(); } catch (IOException e) { log.error(e.getMessage() + "was caused by:"); log.error(e.getCause().getMessage()); @@ -278,7 +136,7 @@ OutputCollector<WritableComparable, Writable> oc, Reporter reporter) throws IOException { - runnableReporter.setReporter(reporter); + pigReporter.setRep(reporter); Object k = DataType.convertToPigType(key); pack.attachInput(k, indInp); @@ -345,8 +203,9 @@ @Override public void close() throws IOException { super.close(); - if(runnableReporter!=null) - runnableReporter.setDone(true); + /*if(runnableReporter!=null) + runnableReporter.setDone(true);*/ + PhysicalOperator.setReporter(null); } } Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/ProgressableReporter.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/ProgressableReporter.java?rev=657224&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/ProgressableReporter.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/ProgressableReporter.java Fri May 16 14:38:40 2008 @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.mapReduceLayer; + +import org.apache.hadoop.mapred.Reporter; +import org.apache.pig.impl.physicalLayer.PigProgressable; + +public class ProgressableReporter implements PigProgressable { + Reporter rep; + + public ProgressableReporter(){ + + } + + public ProgressableReporter(Reporter rep) { + super(); + this.rep = rep; + } + + public void progress() { + rep.progress(); + } + + public void progress(String msg) { + rep.setStatus(msg); + } + + public void setRep(Reporter rep) { + this.rep = rep; + } + +} Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PigProgressable.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PigProgressable.java?rev=657224&r1=657223&r2=657224&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PigProgressable.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PigProgressable.java Fri May 16 14:38:40 2008 @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.pig.impl.physicalLayer; public interface PigProgressable { Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java?rev=657224&r1=657223&r2=657224&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java Fri May 16 14:38:40 2008 @@ -214,7 +214,7 @@ } } - + if(reporter!=null) reporter.progress(); //CreateTuple(data); res.result = CreateTuple(data); res.returnStatus = POStatus.STATUS_OK; Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java?rev=657224&r1=657223&r2=657224&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java Fri May 16 14:38:40 2008 @@ -170,6 +170,7 @@ while (indTupIter.hasNext()) { IndexedTuple it = indTupIter.next(); dbs[it.index].add(it.toTuple()); + if(reporter!=null) reporter.progress(); } //Construct the output tuple by appending Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUnion.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUnion.java?rev=657224&r1=657223&r2=657224&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUnion.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUnion.java Fri May 16 14:38:40 2008 @@ -118,6 +118,7 @@ Result res; while(true){ + if(reporter!=null) reporter.progress(); res = inputs.get(ind).getNext(t); if(res.returnStatus == POStatus.STATUS_NULL) continue; Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java?rev=657224&r1=657223&r2=657224&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java Fri May 16 14:38:40 2008 @@ -28,6 +28,7 @@ import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.OperatorKey; import org.apache.pig.impl.physicalLayer.POStatus; +import org.apache.pig.impl.physicalLayer.PigProgressable; import org.apache.pig.impl.physicalLayer.Result; import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.impl.plan.Operator; @@ -85,6 +86,11 @@ // The result of performing the operation along with the output protected Result res = null; + + // Will be used by operators to report status or transmit heartbeat + // Should be set by the backends to appropriate implementations that + // wrap their own version of a reporter. + protected static PigProgressable reporter; // Dummy types used to access the getNext of appropriate // type. These will be null @@ -197,6 +203,7 @@ * @throws ExecException */ public Result processInput() throws ExecException { + Result res = new Result(); Tuple inpValue = null; if (input == null && (inputs == null || inputs.size()==0)) { @@ -204,6 +211,10 @@ res.returnStatus = POStatus.STATUS_EOP; return res; } + + //Should be removed once the model is clear + if(reporter!=null) reporter.progress(); + if (!isInputAttached()) return inputs.get(0).getNext(inpValue); else { @@ -256,4 +267,8 @@ return res; } + public static void setReporter(PigProgressable reporter) { + PhysicalOperator.reporter = reporter; + } + } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserComparisonFunc.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserComparisonFunc.java?rev=657224&r1=657223&r2=657224&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserComparisonFunc.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserComparisonFunc.java Fri May 16 14:38:40 2008 @@ -17,7 +17,11 @@ public class POUserComparisonFunc extends POUserFunc { - transient ComparisonFunc func; + /** + * + */ + private static final long serialVersionUID = 1L; + transient ComparisonFunc func; private Log log = LogFactory.getLog(getClass()); public POUserComparisonFunc(OperatorKey k, int rp, List inp, String funcSpec, ComparisonFunc func) { @@ -34,6 +38,7 @@ private void instantiateFunc() { this.func = (ComparisonFunc) PigContext.instantiateFuncFromSpec(this.funcSpec); + this.func.setReporter(reporter); } public ComparisonFunc getComparator() { Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserFunc.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserFunc.java?rev=657224&r1=657223&r2=657224&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserFunc.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserFunc.java Fri May 16 14:38:40 2008 @@ -38,12 +38,15 @@ import org.apache.pig.impl.physicalLayer.POStatus; import org.apache.pig.impl.physicalLayer.Result; import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor; -import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.impl.plan.VisitorException; public class POUserFunc extends ExpressionOperator { - transient EvalFunc func; + /** + * + */ + private static final long serialVersionUID = 1L; + transient EvalFunc func; Tuple t1, t2; private final Log log = LogFactory.getLog(getClass()); String funcSpec; @@ -73,6 +76,7 @@ private void instantiateFunc() { this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(this.funcSpec); + this.func.setReporter(reporter); } private Result getNext() throws ExecException { @@ -85,6 +89,7 @@ try { if (inputAttached) { result.result = func.exec(input); + if(reporter!=null) reporter.progress(); result.returnStatus = (result.result != null) ? POStatus.STATUS_OK : POStatus.STATUS_EOP; return result; Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java?rev=657224&r1=657223&r2=657224&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java Fri May 16 14:38:40 2008 @@ -42,6 +42,7 @@ import org.apache.pig.impl.physicalLayer.plans.ExprPlan; import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan; import org.apache.pig.impl.physicalLayer.plans.PlanPrinter; +import org.apache.pig.impl.physicalLayer.topLevelOperators.PODistinct; import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter; import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach; import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate; @@ -95,7 +96,7 @@ GenPhyOp.setR(r); GenPhyOp.setPc(pc); - int numTests = 15; + int numTests = 16; // int numTests = 9; tests = new String[numTests]; int cnt = -1; @@ -107,6 +108,7 @@ for (int i = 1; i <= 3; i++) tests[++cnt] = "intTestSpl" + i; tests[++cnt] = "intTestSortUDF1"; + tests[++cnt] = "intTestDistinct1"; } @After @@ -782,6 +784,28 @@ php.addAsLeaf(st); } + public static void intTestDistinct1() throws PlanException, ExecException{ + php = new PhysicalPlan<PhysicalOperator>(); + PhysicalPlan<PhysicalOperator> ldFil1 = GenPhyOp.loadedFilter(); + php.merge(ldFil1); + + PODistinct op = new PODistinct(new OperatorKey("", r.nextLong()), + -1, null); + + php.addAsLeaf(op); + + PhysicalPlan<PhysicalOperator> grpChain1 = GenPhyOp.grpChain(); + php.merge(grpChain1); + php.connect(op,grpChain1.getRoots().get(0)); + + PODistinct op1 = new PODistinct(new OperatorKey("", r.nextLong()), + -1, null); + + php.addAsLeaf(op1); + POStore st = GenPhyOp.topStoreOp(); + php.addAsLeaf(st); + } + public static class WeirdComparator extends ComparisonFunc { @Override Added: incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld?rev=657224&view=auto ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld (added) +++ incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld Fri May 16 14:38:40 2008 @@ -0,0 +1,41 @@ +MapReduce(-1) - MRCompiler-176: +| Store(DummyFil:DummyLdr) - -6079615556647418436 +| | +| |---For Each - MRCompiler-180 +| | | +| | POGenerate(false) - MRCompiler-179 +| | | | +| | | Project(0) - MRCompiler-178 +| | +| |---Package - MRCompiler-177 +| Load(/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) - MRCompiler-175 +| +|---MapReduce(-1) - MRCompiler-171: + | Store(/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) - MRCompiler-174 + | | + | |---Local Rearrange - MRCompiler-173 + | | | + | | Project(*) - MRCompiler-172 + | | + | |---Package - -8219725798912083822 + | Load(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - MRCompiler-170 + | + |---MapReduce(-1) - MRCompiler-162: + | Store(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - MRCompiler-169 + | | + | |---Local Rearrange - --3889827013424534115 + | | + | |---For Each - MRCompiler-168 + | | | + | | POGenerate(false) - MRCompiler-167 + | | | | + | | | Project(0) - MRCompiler-166 + | | + | |---Package - MRCompiler-165 + | Local Rearrange - MRCompiler-164 + | | | + | | Project(*) - MRCompiler-163 + | | + | |---Filter - --1613182091613226659 + | | + | |---Load(DummyFil:DummyLdr) - -5321755951016030071 \ No newline at end of file