Hello,
To illustrate the issue here is an example of a hanging trident topology :
===========================
        final FixedBatchSpout spout1 = new FixedBatchSpout(new
Fields("sentence"), 6,
                new Values("the cow jumped over the moon"));

        final FixedBatchSpout spout2 = new FixedBatchSpout(new
Fields("word2", "count"), 6, new Values("the", 2),
                new Values("cow", 1), new Values("jumped", 1), new
Values("over", 1),
                new Values("the", 1),
                new Values("moon", 1));

        final TridentTopology topology = new TridentTopology();
        final Stream wordCounts1 = topology.newStream("spout",
spout1).filter(new Debug("1"))
                .each(new Fields("sentence"), new Split(), new
Fields("word1")).groupBy(new Fields("word1"))
                .persistentAggregate(new MemoryMapState.Factory(), new
Count(), new Fields("count")).parallelismHint(2)
                .newValuesStream().filter(new Debug("11"));
        final Stream wordCounts2 = topology.newStream("spout2",
spout2).filter(new Debug("2"));

        topology.join(wordCounts1, new Fields("word1"), wordCounts2,
new Fields("word2"), new Fields("key", "a", "b"),
                JoinType.INNER).filter(new Debug("J"));

        final Config stormConfig = new Config();
        final LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test", stormConfig, topology.build());
        Utils.sleep(30 * 1000L);
        cluster.killTopology("test");
        cluster.shutdown();
==========================

As you can see in the output  (below), the aggregated tuples are not
printed (11) :
<Fri Nov 25 17:57:39 CET 2016> DEBUG(1): [the cow jumped over the moon]
<Fri Nov 25 17:57:39 CET 2016> DEBUG(2): [the, 2]
<Fri Nov 25 17:57:39 CET 2016> DEBUG(2): [cow, 1]
<Fri Nov 25 17:57:39 CET 2016> DEBUG(2): [jumped, 1]
<Fri Nov 25 17:57:39 CET 2016> DEBUG(2): [over, 1]
<Fri Nov 25 17:57:39 CET 2016> DEBUG(2): [the, 1]
<Fri Nov 25 17:57:39 CET 2016> DEBUG(2): [moon, 1]


if i replace the persistentAggregate with an aggregate, the topology work fine :
==========================
       final FixedBatchSpout spout1 = new FixedBatchSpout(new
Fields("sentence"), 6,
                new Values("the cow jumped over the moon"));

        final FixedBatchSpout spout2 = new FixedBatchSpout(new
Fields("word2", "count"), 6,
                new Values("the", 2),
                new Values("cow", 1),
                new Values("jumped", 1),
                new Values("over", 1),
                new Values("the", 1),
                new Values("moon", 1));

        final TridentTopology topology = new TridentTopology();
        final Stream wordCounts1 =
                topology.newStream("spout", spout1).filter(new Debug("1"))
                .each(new Fields("sentence"), new Split(), new
Fields("word1")).groupBy(new Fields("word1"))
                .aggregate(new Count(), new Fields("count"))
                .parallelismHint(2).filter(new Debug("11"));
        final Stream wordCounts2 =topology.newStream("spout2",
spout2).filter(new Debug("2"));

        topology.join(wordCounts1, new Fields("word1"), wordCounts2,
new Fields("word2"), new Fields("key", "a", "b"),
                JoinType.INNER)
        .filter(new Debug("J"));

        final Config stormConfig = new Config();
        final LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test", stormConfig, topology.build());
        Utils.sleep(30 * 1000L);
        cluster.killTopology("test");
        cluster.shutdown();
==========================
As you can see in the output  (below), the aggregated tuples as well
as the joined tuples are printed (11) (J) :
<Fri Nov 25 18:05:35 CET 2016> DEBUG(1): [the cow jumped over the moon]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(11): [over, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(2): [the, 2]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(2): [cow, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(2): [jumped, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(2): [over, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(2): [the, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(2): [moon, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(11): [the, 2]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(11): [moon, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(11): [jumped, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(11): [cow, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(J): [over, 1, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(J): [the, 2, 2]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(J): [the, 2, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(J): [moon, 1, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(J): [jumped, 1, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(J): [cow, 1, 1]

Is any one have an idea ?

2016-11-25 0:00 GMT+01:00 Michel Hummel <[email protected]>:
> Hi every body.
>
> I want to use a simple topology which is basically composed of 2
> spout, so 2 Stream.
>
> On one of the stream I do a simple filter "F1", then groupBy(),
> persistantAggregate( with a MemoryMapState.Factory() and a basic
> CombinerAggregator)
> I do a newStream() to push the results and finally a filter "F2".
>
> On the other stream I do a simple filter "F3".
>
> Finally i do a join between the 2 streams (I joins the output of F2 and F3)
>
> Every Stream work perfectly when it is alone F2 as well as F3 produces
> tuples (I mean if I build and test the 2 topologies by deleting the
> final join from the topology).
>
> But if I add the Join, the first Stream (With the
> persistantAggregate()) "hangs", and no tuples gets out of the
> topology.
>
> After hours of analysis, I think that the problem comes from the
> persistantAggregate.
>
> I don't know why but It seems that the CombinerAggregator never emits
> the combined tuples.
>
>
> I see in my analysis, that the method "combine" is called for "some" tuples.
> From what I have understand from the documentation, the init() method
> of the CombinerAggregator as well as the "combine" method is called
> for every tuple. For every tuple of the batch but not for the last,
> the persistantAggregate emits the "zero" method as output, and for the
> last it should emits directly the result of the combine method.
> Well in my case I think that the final emit which should send the
> result to the next bolt is never done, the "combine" is called but no
> tuples get out of the persistentAggregate.
>
> Like I said at the beginning if I build the first topology (without
> the join) the aggregation is done as expected.
>
> Is someone have an idear ?
>
> I probably can extract a simple test case if necessary.
>
> Any help whould be appreciated,
> Michel Hummel

Reply via email to