Makes sense. Cheers for the elaborate explanation.
On Wed, Mar 11, 2015 at 4:21 PM, David Ortiz <[email protected]> wrote: > I believe it's because the Iterable type is not a Writable type. It's > like when you're doing a SQL group by. Any field you aren't grouping by > has to have an aggregate function to get an output. In the case of Crunch, > groupByKey does the first part of that, but you still need to define your > aggregate function (max, min, first, sum, etc.) before the operation is > completed and ready to write out. > > Another way to look at it, is whatever job you are doing, that > PGroupedTable you get from groupByKey is the input to the Reducer, so you > still need to actually do your reduce function before you get the final > output. > > On Wed, Mar 11, 2015 at 11:01 AM Kristoffer Sjögren <[email protected]> > wrote: > >> Ah, that's it, thanks! >> >> Still, i'm not sure why PGroupedTable.count() fails complaining about >> WritableType. It seems to be the natural thing to do? The error message is >> also confusing since the key type i'm using is in fact WritableType. >> >> >> >> On Wed, Mar 11, 2015 at 3:39 PM, David Ortiz <[email protected]> wrote: >> >>> Oh! If what you want is the count of each unique combination of >>> key/input, try changing the output from tableOf to pairs, so you get a >>> PCollection<Pair<String, String>>, then you can do a count on that >>> collection to get the count of each unique pair. >>> >>> On Wed, Mar 11, 2015 at 10:15 AM David Ortiz <[email protected]> wrote: >>> >>>> Ah. Fair enough. To get that effect, you will need to do a combine >>>> function I think. Under the hood, that PGroupedTable groupByKey gives you >>>> something like PCollection<String, Iterable<String>>. Off hand, I don't >>>> know of a Writable type for Iterable, so my guess is you need to take care >>>> of that before the count. >>>> >>>> On Wed, Mar 11, 2015 at 9:51 AM Kristoffer Sjögren <[email protected]> >>>> wrote: >>>> >>>>> The example is incomplete. >>>>> >>>>> In reality I parse keys from the string and want to count number of >>>>> occurrences for each unique key combination. >>>>> >>>>> On Wed, Mar 11, 2015 at 2:44 PM, David Ortiz <[email protected]> >>>>> wrote: >>>>> >>>>>> Kristoffer, >>>>>> >>>>>> Based on that code snippet, why not just do: >>>>>> >>>>>> PCollection<String> lines = >>>>>> MemPipeline.typedCollectionOf(Writables.strings(), >>>>>> input); >>>>>> PTable<String, Long> lineCount = lines.count(); >>>>>> >>>>>> Since the initial snippet is just creating a pair with two copies of >>>>>> the input string, I believe that would accomplish what you're after. If >>>>>> you need the String twice with the count you could add a MapFn afterwards >>>>>> to create whatever Tuple structure you need. >>>>>> >>>>>> Thanks, >>>>>> Dave >>>>>> >>>>>> >>>>>> On Wed, Mar 11, 2015 at 9:41 AM Kristoffer Sjögren <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Micah >>>>>>> >>>>>>> Ah yes, i'm using the static import from Writables.string(). >>>>>>> >>>>>>> Cheers, >>>>>>> -Kristoffer >>>>>>> >>>>>>> On Wed, Mar 11, 2015 at 2:29 PM, Micah Whitacre < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Kristoffer, >>>>>>>> What PTypeFamily are you using for the "tableOf(strings(), >>>>>>>> strings())"? It looks like you are using Writables.strings() up above >>>>>>>> but >>>>>>>> looks like you are using static imports down below so wasn't sure if >>>>>>>> you >>>>>>>> had switched to AvroTypeFamily instead. >>>>>>>> >>>>>>>> Micah >>>>>>>> >>>>>>>> On Wed, Mar 11, 2015 at 8:17 AM, Kristoffer Sjögren < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hi >>>>>>>>> >>>>>>>>> I'm trying to count the occurrence of a key in a grouped table. >>>>>>>>> But the following code snippet [1] fails [2] when calling count() on a >>>>>>>>> MemPipeline in version 0.8.2+71-cdh4.6.0. >>>>>>>>> >>>>>>>>> Am I using the API incorrectly or is this a bug? >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> -Kristoffer >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> >>>>>>>>> PCollection<String> lines = >>>>>>>>> MemPipeline.typedCollectionOf(Writables.strings(), >>>>>>>>> input); >>>>>>>>> lines.parallelDo(new DoFn<String, Pair<String, String>>() { >>>>>>>>> @Override >>>>>>>>> public void process(String input, Emitter<Pair<String, String>> >>>>>>>>> emitter) { >>>>>>>>> emitter.emit(Pair.of(input, input)); >>>>>>>>> } >>>>>>>>> }, tableOf(strings(), strings())) >>>>>>>>> .groupByKey() >>>>>>>>> .count(); >>>>>>>>> >>>>>>>>> [2] >>>>>>>>> >>>>>>>>> java.lang.IllegalArgumentException: Key type must be of class >>>>>>>>> WritableType >>>>>>>>> at org.apache.crunch.types.writable.Writables.tableOf( >>>>>>>>> Writables.java:351) >>>>>>>>> at org.apache.crunch.types.writable.WritableTypeFamily. >>>>>>>>> tableOf(WritableTypeFamily.java:95) >>>>>>>>> at org.apache.crunch.lib.Aggregate.count(Aggregate.java:65) >>>>>>>>> at org.apache.crunch.lib.Aggregate.count(Aggregate.java:56) >>>>>>>>> at org.apache.crunch.impl.mem.collect.MemCollection.count( >>>>>>>>> MemCollection.java:230) >>>>>>>>> at mapred.functions.FunctionsTest.testGroupActionCount( >>>>>>>>> FunctionsTest.java:79) >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>> >>
