Author: gates
Date: Mon Nov 16 21:43:41 2009
New Revision: 880975

URL: http://svn.apache.org/viewvc?rev=880975&view=rev
Log:
PIG-1085:  Pass JobConf and UDF specific configuration information to UDFs.


Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=880975&r1=880974&r2=880975&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Nov 16 21:43:41 2009
@@ -23,6 +23,8 @@
 INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
+PIG-1085:  Pass JobConf and UDF specific configuration information to UDFs
+                  (gates)
 
 OPTIMIZATIONS
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=880975&r1=880974&r2=880975&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Mon Nov 16 21:43:41 2009
@@ -81,6 +81,7 @@
 import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.util.UDFContext;
 
 /**
  * This is compiler class that takes an MROperPlan and converts
@@ -596,6 +597,9 @@
             jobConf.setOutputCommitter(PigOutputCommitter.class);
             Job job = new Job(jobConf);
             jobStoreMap.put(job,new Pair<List<POStore>, Path>(storeLocations, 
tmpLocation));
+            
+            // Serialize the UDF specific context info.
+            UDFContext.getUDFContext().serialize(jobConf);
             return job;
         } catch (JobCreationException jce) {
                throw jce;

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=880975&r1=880974&r2=880975&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
 Mon Nov 16 21:43:41 2009
@@ -49,6 +49,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.impl.util.UDFContext;
 
 public abstract class PigMapBase extends MapReduceBase{
     private static final Tuple DUMMYTUPLE = null;
@@ -166,6 +167,12 @@
             keyType = 
((byte[])ObjectSerializer.deserialize(job.get("pig.map.keytype")))[0];
             
             pigReporter = new ProgressableReporter();
+                
+            // Get the UDF specific context
+            UDFContext udfc = UDFContext.getUDFContext();
+            udfc.addJobConf(job);
+            udfc.deserialize();
+            
             if(!(mp.isEmpty())) {
                 List<OperatorKey> targetOpKeys = 
                     
(ArrayList<OperatorKey>)ObjectSerializer.deserialize(job.get("map.target.ops"));
@@ -178,7 +185,6 @@
             }
             
             
-            
         } catch (IOException ioe) {
             String msg = "Problem while configuring map plan.";
             throw new RuntimeException(msg, ioe);

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=880975&r1=880974&r2=880975&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 Mon Nov 16 21:43:41 2009
@@ -57,6 +57,7 @@
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.WrappedIOException;
 
 import org.apache.pig.data.DataBag;
@@ -301,6 +302,12 @@
                     roots = rp.getRoots().toArray(new PhysicalOperator[1]);
                     leaf = rp.getLeaves().get(0);
                 }
+                
+                // Get the UDF specific context
+               UDFContext udfc = UDFContext.getUDFContext();
+               udfc.addJobConf(jConf);
+               udfc.deserialize();
+            
             } catch (IOException ioe) {
                 String msg = "Problem while configuring reduce plan.";
                 throw new RuntimeException(msg, ioe);


Reply via email to