Author: daijy
Date: Thu Apr  1 22:38:14 2010
New Revision: 930118

URL: http://svn.apache.org/viewvc?rev=930118&view=rev
Log:
PIG-1336: Optimize POStore serialized into JobConf

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=930118&r1=930117&r2=930118&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Apr  1 22:38:14 2010
@@ -193,6 +193,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-1336: Optimize POStore serialized into JobConf (daijy)
+
 PIG-1335: UDFFinder should find LoadFunc used by POCast (daijy)
 
 PIG-1307: when we spill the DefaultDataBag we are not setting the sized 
changed flag to be true. (breed via daijy)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=930118&r1=930117&r2=930118&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Thu Apr  1 22:38:14 2010
@@ -425,9 +425,6 @@ public class JobControlCompiler{
             LinkedList<POStore> mapStores = PlanHelper.getStores(mro.mapPlan);
             LinkedList<POStore> reduceStores = 
PlanHelper.getStores(mro.reducePlan);
             
-            conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores));
-            conf.set(PIG_REDUCE_STORES, 
ObjectSerializer.serialize(reduceStores));
-            
             for (POStore st: mapStores) {
                 storeLocations.add(st);
                 StoreFuncInterface sFunc = st.getStoreFunc();
@@ -448,11 +445,11 @@ public class JobControlCompiler{
                 
                 POStore st;
                 if (reduceStores.isEmpty()) {
-                    st = mapStores.remove(0);
+                    st = mapStores.get(0);
                     mro.mapPlan.remove(st);
                 }
                 else {
-                    st = reduceStores.remove(0);
+                    st = reduceStores.get(0);
                     mro.reducePlan.remove(st);
                 }
 
@@ -605,6 +602,13 @@ public class JobControlCompiler{
                 nwJob.setMapOutputKeyClass(NullablePartitionWritable.class);
                 
nwJob.setGroupingComparatorClass(PigGroupingPartitionWritableComparator.class);
             }
+            
+            // unset inputs for POStore, otherwise, map/reduce plan will be 
unnecessarily deserialized 
+            for (POStore st: mapStores) { st.setInputs(null); 
st.setParentPlan(null);}
+            for (POStore st: reduceStores) { st.setInputs(null); 
st.setParentPlan(null);}
+
+            conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores));
+            conf.set(PIG_REDUCE_STORES, 
ObjectSerializer.serialize(reduceStores));
       
             // Serialize the UDF specific context info.
             UDFContext.getUDFContext().serialize(conf);


Reply via email to