Hi every body.

I want to use a simple topology which is basically composed of 2
spout, so 2 Stream.

On one of the stream I do a simple filter "F1", then groupBy(),
persistantAggregate( with a MemoryMapState.Factory() and a basic
CombinerAggregator)
I do a newStream() to push the results and finally a filter "F2".

On the other stream I do a simple filter "F3".

Finally i do a join between the 2 streams (I joins the output of F2 and F3)

Every Stream work perfectly when it is alone F2 as well as F3 produces
tuples (I mean if I build and test the 2 topologies by deleting the
final join from the topology).

But if I add the Join, the first Stream (With the
persistantAggregate()) "hangs", and no tuples gets out of the
topology.

After hours of analysis, I think that the problem comes from the
persistantAggregate.

I don't know why but It seems that the CombinerAggregator never emits
the combined tuples.


I see in my analysis, that the method "combine" is called for "some" tuples.
>From what I have understand from the documentation, the init() method
of the CombinerAggregator as well as the "combine" method is called
for every tuple. For every tuple of the batch but not for the last,
the persistantAggregate emits the "zero" method as output, and for the
last it should emits directly the result of the combine method.
Well in my case I think that the final emit which should send the
result to the next bolt is never done, the "combine" is called but no
tuples get out of the persistentAggregate.

Like I said at the beginning if I build the first topology (without
the join) the aggregation is done as expected.

Is someone have an idear ?

I probably can extract a simple test case if necessary.

Any help whould be appreciated,
Michel Hummel

Reply via email to