I believe it's because the Iterable type is not a Writable type. It's like when you're doing a SQL group by. Any field you aren't grouping by has to have an aggregate function to get an output. In the case of Crunch, groupByKey does the first part of that, but you still need to define your aggregate function (max, min, first, sum, etc.) before the operation is completed and ready to write out.
Another way to look at it, is whatever job you are doing, that PGroupedTable you get from groupByKey is the input to the Reducer, so you still need to actually do your reduce function before you get the final output. On Wed, Mar 11, 2015 at 11:01 AM Kristoffer Sjögren <[email protected]> wrote: > Ah, that's it, thanks! > > Still, i'm not sure why PGroupedTable.count() fails complaining about > WritableType. It seems to be the natural thing to do? The error message is > also confusing since the key type i'm using is in fact WritableType. > > > > On Wed, Mar 11, 2015 at 3:39 PM, David Ortiz <[email protected]> wrote: > >> Oh! If what you want is the count of each unique combination of >> key/input, try changing the output from tableOf to pairs, so you get a >> PCollection<Pair<String, String>>, then you can do a count on that >> collection to get the count of each unique pair. >> >> On Wed, Mar 11, 2015 at 10:15 AM David Ortiz <[email protected]> wrote: >> >>> Ah. Fair enough. To get that effect, you will need to do a combine >>> function I think. Under the hood, that PGroupedTable groupByKey gives you >>> something like PCollection<String, Iterable<String>>. Off hand, I don't >>> know of a Writable type for Iterable, so my guess is you need to take care >>> of that before the count. >>> >>> On Wed, Mar 11, 2015 at 9:51 AM Kristoffer Sjögren <[email protected]> >>> wrote: >>> >>>> The example is incomplete. >>>> >>>> In reality I parse keys from the string and want to count number of >>>> occurrences for each unique key combination. >>>> >>>> On Wed, Mar 11, 2015 at 2:44 PM, David Ortiz <[email protected]> wrote: >>>> >>>>> Kristoffer, >>>>> >>>>> Based on that code snippet, why not just do: >>>>> >>>>> PCollection<String> lines = >>>>> MemPipeline.typedCollectionOf(Writables.strings(), >>>>> input); >>>>> PTable<String, Long> lineCount = lines.count(); >>>>> >>>>> Since the initial snippet is just creating a pair with two copies of >>>>> the input string, I believe that would accomplish what you're after. If >>>>> you need the String twice with the count you could add a MapFn afterwards >>>>> to create whatever Tuple structure you need. >>>>> >>>>> Thanks, >>>>> Dave >>>>> >>>>> >>>>> On Wed, Mar 11, 2015 at 9:41 AM Kristoffer Sjögren <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Micah >>>>>> >>>>>> Ah yes, i'm using the static import from Writables.string(). >>>>>> >>>>>> Cheers, >>>>>> -Kristoffer >>>>>> >>>>>> On Wed, Mar 11, 2015 at 2:29 PM, Micah Whitacre <[email protected] >>>>>> > wrote: >>>>>> >>>>>>> Kristoffer, >>>>>>> What PTypeFamily are you using for the "tableOf(strings(), >>>>>>> strings())"? It looks like you are using Writables.strings() up above >>>>>>> but >>>>>>> looks like you are using static imports down below so wasn't sure if you >>>>>>> had switched to AvroTypeFamily instead. >>>>>>> >>>>>>> Micah >>>>>>> >>>>>>> On Wed, Mar 11, 2015 at 8:17 AM, Kristoffer Sjögren < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hi >>>>>>>> >>>>>>>> I'm trying to count the occurrence of a key in a grouped table. But >>>>>>>> the following code snippet [1] fails [2] when calling count() on a >>>>>>>> MemPipeline in version 0.8.2+71-cdh4.6.0. >>>>>>>> >>>>>>>> Am I using the API incorrectly or is this a bug? >>>>>>>> >>>>>>>> Cheers, >>>>>>>> -Kristoffer >>>>>>>> >>>>>>>> [1] >>>>>>>> >>>>>>>> PCollection<String> lines = >>>>>>>> MemPipeline.typedCollectionOf(Writables.strings(), >>>>>>>> input); >>>>>>>> lines.parallelDo(new DoFn<String, Pair<String, String>>() { >>>>>>>> @Override >>>>>>>> public void process(String input, Emitter<Pair<String, String>> >>>>>>>> emitter) { >>>>>>>> emitter.emit(Pair.of(input, input)); >>>>>>>> } >>>>>>>> }, tableOf(strings(), strings())) >>>>>>>> .groupByKey() >>>>>>>> .count(); >>>>>>>> >>>>>>>> [2] >>>>>>>> >>>>>>>> java.lang.IllegalArgumentException: Key type must be of class >>>>>>>> WritableType >>>>>>>> at org.apache.crunch.types.writable.Writables.tableOf( >>>>>>>> Writables.java:351) >>>>>>>> at org.apache.crunch.types.writable.WritableTypeFamily. >>>>>>>> tableOf(WritableTypeFamily.java:95) >>>>>>>> at org.apache.crunch.lib.Aggregate.count(Aggregate.java:65) >>>>>>>> at org.apache.crunch.lib.Aggregate.count(Aggregate.java:56) >>>>>>>> at org.apache.crunch.impl.mem.collect.MemCollection.count( >>>>>>>> MemCollection.java:230) >>>>>>>> at mapred.functions.FunctionsTest.testGroupActionCount( >>>>>>>> FunctionsTest.java:79) >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>> >
