Forgot to note; re: queue sizes, in addition to what the article mentions
you may also need to increase the MAX_PENDING config.

On Sat, Jul 25, 2015 at 5:58 PM, Enno Shioji <[email protected]> wrote:

> I see....
>
> A few suggestions:
>
>  - Pump enough messages into Kafka that the entire test takes at least
> more than a few minutes
>     - If the test is very short, results can often be misleading
>
>  - Use sufficiently large queue sizes
>     - Generally the lighter the tasks are, the longer queue you need to
> avoid worker starvation. Since your task looks very light weight, the
> default queue size may not be sufficient. There are a few places to tune.
> Have a look at this article:
> http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/
>
>  - Make the task a bit more heavy weight
>     - Getting higher throughput through parallelising is generally hard
> when the tasks are very light weight. If you make each tasks a bit more
> heavy weight (e.g. by generating more random numbers per invocation), it
> may be easier to observe the performance increase (this way you may also be
> able to avoid the message buffer tuning thing mentioned above)
>
>  - Try batched topologies
>     - Rationale is similar to above because batching will effectively make
> tasks more "heavy weight".
>
>
>
>
>
>
>
>
> On Sat, Jul 25, 2015 at 5:23 PM, Dimitris Sarlis <[email protected]>
> wrote:
>
>>  Inside my bolts I ack every tuple I receive for processing. Furthermore,
>> I've set the TOPOLOGY_MAX_SPOUT_PENDING to 1000 to throttle how many tuples
>> remain pending at a given moment. The bolts don't have any HashMap or other
>> data structure. In fact, my code is pretty simple:
>>
>> public void execute(Tuple tuple) {
>>         if (!tuple.getString(0).contains("!")) {
>>             Random ran = new Random();
>>             int worker = ran.nextInt(boltNo) + 1;
>>             List<Integer> l = _topo.getComponentTasks("worker" + worker);
>>             String out = tuple.getString(0) + "!";
>>             _collector.emitDirect(l.get(0), new Values(out));
>>         }
>>         else {
>>             LOG.info("Already processed record: " + tuple.getString(0));
>>         }
>>         _collector.ack(tuple);
>>     }
>>
>> As far as the slowdown is concerned, the rate of data processing is
>> constant from start to end. But when I increase the number of spouts/bolts,
>> the system takes more time to process the same amount of records (500000
>> records to be exact). So, the throughput drops because the rate of data
>> digestion is smaller. This is consistent as I increase the number of
>> workers, I have tested till 20 spouts/ 20 bolts.
>>
>> I set the number of Kafka partitions exactly equal to the number of
>> spouts each time to take advantage of the parallelism offered.
>>
>> I don't know if it helps, but I also tried a simpler topology, where
>> bolts are not connected with each other (so it's basically a tree like
>> topology) and I observed the same drop in throughput but not to the same
>> extent. The increase in processing time is almost negligible.
>>
>>
>> On 25/07/2015 07:08 μμ, Enno Shioji wrote:
>>
>>  My first guess would be that there is something with the topology, like
>> number of pending messages increasing abnormally due to tuples not being
>> acked/too many tuples generated, JVM heap shortage caused on workers due to
>> something being retained in bolts (like an ever growing HashMap) etc.
>>
>>  Does the slowdown happen gradually, or is it immediately slow?
>>
>>  Another random guess is that you don't have enough number of Kafka
>> partitions, hence not using your new workers but I wouldn't expect this to
>> make things significantly slower.
>>
>>
>>
>> On Sat, Jul 25, 2015 at 4:48 PM, Dimitris Sarlis <[email protected]>
>> wrote:
>>
>>>  Morgan,
>>>
>>> I hardly think that this could be the problem. The topology is deployed
>>> over a 14 node cluster with 56 total cores and 96GB RAM. So when I jump
>>> from 8 to 16 workers, I think I am still far below my hardware limitations.
>>>
>>>
>>> On 25/07/2015 06:45 μμ, Morgan W09 wrote:
>>>
>>> It could be possible that you're reaching a hardware limitation. The
>>> jump from 8 to 16 total bolt/workers could be more than you hardware can
>>> handle efficiently. So it's starting to have to switch out processes and
>>> their memory, which can have substantial overhead causing your program to
>>> slow down.
>>>
>>>
>>>
>>> On Sat, Jul 25, 2015 at 10:36 AM, Dimitris Sarlis <[email protected]>
>>> wrote:
>>>
>>>>  Yes, it listens to its own output. For example, if I have two bolts
>>>> (bolt1 and bolt2), I perform the following:
>>>>
>>>> bolt1.directGrouping("bolt1");
>>>> bolt1.directGrouping("bolt2");
>>>> bolt2.directGrouping("bolt1");
>>>> bolt2.directGrouping("bolt2");
>>>>
>>>> I know that this could possibly lead to a cycle, but right now the
>>>> bolts I'm trying to run perform the following:
>>>> if the inputRecord doesn't contain a "!" {
>>>>     append a "!"
>>>>     emit to a random node
>>>> }
>>>> else {
>>>>     do nothing with the record
>>>> }
>>>>
>>>> Dimitris
>>>>
>>>>
>>>> On 25/07/2015 06:03 μμ, Enno Shioji wrote:
>>>>
>>>> > Each bolt is connected with itself as well as with each one of the
>>>> other bolts
>>>> You mean the bolt listens to its own output?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, Jul 25, 2015 at 1:29 PM, Dimitris Sarlis <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I'm trying to run a topology in Storm and I am facing some scalability
>>>>> issues. Specifically, I have a topology where KafkaSpouts read from a 
>>>>> Kafka
>>>>> queue and emit messages to bolts which are connected with each other
>>>>> through directGrouping. (Each bolt is connected with itself as well as 
>>>>> with
>>>>> each one of the other bolts). Spouts subscribe to bolts with
>>>>> shuffleGrouping. I observe that when I increase the number of spouts and
>>>>> bolts proportionally, I don't get the speedup I'm expecting to. In fact, 
>>>>> my
>>>>> topology seems to run slower and for the same amount of data, it takes 
>>>>> more
>>>>> time to complete. For example, when I increase spouts from 4->8 and bolts
>>>>> from 4->8, it takes longer to process the same amount of kafka messages.
>>>>>
>>>>> Any ideas why this is happening? Thanks in advance.
>>>>>
>>>>> Best,
>>>>> Dimitris Sarlis
>>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>

Reply via email to