Dimitris, I have a suggestion you could experiment by changing your topology. 
Start with just the spout and first layer of bolts and observe if you get the 
expected performance from it from a higher parallelism. Keep adding bolts until 
you find the bottleneck.
Do  you have  any database calls or any other external calls?



> On Jul 26, 2015, at 7:09 AM, Dimitris Sarlis <[email protected]> wrote:
> 
> @Enno:
> I tried to increase the number of Kafka servers but it didn't help.
> 
> My current experiments are executed with disabled log messages, so this does 
> not affect the performance. Even if I wrote to logs, as you said it shouldn't 
> matter since I'm also increasing the number of machines in the cluster.
> 
> I'll try to find out if any resource contention is a factor.
> 
> I've also checked that my tasks actually perform the heavy weight work I give 
> them. Specifically, as I put more weight I see that bolt capacity as well as 
> execute, process latency also increase, so the tasks are actually performing 
> the heavy work. I don't think there is     any JVM code elimination, as you 
> suggested.
> 
> The logs for the nimbus and supervisors seem normal, I didn't came across any 
> errors or exceptions.
> 
> @Srinath:
> I tested what you suggested but I get similar numbers as before.
> 
> 
> 
>> On 26/07/2015 02:45 μμ, Srinath C wrote:
>> Dimitris,
>> 
>> If I understand correctly, the bolt does nothing if it receives a tuple it 
>> has processed. So instead of randomly emitting the tuple, you could as well 
>> make sure that the bolt does not emit the tuple to itself. You can assert 
>> this by comparing the randomly selected component ID with its own ID which 
>> you can extract in the prepare() call from storm context.
>> 
>> This way you will considerably reduce the number of tuples that flow through 
>> the system even as you increase the workers.
>> Let me know if this works/fares better in your tests.
>> 
>> 
>> On Sun, Jul 26, 2015 at 4:51 PM, Enno Shioji <[email protected]> wrote:
>>> Hmmmm
>>> 
>>> The number looks quite similar, so I wonder if there is a common 
>>> bottleneck. For example, it could be that your Kafka servers are the 
>>> limiting factor (although I'd doubt this, at least if the Storm task's take 
>>> sufficient time as then comparatively the load on Kafka will be smaller).
>>> 
>>> Another place to look would be the output (in your case log file?). But if 
>>> you are increasing the number of machines in the Storm cluster, that 
>>> doesn't explain it either as you'll have more disks.
>>> 
>>> Resource contention can also be a factor; e.g. if you are using VMs and the 
>>> physical machines have already tons of other VMs allocated, or something 
>>> wrong with the network etc. then the results may be confusing.
>>> 
>>> One thing to look out for when you fake a heavy weight task is JVM's dead 
>>> code elimination. Depending on how you do it, the compiler can optimise the 
>>> entire work away. It may be worth to see if your tasks are actually taking 
>>> significant time to solve. A common trick to avoid this is to have this in 
>>> your code:
>>> 
>>> if (result_of_fake_work == very_unlikely):
>>>     System.out.println("")
>>> 
>>> If the task is taking something like 1 sec that would definitely be long 
>>> enough.
>>> 
>>> 
>>> Otherwise I'd be examining the logs / console to see if there's some 
>>> problem with the cluster. I've seen cases where supervisors report into 
>>> nimbus but can't communicate to each other, in effect creating "dud" 
>>> workers that makes the topology work very slow.
>>> 
>>> 
>>> 
>>> 
>>>> On Sun, Jul 26, 2015 at 11:16 AM, Dimitris Sarlis <[email protected]> 
>>>> wrote:
>>>> I tried increasing the number of Kafka messages and made the tasks more 
>>>> heavy weight. As a result the total experiment took around 13-14 minutes 
>>>> to complete. I also increased the number of bolts not by creating 
>>>> different ones but by changing the parallelism hint (e.g. 16 bolts vs 4 
>>>> bolts with 4 parallelism hint). That helped a bit and at least increasing 
>>>> the number of total workers is not slower. But still the system's 
>>>> scalability is not good. For example, with 4 spouts/ 4 bolts I get ~14min, 
>>>> 8/8 give ~14min, 16/16 give ~12min, 20/20 give ~13min.
>>>> 
>>>> Any ideas what else I could check and experiment with? 
>>>> 
>>>>> On 25/07/2015 08:46 μμ, Enno Shioji wrote:
>>>>> Oh and one last thing.. You may want to consider keeping a counter in the 
>>>>> bolt and only log.INFO every 10K or 100K times or something, as writing 
>>>>> to log files could become an artificial bottleneck.
>>>>> 
>>>>>> On Sat, Jul 25, 2015 at 6:42 PM, Enno Shioji <[email protected]> wrote:
>>>>>> I see.
>>>>>> 
>>>>>> One thing, you prob. want to make the else branch ("Already processed 
>>>>>> record") heavy weight, too as that branch would be *extremely* light 
>>>>>> weight. 
>>>>>> 
>>>>>> On Sat, Jul 25, 2015 at 6:33                                             
>>>>>>   PM, Dimitris Sarlis <[email protected]> wrote:
>>>>>>> I've already tried large queue sizes, specifically I've set it to 32768.
>>>>>>> 
>>>>>>> I'll also try to make the tasks more heavy weight as you suggest and 
>>>>>>> see how things go.
>>>>>>> 
>>>>>>> 
>>>>>>>> On 25/07/2015 08:00 μμ, Enno Shioji wrote:
>>>>>>>> Forgot to note; re: queue sizes, in addition to what the article 
>>>>>>>> mentions you may also need to increase the MAX_PENDING config.
>>>>>>>> 
>>>>>>>> On Sat, Jul 25, 2015 at 5:58 PM, Enno Shioji <[email protected]> wrote:
>>>>>>>>> I see....
>>>>>>>>> 
>>>>>>>>> A few suggestions:
>>>>>>>>> 
>>>>>>>>>  - Pump enough messages into Kafka that the entire test takes at 
>>>>>>>>> least more than a few minutes
>>>>>>>>>     - If the test is very short, results can often be misleading
>>>>>>>>> 
>>>>>>>>>  - Use sufficiently large queue sizes
>>>>>>>>>     - Generally the lighter the tasks are, the longer queue you need 
>>>>>>>>> to avoid worker starvation. Since your task looks very light weight, 
>>>>>>>>> the default queue size may not be sufficient. There are a few places 
>>>>>>>>> to tune. Have a look at this article: 
>>>>>>>>> http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/
>>>>>>>>> 
>>>>>>>>>  - Make the task a bit more heavy weight
>>>>>>>>>     - Getting higher throughput through parallelising is generally    
>>>>>>>>>                                                        hard when the 
>>>>>>>>> tasks are very light weight. If you make each tasks a bit more heavy 
>>>>>>>>> weight (e.g. by generating more random numbers per invocation), it 
>>>>>>>>> may be easier to observe the performance increase (this way you may 
>>>>>>>>> also be able to avoid the message buffer tuning thing mentioned above)
>>>>>>>>> 
>>>>>>>>>  - Try batched topologies
>>>>>>>>>     - Rationale is similar to above because batching will effectively 
>>>>>>>>>                                                           make tasks 
>>>>>>>>> more "heavy weight".
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> On Sat, Jul 25, 2015 at 5:23 PM, Dimitris Sarlis 
>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>> Inside my bolts I ack every tuple I receive for processing. 
>>>>>>>>>> Furthermore, I've set the TOPOLOGY_MAX_SPOUT_PENDING to 1000 to 
>>>>>>>>>> throttle how many tuples remain pending at a given moment. The bolts 
>>>>>>>>>> don't have any HashMap or other data structure. In fact, my code is 
>>>>>>>>>> pretty simple:
>>>>>>>>>> 
>>>>>>>>>> public void execute(Tuple tuple) {
>>>>>>>>>>         if (!tuple.getString(0).contains("!")) {
>>>>>>>>>>             Random ran =                                             
>>>>>>>>>>               new Random();
>>>>>>>>>>             int worker = ran.nextInt(boltNo) + 1;
>>>>>>>>>>             List<Integer> l = _topo.getComponentTasks("worker" + 
>>>>>>>>>> worker);
>>>>>>>>>>             String out = tuple.getString(0) + "!";
>>>>>>>>>>             _collector.emitDirect(l.get(0), new Values(out));
>>>>>>>>>>         }
>>>>>>>>>>         else {
>>>>>>>>>>             LOG.info("Already processed record: " +                  
>>>>>>>>>>                                          tuple.getString(0));
>>>>>>>>>>         }
>>>>>>>>>>         _collector.ack(tuple);
>>>>>>>>>>     }
>>>>>>>>>> 
>>>>>>>>>> As far as the slowdown is concerned, the rate of data processing is 
>>>>>>>>>> constant from start to end. But when I increase the number of 
>>>>>>>>>> spouts/bolts, the system takes more time to process the same amount 
>>>>>>>>>> of records (500000 records to be exact). So, the throughput drops 
>>>>>>>>>> because the rate of data digestion is smaller. This is consistent as 
>>>>>>>>>> I increase the number of workers, I have tested till 20 spouts/ 20 
>>>>>>>>>> bolts.
>>>>>>>>>> 
>>>>>>>>>> I set the number of Kafka partitions exactly equal to the number of 
>>>>>>>>>> spouts each time to take advantage of the parallelism offered. 
>>>>>>>>>> 
>>>>>>>>>> I don't know if it helps, but I also tried a simpler topology, where 
>>>>>>>>>> bolts are not connected with each other (so it's basically           
>>>>>>>>>>                                                 a tree like 
>>>>>>>>>> topology) and I observed the same drop in throughput but not to the 
>>>>>>>>>> same extent. The increase in processing time is almost negligible.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>>> On 25/07/2015 07:08 μμ, Enno Shioji wrote:
>>>>>>>>>>> My first guess would be that there is something with the topology, 
>>>>>>>>>>> like number of pending messages increasing abnormally due to tuples 
>>>>>>>>>>> not being acked/too many tuples generated, JVM heap shortage caused 
>>>>>>>>>>> on workers due to something being retained in bolts (like an ever 
>>>>>>>>>>> growing HashMap) etc.
>>>>>>>>>>> 
>>>>>>>>>>> Does the slowdown happen gradually, or                              
>>>>>>>>>>>                              is it immediately slow?
>>>>>>>>>>> 
>>>>>>>>>>> Another random guess is that you don't have enough number of Kafka 
>>>>>>>>>>> partitions, hence not                                               
>>>>>>>>>>>             using your new workers but I wouldn't expect this to 
>>>>>>>>>>> make things significantly slower. 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>> On Sat, Jul 25, 2015 at 4:48 PM, Dimitris Sarlis 
>>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>> Morgan,
>>>>>>>>>>>> 
>>>>>>>>>>>> I hardly think that this could be the problem. The topology is 
>>>>>>>>>>>> deployed over a 14 node cluster with 56 total cores                
>>>>>>>>>>>>                                            and 96GB RAM. So when I 
>>>>>>>>>>>> jump from 8 to 16 workers, I think I am still far below my 
>>>>>>>>>>>> hardware limitations.
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>>> On 25/07/2015 06:45 μμ, Morgan W09 wrote:
>>>>>>>>>>>>> It could be possible that you're reaching a                       
>>>>>>>>>>>>>                                     hardware limitation. The jump 
>>>>>>>>>>>>> from 8 to 16 total bolt/workers could be more than you hardware 
>>>>>>>>>>>>> can handle efficiently.                                           
>>>>>>>>>>>>>                 So it's starting to                               
>>>>>>>>>>>>>                             have to switch out processes and 
>>>>>>>>>>>>> their memory, which can have substantial overhead causing your 
>>>>>>>>>>>>> program to slow down. 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Sat, Jul 25, 2015 at 10:36 AM, Dimitris Sarlis 
>>>>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>>>> Yes, it listens to its own output. For example, if I have two 
>>>>>>>>>>>>>> bolts (bolt1 and bolt2), I perform the following:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> bolt1.directGrouping("bolt1");
>>>>>>>>>>>>>> bolt1.directGrouping("bolt2");
>>>>>>>>>>>>>> bolt2.directGrouping("bolt1");
>>>>>>>>>>>>>> bolt2.directGrouping("bolt2");
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I know that this could possibly lead to a cycle, but right now 
>>>>>>>>>>>>>> the bolts I'm trying to run perform the following:
>>>>>>>>>>>>>> if the inputRecord doesn't contain a "!" {
>>>>>>>>>>>>>>     append a "!"
>>>>>>>>>>>>>>     emit to a random node
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>> else {
>>>>>>>>>>>>>>     do nothing with the record
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Dimitris
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On 25/07/2015 06:03 μμ, Enno Shioji wrote:
>>>>>>>>>>>>>>> > Each bolt is connected with itself as well as with each one 
>>>>>>>>>>>>>>> > of the other bolts
>>>>>>>>>>>>>>> You mean the bolt                                               
>>>>>>>>>>>>>>>             listens to its own output?
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Sat, Jul 25, 2015 at 1:29 PM, Dimitris Sarlis 
>>>>>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I'm trying to run a topology in Storm and I am facing some 
>>>>>>>>>>>>>>>> scalability issues. Specifically, I have a topology where 
>>>>>>>>>>>>>>>> KafkaSpouts read from a Kafka queue and emit messages to bolts 
>>>>>>>>>>>>>>>> which are connected with each other through directGrouping. 
>>>>>>>>>>>>>>>> (Each bolt is connected with itself as well as with each one 
>>>>>>>>>>>>>>>> of the other bolts). Spouts subscribe to                       
>>>>>>>>>>>>>>>>                                     bolts with 
>>>>>>>>>>>>>>>> shuffleGrouping. I observe that                                
>>>>>>>>>>>>>>>>                            when I increase the                 
>>>>>>>>>>>>>>>>                                           number of spouts and 
>>>>>>>>>>>>>>>> bolts proportionally, I don't get                              
>>>>>>>>>>>>>>>>                              the speedup                       
>>>>>>>>>>>>>>>>                                     I'm expecting to. In fact, 
>>>>>>>>>>>>>>>> my topology                                                    
>>>>>>>>>>>>>>>>        seems to run slower and for the same amount of data, it 
>>>>>>>>>>>>>>>> takes more time to complete. For example, when I increase 
>>>>>>>>>>>>>>>> spouts from 4->8 and bolts from 4->8, it takes longer          
>>>>>>>>>>>>>>>>                                                  to process 
>>>>>>>>>>>>>>>> the same amount of kafka messages.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Any ideas why this is happening? Thanks in advance.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> Dimitris Sarlis
> 

Reply via email to