Oh and one last thing.. You may want to consider keeping a counter in the bolt and only log.INFO every 10K or 100K times or something, as writing to log files could become an artificial bottleneck.
On Sat, Jul 25, 2015 at 6:42 PM, Enno Shioji <[email protected]> wrote: > I see. > > One thing, you prob. want to make the else branch ("Already processed > record") heavy weight, too as that branch would be *extremely* light > weight. > > On Sat, Jul 25, 2015 at 6:33 PM, Dimitris Sarlis <[email protected]> > wrote: > >> I've already tried large queue sizes, specifically I've set it to 32768. >> >> I'll also try to make the tasks more heavy weight as you suggest and see >> how things go. >> >> >> On 25/07/2015 08:00 μμ, Enno Shioji wrote: >> >> Forgot to note; re: queue sizes, in addition to what the article mentions >> you may also need to increase the MAX_PENDING config. >> >> On Sat, Jul 25, 2015 at 5:58 PM, Enno Shioji <[email protected]> wrote: >> >>> I see.... >>> >>> A few suggestions: >>> >>> - Pump enough messages into Kafka that the entire test takes at least >>> more than a few minutes >>> - If the test is very short, results can often be misleading >>> >>> - Use sufficiently large queue sizes >>> - Generally the lighter the tasks are, the longer queue you need to >>> avoid worker starvation. Since your task looks very light weight, the >>> default queue size may not be sufficient. There are a few places to tune. >>> Have a look at this article: >>> http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/ >>> >>> - Make the task a bit more heavy weight >>> - Getting higher throughput through parallelising is generally hard >>> when the tasks are very light weight. If you make each tasks a bit more >>> heavy weight (e.g. by generating more random numbers per invocation), it >>> may be easier to observe the performance increase (this way you may also be >>> able to avoid the message buffer tuning thing mentioned above) >>> >>> - Try batched topologies >>> - Rationale is similar to above because batching will effectively >>> make tasks more "heavy weight". >>> >>> >>> >>> >>> >>> >>> >>> >>> On Sat, Jul 25, 2015 at 5:23 PM, Dimitris Sarlis <[email protected]> >>> wrote: >>> >>>> Inside my bolts I ack every tuple I receive for processing. >>>> Furthermore, I've set the TOPOLOGY_MAX_SPOUT_PENDING to 1000 to throttle >>>> how many tuples remain pending at a given moment. The bolts don't have any >>>> HashMap or other data structure. In fact, my code is pretty simple: >>>> >>>> public void execute(Tuple tuple) { >>>> if (!tuple.getString(0).contains("!")) { >>>> Random ran = new Random(); >>>> int worker = ran.nextInt(boltNo) + 1; >>>> List<Integer> l = _topo.getComponentTasks("worker" + >>>> worker); >>>> String out = tuple.getString(0) + "!"; >>>> _collector.emitDirect(l.get(0), new Values(out)); >>>> } >>>> else { >>>> LOG.info("Already processed record: " + tuple.getString(0)); >>>> } >>>> _collector.ack(tuple); >>>> } >>>> >>>> As far as the slowdown is concerned, the rate of data processing is >>>> constant from start to end. But when I increase the number of spouts/bolts, >>>> the system takes more time to process the same amount of records (500000 >>>> records to be exact). So, the throughput drops because the rate of data >>>> digestion is smaller. This is consistent as I increase the number of >>>> workers, I have tested till 20 spouts/ 20 bolts. >>>> >>>> I set the number of Kafka partitions exactly equal to the number of >>>> spouts each time to take advantage of the parallelism offered. >>>> >>>> I don't know if it helps, but I also tried a simpler topology, where >>>> bolts are not connected with each other (so it's basically a tree like >>>> topology) and I observed the same drop in throughput but not to the same >>>> extent. The increase in processing time is almost negligible. >>>> >>>> >>>> On 25/07/2015 07:08 μμ, Enno Shioji wrote: >>>> >>>> My first guess would be that there is something with the topology, >>>> like number of pending messages increasing abnormally due to tuples not >>>> being acked/too many tuples generated, JVM heap shortage caused on workers >>>> due to something being retained in bolts (like an ever growing HashMap) >>>> etc. >>>> >>>> Does the slowdown happen gradually, or is it immediately slow? >>>> >>>> Another random guess is that you don't have enough number of Kafka >>>> partitions, hence not using your new workers but I wouldn't expect this to >>>> make things significantly slower. >>>> >>>> >>>> >>>> On Sat, Jul 25, 2015 at 4:48 PM, Dimitris Sarlis <[email protected]> >>>> wrote: >>>> >>>>> Morgan, >>>>> >>>>> I hardly think that this could be the problem. The topology is >>>>> deployed over a 14 node cluster with 56 total cores and 96GB RAM. So when >>>>> I >>>>> jump from 8 to 16 workers, I think I am still far below my hardware >>>>> limitations. >>>>> >>>>> >>>>> On 25/07/2015 06:45 μμ, Morgan W09 wrote: >>>>> >>>>> It could be possible that you're reaching a hardware limitation. The >>>>> jump from 8 to 16 total bolt/workers could be more than you hardware can >>>>> handle efficiently. So it's starting to have to switch out processes and >>>>> their memory, which can have substantial overhead causing your program to >>>>> slow down. >>>>> >>>>> >>>>> >>>>> On Sat, Jul 25, 2015 at 10:36 AM, Dimitris Sarlis < >>>>> [email protected]> wrote: >>>>> >>>>>> Yes, it listens to its own output. For example, if I have two bolts >>>>>> (bolt1 and bolt2), I perform the following: >>>>>> >>>>>> bolt1.directGrouping("bolt1"); >>>>>> bolt1.directGrouping("bolt2"); >>>>>> bolt2.directGrouping("bolt1"); >>>>>> bolt2.directGrouping("bolt2"); >>>>>> >>>>>> I know that this could possibly lead to a cycle, but right now the >>>>>> bolts I'm trying to run perform the following: >>>>>> if the inputRecord doesn't contain a "!" { >>>>>> append a "!" >>>>>> emit to a random node >>>>>> } >>>>>> else { >>>>>> do nothing with the record >>>>>> } >>>>>> >>>>>> Dimitris >>>>>> >>>>>> >>>>>> On 25/07/2015 06:03 μμ, Enno Shioji wrote: >>>>>> >>>>>> > Each bolt is connected with itself as well as with each one of the >>>>>> other bolts >>>>>> You mean the bolt listens to its own output? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Sat, Jul 25, 2015 at 1:29 PM, Dimitris Sarlis < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> I'm trying to run a topology in Storm and I am facing some >>>>>>> scalability issues. Specifically, I have a topology where KafkaSpouts >>>>>>> read >>>>>>> from a Kafka queue and emit messages to bolts which are connected with >>>>>>> each >>>>>>> other through directGrouping. (Each bolt is connected with itself as >>>>>>> well >>>>>>> as with each one of the other bolts). Spouts subscribe to bolts with >>>>>>> shuffleGrouping. I observe that when I increase the number of spouts and >>>>>>> bolts proportionally, I don't get the speedup I'm expecting to. In >>>>>>> fact, my >>>>>>> topology seems to run slower and for the same amount of data, it takes >>>>>>> more >>>>>>> time to complete. For example, when I increase spouts from 4->8 and >>>>>>> bolts >>>>>>> from 4->8, it takes longer to process the same amount of kafka messages. >>>>>>> >>>>>>> Any ideas why this is happening? Thanks in advance. >>>>>>> >>>>>>> Best, >>>>>>> Dimitris Sarlis >>>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>>> >>> >> >> >
