Hi, Guys,
I am a little confused about using trident to read from stream data. I am
trying to reuse my spout from storm to trident. Thus I just pass the spout
to trident topology. The spout is reading from socket server continuously.
Then I am doing partitionBy some key, and having an aggregrator after it:

topology.merge(spoutStreamList)

.each(new Fields("line"), new DeserializeFunction(),new Fields("key",
"activity","anotherfiled"))

  .partitionBy(new Fields("key"))
.partitionAggregate(new Fields("key"), new MyAggregator(), new Fields("stat"
))

In MyAggregator, i just get the count of different activities. My question
is, when will the complete method in the Aggregator be called by Trident?
How can trident knows that a partition of data has all arrived? or there is
some kind of configuration field to decide when to call complete when
reaching certain amount of entries?

The reason I am asking is that after the partitonAggregate, i will need to
partition by "anotherfield", if only anotherfield is not null,and then do
another aggregation. But if trident does not know when a partition ends, I
cannot filter out the entries by checking whether another field is null. As
it might get filled in later stream.

Thanks very much!
Chen

Reply via email to