It feels to me that the answer to that is yes-- there is some reason why I
cannot do a combiner with a map-side output; but for the life of me I
cannot remember what it was at the moment.

On Thu, May 31, 2018 at 1:17 PM, Stephen Patel <[email protected]>
wrote:

> I've observed that if an MR job with a combine fn is executed and that job
> also has a map phase output, the combine fn will not run as a Combiner.
> Removing the map side output will cause the combiner to run as a Combiner.
> Is this intended behavior?  Here's a unit test to illustrate this behavior:
>
> import static org.junit.Assert.assertTrue;
>
> import java.io.IOException;
>
> import java.util.Arrays;
>
> import java.util.List;
>
> import org.apache.crunch.PCollection;
>
> import org.apache.crunch.PTable;
>
> import org.apache.crunch.Pipeline;
>
> import org.apache.crunch.PipelineResult;
>
> import org.apache.crunch.PipelineResult.StageResult;
>
> import org.apache.crunch.impl.mr.MRPipeline;
>
> import org.apache.crunch.types.avro.Avros;
>
> import org.apache.hadoop.conf.Configuration;
>
> import org.apache.hadoop.mapred.Task;
>
> import org.apache.hadoop.mapreduce.MRConfig;
>
> import org.junit.Test;
>
> import org.junit.rules.TemporaryFolder;
>
>
> public class CombineTest {
>
>   @SuppressWarnings("deprecation")
>
>   @Test
>
>   public void test() throws IOException {
>
>     //Setup
>
>     TemporaryFolder folder = new TemporaryFolder();
>
>     folder.create();
>
>     Configuration conf = new Configuration();
>
>     conf.set("crunch.tmp.dir", folder.newFolder("crunch_tmp").
> toURI().toString());
>
>     conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
>
>
>     Pipeline p = new MRPipeline(getClass(), conf);
>
>     List<Integer> input = Arrays.asList(1, 2, 3, 1, 2, 3);
>
>     PCollection<Integer> collection = p.create(input, Avros.ints());
>
>
>     //Remove this cache call and this test succeeds
>
>     collection.cache();
>
>     PTable<Integer, Long> count = collection.count().cache();
>
>     PipelineResult run = p.run();
>
>     StageResult stageResult = run.getStageResults().get(0);
>
>     assertTrue(
>
>         "No Combiner was used",
>
>         stageResult.getCounterValue(Task.Counter.COMBINE_INPUT_RECORDS) ==
>  input.size());
>
>     assertTrue(
>
>         "No Combiner was used",
>
>         stageResult.getCounterValue(Task.Counter.COMBINE_OUTPUT_RECORDS)
>
>             == input.stream().distinct().count());
>
>     p.done();
>
>   }
>
> }
>
>
>

Reply via email to