Is there no one out there that can help with this? If I use this paradigm, my topology ends up having like 170 bolts. Then I add DRPC stream and I have like 50 spouts. All of this adds up to a topology that I can't even submit because it's too large (and i've bumped the trident max to 50mb already...). It seems like I'm thinking about this wrong, but I haven't be able to come up with another way to do it. I don't really see how using vanilla Storm would help, maybe someone can offer some guidance?
Thanks Andrew On Mon, Jun 9, 2014 at 11:45 AM, Andrew Serff <[email protected]> wrote: > Hello, > > I'm new to using Trident and had a few questions about the best way to do > things in this framework. I'm trying to build a real-time streaming > aggregation system and Trident seems to have a very easy framework to allow > me to do that. I have a basic setup working, but as I am adding more > counters, the performance becomes very slow and eventually I start having > many failures. At the basic level here is what I want to do: > > Have an incoming stream that is using a KafkaSpout to read data from. > I take the Kafka stream, parse it and output multiple fields. > I then want many different counters for those fields in the data. > > For example, say it was the twitter stream. I may want to count: > - A counter for each username I come across. So how many times I have > received a tweet from each user > - A counter for each hashtag so you know how many tweets mention a hashtag > - Binned counters based on date for each tweet (i.e. how many tweets in > 2014, June 2014, June 08 2014, etc). > > The list could continue, but this can add up to hundreds of counters > running in real time. Right now I have something like the following: > > TridentTopology topology = new TridentTopology(); > KafkaSpout spout = new KafkaSpout(kafkaConfig); > Stream stream = topology.newStream("messages", spout).shuffle() > .each(new Fields("str"), new FieldEmitter(), new > Fields("username", "hashtag")); > > stream.groupBy(new Fields("username")) > .persistentAggregate(stateFactory, new Fields("username"), > new Count(), new Fields("count")) > .parallelismHint(6); > stream.groupBy(new Fields("hashtag")) > .persistentAggregate(stateFactory, new Fields("hashtag"), > new Count(), new Fields("count")) > .parallelismHint(6); > > Repeat something similar for everything you want to have a unique count > for. > > I end up having hundreds of GroupBys each that has an aggregator for each. > I have so far only run this on my local machine and not on a cluster yet, > but I'm wondering if this is the correct design for something like this or > if there is a better way to distribute this within Trident to make it more > efficient. > > Any suggestions would be appreciated! > Thanks! > Andrew >
