Author: gates
Date: Thu Dec 11 22:48:23 2008
New Revision: 725925
URL: http://svn.apache.org/viewvc?rev=725925&view=rev
Log:
PIG-556. Changed FindQuantiles to call progress(). Fixed issue with reporter
passed to EvalFunc being null. Fixed issue with sampling phase of order by
query running more than one reduce.
Modified:
hadoop/pig/branches/types/CHANGES.txt
hadoop/pig/branches/types/src/org/apache/pig/EvalFunc.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
hadoop/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
hadoop/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
Modified: hadoop/pig/branches/types/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=725925&r1=725924&r2=725925&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Thu Dec 11 22:48:23 2008
@@ -331,3 +331,9 @@
PIG-449: Schemas for bags should contain tuples all the time (pradeepk via
olgan)
+
+ PIG-543: Restore local mode to truly run locally instead of use map
+ reduce. (shubhamc via gates)
+
+ PIG-556: Changed FindQuantiles to report progress. Fixed issue with
null
+ reporter being passed to EvalFuncs. (gates)
Modified: hadoop/pig/branches/types/src/org/apache/pig/EvalFunc.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/EvalFunc.java?rev=725925&r1=725924&r2=725925&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/EvalFunc.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/EvalFunc.java Thu Dec 11
22:48:23 2008
@@ -23,6 +23,9 @@
import java.lang.reflect.Type;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -36,19 +39,15 @@
* The programmer should not make assumptions about state maintained
* between invocations of the invoke() method since the Pig runtime
* will schedule and localize invocations based on information provided
- * at runtime.
- *
- * The programmer should not directly extend this class but instead
- * extend one of the subclasses where we have bound the parameter T
- * to a specific Datum
- *
- * @author [email protected]
- *
+ * at runtime. The programmer also should not make assumptions about when or
+ * how many times the class will be instantiated, since it may be instantiated
+ * multiple times in both the front and back end.
*/
public abstract class EvalFunc<T> {
// UDFs must use this to report progress
// if the exec is taking more that 300 ms
protected PigProgressable reporter;
+ protected Log log = LogFactory.getLog(getClass());
private static int nextSchemaId; // for assigning unique ids to UDF columns
protected String getSchemaName(String name, Schema input) {
@@ -118,7 +117,8 @@
// report that progress is being made (otherwise hadoop times out after
600 seconds working on one outer tuple)
protected void progress() {
- if(reporter!=null) reporter.progress();
+ if (reporter != null) reporter.progress();
+ else log.warn("No reporter object provided to UDF " +
this.getClass().getName());
}
/**
@@ -166,7 +166,7 @@
}
- public void setReporter(PigProgressable reporter) {
+ public final void setReporter(PigProgressable reporter) {
this.reporter = reporter;
}
Modified:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=725925&r1=725924&r2=725925&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Thu Dec 11 22:48:23 2008
@@ -1172,7 +1172,7 @@
mro.reducePlan.connect(nfe3, str);
mro.setReduceDone(true);
-// mro.requestedParallelism = rp;
+ mro.requestedParallelism = 1;
return mro;
}
Modified:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=725925&r1=725924&r2=725925&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
Thu Dec 11 22:48:23 2008
@@ -96,6 +96,12 @@
public Result processInput() throws ExecException {
+ // Make sure the reporter is set, because it isn't getting carried
+ // across in the serialization (don't know why). I suspect it's as
+ // cheap to call the setReporter call everytime as to check whether I
+ // have (hopefully java will inline it).
+ func.setReporter(reporter);
+
Result res = new Result();
Tuple inpValue = null;
if (input == null && (inputs == null || inputs.size()==0)) {
Modified:
hadoop/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=725925&r1=725924&r2=725925&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java
Thu Dec 11 22:48:23 2008
@@ -125,6 +125,7 @@
nextQuantile+=toSkip+1;
}
i++;
+ if (i % 1000 == 0) progress();
}
return output;
}
Modified:
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=725925&r1=725924&r2=725925&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
Thu Dec 11 22:48:23 2008
@@ -1454,7 +1454,6 @@
/* (non-Javadoc)
* @see java.util.Comparator#compare(java.lang.Object,
java.lang.Object)
*/
- @Override
public int compare(Pair<Long, FuncSpec> o1, Pair<Long, FuncSpec> o2) {
if(o1.first < o2.first)
return -1;
Modified:
hadoop/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld?rev=725925&r1=725924&r2=725925&view=diff
==============================================================================
---
hadoop/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
(original)
+++
hadoop/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
Thu Dec 11 22:48:23 2008
@@ -50,7 +50,7 @@
| |
|
|---Load(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage)
- -144
|
- |---MapReduce(-1,FindQuantiles,TestMRCompiler$WeirdComparator) - -130:
+ |---MapReduce(1,FindQuantiles,TestMRCompiler$WeirdComparator) - -130:
|
Store(/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) -
-143
| |
| |---New For Each(false)[tuple] - -142