Author: gates Date: Fri Mar 28 21:37:30 2008 New Revision: 642504 URL: http://svn.apache.org/viewvc?rev=642504&view=rev Log: Fixed PigCombine to not do initialization on every call to reduce, but instead only do it once in the call to configure.
Modified: incubator/pig/trunk/CHANGES.txt incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java Modified: incubator/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=642504&r1=642503&r2=642504&view=diff ============================================================================== --- incubator/pig/trunk/CHANGES.txt (original) +++ incubator/pig/trunk/CHANGES.txt Fri Mar 28 21:37:30 2008 @@ -184,5 +184,9 @@ PIG-94: changes for M2 of streaming: input/ouptut/ ship/cache error handling + + PIG-108: Fixed PigCombine to not do initialization on every call to + reduce, but instead only do it once in the call to configure. (joa23 via + gates). PIG-172: dealing with NULL error messages in exceptions Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java?rev=642504&r1=642503&r2=642504&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java Fri Mar 28 21:37:30 2008 @@ -17,7 +17,6 @@ */ package org.apache.pig.backend.hadoop.executionengine.mapreduceExec; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; @@ -48,23 +47,18 @@ private JobConf job; private CombineDataOutputCollector finalout; private DataCollector evalPipe; - private OutputCollector oc; private int index; private int inputCount; private DataBag bags[]; + private PigContext pigContext; + private EvalSpec esp; public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { try { - PigContext pigContext = (PigContext) ObjectSerializer.deserialize(job.get("pig.pigContext")); if (evalPipe == null) { - inputCount = ((ArrayList<FileSpec>)ObjectSerializer.deserialize(job.get("pig.inputs"))).size(); - oc = output; - finalout = new CombineDataOutputCollector(oc); - String evalSpec = job.get("pig.combineFunc", ""); - EvalSpec esp = (EvalSpec)ObjectSerializer.deserialize(evalSpec); - if(esp != null) esp.instantiateFunc(pigContext); + finalout = new CombineDataOutputCollector(output); evalPipe = esp.setupPipe(null, finalout); //throw new RuntimeException("combine spec: " + evalSpec + " combine pipe: " + esp.toString()); @@ -112,6 +106,17 @@ */ public void configure(JobConf job) { this.job = job; + try { + this.pigContext = (PigContext) ObjectSerializer.deserialize(job.get("pig.pigContext")); + this.inputCount = ((ArrayList<FileSpec>)ObjectSerializer.deserialize(job.get("pig.inputs"))).size(); + String evalSpec = job.get("pig.combineFunc", ""); + this.esp = (EvalSpec)ObjectSerializer.deserialize(evalSpec); + if(esp != null) { + esp.instantiateFunc(pigContext); + } + } catch (IOException e) { + throw new RuntimeException("unable to deserialize data", e); + } } /**