My goal is to store the triplet (user, total1, total2) in a database. For now I'll go with my "PairCombinator". Thanks
2014-07-09 20:29 GMT+02:00 Sam Goodwin <[email protected]>: > Oops, yeah that field is now gone. Do you plan on using these values down > stream or do you just need to store the values somewhere? Here is how you > can change it but it doesn't give you a stream with the two counts. > > GroupedStream g = myStream.groupBy(new Fields("user")); > > g.persistentAggregate(new MemoryMapState.Factory(), > new Fields("some_int_1"), > new Sum(), > new Fields("total_some_int_1")); > > > g.persistentAggregate(new MemoryMapState.Factory(), > new Fields("some_int_2"), > new Sum(), > new Fields("total_some_int_2")); > > > > On Wed, Jul 9, 2014 at 7:20 PM, c_inconnu <[email protected]> wrote: > >> Thank you >> >> While asking this question, I have actually implemented a quick and dirty >> combinator that combine a pair, and that's working. But there is a lot of >> edge cases (I think) and so I'm not satisfied with it. Will need a lot more >> work. >> >> As for your first suggestion, this unfortunately does not work : >> >> Trying to select non-existent field: 'some_int_2' from stream containing >> fields fields: <[user, total_some_int_1]> >> >> What's the purpose of newValuesStream() then ? >> >> >> 2014-07-09 19:20 GMT+02:00 Sam Goodwin <[email protected]>: >> >>> This may work. >>> >>> myStream.groupBy(new Fields("user")) >>> .persistentAggregate(new MemoryMapState.Factory(), >>> new Fields("some_int_1"), >>> new Sum(), >>> new Fields("total_some_int_1") >>> .newValuesStream() >>> .persistentAggregate(new MemoryMapState.Factory(), >>> new Fields("some_int_2"), >>> new Sum(), >>> new Fields("total_some_int_2"); >>> >>> You could also create your own Function to combine 1 and 2 into a single >>> object and implement your own CombinerAggregator for it. This is how you >>> could do it with only one aggregate step and it also lets your store the >>> values together. >>> >>> >>> On Wed, Jul 9, 2014 at 7:38 AM, c_inconnu <[email protected]> wrote: >>> >>> Hi, >>>> >>>> I'm new to trident and I'm trying to solve the following use case. I've >>>> read several times the documentation and I've been quite deep in the code >>>> base but couldn't figure out how to do it... >>>> >>>> My input tuples have the keys (user, some_int_1, some_int_2). >>>> My output should be the sum of some_int_1 and the sum of some_int_2 >>>> (two fields, not one) by user, ie. (user, total_some_int_1, >>>> total_some_int_2). >>>> >>>> I've tried something like that: >>>> >>>> myStream.groupBy(new Fields("user")) >>>> .persistentAggregate(new MemoryMapState.Factory(), >>>> new Fields("some_int_1", "some_int_2"), >>>> new Sum(), >>>> new Fields("total_some_int_1", "total_ >>>> some_int_2")) >>>> >>>> but got >>>> >>>> Exception in thread "main" java.lang.IllegalArgumentException: Combiner >>>> aggs only take a single field as input. Got this instead: [ >>>> total_some_int_1, total_some_int_2] >>>> >>>> what is the correct way of doing this ? >>>> thanks >>>> >>>> >> >
