Hello, a beginner question coming up. I'm trying to build analytics
crunching with Storm Trident; a continuous stream of events of which I need
to group/aggregate things and then write the aggregated results over a
time-slice into a database for quick access later on. I am starting with
the following topology:

        TridentState state = topology.newStream("logspout", spout)
            .parallelismHint(8).each(new Fields("json"),
                                     new ProcessJsonFunction(),
                                     new Fields("ad_id", "zone",
                                                "impressions", "clicks"))
            .groupBy(new Fields("ad_id", "zone"))
            .chainedAgg()
            .aggregate(new Fields("impressions"), new Sum(),
                       new Fields("impressions_sum"))
            .aggregate(new Fields("clicks"), new Sum(),
                       new Fields("clicks_sum"))
            .chainEnd()
            .partitionPersist(new AnalyticsStateFactory(),
                              new Fields("impressions_sum", "clicks_sum"),
                              new AnalyticsStateUpdater());

And then in my class AnalyticsStateUpdater:s method updateState() I would
like to store the aggregated values ("impressions_sum", "clicks_sum") per
key-"bucket" -- and here I ran into problems; how do I - in the
StateUpdater - know to which groupBy() "bucket" the aggregated data belongs
to? In other words I would need to get the key formed of the values of the
fields ("ad_id", "zone"). The aggregated values themselves end up properly
Sum():ed in the StateUpdater.

I am aware this is probably trivial but from the Trident documentation (or
the lack of it) I cannot seem to figure out how to do this.

BR,

- Matti

Reply via email to