Dimitris, I have a suggestion you could experiment by changing your topology. Start with just the spout and first layer of bolts and observe if you get the expected performance from it from a higher parallelism. Keep adding bolts until you find the bottleneck. Do you have any database calls or any other external calls?
> On Jul 26, 2015, at 7:09 AM, Dimitris Sarlis <[email protected]> wrote: > > @Enno: > I tried to increase the number of Kafka servers but it didn't help. > > My current experiments are executed with disabled log messages, so this does > not affect the performance. Even if I wrote to logs, as you said it shouldn't > matter since I'm also increasing the number of machines in the cluster. > > I'll try to find out if any resource contention is a factor. > > I've also checked that my tasks actually perform the heavy weight work I give > them. Specifically, as I put more weight I see that bolt capacity as well as > execute, process latency also increase, so the tasks are actually performing > the heavy work. I don't think there is any JVM code elimination, as you > suggested. > > The logs for the nimbus and supervisors seem normal, I didn't came across any > errors or exceptions. > > @Srinath: > I tested what you suggested but I get similar numbers as before. > > > >> On 26/07/2015 02:45 μμ, Srinath C wrote: >> Dimitris, >> >> If I understand correctly, the bolt does nothing if it receives a tuple it >> has processed. So instead of randomly emitting the tuple, you could as well >> make sure that the bolt does not emit the tuple to itself. You can assert >> this by comparing the randomly selected component ID with its own ID which >> you can extract in the prepare() call from storm context. >> >> This way you will considerably reduce the number of tuples that flow through >> the system even as you increase the workers. >> Let me know if this works/fares better in your tests. >> >> >> On Sun, Jul 26, 2015 at 4:51 PM, Enno Shioji <[email protected]> wrote: >>> Hmmmm >>> >>> The number looks quite similar, so I wonder if there is a common >>> bottleneck. For example, it could be that your Kafka servers are the >>> limiting factor (although I'd doubt this, at least if the Storm task's take >>> sufficient time as then comparatively the load on Kafka will be smaller). >>> >>> Another place to look would be the output (in your case log file?). But if >>> you are increasing the number of machines in the Storm cluster, that >>> doesn't explain it either as you'll have more disks. >>> >>> Resource contention can also be a factor; e.g. if you are using VMs and the >>> physical machines have already tons of other VMs allocated, or something >>> wrong with the network etc. then the results may be confusing. >>> >>> One thing to look out for when you fake a heavy weight task is JVM's dead >>> code elimination. Depending on how you do it, the compiler can optimise the >>> entire work away. It may be worth to see if your tasks are actually taking >>> significant time to solve. A common trick to avoid this is to have this in >>> your code: >>> >>> if (result_of_fake_work == very_unlikely): >>> System.out.println("") >>> >>> If the task is taking something like 1 sec that would definitely be long >>> enough. >>> >>> >>> Otherwise I'd be examining the logs / console to see if there's some >>> problem with the cluster. I've seen cases where supervisors report into >>> nimbus but can't communicate to each other, in effect creating "dud" >>> workers that makes the topology work very slow. >>> >>> >>> >>> >>>> On Sun, Jul 26, 2015 at 11:16 AM, Dimitris Sarlis <[email protected]> >>>> wrote: >>>> I tried increasing the number of Kafka messages and made the tasks more >>>> heavy weight. As a result the total experiment took around 13-14 minutes >>>> to complete. I also increased the number of bolts not by creating >>>> different ones but by changing the parallelism hint (e.g. 16 bolts vs 4 >>>> bolts with 4 parallelism hint). That helped a bit and at least increasing >>>> the number of total workers is not slower. But still the system's >>>> scalability is not good. For example, with 4 spouts/ 4 bolts I get ~14min, >>>> 8/8 give ~14min, 16/16 give ~12min, 20/20 give ~13min. >>>> >>>> Any ideas what else I could check and experiment with? >>>> >>>>> On 25/07/2015 08:46 μμ, Enno Shioji wrote: >>>>> Oh and one last thing.. You may want to consider keeping a counter in the >>>>> bolt and only log.INFO every 10K or 100K times or something, as writing >>>>> to log files could become an artificial bottleneck. >>>>> >>>>>> On Sat, Jul 25, 2015 at 6:42 PM, Enno Shioji <[email protected]> wrote: >>>>>> I see. >>>>>> >>>>>> One thing, you prob. want to make the else branch ("Already processed >>>>>> record") heavy weight, too as that branch would be *extremely* light >>>>>> weight. >>>>>> >>>>>> On Sat, Jul 25, 2015 at 6:33 >>>>>> PM, Dimitris Sarlis <[email protected]> wrote: >>>>>>> I've already tried large queue sizes, specifically I've set it to 32768. >>>>>>> >>>>>>> I'll also try to make the tasks more heavy weight as you suggest and >>>>>>> see how things go. >>>>>>> >>>>>>> >>>>>>>> On 25/07/2015 08:00 μμ, Enno Shioji wrote: >>>>>>>> Forgot to note; re: queue sizes, in addition to what the article >>>>>>>> mentions you may also need to increase the MAX_PENDING config. >>>>>>>> >>>>>>>> On Sat, Jul 25, 2015 at 5:58 PM, Enno Shioji <[email protected]> wrote: >>>>>>>>> I see.... >>>>>>>>> >>>>>>>>> A few suggestions: >>>>>>>>> >>>>>>>>> - Pump enough messages into Kafka that the entire test takes at >>>>>>>>> least more than a few minutes >>>>>>>>> - If the test is very short, results can often be misleading >>>>>>>>> >>>>>>>>> - Use sufficiently large queue sizes >>>>>>>>> - Generally the lighter the tasks are, the longer queue you need >>>>>>>>> to avoid worker starvation. Since your task looks very light weight, >>>>>>>>> the default queue size may not be sufficient. There are a few places >>>>>>>>> to tune. Have a look at this article: >>>>>>>>> http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/ >>>>>>>>> >>>>>>>>> - Make the task a bit more heavy weight >>>>>>>>> - Getting higher throughput through parallelising is generally >>>>>>>>> hard when the >>>>>>>>> tasks are very light weight. If you make each tasks a bit more heavy >>>>>>>>> weight (e.g. by generating more random numbers per invocation), it >>>>>>>>> may be easier to observe the performance increase (this way you may >>>>>>>>> also be able to avoid the message buffer tuning thing mentioned above) >>>>>>>>> >>>>>>>>> - Try batched topologies >>>>>>>>> - Rationale is similar to above because batching will effectively >>>>>>>>> make tasks >>>>>>>>> more "heavy weight". >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>> On Sat, Jul 25, 2015 at 5:23 PM, Dimitris Sarlis >>>>>>>>>> <[email protected]> wrote: >>>>>>>>>> Inside my bolts I ack every tuple I receive for processing. >>>>>>>>>> Furthermore, I've set the TOPOLOGY_MAX_SPOUT_PENDING to 1000 to >>>>>>>>>> throttle how many tuples remain pending at a given moment. The bolts >>>>>>>>>> don't have any HashMap or other data structure. In fact, my code is >>>>>>>>>> pretty simple: >>>>>>>>>> >>>>>>>>>> public void execute(Tuple tuple) { >>>>>>>>>> if (!tuple.getString(0).contains("!")) { >>>>>>>>>> Random ran = >>>>>>>>>> new Random(); >>>>>>>>>> int worker = ran.nextInt(boltNo) + 1; >>>>>>>>>> List<Integer> l = _topo.getComponentTasks("worker" + >>>>>>>>>> worker); >>>>>>>>>> String out = tuple.getString(0) + "!"; >>>>>>>>>> _collector.emitDirect(l.get(0), new Values(out)); >>>>>>>>>> } >>>>>>>>>> else { >>>>>>>>>> LOG.info("Already processed record: " + >>>>>>>>>> tuple.getString(0)); >>>>>>>>>> } >>>>>>>>>> _collector.ack(tuple); >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> As far as the slowdown is concerned, the rate of data processing is >>>>>>>>>> constant from start to end. But when I increase the number of >>>>>>>>>> spouts/bolts, the system takes more time to process the same amount >>>>>>>>>> of records (500000 records to be exact). So, the throughput drops >>>>>>>>>> because the rate of data digestion is smaller. This is consistent as >>>>>>>>>> I increase the number of workers, I have tested till 20 spouts/ 20 >>>>>>>>>> bolts. >>>>>>>>>> >>>>>>>>>> I set the number of Kafka partitions exactly equal to the number of >>>>>>>>>> spouts each time to take advantage of the parallelism offered. >>>>>>>>>> >>>>>>>>>> I don't know if it helps, but I also tried a simpler topology, where >>>>>>>>>> bolts are not connected with each other (so it's basically >>>>>>>>>> a tree like >>>>>>>>>> topology) and I observed the same drop in throughput but not to the >>>>>>>>>> same extent. The increase in processing time is almost negligible. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> On 25/07/2015 07:08 μμ, Enno Shioji wrote: >>>>>>>>>>> My first guess would be that there is something with the topology, >>>>>>>>>>> like number of pending messages increasing abnormally due to tuples >>>>>>>>>>> not being acked/too many tuples generated, JVM heap shortage caused >>>>>>>>>>> on workers due to something being retained in bolts (like an ever >>>>>>>>>>> growing HashMap) etc. >>>>>>>>>>> >>>>>>>>>>> Does the slowdown happen gradually, or >>>>>>>>>>> is it immediately slow? >>>>>>>>>>> >>>>>>>>>>> Another random guess is that you don't have enough number of Kafka >>>>>>>>>>> partitions, hence not >>>>>>>>>>> using your new workers but I wouldn't expect this to >>>>>>>>>>> make things significantly slower. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> On Sat, Jul 25, 2015 at 4:48 PM, Dimitris Sarlis >>>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>>> Morgan, >>>>>>>>>>>> >>>>>>>>>>>> I hardly think that this could be the problem. The topology is >>>>>>>>>>>> deployed over a 14 node cluster with 56 total cores >>>>>>>>>>>> and 96GB RAM. So when I >>>>>>>>>>>> jump from 8 to 16 workers, I think I am still far below my >>>>>>>>>>>> hardware limitations. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> On 25/07/2015 06:45 μμ, Morgan W09 wrote: >>>>>>>>>>>>> It could be possible that you're reaching a >>>>>>>>>>>>> hardware limitation. The jump >>>>>>>>>>>>> from 8 to 16 total bolt/workers could be more than you hardware >>>>>>>>>>>>> can handle efficiently. >>>>>>>>>>>>> So it's starting to >>>>>>>>>>>>> have to switch out processes and >>>>>>>>>>>>> their memory, which can have substantial overhead causing your >>>>>>>>>>>>> program to slow down. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> On Sat, Jul 25, 2015 at 10:36 AM, Dimitris Sarlis >>>>>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>>>>> Yes, it listens to its own output. For example, if I have two >>>>>>>>>>>>>> bolts (bolt1 and bolt2), I perform the following: >>>>>>>>>>>>>> >>>>>>>>>>>>>> bolt1.directGrouping("bolt1"); >>>>>>>>>>>>>> bolt1.directGrouping("bolt2"); >>>>>>>>>>>>>> bolt2.directGrouping("bolt1"); >>>>>>>>>>>>>> bolt2.directGrouping("bolt2"); >>>>>>>>>>>>>> >>>>>>>>>>>>>> I know that this could possibly lead to a cycle, but right now >>>>>>>>>>>>>> the bolts I'm trying to run perform the following: >>>>>>>>>>>>>> if the inputRecord doesn't contain a "!" { >>>>>>>>>>>>>> append a "!" >>>>>>>>>>>>>> emit to a random node >>>>>>>>>>>>>> } >>>>>>>>>>>>>> else { >>>>>>>>>>>>>> do nothing with the record >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> Dimitris >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 25/07/2015 06:03 μμ, Enno Shioji wrote: >>>>>>>>>>>>>>> > Each bolt is connected with itself as well as with each one >>>>>>>>>>>>>>> > of the other bolts >>>>>>>>>>>>>>> You mean the bolt >>>>>>>>>>>>>>> listens to its own output? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sat, Jul 25, 2015 at 1:29 PM, Dimitris Sarlis >>>>>>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I'm trying to run a topology in Storm and I am facing some >>>>>>>>>>>>>>>> scalability issues. Specifically, I have a topology where >>>>>>>>>>>>>>>> KafkaSpouts read from a Kafka queue and emit messages to bolts >>>>>>>>>>>>>>>> which are connected with each other through directGrouping. >>>>>>>>>>>>>>>> (Each bolt is connected with itself as well as with each one >>>>>>>>>>>>>>>> of the other bolts). Spouts subscribe to >>>>>>>>>>>>>>>> bolts with >>>>>>>>>>>>>>>> shuffleGrouping. I observe that >>>>>>>>>>>>>>>> when I increase the >>>>>>>>>>>>>>>> number of spouts and >>>>>>>>>>>>>>>> bolts proportionally, I don't get >>>>>>>>>>>>>>>> the speedup >>>>>>>>>>>>>>>> I'm expecting to. In fact, >>>>>>>>>>>>>>>> my topology >>>>>>>>>>>>>>>> seems to run slower and for the same amount of data, it >>>>>>>>>>>>>>>> takes more time to complete. For example, when I increase >>>>>>>>>>>>>>>> spouts from 4->8 and bolts from 4->8, it takes longer >>>>>>>>>>>>>>>> to process >>>>>>>>>>>>>>>> the same amount of kafka messages. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Any ideas why this is happening? Thanks in advance. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>> Dimitris Sarlis >
