Author: gates Date: Thu Dec 11 22:48:23 2008 New Revision: 725925 URL: http://svn.apache.org/viewvc?rev=725925&view=rev Log: PIG-556. Changed FindQuantiles to call progress(). Fixed issue with reporter passed to EvalFunc being null. Fixed issue with sampling phase of order by query running more than one reduce.
Modified: hadoop/pig/branches/types/CHANGES.txt hadoop/pig/branches/types/src/org/apache/pig/EvalFunc.java hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java hadoop/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java hadoop/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld Modified: hadoop/pig/branches/types/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=725925&r1=725924&r2=725925&view=diff ============================================================================== --- hadoop/pig/branches/types/CHANGES.txt (original) +++ hadoop/pig/branches/types/CHANGES.txt Thu Dec 11 22:48:23 2008 @@ -331,3 +331,9 @@ PIG-449: Schemas for bags should contain tuples all the time (pradeepk via olgan) + + PIG-543: Restore local mode to truly run locally instead of use map + reduce. (shubhamc via gates) + + PIG-556: Changed FindQuantiles to report progress. Fixed issue with null + reporter being passed to EvalFuncs. (gates) Modified: hadoop/pig/branches/types/src/org/apache/pig/EvalFunc.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/EvalFunc.java?rev=725925&r1=725924&r2=725925&view=diff ============================================================================== --- hadoop/pig/branches/types/src/org/apache/pig/EvalFunc.java (original) +++ hadoop/pig/branches/types/src/org/apache/pig/EvalFunc.java Thu Dec 11 22:48:23 2008 @@ -23,6 +23,9 @@ import java.lang.reflect.Type; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.logicalLayer.FrontendException; @@ -36,19 +39,15 @@ * The programmer should not make assumptions about state maintained * between invocations of the invoke() method since the Pig runtime * will schedule and localize invocations based on information provided - * at runtime. - * - * The programmer should not directly extend this class but instead - * extend one of the subclasses where we have bound the parameter T - * to a specific Datum - * - * @author database-syst...@yahoo.research - * + * at runtime. The programmer also should not make assumptions about when or + * how many times the class will be instantiated, since it may be instantiated + * multiple times in both the front and back end. */ public abstract class EvalFunc<T> { // UDFs must use this to report progress // if the exec is taking more that 300 ms protected PigProgressable reporter; + protected Log log = LogFactory.getLog(getClass()); private static int nextSchemaId; // for assigning unique ids to UDF columns protected String getSchemaName(String name, Schema input) { @@ -118,7 +117,8 @@ // report that progress is being made (otherwise hadoop times out after 600 seconds working on one outer tuple) protected void progress() { - if(reporter!=null) reporter.progress(); + if (reporter != null) reporter.progress(); + else log.warn("No reporter object provided to UDF " + this.getClass().getName()); } /** @@ -166,7 +166,7 @@ } - public void setReporter(PigProgressable reporter) { + public final void setReporter(PigProgressable reporter) { this.reporter = reporter; } Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=725925&r1=725924&r2=725925&view=diff ============================================================================== --- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Dec 11 22:48:23 2008 @@ -1172,7 +1172,7 @@ mro.reducePlan.connect(nfe3, str); mro.setReduceDone(true); -// mro.requestedParallelism = rp; + mro.requestedParallelism = 1; return mro; } Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=725925&r1=725924&r2=725925&view=diff ============================================================================== --- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original) +++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Thu Dec 11 22:48:23 2008 @@ -96,6 +96,12 @@ public Result processInput() throws ExecException { + // Make sure the reporter is set, because it isn't getting carried + // across in the serialization (don't know why). I suspect it's as + // cheap to call the setReporter call everytime as to check whether I + // have (hopefully java will inline it). + func.setReporter(reporter); + Result res = new Result(); Tuple inpValue = null; if (input == null && (inputs == null || inputs.size()==0)) { Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=725925&r1=725924&r2=725925&view=diff ============================================================================== --- hadoop/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java (original) +++ hadoop/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java Thu Dec 11 22:48:23 2008 @@ -125,6 +125,7 @@ nextQuantile+=toSkip+1; } i++; + if (i % 1000 == 0) progress(); } return output; } Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=725925&r1=725924&r2=725925&view=diff ============================================================================== --- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original) +++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Thu Dec 11 22:48:23 2008 @@ -1454,7 +1454,6 @@ /* (non-Javadoc) * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object) */ - @Override public int compare(Pair<Long, FuncSpec> o1, Pair<Long, FuncSpec> o2) { if(o1.first < o2.first) return -1; Modified: hadoop/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld?rev=725925&r1=725924&r2=725925&view=diff ============================================================================== --- hadoop/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld (original) +++ hadoop/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld Thu Dec 11 22:48:23 2008 @@ -50,7 +50,7 @@ | | | |---Load(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - -144 | - |---MapReduce(-1,FindQuantiles,TestMRCompiler$WeirdComparator) - -130: + |---MapReduce(1,FindQuantiles,TestMRCompiler$WeirdComparator) - -130: | Store(/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) - -143 | | | |---New For Each(false)[tuple] - -142