Hi Chao,
I don't think it's currently possible to have separate combiner and reducer
logic.
Actually, looking at what you want to do, it seems that there isn't a really
simple way to do it in Crunch, which seems surprising because doing something
with the top-n values per key seems like it would come up pretty often.
The best way I can think of accomplishing it would to be to do something like
this:
// Convert the original PTable into a table of <K, Collection<V>> with
single values in the collection
PTable<K,V> input = …;
PTable<K, Collection<V>> tableOfCollections = input.parallelDo(new
ValueToSingleElementCollectionFn());
// Use a custom CombineFn that collects the top values per key, does a
nested loop over the
// incoming iterable of collections
PTable<K, Collection<V>> topValuesPerKey =
tableOfCollections.groupByKey().combineValues(new
NestedLoopTopKCombineFn());
PTable<K, String> withJoinedValues = topValuesPerKey.paralleDo(new
JoinValuesAsStringFn());
This feels pretty hacky, but as far as I can see it's the easiest way to use a
Combiner as part of the
top-k selection. I'm a bit worried about the impact that the use of the
single-element collections would
introduce as well, and have a nagging feeling that there must be a better way,
but I don't see it at the moment.
- Gabriel
On 25 Sep 2013, at 04:59, Chao Shi <[email protected]> wrote:
> Hi guys,
>
> I need to have crunch generating a MR pipeline with a combiner and reducer.
> My combiner and reducer have different logic. I wonder if this is possible in
> crunch.
>
> The problem can be simplified as the following:
>
> Give a series of <string, int> pairs, output the largest K values per key,
> and join them to a string. For example, suppose K=2, the output of <"hello",
> 1>, <"hello", 2>, <"hello", 3>, <"world", 3> is <"hello", "2, 3">, <"world",
> "3">.
>
> In raw MR, I would like to use a combiner to determine the locally largest
> value per key.
>
> class MyCombiner extneds Reducer<Text, IntWritable, Text, intWritable> {
> void reduce(Text key, Iterable<IntWritable> values, Context context) {
> go over "values" and keep top K in memory
> emit top K
> }
> }
>
> class MyReducer extends Reducer<Text, IntWritable, Text, Text> {
> void reduce(Text key, Iterable<IntWritable> values, Context context) {
> go over "values" and keep top K in memory, assuming saving to "int[] top";
> context.write(key, join(top, ", "));
> }
> }
>
> Could anyone give me a hint on how to do this in crunch? I see
> PGroupedTable#combineValues, but I think it requires the reducer and combiner
> has the same signature (generic types).
>
> Thanks,
> Chao