Hi guys,
I need to have crunch generating a MR pipeline with a combiner and reducer.
My combiner and reducer have different logic. I wonder if this is possible
in crunch.
The problem can be simplified as the following:
Give a series of <string, int> pairs, output the largest K values per key,
and join them to a string. For example, suppose K=2, the output of
<"hello", 1>, <"hello", 2>, <"hello", 3>, <"world", 3> is <"hello", "2,
3">, <"world", "3">.
In raw MR, I would like to use a combiner to determine the locally largest
value per key.
class MyCombiner extneds Reducer<Text, IntWritable, Text, intWritable> {
void reduce(Text key, Iterable<IntWritable> values, Context context) {
go over "values" and keep top K in memory
emit top K
}
}
class MyReducer extends Reducer<Text, IntWritable, Text, Text> {
void reduce(Text key, Iterable<IntWritable> values, Context context) {
go over "values" and keep top K in memory, assuming saving to "int[]
top";
context.write(key, join(top, ", "));
}
}
Could anyone give me a hint on how to do this in crunch? I see
PGroupedTable#combineValues, but I think it requires the reducer and
combiner has the same signature (generic types).
Thanks,
Chao