Author: daijy Date: Thu Apr 1 22:42:59 2010 New Revision: 930122 URL: http://svn.apache.org/viewvc?rev=930122&view=rev Log: PIG-1336: Optimize POStore serialized into JobConf
Modified: hadoop/pig/branches/branch-0.7/CHANGES.txt hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Modified: hadoop/pig/branches/branch-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/CHANGES.txt?rev=930122&r1=930121&r2=930122&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.7/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.7/CHANGES.txt Thu Apr 1 22:42:59 2010 @@ -179,6 +179,8 @@ OPTIMIZATIONS BUG FIXES +PIG-1336: Optimize POStore serialized into JobConf (daijy) + PIG-1335: UDFFinder should find LoadFunc used by POCast (daijy) PIG-1307: when we spill the DefaultDataBag we are not setting the sized changed flag to be true. (breed via daijy) Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=930122&r1=930121&r2=930122&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Apr 1 22:42:59 2010 @@ -424,9 +424,6 @@ public class JobControlCompiler{ LinkedList<POStore> mapStores = PlanHelper.getStores(mro.mapPlan); LinkedList<POStore> reduceStores = PlanHelper.getStores(mro.reducePlan); - conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores)); - conf.set(PIG_REDUCE_STORES, ObjectSerializer.serialize(reduceStores)); - for (POStore st: mapStores) { storeLocations.add(st); StoreFuncInterface sFunc = st.getStoreFunc(); @@ -447,11 +444,11 @@ public class JobControlCompiler{ POStore st; if (reduceStores.isEmpty()) { - st = mapStores.remove(0); + st = mapStores.get(0); mro.mapPlan.remove(st); } else { - st = reduceStores.remove(0); + st = reduceStores.get(0); mro.reducePlan.remove(st); } @@ -604,6 +601,13 @@ public class JobControlCompiler{ nwJob.setMapOutputKeyClass(NullablePartitionWritable.class); nwJob.setGroupingComparatorClass(PigGroupingPartitionWritableComparator.class); } + + // unset inputs for POStore, otherwise, map/reduce plan will be unnecessarily deserialized + for (POStore st: mapStores) { st.setInputs(null); st.setParentPlan(null);} + for (POStore st: reduceStores) { st.setInputs(null); st.setParentPlan(null);} + + conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores)); + conf.set(PIG_REDUCE_STORES, ObjectSerializer.serialize(reduceStores)); // Serialize the UDF specific context info. UDFContext.getUDFContext().serialize(conf);