Forgot to note; re: queue sizes, in addition to what the article mentions you may also need to increase the MAX_PENDING config.
On Sat, Jul 25, 2015 at 5:58 PM, Enno Shioji <[email protected]> wrote: > I see.... > > A few suggestions: > > - Pump enough messages into Kafka that the entire test takes at least > more than a few minutes > - If the test is very short, results can often be misleading > > - Use sufficiently large queue sizes > - Generally the lighter the tasks are, the longer queue you need to > avoid worker starvation. Since your task looks very light weight, the > default queue size may not be sufficient. There are a few places to tune. > Have a look at this article: > http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/ > > - Make the task a bit more heavy weight > - Getting higher throughput through parallelising is generally hard > when the tasks are very light weight. If you make each tasks a bit more > heavy weight (e.g. by generating more random numbers per invocation), it > may be easier to observe the performance increase (this way you may also be > able to avoid the message buffer tuning thing mentioned above) > > - Try batched topologies > - Rationale is similar to above because batching will effectively make > tasks more "heavy weight". > > > > > > > > > On Sat, Jul 25, 2015 at 5:23 PM, Dimitris Sarlis <[email protected]> > wrote: > >> Inside my bolts I ack every tuple I receive for processing. Furthermore, >> I've set the TOPOLOGY_MAX_SPOUT_PENDING to 1000 to throttle how many tuples >> remain pending at a given moment. The bolts don't have any HashMap or other >> data structure. In fact, my code is pretty simple: >> >> public void execute(Tuple tuple) { >> if (!tuple.getString(0).contains("!")) { >> Random ran = new Random(); >> int worker = ran.nextInt(boltNo) + 1; >> List<Integer> l = _topo.getComponentTasks("worker" + worker); >> String out = tuple.getString(0) + "!"; >> _collector.emitDirect(l.get(0), new Values(out)); >> } >> else { >> LOG.info("Already processed record: " + tuple.getString(0)); >> } >> _collector.ack(tuple); >> } >> >> As far as the slowdown is concerned, the rate of data processing is >> constant from start to end. But when I increase the number of spouts/bolts, >> the system takes more time to process the same amount of records (500000 >> records to be exact). So, the throughput drops because the rate of data >> digestion is smaller. This is consistent as I increase the number of >> workers, I have tested till 20 spouts/ 20 bolts. >> >> I set the number of Kafka partitions exactly equal to the number of >> spouts each time to take advantage of the parallelism offered. >> >> I don't know if it helps, but I also tried a simpler topology, where >> bolts are not connected with each other (so it's basically a tree like >> topology) and I observed the same drop in throughput but not to the same >> extent. The increase in processing time is almost negligible. >> >> >> On 25/07/2015 07:08 μμ, Enno Shioji wrote: >> >> My first guess would be that there is something with the topology, like >> number of pending messages increasing abnormally due to tuples not being >> acked/too many tuples generated, JVM heap shortage caused on workers due to >> something being retained in bolts (like an ever growing HashMap) etc. >> >> Does the slowdown happen gradually, or is it immediately slow? >> >> Another random guess is that you don't have enough number of Kafka >> partitions, hence not using your new workers but I wouldn't expect this to >> make things significantly slower. >> >> >> >> On Sat, Jul 25, 2015 at 4:48 PM, Dimitris Sarlis <[email protected]> >> wrote: >> >>> Morgan, >>> >>> I hardly think that this could be the problem. The topology is deployed >>> over a 14 node cluster with 56 total cores and 96GB RAM. So when I jump >>> from 8 to 16 workers, I think I am still far below my hardware limitations. >>> >>> >>> On 25/07/2015 06:45 μμ, Morgan W09 wrote: >>> >>> It could be possible that you're reaching a hardware limitation. The >>> jump from 8 to 16 total bolt/workers could be more than you hardware can >>> handle efficiently. So it's starting to have to switch out processes and >>> their memory, which can have substantial overhead causing your program to >>> slow down. >>> >>> >>> >>> On Sat, Jul 25, 2015 at 10:36 AM, Dimitris Sarlis <[email protected]> >>> wrote: >>> >>>> Yes, it listens to its own output. For example, if I have two bolts >>>> (bolt1 and bolt2), I perform the following: >>>> >>>> bolt1.directGrouping("bolt1"); >>>> bolt1.directGrouping("bolt2"); >>>> bolt2.directGrouping("bolt1"); >>>> bolt2.directGrouping("bolt2"); >>>> >>>> I know that this could possibly lead to a cycle, but right now the >>>> bolts I'm trying to run perform the following: >>>> if the inputRecord doesn't contain a "!" { >>>> append a "!" >>>> emit to a random node >>>> } >>>> else { >>>> do nothing with the record >>>> } >>>> >>>> Dimitris >>>> >>>> >>>> On 25/07/2015 06:03 μμ, Enno Shioji wrote: >>>> >>>> > Each bolt is connected with itself as well as with each one of the >>>> other bolts >>>> You mean the bolt listens to its own output? >>>> >>>> >>>> >>>> >>>> >>>> On Sat, Jul 25, 2015 at 1:29 PM, Dimitris Sarlis <[email protected]> >>>> wrote: >>>> >>>>> Hi all, >>>>> >>>>> I'm trying to run a topology in Storm and I am facing some scalability >>>>> issues. Specifically, I have a topology where KafkaSpouts read from a >>>>> Kafka >>>>> queue and emit messages to bolts which are connected with each other >>>>> through directGrouping. (Each bolt is connected with itself as well as >>>>> with >>>>> each one of the other bolts). Spouts subscribe to bolts with >>>>> shuffleGrouping. I observe that when I increase the number of spouts and >>>>> bolts proportionally, I don't get the speedup I'm expecting to. In fact, >>>>> my >>>>> topology seems to run slower and for the same amount of data, it takes >>>>> more >>>>> time to complete. For example, when I increase spouts from 4->8 and bolts >>>>> from 4->8, it takes longer to process the same amount of kafka messages. >>>>> >>>>> Any ideas why this is happening? Thanks in advance. >>>>> >>>>> Best, >>>>> Dimitris Sarlis >>>>> >>>> >>>> >>>> >>> >>> >> >> >
