Author: daijy
Date: Thu Apr 1 22:42:59 2010
New Revision: 930122
URL: http://svn.apache.org/viewvc?rev=930122&view=rev
Log:
PIG-1336: Optimize POStore serialized into JobConf
Modified:
hadoop/pig/branches/branch-0.7/CHANGES.txt
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Modified: hadoop/pig/branches/branch-0.7/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/CHANGES.txt?rev=930122&r1=930121&r2=930122&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.7/CHANGES.txt Thu Apr 1 22:42:59 2010
@@ -179,6 +179,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-1336: Optimize POStore serialized into JobConf (daijy)
+
PIG-1335: UDFFinder should find LoadFunc used by POCast (daijy)
PIG-1307: when we spill the DefaultDataBag we are not setting the sized
changed flag to be true. (breed via daijy)
Modified:
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=930122&r1=930121&r2=930122&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
(original)
+++
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Thu Apr 1 22:42:59 2010
@@ -424,9 +424,6 @@ public class JobControlCompiler{
LinkedList<POStore> mapStores = PlanHelper.getStores(mro.mapPlan);
LinkedList<POStore> reduceStores =
PlanHelper.getStores(mro.reducePlan);
- conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores));
- conf.set(PIG_REDUCE_STORES,
ObjectSerializer.serialize(reduceStores));
-
for (POStore st: mapStores) {
storeLocations.add(st);
StoreFuncInterface sFunc = st.getStoreFunc();
@@ -447,11 +444,11 @@ public class JobControlCompiler{
POStore st;
if (reduceStores.isEmpty()) {
- st = mapStores.remove(0);
+ st = mapStores.get(0);
mro.mapPlan.remove(st);
}
else {
- st = reduceStores.remove(0);
+ st = reduceStores.get(0);
mro.reducePlan.remove(st);
}
@@ -604,6 +601,13 @@ public class JobControlCompiler{
nwJob.setMapOutputKeyClass(NullablePartitionWritable.class);
nwJob.setGroupingComparatorClass(PigGroupingPartitionWritableComparator.class);
}
+
+ // unset inputs for POStore, otherwise, map/reduce plan will be
unnecessarily deserialized
+ for (POStore st: mapStores) { st.setInputs(null);
st.setParentPlan(null);}
+ for (POStore st: reduceStores) { st.setInputs(null);
st.setParentPlan(null);}
+
+ conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores));
+ conf.set(PIG_REDUCE_STORES,
ObjectSerializer.serialize(reduceStores));
// Serialize the UDF specific context info.
UDFContext.getUDFContext().serialize(conf);