Hi Gabriel, Your approach is tricky. I agree that this kind of MR logic is pretty common. So it would be nice to add such feature to crunch. At the first glance, I think the problem in PTable#collectValues is that it return a PTable rather than a PGroupedTable (I haven't check the internal logic yet).
2013/9/25 Gabriel Reid <[email protected]> > Hi Chao, > > I don't think it's currently possible to have separate combiner and > reducer logic. > > Actually, looking at what you want to do, it seems that there isn't a > really simple way to do it in Crunch, which seems surprising because doing > something with the top-n values per key seems like it would come up pretty > often. > > The best way I can think of accomplishing it would to be to do something > like this: > > // Convert the original PTable into a table of <K, Collection<V>> > with single values in the collection > PTable<K,V> input = …; > PTable<K, Collection<V>> tableOfCollections = input.parallelDo(new > ValueToSingleElementCollectionFn()); > > // Use a custom CombineFn that collects the top values per key, > does a nested loop over the > // incoming iterable of collections > PTable<K, Collection<V>> topValuesPerKey = > tableOfCollections.groupByKey().combineValues(new > NestedLoopTopKCombineFn()); > > PTable<K, String> withJoinedValues = topValuesPerKey.paralleDo(new > JoinValuesAsStringFn()); > > > This feels pretty hacky, but as far as I can see it's the easiest way to > use a Combiner as part of the > top-k selection. I'm a bit worried about the impact that the use of the > single-element collections would > introduce as well, and have a nagging feeling that there must be a better > way, but I don't see it at the moment. > > - Gabriel > > On 25 Sep 2013, at 04:59, Chao Shi <[email protected]> wrote: > > > Hi guys, > > > > I need to have crunch generating a MR pipeline with a combiner and > reducer. My combiner and reducer have different logic. I wonder if this is > possible in crunch. > > > > The problem can be simplified as the following: > > > > Give a series of <string, int> pairs, output the largest K values per > key, and join them to a string. For example, suppose K=2, the output of > <"hello", 1>, <"hello", 2>, <"hello", 3>, <"world", 3> is <"hello", "2, > 3">, <"world", "3">. > > > > In raw MR, I would like to use a combiner to determine the locally > largest value per key. > > > > class MyCombiner extneds Reducer<Text, IntWritable, Text, intWritable> { > > void reduce(Text key, Iterable<IntWritable> values, Context context) { > > go over "values" and keep top K in memory > > emit top K > > } > > } > > > > class MyReducer extends Reducer<Text, IntWritable, Text, Text> { > > void reduce(Text key, Iterable<IntWritable> values, Context context) { > > go over "values" and keep top K in memory, assuming saving to "int[] > top"; > > context.write(key, join(top, ", ")); > > } > > } > > > > Could anyone give me a hint on how to do this in crunch? I see > PGroupedTable#combineValues, but I think it requires the reducer and > combiner has the same signature (generic types). > > > > Thanks, > > Chao > > >
