Ah. Fair enough. To get that effect, you will need to do a combine function I think. Under the hood, that PGroupedTable groupByKey gives you something like PCollection<String, Iterable<String>>. Off hand, I don't know of a Writable type for Iterable, so my guess is you need to take care of that before the count.
On Wed, Mar 11, 2015 at 9:51 AM Kristoffer Sjögren <[email protected]> wrote: > The example is incomplete. > > In reality I parse keys from the string and want to count number of > occurrences for each unique key combination. > > On Wed, Mar 11, 2015 at 2:44 PM, David Ortiz <[email protected]> wrote: > >> Kristoffer, >> >> Based on that code snippet, why not just do: >> >> PCollection<String> lines = >> MemPipeline.typedCollectionOf(Writables.strings(), input); >> PTable<String, Long> lineCount = lines.count(); >> >> Since the initial snippet is just creating a pair with two copies of the >> input string, I believe that would accomplish what you're after. If you >> need the String twice with the count you could add a MapFn afterwards to >> create whatever Tuple structure you need. >> >> Thanks, >> Dave >> >> >> On Wed, Mar 11, 2015 at 9:41 AM Kristoffer Sjögren <[email protected]> >> wrote: >> >>> Hi Micah >>> >>> Ah yes, i'm using the static import from Writables.string(). >>> >>> Cheers, >>> -Kristoffer >>> >>> On Wed, Mar 11, 2015 at 2:29 PM, Micah Whitacre <[email protected]> >>> wrote: >>> >>>> Kristoffer, >>>> What PTypeFamily are you using for the "tableOf(strings(), >>>> strings())"? It looks like you are using Writables.strings() up above but >>>> looks like you are using static imports down below so wasn't sure if you >>>> had switched to AvroTypeFamily instead. >>>> >>>> Micah >>>> >>>> On Wed, Mar 11, 2015 at 8:17 AM, Kristoffer Sjögren <[email protected]> >>>> wrote: >>>> >>>>> Hi >>>>> >>>>> I'm trying to count the occurrence of a key in a grouped table. But >>>>> the following code snippet [1] fails [2] when calling count() on a >>>>> MemPipeline in version 0.8.2+71-cdh4.6.0. >>>>> >>>>> Am I using the API incorrectly or is this a bug? >>>>> >>>>> Cheers, >>>>> -Kristoffer >>>>> >>>>> [1] >>>>> >>>>> PCollection<String> lines = >>>>> MemPipeline.typedCollectionOf(Writables.strings(), input); >>>>> lines.parallelDo(new DoFn<String, Pair<String, String>>() { >>>>> @Override >>>>> public void process(String input, Emitter<Pair<String, String>> >>>>> emitter) { >>>>> emitter.emit(Pair.of(input, input)); >>>>> } >>>>> }, tableOf(strings(), strings())) >>>>> .groupByKey() >>>>> .count(); >>>>> >>>>> [2] >>>>> >>>>> java.lang.IllegalArgumentException: Key type must be of class >>>>> WritableType >>>>> at >>>>> org.apache.crunch.types.writable.Writables.tableOf(Writables.java:351) >>>>> at >>>>> org.apache.crunch.types.writable.WritableTypeFamily.tableOf(WritableTypeFamily.java:95) >>>>> at org.apache.crunch.lib.Aggregate.count(Aggregate.java:65) >>>>> at org.apache.crunch.lib.Aggregate.count(Aggregate.java:56) >>>>> at >>>>> org.apache.crunch.impl.mem.collect.MemCollection.count(MemCollection.java:230) >>>>> at >>>>> mapred.functions.FunctionsTest.testGroupActionCount(FunctionsTest.java:79) >>>>> >>>> >>>> >>> >
