I'd like to cache aggregation results in-memory (in my IBackingMap
implementation) for, say, 10 seconds and then do a upsert to my final data
storage (MySQL in this case). This is of course to avoid unnecessarily
writing 1000 times a second as our loads are rather big.

My question is, which entity should take care of the "flushing" and how? I
have a topology as follows:

        Stream stream = topology.newStream("logspout", spout)
            .parallelismHint(8)
            .each(new Fields("json"),
                  new ProcessJsonFunction(),
                  new Fields("ad_id", "zone", "analytics"))
            .groupBy(new Fields("ad_id", "zone"))
            .persistentAggregate(AnalyticsBackingMap.FACTORY,
                                 new Fields("analytics"),
                                 new AnalyticsSum(),
                                 new Fields("analytics_aggregate"))
            .newValuesStream()
            .each(new Fields("analytics_aggregate"),
                  new ProcessResultFunction(),
                  new Fields());

I was thinking I'd put the flushing logic into ProcessResultFunction. But
then I would have
to be able to signal my IBackingMap implementation to clear its state since
I'm dealing with time windows. And I cannot figure out a proper way to do
this. :o

- Matti

Reply via email to