I've observed that if an MR job with a combine fn is executed and that job
also has a map phase output, the combine fn will not run as a Combiner.
Removing the map side output will cause the combiner to run as a Combiner.
Is this intended behavior? Here's a unit test to illustrate this behavior:
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.PipelineResult.StageResult;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapreduce.MRConfig;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class CombineTest {
@SuppressWarnings("deprecation")
@Test
public void test() throws IOException {
//Setup
TemporaryFolder folder = new TemporaryFolder();
folder.create();
Configuration conf = new Configuration();
conf.set("crunch.tmp.dir", folder.newFolder("crunch_tmp").
toURI().toString());
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
Pipeline p = new MRPipeline(getClass(), conf);
List<Integer> input = Arrays.asList(1, 2, 3, 1, 2, 3);
PCollection<Integer> collection = p.create(input, Avros.ints());
//Remove this cache call and this test succeeds
collection.cache();
PTable<Integer, Long> count = collection.count().cache();
PipelineResult run = p.run();
StageResult stageResult = run.getStageResults().get(0);
assertTrue(
"No Combiner was used",
stageResult.getCounterValue(Task.Counter.COMBINE_INPUT_RECORDS) ==
input.size());
assertTrue(
"No Combiner was used",
stageResult.getCounterValue(Task.Counter.COMBINE_OUTPUT_RECORDS)
== input.stream().distinct().count());
p.done();
}
}