copy from :
http://storm.incubator.apache.org/documentation/Trident-API-Overview.html
Sometimes you want to execute multiple aggregators at the same time. This
is called chaining and can be accomplished like this:
java mystream.chainedAgg() .partitionAggregate(new Count(), new
Fields("count")) .partitionAggregate(new Fields("b"), new Sum(), new
Fields("sum")) .chainEnd()
This code will run the Count and Sum aggregators on each partition. The
output will contain a single tuple with the fields [“count”, “sum”].
2014-07-10 23:20 GMT+08:00 Can Gencer <[email protected]>:
> Hi David,
>
> Another option would be to create two seperate persistentAggregates from
> the groupedStream as Sam mentioned, and then use topology.join() to join
> them on the group key ("user", in your case)
>
>
> On Wed, Jul 9, 2014 at 8:01 PM, David DIDIER <[email protected]> wrote:
>
>> thanks for the pointer!
>>
>>
>> 2014-07-09 20:52 GMT+02:00 Danijel Schiavuzzi <[email protected]>:
>>
>> Take a look at the custom CountSumSum CombinerAggregator here:
>>> https://github.com/dschiavu/trident-mssql/blob/master/src/test/java/storm/trident/mssql/MssqlStateTopology.java
>>>
>>> Basically, you implement your own Aggregator which may output only one
>>> Field, but you store a java.util.List into the field, so you may aggregate
>>> multiple input fields and pack them into a List which is subsequently
>>> dispatched into a State implementation's multiPut() or multiGet() method,
>>> where you can then unpack the single values from the List and store them
>>> separately into your persistent storage (for example, into separate columns
>>> of an SQL table).
>>>
>>> The above example provides a working example for this.
>>>
>>>
>>> On Wednesday, July 9, 2014, c_inconnu <[email protected]> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm new to trident and I'm trying to solve the following use case. I've
>>>> read several times the documentation and I've been quite deep in the code
>>>> base but couldn't figure out how to do it...
>>>>
>>>> My input tuples have the keys (user, some_int_1, some_int_2).
>>>> My output should be the sum of some_int_1 and the sum of some_int_2
>>>> (two fields, not one) by user, ie. (user, total_some_int_1,
>>>> total_some_int_2).
>>>>
>>>> I've tried something like that:
>>>>
>>>> myStream.groupBy(new Fields("user"))
>>>> .persistentAggregate(new MemoryMapState.Factory(),
>>>> new Fields("some_int_1", "some_int_2"),
>>>> new Sum(),
>>>> new Fields("total_some_int_1", "total_
>>>> some_int_2"))
>>>>
>>>> but got
>>>>
>>>> Exception in thread "main" java.lang.IllegalArgumentException: Combiner
>>>> aggs only take a single field as input. Got this instead:
>>>> [total_some_int_1,
>>>> total_some_int_2]
>>>>
>>>> what is the correct way of doing this ?
>>>> thanks
>>>>
>>>>
>>>
>>> --
>>> Danijel Schiavuzzi
>>>
>>> E: [email protected]
>>> W: www.schiavuzzi.com
>>> T: +385989035562
>>> Skype: danijels7
>>>
>>
>>
>