David,
Take a look at the example here:
https://github.com/dschiavu/trident-mssql/blob/master/src/test/java/storm/trident/mssql/MssqlStateTopology.java
-- the CountSumSum (or a variant of) Aggregator is just what you need.
Reproduced here for convenience:
class CountSumSum implements CombinerAggregator<List<Number>> {
@Override
public List<Number> init(TridentTuple tuple) {
return Lists.newArrayList(1L, (Number)
tuple.getValue(0), (Number)
tuple.getValue(1));
}
@Override
public List<Number> combine(List<Number> val1, List<Number>
val2) {
return Lists.newArrayList(Numbers.add(val1.get(0),
val2.get(0)),
Numbers.add(val1.get(1), val2.get(1)), Numbers.add(val1.get(2),
val2.get(2)));
}
@Override
public List<Number> zero() {
return Lists.newArrayList((Number) 0, (Number) 0,
(Number) 0);
}
}
Usage example:
stream.persistentAggregate(MssqlState.newFactory(config), new
Fields("a", "b", "c"), new CountSumSum(), new Fields("CountSumSum"));
The output tuple field "CountSumSum" is a List containing the count of "a"
and the sums of "b" and "c" respectively, and is what your State
implementation will receive in its multiPut() method to be stored into your
database.
Hope this helps,
Regards,
Danijel
On Thu, Jul 10, 2014 at 7:44 PM, David DIDIER <[email protected]> wrote:
> thank you all
>
> - chainedAgg... : indeed it does not work with persistentAggregate.
> - join : I guess it should work but it seems a little far fetched just to
> sum up 2 fields, which are in my case closely linked (download and upload
> sizes) ; is it worth the splitting, the possible network overhead then the
> joining ?
>
> Maybe I'm thinking all this wrong since it seems to me a really simple use
> case ?
>
>
> 2014-07-10 19:08 GMT+02:00 Sam Goodwin <[email protected]>:
>
> I don't think this works for persistentAggregate.
>>
>>
>> On Thu, Jul 10, 2014 at 4:30 PM, Xuehui He <[email protected]>
>> wrote:
>>
>>> copy from :
>>> http://storm.incubator.apache.org/documentation/Trident-API-Overview.html
>>>
>>>
>>> Sometimes you want to execute multiple aggregators at the same time.
>>> This is called chaining and can be accomplished like this:
>>>
>>> java mystream.chainedAgg() .partitionAggregate(new Count(), new
>>> Fields("count")) .partitionAggregate(new Fields("b"), new Sum(), new
>>> Fields("sum")) .chainEnd()
>>>
>>> This code will run the Count and Sum aggregators on each partition. The
>>> output will contain a single tuple with the fields [“count”, “sum”].
>>>
>>>
>>> 2014-07-10 23:20 GMT+08:00 Can Gencer <[email protected]>:
>>>
>>>> Hi David,
>>>>
>>>> Another option would be to create two seperate persistentAggregates
>>>> from the groupedStream as Sam mentioned, and then use topology.join() to
>>>> join them on the group key ("user", in your case)
>>>>
>>>>
>>>> On Wed, Jul 9, 2014 at 8:01 PM, David DIDIER <[email protected]>
>>>> wrote:
>>>>
>>>>> thanks for the pointer!
>>>>>
>>>>>
>>>>> 2014-07-09 20:52 GMT+02:00 Danijel Schiavuzzi <[email protected]>
>>>>> :
>>>>>
>>>>> Take a look at the custom CountSumSum CombinerAggregator here:
>>>>>> https://github.com/dschiavu/trident-mssql/blob/master/src/test/java/storm/trident/mssql/MssqlStateTopology.java
>>>>>>
>>>>>> Basically, you implement your own Aggregator which may output only
>>>>>> one Field, but you store a java.util.List into the field, so you may
>>>>>> aggregate multiple input fields and pack them into a List which is
>>>>>> subsequently dispatched into a State implementation's multiPut() or
>>>>>> multiGet() method, where you can then unpack the single values from the
>>>>>> List and store them separately into your persistent storage (for example,
>>>>>> into separate columns of an SQL table).
>>>>>>
>>>>>> The above example provides a working example for this.
>>>>>>
>>>>>>
>>>>>> On Wednesday, July 9, 2014, c_inconnu <[email protected]> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I'm new to trident and I'm trying to solve the following use case.
>>>>>>> I've read several times the documentation and I've been quite deep in
>>>>>>> the
>>>>>>> code base but couldn't figure out how to do it...
>>>>>>>
>>>>>>> My input tuples have the keys (user, some_int_1, some_int_2).
>>>>>>> My output should be the sum of some_int_1 and the sum of some_int_2
>>>>>>> (two fields, not one) by user, ie. (user, total_some_int_1,
>>>>>>> total_some_int_2).
>>>>>>>
>>>>>>> I've tried something like that:
>>>>>>>
>>>>>>> myStream.groupBy(new Fields("user"))
>>>>>>> .persistentAggregate(new MemoryMapState.Factory(),
>>>>>>> new Fields("some_int_1", "some_int_2"),
>>>>>>> new Sum(),
>>>>>>> new Fields("total_some_int_1", "total_
>>>>>>> some_int_2"))
>>>>>>>
>>>>>>> but got
>>>>>>>
>>>>>>> Exception in thread "main" java.lang.IllegalArgumentException:
>>>>>>> Combiner aggs only take a single field as input. Got this instead: [
>>>>>>> total_some_int_1, total_some_int_2]
>>>>>>>
>>>>>>> what is the correct way of doing this ?
>>>>>>> thanks
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Danijel Schiavuzzi
>>>>>>
>>>>>> E: [email protected]
>>>>>> W: www.schiavuzzi.com
>>>>>> T: +385989035562
>>>>>> Skype: danijels7
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
--
Danijel Schiavuzzi
E: [email protected]
W: www.schiavuzzi.com
T: +385989035562
Skype: danijels7