Hi every body. I want to use a simple topology which is basically composed of 2 spout, so 2 Stream.
On one of the stream I do a simple filter "F1", then groupBy(), persistantAggregate( with a MemoryMapState.Factory() and a basic CombinerAggregator) I do a newStream() to push the results and finally a filter "F2". On the other stream I do a simple filter "F3". Finally i do a join between the 2 streams (I joins the output of F2 and F3) Every Stream work perfectly when it is alone F2 as well as F3 produces tuples (I mean if I build and test the 2 topologies by deleting the final join from the topology). But if I add the Join, the first Stream (With the persistantAggregate()) "hangs", and no tuples gets out of the topology. After hours of analysis, I think that the problem comes from the persistantAggregate. I don't know why but It seems that the CombinerAggregator never emits the combined tuples. I see in my analysis, that the method "combine" is called for "some" tuples. >From what I have understand from the documentation, the init() method of the CombinerAggregator as well as the "combine" method is called for every tuple. For every tuple of the batch but not for the last, the persistantAggregate emits the "zero" method as output, and for the last it should emits directly the result of the combine method. Well in my case I think that the final emit which should send the result to the next bolt is never done, the "combine" is called but no tuples get out of the persistentAggregate. Like I said at the beginning if I build the first topology (without the join) the aggregation is done as expected. Is someone have an idear ? I probably can extract a simple test case if necessary. Any help whould be appreciated, Michel Hummel