Author: gates
Date: Fri Mar 28 21:37:30 2008
New Revision: 642504

URL: http://svn.apache.org/viewvc?rev=642504&view=rev
Log:
Fixed PigCombine to not do initialization on every call to reduce, but instead 
only do it once in the call to configure.  

Modified:
    incubator/pig/trunk/CHANGES.txt
    
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=642504&r1=642503&r2=642504&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Fri Mar 28 21:37:30 2008
@@ -184,5 +184,9 @@
 
     PIG-94: changes for M2 of streaming: input/ouptut/ ship/cache error
     handling
+       
+       PIG-108: Fixed PigCombine to not do initialization on every call to 
+       reduce, but instead only do it once in the call to configure.  (joa23 
via
+       gates).
 
     PIG-172: dealing with NULL error messages in exceptions

Modified: 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java?rev=642504&r1=642503&r2=642504&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
 Fri Mar 28 21:37:30 2008
@@ -17,7 +17,6 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapreduceExec;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -48,23 +47,18 @@
     private JobConf         job;
     private CombineDataOutputCollector finalout;
     private DataCollector   evalPipe;
-    private OutputCollector oc;
     private int             index;
     private int             inputCount;
     private DataBag         bags[];
+    private PigContext pigContext;
+    private EvalSpec esp;
 
     public void reduce(WritableComparable key, Iterator values, 
OutputCollector output, Reporter reporter)
             throws IOException {
 
         try {
-            PigContext pigContext = (PigContext) 
ObjectSerializer.deserialize(job.get("pig.pigContext"));
             if (evalPipe == null) {
-                inputCount = 
((ArrayList<FileSpec>)ObjectSerializer.deserialize(job.get("pig.inputs"))).size();
-                oc = output;
-                finalout = new CombineDataOutputCollector(oc);
-                String evalSpec = job.get("pig.combineFunc", "");
-                EvalSpec esp = 
(EvalSpec)ObjectSerializer.deserialize(evalSpec);
-                if(esp != null) esp.instantiateFunc(pigContext);
+                finalout = new CombineDataOutputCollector(output);
                 evalPipe = esp.setupPipe(null, finalout);
                 //throw new RuntimeException("combine spec: " + evalSpec + " 
combine pipe: " + esp.toString());
                 
@@ -112,6 +106,17 @@
      */
     public void configure(JobConf job) {
         this.job = job;
+        try {
+            this.pigContext = (PigContext) 
ObjectSerializer.deserialize(job.get("pig.pigContext"));
+            this.inputCount = 
((ArrayList<FileSpec>)ObjectSerializer.deserialize(job.get("pig.inputs"))).size();
+            String evalSpec = job.get("pig.combineFunc", "");
+            this.esp = (EvalSpec)ObjectSerializer.deserialize(evalSpec);
+            if(esp != null) {
+              esp.instantiateFunc(pigContext);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("unable to deserialize data", e);
+        }
     }
 
     /**


Reply via email to