I don't think this works for persistentAggregate.
On Thu, Jul 10, 2014 at 4:30 PM, Xuehui He <[email protected]> wrote: > copy from : > http://storm.incubator.apache.org/documentation/Trident-API-Overview.html > > > Sometimes you want to execute multiple aggregators at the same time. This > is called chaining and can be accomplished like this: > > java mystream.chainedAgg() .partitionAggregate(new Count(), new > Fields("count")) .partitionAggregate(new Fields("b"), new Sum(), new > Fields("sum")) .chainEnd() > > This code will run the Count and Sum aggregators on each partition. The > output will contain a single tuple with the fields [“count”, “sum”]. > > > 2014-07-10 23:20 GMT+08:00 Can Gencer <[email protected]>: > >> Hi David, >> >> Another option would be to create two seperate persistentAggregates from >> the groupedStream as Sam mentioned, and then use topology.join() to join >> them on the group key ("user", in your case) >> >> >> On Wed, Jul 9, 2014 at 8:01 PM, David DIDIER <[email protected]> >> wrote: >> >>> thanks for the pointer! >>> >>> >>> 2014-07-09 20:52 GMT+02:00 Danijel Schiavuzzi <[email protected]>: >>> >>> Take a look at the custom CountSumSum CombinerAggregator here: >>>> https://github.com/dschiavu/trident-mssql/blob/master/src/test/java/storm/trident/mssql/MssqlStateTopology.java >>>> >>>> Basically, you implement your own Aggregator which may output only one >>>> Field, but you store a java.util.List into the field, so you may aggregate >>>> multiple input fields and pack them into a List which is subsequently >>>> dispatched into a State implementation's multiPut() or multiGet() method, >>>> where you can then unpack the single values from the List and store them >>>> separately into your persistent storage (for example, into separate columns >>>> of an SQL table). >>>> >>>> The above example provides a working example for this. >>>> >>>> >>>> On Wednesday, July 9, 2014, c_inconnu <[email protected]> wrote: >>>> >>>>> Hi, >>>>> >>>>> I'm new to trident and I'm trying to solve the following use case. >>>>> I've read several times the documentation and I've been quite deep in the >>>>> code base but couldn't figure out how to do it... >>>>> >>>>> My input tuples have the keys (user, some_int_1, some_int_2). >>>>> My output should be the sum of some_int_1 and the sum of some_int_2 >>>>> (two fields, not one) by user, ie. (user, total_some_int_1, >>>>> total_some_int_2). >>>>> >>>>> I've tried something like that: >>>>> >>>>> myStream.groupBy(new Fields("user")) >>>>> .persistentAggregate(new MemoryMapState.Factory(), >>>>> new Fields("some_int_1", "some_int_2"), >>>>> new Sum(), >>>>> new Fields("total_some_int_1", "total_ >>>>> some_int_2")) >>>>> >>>>> but got >>>>> >>>>> Exception in thread "main" java.lang.IllegalArgumentException: >>>>> Combiner aggs only take a single field as input. Got this instead: [ >>>>> total_some_int_1, total_some_int_2] >>>>> >>>>> what is the correct way of doing this ? >>>>> thanks >>>>> >>>>> >>>> >>>> -- >>>> Danijel Schiavuzzi >>>> >>>> E: [email protected] >>>> W: www.schiavuzzi.com >>>> T: +385989035562 >>>> Skype: danijels7 >>>> >>> >>> >> >
