Do you have any insight into how DRPC plays into this? The groupBy bolt boundary makes perfect sense and I understand how that maps to some collection of bolts that would process different groupings (depending on parallelism). What stumps me are the cases where adding multiple DRPC spouts to the topology seems to result in the whole things being duplicated for each spout. I can see some extra tracking mechanisms get stood up to track DRPC requests and then match request with response but still not sure why that wouldn't scale linearly with # of DRPC spouts.
On Tue, Jun 17, 2014 at 8:05 PM, P. Taylor Goetz <[email protected]> wrote: > Not at the moment but I will be adding that functionality (trident state) > to the storm-hbase project very soon. Currently it only supports MapState. > > -Taylor > > On Jun 17, 2014, at 6:09 PM, Andrew Serff <[email protected]> wrote: > > I'm currently using Redis, but I'm by no means tied to it. Are there any > example of using either to do that? > > Andrew > > > On Tue, Jun 17, 2014 at 3:51 PM, P. Taylor Goetz <[email protected]> > wrote: > >> Andrew/Adam, >> >> Partitioning operations like groupBy() form the bolt boundaries in >> trident topologies, so the more you have the more bolts you will have and >> thus, potentially, more network transfer. >> >> What backing store are you using for persistence? If you are using >> something with counter support like HBase or Cassandra you could leverage >> that in combination with tridents exactly once semantics to let it handle >> the counting, and potentially greatly reduce the complexity of your >> topology. >> >> -Taylor >> >> On Jun 17, 2014, at 5:15 PM, Adam Lewis <[email protected]> wrote: >> >> I, too, am eagerly awaiting a reply from the list on this topic. I hit >> up against max topology size limits doing something similar with trident. >> There are definitely "linear" changes to a trident topology that result in >> quadratic growth of the "compiled" storm topology size, such as adding DRPC >> spouts. Sadly the compilation process of trident to plain storm remains >> somewhat opaque to me and I haven't had time to dig deeper. My work around >> has been to limit myself to one DRPC spout per topology and >> programmatically build multiple topologies for the variations (which >> results in a lot of structural and functional duplication of deployed >> topologies, but at least not code duplication). >> >> Trident presents a seemingly nice abstraction, but from my point of view >> it is a leaky one if I need to understand the compilation process to know >> why adding a single DRPC spout double the topology size. >> >> >> >> On Tue, Jun 17, 2014 at 4:33 PM, Andrew Serff <[email protected]> wrote: >> >>> Is there no one out there that can help with this? If I use this >>> paradigm, my topology ends up having like 170 bolts. Then I add DRPC >>> stream and I have like 50 spouts. All of this adds up to a topology that I >>> can't even submit because it's too large (and i've bumped the trident max >>> to 50mb already...). It seems like I'm thinking about this wrong, but I >>> haven't be able to come up with another way to do it. I don't really see >>> how using vanilla Storm would help, maybe someone can offer some guidance? >>> >>> Thanks >>> Andrew >>> >>> >>> On Mon, Jun 9, 2014 at 11:45 AM, Andrew Serff <[email protected]> wrote: >>> >>>> Hello, >>>> >>>> I'm new to using Trident and had a few questions about the best way to >>>> do things in this framework. I'm trying to build a real-time streaming >>>> aggregation system and Trident seems to have a very easy framework to allow >>>> me to do that. I have a basic setup working, but as I am adding more >>>> counters, the performance becomes very slow and eventually I start having >>>> many failures. At the basic level here is what I want to do: >>>> >>>> Have an incoming stream that is using a KafkaSpout to read data from. >>>> I take the Kafka stream, parse it and output multiple fields. >>>> I then want many different counters for those fields in the data. >>>> >>>> For example, say it was the twitter stream. I may want to count: >>>> - A counter for each username I come across. So how many times I have >>>> received a tweet from each user >>>> - A counter for each hashtag so you know how many tweets mention a >>>> hashtag >>>> - Binned counters based on date for each tweet (i.e. how many tweets in >>>> 2014, June 2014, June 08 2014, etc). >>>> >>>> The list could continue, but this can add up to hundreds of counters >>>> running in real time. Right now I have something like the following: >>>> >>>> TridentTopology topology = new TridentTopology(); >>>> KafkaSpout spout = new KafkaSpout(kafkaConfig); >>>> Stream stream = topology.newStream("messages", spout).shuffle() >>>> .each(new Fields("str"), new FieldEmitter(), new >>>> Fields("username", "hashtag")); >>>> >>>> stream.groupBy(new Fields("username")) >>>> .persistentAggregate(stateFactory, new >>>> Fields("username"), new Count(), new Fields("count")) >>>> .parallelismHint(6); >>>> stream.groupBy(new Fields("hashtag")) >>>> .persistentAggregate(stateFactory, new >>>> Fields("hashtag"), new Count(), new Fields("count")) >>>> .parallelismHint(6); >>>> >>>> Repeat something similar for everything you want to have a unique count >>>> for. >>>> >>>> I end up having hundreds of GroupBys each that has an aggregator for >>>> each. I have so far only run this on my local machine and not on a cluster >>>> yet, but I'm wondering if this is the correct design for something like >>>> this or if there is a better way to distribute this within Trident to make >>>> it more efficient. >>>> >>>> Any suggestions would be appreciated! >>>> Thanks! >>>> Andrew >>>> >>> >>> >> >
