Oh and one last thing.. You may want to consider keeping a counter in the
bolt and only log.INFO every 10K or 100K times or something, as writing to
log files could become an artificial bottleneck.

On Sat, Jul 25, 2015 at 6:42 PM, Enno Shioji <[email protected]> wrote:

> I see.
>
> One thing, you prob. want to make the else branch ("Already processed
> record") heavy weight, too as that branch would be *extremely* light
> weight.
>
> On Sat, Jul 25, 2015 at 6:33 PM, Dimitris Sarlis <[email protected]>
> wrote:
>
>>  I've already tried large queue sizes, specifically I've set it to 32768.
>>
>> I'll also try to make the tasks more heavy weight as you suggest and see
>> how things go.
>>
>>
>> On 25/07/2015 08:00 μμ, Enno Shioji wrote:
>>
>> Forgot to note; re: queue sizes, in addition to what the article mentions
>> you may also need to increase the MAX_PENDING config.
>>
>> On Sat, Jul 25, 2015 at 5:58 PM, Enno Shioji <[email protected]> wrote:
>>
>>> I see....
>>>
>>>  A few suggestions:
>>>
>>>   - Pump enough messages into Kafka that the entire test takes at least
>>> more than a few minutes
>>>     - If the test is very short, results can often be misleading
>>>
>>>   - Use sufficiently large queue sizes
>>>     - Generally the lighter the tasks are, the longer queue you need to
>>> avoid worker starvation. Since your task looks very light weight, the
>>> default queue size may not be sufficient. There are a few places to tune.
>>> Have a look at this article:
>>> http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/
>>>
>>>   - Make the task a bit more heavy weight
>>>     - Getting higher throughput through parallelising is generally hard
>>> when the tasks are very light weight. If you make each tasks a bit more
>>> heavy weight (e.g. by generating more random numbers per invocation), it
>>> may be easier to observe the performance increase (this way you may also be
>>> able to avoid the message buffer tuning thing mentioned above)
>>>
>>>   - Try batched topologies
>>>     - Rationale is similar to above because batching will effectively
>>> make tasks more "heavy weight".
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Sat, Jul 25, 2015 at 5:23 PM, Dimitris Sarlis <[email protected]>
>>> wrote:
>>>
>>>>  Inside my bolts I ack every tuple I receive for processing.
>>>> Furthermore, I've set the TOPOLOGY_MAX_SPOUT_PENDING to 1000 to throttle
>>>> how many tuples remain pending at a given moment. The bolts don't have any
>>>> HashMap or other data structure. In fact, my code is pretty simple:
>>>>
>>>> public void execute(Tuple tuple) {
>>>>         if (!tuple.getString(0).contains("!")) {
>>>>             Random ran = new Random();
>>>>             int worker = ran.nextInt(boltNo) + 1;
>>>>             List<Integer> l = _topo.getComponentTasks("worker" +
>>>> worker);
>>>>             String out = tuple.getString(0) + "!";
>>>>             _collector.emitDirect(l.get(0), new Values(out));
>>>>         }
>>>>         else {
>>>>             LOG.info("Already processed record: " + tuple.getString(0));
>>>>         }
>>>>         _collector.ack(tuple);
>>>>     }
>>>>
>>>> As far as the slowdown is concerned, the rate of data processing is
>>>> constant from start to end. But when I increase the number of spouts/bolts,
>>>> the system takes more time to process the same amount of records (500000
>>>> records to be exact). So, the throughput drops because the rate of data
>>>> digestion is smaller. This is consistent as I increase the number of
>>>> workers, I have tested till 20 spouts/ 20 bolts.
>>>>
>>>> I set the number of Kafka partitions exactly equal to the number of
>>>> spouts each time to take advantage of the parallelism offered.
>>>>
>>>> I don't know if it helps, but I also tried a simpler topology, where
>>>> bolts are not connected with each other (so it's basically a tree like
>>>> topology) and I observed the same drop in throughput but not to the same
>>>> extent. The increase in processing time is almost negligible.
>>>>
>>>>
>>>> On 25/07/2015 07:08 μμ, Enno Shioji wrote:
>>>>
>>>>  My first guess would be that there is something with the topology,
>>>> like number of pending messages increasing abnormally due to tuples not
>>>> being acked/too many tuples generated, JVM heap shortage caused on workers
>>>> due to something being retained in bolts (like an ever growing HashMap) 
>>>> etc.
>>>>
>>>>  Does the slowdown happen gradually, or is it immediately slow?
>>>>
>>>>  Another random guess is that you don't have enough number of Kafka
>>>> partitions, hence not using your new workers but I wouldn't expect this to
>>>> make things significantly slower.
>>>>
>>>>
>>>>
>>>> On Sat, Jul 25, 2015 at 4:48 PM, Dimitris Sarlis <[email protected]>
>>>> wrote:
>>>>
>>>>>  Morgan,
>>>>>
>>>>> I hardly think that this could be the problem. The topology is
>>>>> deployed over a 14 node cluster with 56 total cores and 96GB RAM. So when 
>>>>> I
>>>>> jump from 8 to 16 workers, I think I am still far below my hardware
>>>>> limitations.
>>>>>
>>>>>
>>>>> On 25/07/2015 06:45 μμ, Morgan W09 wrote:
>>>>>
>>>>> It could be possible that you're reaching a hardware limitation. The
>>>>> jump from 8 to 16 total bolt/workers could be more than you hardware can
>>>>> handle efficiently. So it's starting to have to switch out processes and
>>>>> their memory, which can have substantial overhead causing your program to
>>>>> slow down.
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Jul 25, 2015 at 10:36 AM, Dimitris Sarlis <
>>>>> [email protected]> wrote:
>>>>>
>>>>>>  Yes, it listens to its own output. For example, if I have two bolts
>>>>>> (bolt1 and bolt2), I perform the following:
>>>>>>
>>>>>> bolt1.directGrouping("bolt1");
>>>>>> bolt1.directGrouping("bolt2");
>>>>>> bolt2.directGrouping("bolt1");
>>>>>> bolt2.directGrouping("bolt2");
>>>>>>
>>>>>> I know that this could possibly lead to a cycle, but right now the
>>>>>> bolts I'm trying to run perform the following:
>>>>>> if the inputRecord doesn't contain a "!" {
>>>>>>     append a "!"
>>>>>>     emit to a random node
>>>>>> }
>>>>>> else {
>>>>>>     do nothing with the record
>>>>>> }
>>>>>>
>>>>>> Dimitris
>>>>>>
>>>>>>
>>>>>> On 25/07/2015 06:03 μμ, Enno Shioji wrote:
>>>>>>
>>>>>> > Each bolt is connected with itself as well as with each one of the
>>>>>> other bolts
>>>>>> You mean the bolt listens to its own output?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sat, Jul 25, 2015 at 1:29 PM, Dimitris Sarlis <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I'm trying to run a topology in Storm and I am facing some
>>>>>>> scalability issues. Specifically, I have a topology where KafkaSpouts 
>>>>>>> read
>>>>>>> from a Kafka queue and emit messages to bolts which are connected with 
>>>>>>> each
>>>>>>> other through directGrouping. (Each bolt is connected with itself as 
>>>>>>> well
>>>>>>> as with each one of the other bolts). Spouts subscribe to bolts with
>>>>>>> shuffleGrouping. I observe that when I increase the number of spouts and
>>>>>>> bolts proportionally, I don't get the speedup I'm expecting to. In 
>>>>>>> fact, my
>>>>>>> topology seems to run slower and for the same amount of data, it takes 
>>>>>>> more
>>>>>>> time to complete. For example, when I increase spouts from 4->8 and 
>>>>>>> bolts
>>>>>>> from 4->8, it takes longer to process the same amount of kafka messages.
>>>>>>>
>>>>>>> Any ideas why this is happening? Thanks in advance.
>>>>>>>
>>>>>>> Best,
>>>>>>> Dimitris Sarlis
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
>

Reply via email to