Author: gates Date: Mon Nov 16 21:43:41 2009 New Revision: 880975 URL: http://svn.apache.org/viewvc?rev=880975&view=rev Log: PIG-1085: Pass JobConf and UDF specific configuration information to UDFs.
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=880975&r1=880974&r2=880975&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Mon Nov 16 21:43:41 2009 @@ -23,6 +23,8 @@ INCOMPATIBLE CHANGES IMPROVEMENTS +PIG-1085: Pass JobConf and UDF specific configuration information to UDFs + (gates) OPTIMIZATIONS Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=880975&r1=880974&r2=880975&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Nov 16 21:43:41 2009 @@ -81,6 +81,7 @@ import org.apache.pig.impl.util.JarManager; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.Pair; +import org.apache.pig.impl.util.UDFContext; /** * This is compiler class that takes an MROperPlan and converts @@ -596,6 +597,9 @@ jobConf.setOutputCommitter(PigOutputCommitter.class); Job job = new Job(jobConf); jobStoreMap.put(job,new Pair<List<POStore>, Path>(storeLocations, tmpLocation)); + + // Serialize the UDF specific context info. + UDFContext.getUDFContext().serialize(jobConf); return job; } catch (JobCreationException jce) { throw jce; Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=880975&r1=880974&r2=880975&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Mon Nov 16 21:43:41 2009 @@ -49,6 +49,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.SpillableMemoryManager; +import org.apache.pig.impl.util.UDFContext; public abstract class PigMapBase extends MapReduceBase{ private static final Tuple DUMMYTUPLE = null; @@ -166,6 +167,12 @@ keyType = ((byte[])ObjectSerializer.deserialize(job.get("pig.map.keytype")))[0]; pigReporter = new ProgressableReporter(); + + // Get the UDF specific context + UDFContext udfc = UDFContext.getUDFContext(); + udfc.addJobConf(job); + udfc.deserialize(); + if(!(mp.isEmpty())) { List<OperatorKey> targetOpKeys = (ArrayList<OperatorKey>)ObjectSerializer.deserialize(job.get("map.target.ops")); @@ -178,7 +185,6 @@ } - } catch (IOException ioe) { String msg = "Problem while configuring map plan."; throw new RuntimeException(msg, ioe); Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=880975&r1=880974&r2=880975&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Mon Nov 16 21:43:41 2009 @@ -57,6 +57,7 @@ import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.SpillableMemoryManager; +import org.apache.pig.impl.util.UDFContext; import org.apache.pig.impl.util.WrappedIOException; import org.apache.pig.data.DataBag; @@ -301,6 +302,12 @@ roots = rp.getRoots().toArray(new PhysicalOperator[1]); leaf = rp.getLeaves().get(0); } + + // Get the UDF specific context + UDFContext udfc = UDFContext.getUDFContext(); + udfc.addJobConf(jConf); + udfc.deserialize(); + } catch (IOException ioe) { String msg = "Problem while configuring reduce plan."; throw new RuntimeException(msg, ioe);