Author: daijy
Date: Thu Apr 1 22:38:14 2010
New Revision: 930118
URL: http://svn.apache.org/viewvc?rev=930118&view=rev
Log:
PIG-1336: Optimize POStore serialized into JobConf
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=930118&r1=930117&r2=930118&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Apr 1 22:38:14 2010
@@ -193,6 +193,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-1336: Optimize POStore serialized into JobConf (daijy)
+
PIG-1335: UDFFinder should find LoadFunc used by POCast (daijy)
PIG-1307: when we spill the DefaultDataBag we are not setting the sized
changed flag to be true. (breed via daijy)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=930118&r1=930117&r2=930118&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Thu Apr 1 22:38:14 2010
@@ -425,9 +425,6 @@ public class JobControlCompiler{
LinkedList<POStore> mapStores = PlanHelper.getStores(mro.mapPlan);
LinkedList<POStore> reduceStores =
PlanHelper.getStores(mro.reducePlan);
- conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores));
- conf.set(PIG_REDUCE_STORES,
ObjectSerializer.serialize(reduceStores));
-
for (POStore st: mapStores) {
storeLocations.add(st);
StoreFuncInterface sFunc = st.getStoreFunc();
@@ -448,11 +445,11 @@ public class JobControlCompiler{
POStore st;
if (reduceStores.isEmpty()) {
- st = mapStores.remove(0);
+ st = mapStores.get(0);
mro.mapPlan.remove(st);
}
else {
- st = reduceStores.remove(0);
+ st = reduceStores.get(0);
mro.reducePlan.remove(st);
}
@@ -605,6 +602,13 @@ public class JobControlCompiler{
nwJob.setMapOutputKeyClass(NullablePartitionWritable.class);
nwJob.setGroupingComparatorClass(PigGroupingPartitionWritableComparator.class);
}
+
+ // unset inputs for POStore, otherwise, map/reduce plan will be
unnecessarily deserialized
+ for (POStore st: mapStores) { st.setInputs(null);
st.setParentPlan(null);}
+ for (POStore st: reduceStores) { st.setInputs(null);
st.setParentPlan(null);}
+
+ conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores));
+ conf.set(PIG_REDUCE_STORES,
ObjectSerializer.serialize(reduceStores));
// Serialize the UDF specific context info.
UDFContext.getUDFContext().serialize(conf);