Author: gates
Date: Mon Nov 16 21:43:41 2009
New Revision: 880975
URL: http://svn.apache.org/viewvc?rev=880975&view=rev
Log:
PIG-1085: Pass JobConf and UDF specific configuration information to UDFs.
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=880975&r1=880974&r2=880975&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Nov 16 21:43:41 2009
@@ -23,6 +23,8 @@
INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-1085: Pass JobConf and UDF specific configuration information to UDFs
+ (gates)
OPTIMIZATIONS
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=880975&r1=880974&r2=880975&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Mon Nov 16 21:43:41 2009
@@ -81,6 +81,7 @@
import org.apache.pig.impl.util.JarManager;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.util.UDFContext;
/**
* This is compiler class that takes an MROperPlan and converts
@@ -596,6 +597,9 @@
jobConf.setOutputCommitter(PigOutputCommitter.class);
Job job = new Job(jobConf);
jobStoreMap.put(job,new Pair<List<POStore>, Path>(storeLocations,
tmpLocation));
+
+ // Serialize the UDF specific context info.
+ UDFContext.getUDFContext().serialize(jobConf);
return job;
} catch (JobCreationException jce) {
throw jce;
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=880975&r1=880974&r2=880975&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
Mon Nov 16 21:43:41 2009
@@ -49,6 +49,7 @@
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.impl.util.UDFContext;
public abstract class PigMapBase extends MapReduceBase{
private static final Tuple DUMMYTUPLE = null;
@@ -166,6 +167,12 @@
keyType =
((byte[])ObjectSerializer.deserialize(job.get("pig.map.keytype")))[0];
pigReporter = new ProgressableReporter();
+
+ // Get the UDF specific context
+ UDFContext udfc = UDFContext.getUDFContext();
+ udfc.addJobConf(job);
+ udfc.deserialize();
+
if(!(mp.isEmpty())) {
List<OperatorKey> targetOpKeys =
(ArrayList<OperatorKey>)ObjectSerializer.deserialize(job.get("map.target.ops"));
@@ -178,7 +185,6 @@
}
-
} catch (IOException ioe) {
String msg = "Problem while configuring map plan.";
throw new RuntimeException(msg, ioe);
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=880975&r1=880974&r2=880975&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
Mon Nov 16 21:43:41 2009
@@ -57,6 +57,7 @@
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.WrappedIOException;
import org.apache.pig.data.DataBag;
@@ -301,6 +302,12 @@
roots = rp.getRoots().toArray(new PhysicalOperator[1]);
leaf = rp.getLeaves().get(0);
}
+
+ // Get the UDF specific context
+ UDFContext udfc = UDFContext.getUDFContext();
+ udfc.addJobConf(jConf);
+ udfc.deserialize();
+
} catch (IOException ioe) {
String msg = "Problem while configuring reduce plan.";
throw new RuntimeException(msg, ioe);