Hi Chao,

I don't think it's currently possible to have separate combiner and reducer 
logic.

Actually, looking at what you want to do, it seems that there isn't a really 
simple way to do it in Crunch, which seems surprising because doing something 
with the top-n values per key seems like it would come up pretty often.

The best way I can think of accomplishing it would to be to do something like 
this:

        // Convert the original PTable into a table of <K, Collection<V>> with 
single values in the collection
        PTable<K,V> input = …;
        PTable<K, Collection<V>> tableOfCollections = input.parallelDo(new 
ValueToSingleElementCollectionFn());

        // Use a custom CombineFn that collects the top values per key, does a 
nested loop over the 
        // incoming iterable of collections
        PTable<K, Collection<V>> topValuesPerKey = 
                tableOfCollections.groupByKey().combineValues(new 
NestedLoopTopKCombineFn());

        PTable<K, String> withJoinedValues = topValuesPerKey.paralleDo(new 
JoinValuesAsStringFn());


This feels pretty hacky, but as far as I can see it's the easiest way to use a 
Combiner as part of the
top-k selection. I'm a bit worried about the impact that the use of the 
single-element collections would
introduce as well, and have a nagging feeling that there must be a better way, 
but I don't see it at the moment.

- Gabriel

On 25 Sep 2013, at 04:59, Chao Shi <[email protected]> wrote:

> Hi guys,
> 
> I need to have crunch generating a MR pipeline with a combiner and reducer. 
> My combiner and reducer have different logic. I wonder if this is possible in 
> crunch.
> 
> The problem can be simplified as the following:
> 
> Give a series of <string, int> pairs, output the largest K values per key, 
> and join them to a string. For example, suppose K=2, the output of <"hello", 
> 1>, <"hello", 2>, <"hello", 3>, <"world", 3> is <"hello", "2, 3">, <"world", 
> "3">.
> 
> In raw MR, I would like to use a combiner to determine the locally largest 
> value per key. 
> 
> class MyCombiner extneds Reducer<Text, IntWritable, Text, intWritable> {
>   void reduce(Text key, Iterable<IntWritable> values, Context context) {
>     go over "values" and keep top K in memory
>     emit top K
>   }
> }
> 
> class MyReducer extends Reducer<Text, IntWritable, Text, Text> {
>   void reduce(Text key, Iterable<IntWritable> values, Context context) {
>     go over "values" and keep top K in memory, assuming saving to "int[] top";
>     context.write(key, join(top, ", "));
>   }
> }
> 
> Could anyone give me a hint on how to do this in crunch? I see 
> PGroupedTable#combineValues, but I think it requires the reducer and combiner 
> has the same signature (generic types).
> 
> Thanks,
> Chao

Reply via email to