Hi David,

Another option would be to create two seperate persistentAggregates from
the groupedStream as Sam mentioned, and then use topology.join() to join
them on the group key ("user", in your case)


On Wed, Jul 9, 2014 at 8:01 PM, David DIDIER <[email protected]> wrote:

> thanks for the pointer!
>
>
> 2014-07-09 20:52 GMT+02:00 Danijel Schiavuzzi <[email protected]>:
>
> Take a look at the custom CountSumSum CombinerAggregator here:
>> https://github.com/dschiavu/trident-mssql/blob/master/src/test/java/storm/trident/mssql/MssqlStateTopology.java
>>
>> Basically, you implement your own Aggregator which may output only one
>> Field, but you store a java.util.List into the field, so you may aggregate
>> multiple input fields and pack them into a List which is subsequently
>> dispatched into a State implementation's multiPut() or multiGet() method,
>> where you can then unpack the single values from the List and store them
>> separately into your persistent storage (for example, into separate columns
>> of an SQL table).
>>
>> The above example provides a working example for this.
>>
>>
>> On Wednesday, July 9, 2014, c_inconnu <[email protected]> wrote:
>>
>>> Hi,
>>>
>>> I'm new to trident and I'm trying to solve the following use case. I've
>>> read several times the documentation and I've been quite deep in the code
>>> base but couldn't figure out how to do it...
>>>
>>> My input tuples have the keys (user, some_int_1, some_int_2).
>>> My output should be the sum of some_int_1 and the sum of some_int_2
>>> (two fields, not one) by user, ie. (user, total_some_int_1,
>>> total_some_int_2).
>>>
>>> I've tried something like that:
>>>
>>> myStream.groupBy(new Fields("user"))
>>>         .persistentAggregate(new MemoryMapState.Factory(),
>>>                              new Fields("some_int_1", "some_int_2"),
>>>                              new Sum(),
>>>                              new Fields("total_some_int_1", "total_
>>> some_int_2"))
>>>
>>> but got
>>>
>>> Exception in thread "main" java.lang.IllegalArgumentException: Combiner
>>> aggs only take a single field as input. Got this instead: [total_some_int_1,
>>> total_some_int_2]
>>>
>>> what is the correct way of doing this ?
>>> thanks
>>>
>>>
>>
>> --
>> Danijel Schiavuzzi
>>
>> E: [email protected]
>> W: www.schiavuzzi.com
>> T: +385989035562
>> Skype: danijels7
>>
>
>

Reply via email to