It feels to me that the answer to that is yes-- there is some reason why I cannot do a combiner with a map-side output; but for the life of me I cannot remember what it was at the moment.
On Thu, May 31, 2018 at 1:17 PM, Stephen Patel <[email protected]> wrote: > I've observed that if an MR job with a combine fn is executed and that job > also has a map phase output, the combine fn will not run as a Combiner. > Removing the map side output will cause the combiner to run as a Combiner. > Is this intended behavior? Here's a unit test to illustrate this behavior: > > import static org.junit.Assert.assertTrue; > > import java.io.IOException; > > import java.util.Arrays; > > import java.util.List; > > import org.apache.crunch.PCollection; > > import org.apache.crunch.PTable; > > import org.apache.crunch.Pipeline; > > import org.apache.crunch.PipelineResult; > > import org.apache.crunch.PipelineResult.StageResult; > > import org.apache.crunch.impl.mr.MRPipeline; > > import org.apache.crunch.types.avro.Avros; > > import org.apache.hadoop.conf.Configuration; > > import org.apache.hadoop.mapred.Task; > > import org.apache.hadoop.mapreduce.MRConfig; > > import org.junit.Test; > > import org.junit.rules.TemporaryFolder; > > > public class CombineTest { > > @SuppressWarnings("deprecation") > > @Test > > public void test() throws IOException { > > //Setup > > TemporaryFolder folder = new TemporaryFolder(); > > folder.create(); > > Configuration conf = new Configuration(); > > conf.set("crunch.tmp.dir", folder.newFolder("crunch_tmp"). > toURI().toString()); > > conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME); > > > Pipeline p = new MRPipeline(getClass(), conf); > > List<Integer> input = Arrays.asList(1, 2, 3, 1, 2, 3); > > PCollection<Integer> collection = p.create(input, Avros.ints()); > > > //Remove this cache call and this test succeeds > > collection.cache(); > > PTable<Integer, Long> count = collection.count().cache(); > > PipelineResult run = p.run(); > > StageResult stageResult = run.getStageResults().get(0); > > assertTrue( > > "No Combiner was used", > > stageResult.getCounterValue(Task.Counter.COMBINE_INPUT_RECORDS) == > input.size()); > > assertTrue( > > "No Combiner was used", > > stageResult.getCounterValue(Task.Counter.COMBINE_OUTPUT_RECORDS) > > == input.stream().distinct().count()); > > p.done(); > > } > > } > > >
