I don't think this works for persistentAggregate.

On Thu, Jul 10, 2014 at 4:30 PM, Xuehui He <[email protected]> wrote:

> copy from :
> http://storm.incubator.apache.org/documentation/Trident-API-Overview.html
>
>
> Sometimes you want to execute multiple aggregators at the same time. This
> is called chaining and can be accomplished like this:
>
> java mystream.chainedAgg() .partitionAggregate(new Count(), new
> Fields("count")) .partitionAggregate(new Fields("b"), new Sum(), new
> Fields("sum")) .chainEnd()
>
> This code will run the Count and Sum aggregators on each partition. The
> output will contain a single tuple with the fields [“count”, “sum”].
>
>
> 2014-07-10 23:20 GMT+08:00 Can Gencer <[email protected]>:
>
>> Hi David,
>>
>> Another option would be to create two seperate persistentAggregates from
>> the groupedStream as Sam mentioned, and then use topology.join() to join
>> them on the group key ("user", in your case)
>>
>>
>> On Wed, Jul 9, 2014 at 8:01 PM, David DIDIER <[email protected]>
>> wrote:
>>
>>> thanks for the pointer!
>>>
>>>
>>> 2014-07-09 20:52 GMT+02:00 Danijel Schiavuzzi <[email protected]>:
>>>
>>> Take a look at the custom CountSumSum CombinerAggregator here:
>>>> https://github.com/dschiavu/trident-mssql/blob/master/src/test/java/storm/trident/mssql/MssqlStateTopology.java
>>>>
>>>> Basically, you implement your own Aggregator which may output only one
>>>> Field, but you store a java.util.List into the field, so you may aggregate
>>>> multiple input fields and pack them into a List which is subsequently
>>>> dispatched into a State implementation's multiPut() or multiGet() method,
>>>> where you can then unpack the single values from the List and store them
>>>> separately into your persistent storage (for example, into separate columns
>>>> of an SQL table).
>>>>
>>>> The above example provides a working example for this.
>>>>
>>>>
>>>> On Wednesday, July 9, 2014, c_inconnu <[email protected]> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm new to trident and I'm trying to solve the following use case.
>>>>> I've read several times the documentation and I've been quite deep in the
>>>>> code base but couldn't figure out how to do it...
>>>>>
>>>>> My input tuples have the keys (user, some_int_1, some_int_2).
>>>>> My output should be the sum of some_int_1 and the sum of some_int_2
>>>>> (two fields, not one) by user, ie. (user, total_some_int_1,
>>>>> total_some_int_2).
>>>>>
>>>>> I've tried something like that:
>>>>>
>>>>> myStream.groupBy(new Fields("user"))
>>>>>         .persistentAggregate(new MemoryMapState.Factory(),
>>>>>                              new Fields("some_int_1", "some_int_2"),
>>>>>                              new Sum(),
>>>>>                              new Fields("total_some_int_1", "total_
>>>>> some_int_2"))
>>>>>
>>>>> but got
>>>>>
>>>>> Exception in thread "main" java.lang.IllegalArgumentException:
>>>>> Combiner aggs only take a single field as input. Got this instead: [
>>>>> total_some_int_1, total_some_int_2]
>>>>>
>>>>> what is the correct way of doing this ?
>>>>> thanks
>>>>>
>>>>>
>>>>
>>>> --
>>>> Danijel Schiavuzzi
>>>>
>>>> E: [email protected]
>>>> W: www.schiavuzzi.com
>>>> T: +385989035562
>>>> Skype: danijels7
>>>>
>>>
>>>
>>
>

Reply via email to