Is there no one out there that can help with this?  If I use this paradigm,
my topology ends up having like 170 bolts.  Then I add DRPC stream and I
have like 50 spouts.  All of this adds up to a topology that I can't even
submit because it's too large (and i've bumped the trident max to 50mb
already...).  It seems like I'm thinking about this wrong, but I haven't be
able to come up with another way to do it.  I don't really see how using
vanilla Storm would help, maybe someone can offer some guidance?

Thanks
Andrew


On Mon, Jun 9, 2014 at 11:45 AM, Andrew Serff <[email protected]> wrote:

> Hello,
>
> I'm new to using Trident and had a few questions about the best way to do
> things in this framework.  I'm trying to build a real-time streaming
> aggregation system and Trident seems to have a very easy framework to allow
> me to do that.  I have a basic setup working, but as I am adding more
> counters, the performance becomes very slow and eventually I start having
> many failures.  At the basic level here is what I want to do:
>
> Have an incoming stream that is using a KafkaSpout to read data from.
> I take the Kafka stream, parse it and output multiple fields.
> I then want many different counters for those fields in the data.
>
> For example, say it was the twitter stream.  I may want to count:
> - A counter for each username I come across.  So how many times I have
> received a tweet from each user
> - A counter for each hashtag so you know how many tweets mention a hashtag
> - Binned counters based on date for each tweet (i.e. how many tweets in
> 2014, June 2014, June 08 2014, etc).
>
> The list could continue, but this can add up to hundreds of counters
> running in real time.  Right now I have something like the following:
>
> TridentTopology topology = new TridentTopology();
> KafkaSpout spout = new KafkaSpout(kafkaConfig);
> Stream stream = topology.newStream("messages", spout).shuffle()
>                 .each(new Fields("str"), new FieldEmitter(), new
> Fields("username", "hashtag"));
>
> stream.groupBy(new Fields("username"))
>                 .persistentAggregate(stateFactory, new Fields("username"),
> new Count(), new Fields("count"))
>                 .parallelismHint(6);
> stream.groupBy(new Fields("hashtag"))
>                 .persistentAggregate(stateFactory, new Fields("hashtag"),
> new Count(), new Fields("count"))
>                 .parallelismHint(6);
>
> Repeat something similar for everything you want to have a unique count
> for.
>
> I end up having hundreds of GroupBys each that has an aggregator for each.
>  I have so far only run this on my local machine and not on a cluster yet,
> but I'm wondering if this is the correct design for something like this or
> if there is a better way to distribute this within Trident to make it more
> efficient.
>
> Any suggestions would be appreciated!
> Thanks!
> Andrew
>

Reply via email to