I'd like to cache aggregation results in-memory (in my IBackingMap
implementation) for, say, 10 seconds and then do a upsert to my final data
storage (MySQL in this case). This is of course to avoid unnecessarily
writing 1000 times a second as our loads are rather big.
My question is, which entity should take care of the "flushing" and how? I
have a topology as follows:
Stream stream = topology.newStream("logspout", spout)
.parallelismHint(8)
.each(new Fields("json"),
new ProcessJsonFunction(),
new Fields("ad_id", "zone", "analytics"))
.groupBy(new Fields("ad_id", "zone"))
.persistentAggregate(AnalyticsBackingMap.FACTORY,
new Fields("analytics"),
new AnalyticsSum(),
new Fields("analytics_aggregate"))
.newValuesStream()
.each(new Fields("analytics_aggregate"),
new ProcessResultFunction(),
new Fields());
I was thinking I'd put the flushing logic into ProcessResultFunction. But
then I would have
to be able to signal my IBackingMap implementation to clear its state since
I'm dealing with time windows. And I cannot figure out a proper way to do
this. :o
- Matti