My goal is to store the triplet (user, total1, total2) in a database. For
now I'll go with my "PairCombinator". Thanks



2014-07-09 20:29 GMT+02:00 Sam Goodwin <[email protected]>:

> Oops, yeah that field is now gone. Do you plan on using these values down
> stream or do you just need to store the values somewhere? Here is how you
> can change it but it doesn't give you a stream with the two counts.
>
> GroupedStream g = myStream.groupBy(new Fields("user"));
>
> g.persistentAggregate(new MemoryMapState.Factory(),
>                              new Fields("some_int_1"),
>                              new Sum(),
>                              new Fields("total_some_int_1"));
>
>
> g.persistentAggregate(new MemoryMapState.Factory(),
>                              new Fields("some_int_2"),
>                              new Sum(),
>                              new Fields("total_some_int_2"));
>
>
>
> On Wed, Jul 9, 2014 at 7:20 PM, c_inconnu <[email protected]> wrote:
>
>> Thank you
>>
>> While asking this question, I have actually implemented a quick and dirty
>> combinator that combine a pair, and that's working. But there is a lot of
>> edge cases (I think) and so I'm not satisfied with it. Will need a lot more
>> work.
>>
>> As for your first suggestion, this unfortunately does not work :
>>
>> Trying to select non-existent field: 'some_int_2' from stream containing
>> fields fields: <[user, total_some_int_1]>
>>
>> What's the purpose of newValuesStream() then ?
>>
>>
>> 2014-07-09 19:20 GMT+02:00 Sam Goodwin <[email protected]>:
>>
>>> This may work.
>>>
>>> myStream.groupBy(new Fields("user"))
>>>         .persistentAggregate(new MemoryMapState.Factory(),
>>>                              new Fields("some_int_1"),
>>>                              new Sum(),
>>>                              new Fields("total_some_int_1")
>>>         .newValuesStream()
>>>         .persistentAggregate(new MemoryMapState.Factory(),
>>>                              new Fields("some_int_2"),
>>>                              new Sum(),
>>>                              new Fields("total_some_int_2");
>>>
>>> You could also create your own Function to combine 1 and 2 into a single
>>> object and implement your own CombinerAggregator for it. This is how you
>>> could do it with only one aggregate step and it also lets your store the
>>> values together.
>>>
>>>
>>> On Wed, Jul 9, 2014 at 7:38 AM, c_inconnu <[email protected]> wrote:
>>>
>>> Hi,
>>>>
>>>> I'm new to trident and I'm trying to solve the following use case. I've
>>>> read several times the documentation and I've been quite deep in the code
>>>> base but couldn't figure out how to do it...
>>>>
>>>> My input tuples have the keys (user, some_int_1, some_int_2).
>>>> My output should be the sum of some_int_1 and the sum of some_int_2
>>>> (two fields, not one) by user, ie. (user, total_some_int_1,
>>>> total_some_int_2).
>>>>
>>>> I've tried something like that:
>>>>
>>>> myStream.groupBy(new Fields("user"))
>>>>         .persistentAggregate(new MemoryMapState.Factory(),
>>>>                              new Fields("some_int_1", "some_int_2"),
>>>>                              new Sum(),
>>>>                              new Fields("total_some_int_1", "total_
>>>> some_int_2"))
>>>>
>>>> but got
>>>>
>>>> Exception in thread "main" java.lang.IllegalArgumentException: Combiner
>>>> aggs only take a single field as input. Got this instead: [
>>>> total_some_int_1, total_some_int_2]
>>>>
>>>> what is the correct way of doing this ?
>>>> thanks
>>>>
>>>>
>>
>

Reply via email to