thank you all - chainedAgg... : indeed it does not work with persistentAggregate. - join : I guess it should work but it seems a little far fetched just to sum up 2 fields, which are in my case closely linked (download and upload sizes) ; is it worth the splitting, the possible network overhead then the joining ?
Maybe I'm thinking all this wrong since it seems to me a really simple use case ? 2014-07-10 19:08 GMT+02:00 Sam Goodwin <[email protected]>: > I don't think this works for persistentAggregate. > > > On Thu, Jul 10, 2014 at 4:30 PM, Xuehui He <[email protected]> > wrote: > >> copy from : >> http://storm.incubator.apache.org/documentation/Trident-API-Overview.html >> >> >> Sometimes you want to execute multiple aggregators at the same time. This >> is called chaining and can be accomplished like this: >> >> java mystream.chainedAgg() .partitionAggregate(new Count(), new >> Fields("count")) .partitionAggregate(new Fields("b"), new Sum(), new >> Fields("sum")) .chainEnd() >> >> This code will run the Count and Sum aggregators on each partition. The >> output will contain a single tuple with the fields [“count”, “sum”]. >> >> >> 2014-07-10 23:20 GMT+08:00 Can Gencer <[email protected]>: >> >>> Hi David, >>> >>> Another option would be to create two seperate persistentAggregates from >>> the groupedStream as Sam mentioned, and then use topology.join() to join >>> them on the group key ("user", in your case) >>> >>> >>> On Wed, Jul 9, 2014 at 8:01 PM, David DIDIER <[email protected]> >>> wrote: >>> >>>> thanks for the pointer! >>>> >>>> >>>> 2014-07-09 20:52 GMT+02:00 Danijel Schiavuzzi <[email protected]>: >>>> >>>> Take a look at the custom CountSumSum CombinerAggregator here: >>>>> https://github.com/dschiavu/trident-mssql/blob/master/src/test/java/storm/trident/mssql/MssqlStateTopology.java >>>>> >>>>> Basically, you implement your own Aggregator which may output only one >>>>> Field, but you store a java.util.List into the field, so you may aggregate >>>>> multiple input fields and pack them into a List which is subsequently >>>>> dispatched into a State implementation's multiPut() or multiGet() method, >>>>> where you can then unpack the single values from the List and store them >>>>> separately into your persistent storage (for example, into separate >>>>> columns >>>>> of an SQL table). >>>>> >>>>> The above example provides a working example for this. >>>>> >>>>> >>>>> On Wednesday, July 9, 2014, c_inconnu <[email protected]> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I'm new to trident and I'm trying to solve the following use case. >>>>>> I've read several times the documentation and I've been quite deep in the >>>>>> code base but couldn't figure out how to do it... >>>>>> >>>>>> My input tuples have the keys (user, some_int_1, some_int_2). >>>>>> My output should be the sum of some_int_1 and the sum of some_int_2 >>>>>> (two fields, not one) by user, ie. (user, total_some_int_1, >>>>>> total_some_int_2). >>>>>> >>>>>> I've tried something like that: >>>>>> >>>>>> myStream.groupBy(new Fields("user")) >>>>>> .persistentAggregate(new MemoryMapState.Factory(), >>>>>> new Fields("some_int_1", "some_int_2"), >>>>>> new Sum(), >>>>>> new Fields("total_some_int_1", "total_ >>>>>> some_int_2")) >>>>>> >>>>>> but got >>>>>> >>>>>> Exception in thread "main" java.lang.IllegalArgumentException: >>>>>> Combiner aggs only take a single field as input. Got this instead: [ >>>>>> total_some_int_1, total_some_int_2] >>>>>> >>>>>> what is the correct way of doing this ? >>>>>> thanks >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Danijel Schiavuzzi >>>>> >>>>> E: [email protected] >>>>> W: www.schiavuzzi.com >>>>> T: +385989035562 >>>>> Skype: danijels7 >>>>> >>>> >>>> >>> >> >
