Author: tucu Date: Mon Jan 6 18:35:26 2014 New Revision: 1555968 URL: http://svn.apache.org/r1555968 Log: MAPREDUCE-3310. Custom grouping comparator cannot be set for Combiners (tucu)
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1555968&r1=1555967&r2=1555968&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Mon Jan 6 18:35:26 2014 @@ -196,6 +196,8 @@ Release 2.4.0 - UNRELEASED MAPREDUCE-5550. Task Status message (reporter.setStatus) not shown in UI with Hadoop 2.0 (Gera Shegalov via Sandy Ryza) + MAPREDUCE-3310. Custom grouping comparator cannot be set for Combiners (tucu) + OPTIMIZATIONS MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza) Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java?rev=1555968&r1=1555967&r2=1555968&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java Mon Jan 6 18:35:26 2014 @@ -949,6 +949,23 @@ public class JobConf extends Configurati return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS); } + /** + * Get the user defined {@link WritableComparable} comparator for + * grouping keys of inputs to the combiner. + * + * @return comparator set by the user for grouping values. + * @see #setCombinerKeyGroupingComparator(Class) for details. + */ + public RawComparator getCombinerKeyGroupingComparator() { + Class<? extends RawComparator> theClass = getClass( + JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class); + if (theClass == null) { + return getOutputKeyComparator(); + } + + return ReflectionUtils.newInstance(theClass, this); + } + /** * Get the user defined {@link WritableComparable} comparator for * grouping keys of inputs to the reduce. @@ -966,6 +983,37 @@ public class JobConf extends Configurati return ReflectionUtils.newInstance(theClass, this); } + /** + * Set the user defined {@link RawComparator} comparator for + * grouping keys in the input to the combiner. + * <p/> + * <p>This comparator should be provided if the equivalence rules for keys + * for sorting the intermediates are different from those for grouping keys + * before each call to + * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p> + * <p/> + * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed + * in a single call to the reduce function if K1 and K2 compare as equal.</p> + * <p/> + * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control + * how keys are sorted, this can be used in conjunction to simulate + * <i>secondary sort on values</i>.</p> + * <p/> + * <p><i>Note</i>: This is not a guarantee of the combiner sort being + * <i>stable</i> in any sense. (In any case, with the order of available + * map-outputs to the combiner being non-deterministic, it wouldn't make + * that much sense.)</p> + * + * @param theClass the comparator class to be used for grouping keys for the + * combiner. It should implement <code>RawComparator</code>. + * @see #setOutputKeyComparatorClass(Class) + */ + public void setCombinerKeyGroupingComparator( + Class<? extends RawComparator> theClass) { + setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS, + theClass, RawComparator.class); + } + /** * Set the user defined {@link RawComparator} comparator for * grouping keys in the input to the reduce. @@ -989,7 +1037,9 @@ public class JobConf extends Configurati * * @param theClass the comparator class to be used for grouping keys. * It should implement <code>RawComparator</code>. - * @see #setOutputKeyComparatorClass(Class) + * @see #setOutputKeyComparatorClass(Class) + * @see {@link #setCombinerKeyGroupingComparator(Class)} for setting a + * comparator for the combiner. */ public void setOutputValueGroupingComparator( Class<? extends RawComparator> theClass) { Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1555968&r1=1555967&r2=1555968&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Mon Jan 6 18:35:26 2014 @@ -1575,7 +1575,8 @@ abstract public class Task implements Wr combinerClass = cls; keyClass = (Class<K>) job.getMapOutputKeyClass(); valueClass = (Class<V>) job.getMapOutputValueClass(); - comparator = (RawComparator<K>) job.getOutputKeyComparator(); + comparator = (RawComparator<K>) + job.getCombinerKeyGroupingComparator(); } @SuppressWarnings("unchecked") @@ -1624,7 +1625,7 @@ abstract public class Task implements Wr this.taskId = taskId; keyClass = (Class<K>) context.getMapOutputKeyClass(); valueClass = (Class<V>) context.getMapOutputValueClass(); - comparator = (RawComparator<K>) context.getSortComparator(); + comparator = (RawComparator<K>) context.getCombinerKeyGroupingComparator(); this.committer = committer; } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1555968&r1=1555967&r2=1555968&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Mon Jan 6 18:35:26 2014 @@ -949,10 +949,26 @@ public class Job extends JobContextImpl } /** + * Define the comparator that controls which keys are grouped together + * for a single call to combiner, + * {@link Reducer#reduce(Object, Iterable, + * org.apache.hadoop.mapreduce.Reducer.Context)} + * + * @param cls the raw comparator to use + * @throws IllegalStateException if the job is submitted + */ + public void setCombinerKeyGroupingComparatorClass( + Class<? extends RawComparator> cls) throws IllegalStateException { + ensureState(JobState.DEFINE); + conf.setCombinerKeyGroupingComparator(cls); + } + + /** * Define the comparator that controls how the keys are sorted before they * are passed to the {@link Reducer}. * @param cls the raw comparator * @throws IllegalStateException if the job is submitted + * @see {@link #setCombinerKeyGroupingComparatorClass(Class)} */ public void setSortComparatorClass(Class<? extends RawComparator> cls ) throws IllegalStateException { @@ -967,6 +983,8 @@ public class Job extends JobContextImpl * org.apache.hadoop.mapreduce.Reducer.Context)} * @param cls the raw comparator to use * @throws IllegalStateException if the job is submitted + * @see {@link #setCombinerKeyGroupingComparatorClass(Class)} for setting a + * comparator for the combiner. */ public void setGroupingComparatorClass(Class<? extends RawComparator> cls ) throws IllegalStateException { Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java?rev=1555968&r1=1555967&r2=1555968&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java Mon Jan 6 18:35:26 2014 @@ -167,13 +167,24 @@ public interface JobContext extends MRJo */ public String getJar(); - /** - * Get the user defined {@link RawComparator} comparator for - * grouping keys of inputs to the reduce. - * + /** + * Get the user defined {@link RawComparator} comparator for + * grouping keys of inputs to the combiner. + * * @return comparator set by the user for grouping values. - * @see Job#setGroupingComparatorClass(Class) for details. + * @see Job#setCombinerKeyGroupingComparatorClass(Class) for details. */ + public RawComparator<?> getCombinerKeyGroupingComparator(); + + /** + * Get the user defined {@link RawComparator} comparator for + * grouping keys of inputs to the reduce. + * + * @return comparator set by the user for grouping values. + * @see Job#setGroupingComparatorClass(Class) for details. + * @see {@link #getCombinerKeyGroupingComparator()} for setting a + * comparator for the combiner. + */ public RawComparator<?> getGroupingComparator(); /** Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1555968&r1=1555967&r2=1555968&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Mon Jan 6 18:35:26 2014 @@ -93,6 +93,8 @@ public interface MRJobConfig { public static final String KEY_COMPARATOR = "mapreduce.job.output.key.comparator.class"; + public static final String COMBINER_GROUP_COMPARATOR_CLASS = "mapreduce.job.combiner.group.comparator.class"; + public static final String GROUP_COMPARATOR_CLASS = "mapreduce.job.output.group.comparator.class"; public static final String WORKING_DIR = "mapreduce.job.working.dir"; Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java?rev=1555968&r1=1555967&r2=1555968&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java Mon Jan 6 18:35:26 2014 @@ -167,6 +167,11 @@ class ChainMapContextImpl<KEYIN, VALUEIN } @Override + public RawComparator<?> getCombinerKeyGroupingComparator() { + return base.getCombinerKeyGroupingComparator(); + } + + @Override public RawComparator<?> getGroupingComparator() { return base.getGroupingComparator(); } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java?rev=1555968&r1=1555967&r2=1555968&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java Mon Jan 6 18:35:26 2014 @@ -160,6 +160,11 @@ class ChainReduceContextImpl<KEYIN, VALU } @Override + public RawComparator<?> getCombinerKeyGroupingComparator() { + return base.getCombinerKeyGroupingComparator(); + } + + @Override public RawComparator<?> getGroupingComparator() { return base.getGroupingComparator(); } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java?rev=1555968&r1=1555967&r2=1555968&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java Mon Jan 6 18:35:26 2014 @@ -169,6 +169,11 @@ public class WrappedMapper<KEYIN, VALUEI } @Override + public RawComparator<?> getCombinerKeyGroupingComparator() { + return mapContext.getCombinerKeyGroupingComparator(); + } + + @Override public RawComparator<?> getGroupingComparator() { return mapContext.getGroupingComparator(); } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java?rev=1555968&r1=1555967&r2=1555968&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java Mon Jan 6 18:35:26 2014 @@ -162,6 +162,11 @@ public class WrappedReducer<KEYIN, VALUE } @Override + public RawComparator<?> getCombinerKeyGroupingComparator() { + return reduceContext.getCombinerKeyGroupingComparator(); + } + + @Override public RawComparator<?> getGroupingComparator() { return reduceContext.getGroupingComparator(); } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java?rev=1555968&r1=1555967&r2=1555968&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java Mon Jan 6 18:35:26 2014 @@ -252,6 +252,17 @@ public class JobContextImpl implements J return conf.getJar(); } + /** + * Get the user defined {@link RawComparator} comparator for + * grouping keys of inputs to the combiner. + * + * @return comparator set by the user for grouping values. + * @see Job#setCombinerKeyGroupingComparatorClass(Class) for details. + */ + public RawComparator<?> getCombinerKeyGroupingComparator() { + return conf.getCombinerKeyGroupingComparator(); + } + /** * Get the user defined {@link RawComparator} comparator for * grouping keys of inputs to the reduce. Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1555968&r1=1555967&r2=1555968&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Mon Jan 6 18:35:26 2014 @@ -582,7 +582,7 @@ public class MergeManagerImpl<K, V> impl Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass(); Class<V> valClass = (Class<V>) job.getMapOutputValueClass(); RawComparator<K> comparator = - (RawComparator<K>)job.getOutputKeyComparator(); + (RawComparator<K>)job.getCombinerKeyGroupingComparator(); try { CombineValuesIterator values = new CombineValuesIterator( kvIter, comparator, keyClass, valClass, job, Reporter.NULL, Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java?rev=1555968&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java Mon Jan 6 18:35:26 2014 @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import junit.framework.Assert; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.UUID; + +public class TestOldCombinerGrouping { + private static String TEST_ROOT_DIR = + new File("build", UUID.randomUUID().toString()).getAbsolutePath(); + + public static class Map implements + Mapper<LongWritable, Text, Text, LongWritable> { + @Override + public void map(LongWritable key, Text value, + OutputCollector<Text, LongWritable> output, Reporter reporter) + throws IOException { + String v = value.toString(); + String k = v.substring(0, v.indexOf(",")); + v = v.substring(v.indexOf(",") + 1); + output.collect(new Text(k), new LongWritable(Long.parseLong(v))); + } + + @Override + public void close() throws IOException { + } + + @Override + public void configure(JobConf job) { + } + } + + public static class Reduce implements + Reducer<Text, LongWritable, Text, LongWritable> { + + @Override + public void reduce(Text key, Iterator<LongWritable> values, + OutputCollector<Text, LongWritable> output, Reporter reporter) + throws IOException { + LongWritable maxValue = null; + while (values.hasNext()) { + LongWritable value = values.next(); + if (maxValue == null) { + maxValue = value; + } else if (value.compareTo(maxValue) > 0) { + maxValue = value; + } + } + output.collect(key, maxValue); + } + + @Override + public void close() throws IOException { + } + + @Override + public void configure(JobConf job) { + } + } + + public static class Combiner extends Reduce { + } + + public static class GroupComparator implements RawComparator<Text> { + @Override + public int compare(byte[] bytes, int i, int i2, byte[] bytes2, int i3, + int i4) { + byte[] b1 = new byte[i2]; + System.arraycopy(bytes, i, b1, 0, i2); + + byte[] b2 = new byte[i4]; + System.arraycopy(bytes2, i3, b2, 0, i4); + + return compare(new Text(new String(b1)), new Text(new String(b2))); + } + + @Override + public int compare(Text o1, Text o2) { + String s1 = o1.toString(); + String s2 = o2.toString(); + s1 = s1.substring(0, s1.indexOf("|")); + s2 = s2.substring(0, s2.indexOf("|")); + return s1.compareTo(s2); + } + + } + + @Test + public void testCombiner() throws Exception { + if (!new File(TEST_ROOT_DIR).mkdirs()) { + throw new RuntimeException("Could not create test dir: " + TEST_ROOT_DIR); + } + File in = new File(TEST_ROOT_DIR, "input"); + if (!in.mkdirs()) { + throw new RuntimeException("Could not create test dir: " + in); + } + File out = new File(TEST_ROOT_DIR, "output"); + PrintWriter pw = new PrintWriter(new FileWriter(new File(in, "data.txt"))); + pw.println("A|a,1"); + pw.println("A|b,2"); + pw.println("B|a,3"); + pw.println("B|b,4"); + pw.println("B|c,5"); + pw.close(); + JobConf job = new JobConf(); + job.set("mapreduce.framework.name", "local"); + TextInputFormat.setInputPaths(job, new Path(in.getPath())); + TextOutputFormat.setOutputPath(job, new Path(out.getPath())); + job.setMapperClass(Map.class); + job.setReducerClass(Reduce.class); + job.setInputFormat(TextInputFormat.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(LongWritable.class); + job.setOutputFormat(TextOutputFormat.class); + job.setOutputValueGroupingComparator(GroupComparator.class); + + job.setCombinerClass(Combiner.class); + job.setCombinerKeyGroupingComparator(GroupComparator.class); + job.setInt("min.num.spills.for.combine", 0); + + JobClient client = new JobClient(job); + RunningJob runningJob = client.submitJob(job); + runningJob.waitForCompletion(); + if (runningJob.isSuccessful()) { + Counters counters = runningJob.getCounters(); + + long combinerInputRecords = counters.getGroup( + "org.apache.hadoop.mapreduce.TaskCounter"). + getCounter("COMBINE_INPUT_RECORDS"); + long combinerOutputRecords = counters.getGroup( + "org.apache.hadoop.mapreduce.TaskCounter"). + getCounter("COMBINE_OUTPUT_RECORDS"); + Assert.assertTrue(combinerInputRecords > 0); + Assert.assertTrue(combinerInputRecords > combinerOutputRecords); + + BufferedReader br = new BufferedReader(new FileReader( + new File(out, "part-00000"))); + Set<String> output = new HashSet<String>(); + String line = br.readLine(); + Assert.assertNotNull(line); + output.add(line.substring(0, 1) + line.substring(4, 5)); + line = br.readLine(); + Assert.assertNotNull(line); + output.add(line.substring(0, 1) + line.substring(4, 5)); + line = br.readLine(); + Assert.assertNull(line); + br.close(); + + Set<String> expected = new HashSet<String>(); + expected.add("A2"); + expected.add("B5"); + + Assert.assertEquals(expected, output); + + } else { + Assert.fail("Job failed"); + } + } + +} Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java?rev=1555968&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java Mon Jan 6 18:35:26 2014 @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import junit.framework.Assert; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +public class TestNewCombinerGrouping { + private static String TEST_ROOT_DIR = + new File("build", UUID.randomUUID().toString()).getAbsolutePath(); + + public static class Map extends + Mapper<LongWritable, Text, Text, LongWritable> { + + @Override + protected void map(LongWritable key, Text value, + Context context) + throws IOException, InterruptedException { + String v = value.toString(); + String k = v.substring(0, v.indexOf(",")); + v = v.substring(v.indexOf(",") + 1); + context.write(new Text(k), new LongWritable(Long.parseLong(v))); + } + } + + public static class Reduce extends + Reducer<Text, LongWritable, Text, LongWritable> { + + @Override + protected void reduce(Text key, Iterable<LongWritable> values, + Context context) + throws IOException, InterruptedException { + LongWritable maxValue = null; + for (LongWritable value : values) { + if (maxValue == null) { + maxValue = value; + } else if (value.compareTo(maxValue) > 0) { + maxValue = value; + } + } + context.write(key, maxValue); + } + } + + public static class Combiner extends Reduce { + } + + public static class GroupComparator implements RawComparator<Text> { + @Override + public int compare(byte[] bytes, int i, int i2, byte[] bytes2, int i3, + int i4) { + byte[] b1 = new byte[i2]; + System.arraycopy(bytes, i, b1, 0, i2); + + byte[] b2 = new byte[i4]; + System.arraycopy(bytes2, i3, b2, 0, i4); + + return compare(new Text(new String(b1)), new Text(new String(b2))); + } + + @Override + public int compare(Text o1, Text o2) { + String s1 = o1.toString(); + String s2 = o2.toString(); + s1 = s1.substring(0, s1.indexOf("|")); + s2 = s2.substring(0, s2.indexOf("|")); + return s1.compareTo(s2); + } + + } + + @Test + public void testCombiner() throws Exception { + if (!new File(TEST_ROOT_DIR).mkdirs()) { + throw new RuntimeException("Could not create test dir: " + TEST_ROOT_DIR); + } + File in = new File(TEST_ROOT_DIR, "input"); + if (!in.mkdirs()) { + throw new RuntimeException("Could not create test dir: " + in); + } + File out = new File(TEST_ROOT_DIR, "output"); + PrintWriter pw = new PrintWriter(new FileWriter(new File(in, "data.txt"))); + pw.println("A|a,1"); + pw.println("A|b,2"); + pw.println("B|a,3"); + pw.println("B|b,4"); + pw.println("B|c,5"); + pw.close(); + JobConf conf = new JobConf(); + conf.set("mapreduce.framework.name", "local"); + Job job = new Job(conf); + TextInputFormat.setInputPaths(job, new Path(in.getPath())); + TextOutputFormat.setOutputPath(job, new Path(out.getPath())); + + job.setMapperClass(Map.class); + job.setReducerClass(Reduce.class); + job.setInputFormatClass(TextInputFormat.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(LongWritable.class); + job.setOutputFormatClass(TextOutputFormat.class); + job.setGroupingComparatorClass(GroupComparator.class); + + job.setCombinerKeyGroupingComparatorClass(GroupComparator.class); + job.setCombinerClass(Combiner.class); + job.getConfiguration().setInt("min.num.spills.for.combine", 0); + + job.submit(); + job.waitForCompletion(false); + if (job.isSuccessful()) { + Counters counters = job.getCounters(); + + long combinerInputRecords = counters.findCounter( + "org.apache.hadoop.mapreduce.TaskCounter", + "COMBINE_INPUT_RECORDS").getValue(); + long combinerOutputRecords = counters.findCounter( + "org.apache.hadoop.mapreduce.TaskCounter", + "COMBINE_OUTPUT_RECORDS").getValue(); + Assert.assertTrue(combinerInputRecords > 0); + Assert.assertTrue(combinerInputRecords > combinerOutputRecords); + + BufferedReader br = new BufferedReader(new FileReader( + new File(out, "part-r-00000"))); + Set<String> output = new HashSet<String>(); + String line = br.readLine(); + Assert.assertNotNull(line); + output.add(line.substring(0, 1) + line.substring(4, 5)); + line = br.readLine(); + Assert.assertNotNull(line); + output.add(line.substring(0, 1) + line.substring(4, 5)); + line = br.readLine(); + Assert.assertNull(line); + br.close(); + + Set<String> expected = new HashSet<String>(); + expected.add("A2"); + expected.add("B5"); + + Assert.assertEquals(expected, output); + + } else { + Assert.fail("Job failed"); + } + } + +}